夜莺监控告警通知基于webhook
方式对接的IM
的通知方式都覆盖全面了,但没有内置对接短信和电话通知方式。
由于特殊场景或者某些项目中,必须对接短信而且还是短信猫这种古老的对接方式,没办法只能硬着头皮上,下面简单介绍下如何对接短信猫实现告警消息的通知,下图是夜莺体系告警的简单实现流程:
对接短信猫
本篇通过Ubuntu 20.04
及之后的系统为例,物理服务器上对接短信猫,物理连接方式最开始是通过串口直接插入服务器上,但是服务器上没有识别到串口设备信息,初步观察应该是驱动或者内核没有直接识别串口设备,避免折腾,马上更换为USB
转串口设备,在系统层面会映射到如下所示信息中:
ls /dev/ttyUSB* # USB 转串口设备# 输出串口设备/dev/ttyUSB0# 检查驱动是否加载lsmod | grep pl2303
使用dmesg
命令查看串口设备是否正常识别,特别是在连接串口设备时:
# 查看串口设备是否正常识别dmesg | grep tty
你应该能看到类似以下的输出,表示设备已成功连接:
pl2303 ttyUSB0: pl2303 converter now disconnected from ttyUSB0usb 3-1: pl2303 converter now attached to ttyUSB0
这条信息表明你的系统检测到了一个 PL2303 USB 转串口适配器,并且该适配器已成功连接到/dev/ttyUSB0
设备。
-
设备识别成功
:PL2303 转换器已成功连接并被识别为/dev/ttyUSB0
。这通常意味着系统中的硬件和驱动都正常工作。 -
ttyUSB0
设备:你可以通过访问/dev/ttyUSB0
来与串口通信,测试串口是否工作正常。
安装测试软件,测试串口通信是否正常:
sudo apt-get install minicom
-
配置 minicom
连接到短信猫:
sudo minicom -s
在弹出的菜单中选择Serial port setup,配置正确的串口设备(如/dev/ttyUSB0
)和波特率(通常是115200
)以及8
数据位,1
停止位。
-
启动 minicom
:
sudo minicom# 快捷键Ctrl + A 然后按下 Z# 选择 E 允许界面输入字符E
然后你可以输入AT
指令回车
,界面返回OK,即可代表短信猫正常连接。
短信猫硬件对接服务器成功。退出minicom
: 按下Ctrl + A
,然后按下Z
,选择退出。
接下来我们需要使用Python
调用短信猫发送短信,测试短信猫发送短信是否正常。
# 安装 Pyhton 以及依赖包sudo apt install python3sudo apt install python3-pippip3 install pyserialpip3 install --upgrade pyserial
验证pyserial
是否安装成功:
用Python
写调用案例,确保调用短信猫指令发送短信正常,并且确保发送短信的字符串中英文显示正常,需要注意的点:字符编码,短信长度,以及短信内容加引号,如下是测试代码案例,可完全适配市面上主流的短信猫:
import serialimport loggingimport timeimport pdb# 配置日志记录logging.basicConfig( level=logging.DEBUG, # 设置日志级别为 DEBUG format='%(asctime)s - %(levelname)s - %(message)s')definitialize_serial_connection(port, baudrate):"""初始化串口连接"""try: logging.debug(f"尝试连接串口设备 {port},波特率 {baudrate}") ser = serial.Serial(port, baudrate, timeout=1, parity='N', stopbits=1, bytesize=8)if ser.is_open: logging.info(f"成功连接到 {port}")return serexcept Exception as e: logging.error(f"无法连接到串口 {port}: {e}")returnNonedefsend_at_command(ser, command):"""发送AT指令并获取响应"""try: logging.debug(f"发送 AT 指令: {command}")# 发送指令并加上回车符(r) ser.write((command + 'r').encode('gbk')) time.sleep(2) # 等待串口响应 response = ser.read(ser.in_waiting) # 读取返回的数据if response: logging.info(f"接收到响应: {response.decode('gbk')}")else: logging.warning("没有接收到响应")return response.decode()except Exception as e: logging.error(f"发送指令失败: {e}")returnNonedefsend_sms(ser, phone_number, message):"""发送短信,通过 AT+SMS 指令"""try:# 构造 AT+SMS 指令 command = f'AT+SMS={phone_number} "{message}"' send_at_command(ser, command) logging.info(f"短信发送到 {phone_number}: {message}")returnTrueexcept Exception as e: logging.error(f"发送短信失败: {e}")returnFalsedefmain():# 设置串口端口和波特率 port = '/dev/ttyUSB0'# 根据实际设备修改 baudrate = 115200# 初始化串口连接 ser = initialize_serial_connection(port, baudrate)if ser isNone: logging.error("串口连接失败,退出程序")return# 发送 AT 指令测试 send_at_command(ser, 'AT')# 发送短信测试 phone_number = "176XXXXXXXX"# 替换为实际手机号 message = "Hello, 这是一条测试短信" send_sms(ser, phone_number, message)# 程序完成 logging.debug("程序执行完成")# 关闭串口 ser.close() logging.info("串口已关闭")if __name__ == '__main__':# 启动调试器 pdb.set_trace() main()
测试通过,手机上正常收到测试短信:
市面上不同的短信猫,可能支持的
AT
指令是不一样的,请根据实际情况修改代码中拼接指令。
夜莺配置
如果觉得自定义麻烦,建议您使用FlashDuty
来作为一站式告警OnCall
响应平台。
增加短信通知媒介
菜单位置:告警通知-通知设置-通知媒介。点击下面的添加,增加一个新的通知媒介,名称就叫短信
即可,标识就叫sms
即可,如下图所示:
增加通知媒介之后,在告警规则的配置页面,就可以勾选短信
这个通知媒介了,如下图所示:
编写短信通知模版
告警事件的格式是一大串JSON
数据,如果直接看JSON
数据那肯定头大,告警肯定是需要提取最关键的数据,特别是短信这种,限制文本大小的,更加要简短直接体现告警事件的对象和核心要素。
首先需要添加下短信通知模版:
{{- if .IsRecovered}}【告警恢复】##{{.RuleName}}##{{else}}【告警触发】##{{.RuleName}}##{{- end}}{{- $time_duration := sub now.Unix .FirstTriggerTime -}}{{- if .IsRecovered}}{{$time_duration = sub .LastEvalTime .FirstTriggerTime}}{{- end}} #规则级别: {{- if .IsRecovered}}(S{{.Severity}} 已恢复){{else}}(S{{.Severity}} 已触发){{- end}}{{- if .RuleNote}}{{.RuleNote}}{{- end}}{{- if .TargetIdent}} #监控对象: {{.TargetIdent}}{{- end}}{{- if ne .AnnotationsJSON.Note "1"}} #监控指标: ({{.TagsJSON}}{{- end}}){{- if not .IsRecovered}} #触发时值: ({{.TriggerValue}}) #触发时间: ({{timeformat .TriggerTime}}) #持续时长: ({{humanizeDurationInterface $time_duration}}){{- else}}{{- if .AnnotationsJSON.recovery_value}} #恢复时值: ({{formatDecimal .AnnotationsJSON.recovery_value 2}}){{- end}} #首次触发: {{timeformat .FirstTriggerTime}} #恢复时间: ({{timeformat .LastEvalTime}}) #持续时长: ({{humanizeDurationInterface $time_duration}}){{- end}} #发送时间: ({{- timestamp -}})
上面的告警模版是基于短信的,由于短信的特殊性,没办法排版显示,只能通过空格和特殊字符进行分割进行排版显示。
同时在该模版中,还添加了夜莺的告警规则RuleNote
字段携带显示告警标签数据,同时添加recovery_value
告警恢复时值字段,这两个字段都分别要在告警规则里面自定义配置,看下告警规则的备注和附加信息:
如果需要告警恢复时值,还需要在附加组中添加如下信息:
用户信息添加
在用户管理里面添加手机号,这里以 root 用户为例:
接下来就把团队管理和业务组管理新建一个,把对应的用户加入对于的团队和业务组即可,在编写告警规则的时候,选择对应的告警接收组,保证需要发送的手机号所属用户在这个告警接收组即可。
编写短信通知脚本
夜莺告警的时候,可以自动调用通知脚本,你就在通知脚本里写你的逻辑就可以了。脚本需要先启用,菜单入口:告警通知-通知设置-通知脚本,如下图:
把写好的通知脚本放在夜莺脚本文件中/opt/nightingale/etc/script/notify.py
中,/opt/nightingale
是夜莺部署目录,脚本详细如下:
#!/usr/bin/env python3# -*- coding: UTF-8 -*-import sysimport jsonimport loggingimport timeimport serial# 配置日志记录logging.basicConfig( level=logging.DEBUG, # 设置日志级别为 DEBUG format='%(asctime)s - %(levelname)s - %(message)s')classSender(object): @classmethoddefsend_email(cls, payload):# already done in go codepass @classmethoddefsend_wecom(cls, payload):# already done in go codepass @classmethoddefsend_dingtalk(cls, payload):# already done in go codepass @classmethoddefsend_feishu(cls, payload):# already done in go codepass @classmethoddefsend_mm(cls, payload):# already done in go codepass @classmethoddefsend_sms(cls, payload):""" 处理短信发送逻辑 """try: users = payload.get('event', {}).get('notify_users_obj', [])ifnot users: logging.warning("没有找到通知的用户信息。")return phones = {u.get("phone") for u in users if u.get("phone")}ifnot phones: logging.warning("没有有效的手机号码,无法发送短信。")return# 获取短信内容模版 sms_template = payload.get('tpls', {}).get("sms", "短信模板未找到")if sms_template == "短信模板未找到": logging.warning("短信模板未找到,使用默认模板。") port = '/dev/ttyUSB0'# 根据实际设备配置 baudrate = 115200 ser = initialize_modem_connection(port, baudrate)if ser isNone: logging.error("无法连接到短信猫设备,短信发送失败")returnfor phone in phones:try: logging.info(f"准备通过短信猫发送短信到: {phone}") send_sms_via_modem(ser, phone, sms_template)except Exception as e: logging.error(f"发送短信到 {phone} 时发生错误: {str(e)}") ser.close() logging.info("短信猫串口已关闭")except Exception as e: logging.error(f"发送短信时发生未知错误: {str(e)}") @classmethoddefsend_voice(cls, payload):# 没有电话语音告警passdefinitialize_modem_connection(port, baudrate):"""初始化短信猫的串口连接"""try: logging.debug(f"尝试连接短信猫设备 {port},波特率 {baudrate}") ser = serial.Serial(port, baudrate, timeout=1, parity='N', stopbits=1, bytesize=8)if ser.is_open: logging.info(f"成功连接到短信猫设备 {port}")return serexcept Exception as e: logging.error(f"无法连接到短信猫设备 {port}: {e}")returnNonedefsend_modem_command(ser, command):"""发送AT指令到短信猫并获取响应"""try: logging.debug(f"发送 AT 指令: {command}") ser.write((command + 'r').encode('gbk')) time.sleep(2) response = ser.read(ser.in_waiting) # 读取返回数据if response: logging.info(f"接收到短信猫响应: {response.decode('gbk')}")else: logging.warning("短信猫没有返回响应")return response.decode()except Exception as e: logging.error(f"发送短信猫指令失败: {e}")returnNonedefsend_sms_via_modem(ser, phone_number, message):"""发送短信,通过 AT+SMS 指令"""try:# 构造 AT+SMS 指令 command = f'AT+SMS={phone_number} "{message}"' send_modem_command(ser, command) logging.info(f"短信发送到 {phone_number}: {message}")returnTrueexcept Exception as e: logging.error(f"发送短信失败: {e}")returnFalsedefhello():# 测试使用 print("hello nightingale")defmain():""" 主程序逻辑 """try: payload = None# 检查是否传入了文件路径作为命令行参数if len(sys.argv) > 1: file_path = sys.argv[1]try:with open(file_path, 'r') as f: payload = json.load(f) logging.info(f"成功从文件 {file_path} 读取 payload 数据。")except Exception as e: logging.error(f"从文件 {file_path} 读取数据时发生错误: {str(e)}")returnelse:# 如果没有传入文件路径,继续从标准输入读取 JSON 数据try: payload = json.load(sys.stdin) logging.info("成功从标准输入读取 payload 数据。")except json.JSONDecodeError: logging.error("读取 JSON 数据时发生错误,可能是格式不正确。")returnexcept Exception as e: logging.error(f"程序执行时发生未知错误: {str(e)}")return# json 包含两个属性,其中 tpls 属性就是通知内容,可以在通知模版(系统设置-通知模板)中自行修改具体内容格式# event 属性包含了这条告警的完整信息,可以从 notify_channels 看到要发送的通知媒介,notify_users_obj 看到接收告警的所有用户信息等等# 遍历告警设置的通知媒介名称# 将告警事件数据保存到 .payload 文件,便于调试try:with open(".payload", 'w') as f: f.write(json.dumps(payload, indent=4)) logging.info("成功读取并保存 payload 数据到 .payload 文件。")except Exception as e: logging.warning(f"保存 payload 数据到文件时发生错误: {str(e)}")# json 包含两个属性,其中 tpls 属性就是通知内容,可以在通知模版(系统设置-通知模板)中自行修改具体内容格式# event 属性包含了这条告警的完整信息,可以从 notify_channels 看到要发送的通知媒介,notify_users_obj 看到接收告警的所有用户信息等等 channels = payload.get('event', {}).get('notify_channels', [])ifnot channels: logging.warning("没有发现通知媒介。")returnfor ch in channels: send_func_name = f"send_{ch.strip()}" send_func = getattr(Sender, send_func_name, None)if send_func isNone: logging.warning(f"方法 {send_func_name} 未找到,跳过该通知媒介。")continuetry: send_func(payload)except Exception as e: logging.error(f"调用方法 {send_func_name} 时发生错误: {str(e)}")except Exception as e: logging.error(f"程序执行时发生未知错误: {str(e)}")if __name__ == "__main__":# 调试输出 sys.argv,确保我们获得的参数是什么 logging.info(f"sys.argv: {sys.argv}")# 检查是否传递了 "hello" 参数或者文件路径if len(sys.argv) == 1:# 如果没有传递任何额外的参数,调用 main() main()elif sys.argv[1] == "hello":# 如果参数是 "hello",调用 hello() hello()else:# 如果参数是文件路径,则执行处理文件路径的逻辑 file_path = sys.argv[1]try: main()except Exception as e: logging.error(f"无法读取文件 {file_path}: {str(e)}") print("I am confused")
简单对上面代码做个解释,上面代码主要还是为了方便调试,实现运行先带个参数运行,如下所示:
# 测试python3 notify.py hello# 输出hello nightingale
首次运行不带任何额外参数,从主函数中得到夜莺标准输入的一个告警事件,并把告警事件JSON
写入一个.payload
文件中,该文件默认不指定路径,默认保存在夜莺进程目录下,由于是个隐藏文件,你需要ll
查看到:
得到一个文件后,我们就可以通过这个文件去不断调试短信内容和短信格式,如下所示:
# 带JSON格式的文件作为参数调试python3 notify.py /opt/nightingale/.payload
最终得到告警消息如下:
这篇简单介绍了下关于夜莺告警体系中如何通过Python
获取告警事件,并调用短信猫的方式去发送告警短信。
由于短信猫的局限性,现在更多的人可能还是利用短信通道供应商的服务,也可以用我这个去仿写,无非就是短信猫调用的方法改为用request
请求短信通道供应商的接口去发送短信,逻辑上比这个更简单,短信供应商都有demo
。
好了,简单聊到这里了,需要更深入沟通的话可直接公众号私信我。
📣欢迎朋友们关注公众号📢📢:【网络小斐】!
🙋♂️有想法的朋友也可以加我沟通,朋友🔘做个点赞之交!😂😂 欢迎点赞 👍、收藏 💗、关注 💡 三连支持一下,我们下期见~✨
原文始发于微信公众号(网络小斐):监控告警:夜莺体系中使用Python实现短信告警
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论