监控告警:夜莺体系中使用Python实现短信告警

admin 2024年12月10日17:50:10评论28 views字数 9585阅读31分57秒阅读模式

夜莺监控告警通知基于webhook 方式对接的IM 的通知方式都覆盖全面了,但没有内置对接短信和电话通知方式。

由于特殊场景或者某些项目中,必须对接短信而且还是短信猫这种古老的对接方式,没办法只能硬着头皮上,下面简单介绍下如何对接短信猫实现告警消息的通知,下图是夜莺体系告警的简单实现流程:

监控告警:夜莺体系中使用Python实现短信告警

对接短信猫

本篇通过Ubuntu 20.04 及之后的系统为例,物理服务器上对接短信猫,物理连接方式最开始是通过串口直接插入服务器上,但是服务器上没有识别到串口设备信息,初步观察应该是驱动或者内核没有直接识别串口设备,避免折腾,马上更换为USB 转串口设备,在系统层面会映射到如下所示信息中:

ls /dev/ttyUSB*  # USB 转串口设备# 输出串口设备/dev/ttyUSB0# 检查驱动是否加载lsmod | grep pl2303

使用dmesg 命令查看串口设备是否正常识别,特别是在连接串口设备时:

# 查看串口设备是否正常识别dmesg | grep tty

你应该能看到类似以下的输出,表示设备已成功连接:

pl2303 ttyUSB0: pl2303 converter now disconnected from ttyUSB0usb 3-1: pl2303 converter now attached to ttyUSB0

这条信息表明你的系统检测到了一个 PL2303 USB 转串口适配器,并且该适配器已成功连接到/dev/ttyUSB0 设备。

  • 设备识别成功 :PL2303 转换器已成功连接并被识别为/dev/ttyUSB0。这通常意味着系统中的硬件和驱动都正常工作。
  • ttyUSB0 设备:你可以通过访问/dev/ttyUSB0 来与串口通信,测试串口是否工作正常。

安装测试软件,测试串口通信是否正常:

sudo apt-get install minicom
  • 配置minicom 连接到短信猫:
sudo minicom -s 

在弹出的菜单中选择Serial port setup,配置正确的串口设备(如/dev/ttyUSB0)和波特率(通常是115200)以及8 数据位,1 停止位。

  • 启动minicom
sudo minicom# 快捷键Ctrl + A 然后按下 Z# 选择 E 允许界面输入字符E

然后你可以输入AT 指令回车 ,界面返回OK,即可代表短信猫正常连接。

监控告警:夜莺体系中使用Python实现短信告警

短信猫硬件对接服务器成功。退出minicom: 按下Ctrl + A,然后按下Z,选择退出。

接下来我们需要使用Python 调用短信猫发送短信,测试短信猫发送短信是否正常。

# 安装 Pyhton 以及依赖包sudo apt install python3sudo apt install python3-pippip3 install pyserialpip3 install --upgrade pyserial

验证pyserial 是否安装成功:

监控告警:夜莺体系中使用Python实现短信告警

Python 写调用案例,确保调用短信猫指令发送短信正常,并且确保发送短信的字符串中英文显示正常,需要注意的点:字符编码,短信长度,以及短信内容加引号,如下是测试代码案例,可完全适配市面上主流的短信猫:

import serialimport loggingimport timeimport pdb# 配置日志记录logging.basicConfig(    level=logging.DEBUG,  # 设置日志级别为 DEBUG    format='%(asctime)s - %(levelname)s - %(message)s')definitialize_serial_connection(port, baudrate):"""初始化串口连接"""try:        logging.debug(f"尝试连接串口设备 {port},波特率 {baudrate}")        ser = serial.Serial(port, baudrate, timeout=1, parity='N', stopbits=1, bytesize=8)if ser.is_open:            logging.info(f"成功连接到 {port}")return serexcept Exception as e:        logging.error(f"无法连接到串口 {port}: {e}")returnNonedefsend_at_command(ser, command):"""发送AT指令并获取响应"""try:        logging.debug(f"发送 AT 指令: {command}")# 发送指令并加上回车符(r)        ser.write((command + 'r').encode('gbk'))        time.sleep(2)  # 等待串口响应        response = ser.read(ser.in_waiting)  # 读取返回的数据if response:            logging.info(f"接收到响应: {response.decode('gbk')}")else:            logging.warning("没有接收到响应")return response.decode()except Exception as e:        logging.error(f"发送指令失败: {e}")returnNonedefsend_sms(ser, phone_number, message):"""发送短信,通过 AT+SMS 指令"""try:# 构造 AT+SMS 指令        command = f'AT+SMS={phone_number} "{message}"'        send_at_command(ser, command)        logging.info(f"短信发送到 {phone_number}: {message}")returnTrueexcept Exception as e:        logging.error(f"发送短信失败: {e}")returnFalsedefmain():# 设置串口端口和波特率    port = '/dev/ttyUSB0'# 根据实际设备修改    baudrate = 115200# 初始化串口连接    ser = initialize_serial_connection(port, baudrate)if ser isNone:        logging.error("串口连接失败,退出程序")return# 发送 AT 指令测试    send_at_command(ser, 'AT')# 发送短信测试    phone_number = "176XXXXXXXX"# 替换为实际手机号    message = "Hello, 这是一条测试短信"    send_sms(ser, phone_number, message)# 程序完成    logging.debug("程序执行完成")# 关闭串口    ser.close()    logging.info("串口已关闭")if __name__ == '__main__':# 启动调试器    pdb.set_trace()    main()

测试通过,手机上正常收到测试短信:

监控告警:夜莺体系中使用Python实现短信告警

市面上不同的短信猫,可能支持的 AT 指令是不一样的,请根据实际情况修改代码中拼接指令。

夜莺配置

如果觉得自定义麻烦,建议您使用FlashDuty 来作为一站式告警OnCall 响应平台。

增加短信通知媒介

菜单位置:告警通知-通知设置-通知媒介。点击下面的添加,增加一个新的通知媒介,名称就叫短信 即可,标识就叫sms 即可,如下图所示:

监控告警:夜莺体系中使用Python实现短信告警

增加通知媒介之后,在告警规则的配置页面,就可以勾选短信 这个通知媒介了,如下图所示:

监控告警:夜莺体系中使用Python实现短信告警

编写短信通知模版

告警事件的格式是一大串JSON 数据,如果直接看JSON 数据那肯定头大,告警肯定是需要提取最关键的数据,特别是短信这种,限制文本大小的,更加要简短直接体现告警事件的对象和核心要素。

首先需要添加下短信通知模版:

{{- if .IsRecovered}}【告警恢复】##{{.RuleName}}##{{else}}【告警触发】##{{.RuleName}}##{{- end}}{{- $time_duration := sub now.Unix .FirstTriggerTime -}}{{- if .IsRecovered}}{{$time_duration = sub .LastEvalTime .FirstTriggerTime}}{{- end}} #规则级别: {{- if .IsRecovered}}(S{{.Severity}} 已恢复){{else}}(S{{.Severity}} 已触发){{- end}}{{- if .RuleNote}}{{.RuleNote}}{{- end}}{{- if .TargetIdent}} #监控对象: {{.TargetIdent}}{{- end}}{{- if ne .AnnotationsJSON.Note "1"}} #监控指标: ({{.TagsJSON}}{{- end}}){{- if not .IsRecovered}} #触发时值: ({{.TriggerValue}}) #触发时间: ({{timeformat .TriggerTime}}) #持续时长: ({{humanizeDurationInterface $time_duration}}){{- else}}{{- if .AnnotationsJSON.recovery_value}} #恢复时值: ({{formatDecimal .AnnotationsJSON.recovery_value 2}}){{- end}} #首次触发: {{timeformat .FirstTriggerTime}} #恢复时间: ({{timeformat .LastEvalTime}}) #持续时长: ({{humanizeDurationInterface $time_duration}}){{- end}} #发送时间: ({{- timestamp -}})

上面的告警模版是基于短信的,由于短信的特殊性,没办法排版显示,只能通过空格和特殊字符进行分割进行排版显示。

同时在该模版中,还添加了夜莺的告警规则RuleNote 字段携带显示告警标签数据,同时添加recovery_value 告警恢复时值字段,这两个字段都分别要在告警规则里面自定义配置,看下告警规则的备注和附加信息:

监控告警:夜莺体系中使用Python实现短信告警
监控告警:夜莺体系中使用Python实现短信告警

如果需要告警恢复时值,还需要在附加组中添加如下信息:

监控告警:夜莺体系中使用Python实现短信告警

用户信息添加

在用户管理里面添加手机号,这里以 root 用户为例:

监控告警:夜莺体系中使用Python实现短信告警

接下来就把团队管理和业务组管理新建一个,把对应的用户加入对于的团队和业务组即可,在编写告警规则的时候,选择对应的告警接收组,保证需要发送的手机号所属用户在这个告警接收组即可。

编写短信通知脚本

夜莺告警的时候,可以自动调用通知脚本,你就在通知脚本里写你的逻辑就可以了。脚本需要先启用,菜单入口:告警通知-通知设置-通知脚本,如下图:

监控告警:夜莺体系中使用Python实现短信告警

把写好的通知脚本放在夜莺脚本文件中/opt/nightingale/etc/script/notify.py 中,/opt/nightingale 是夜莺部署目录,脚本详细如下:

#!/usr/bin/env python3# -*- coding: UTF-8 -*-import sysimport jsonimport loggingimport timeimport serial# 配置日志记录logging.basicConfig(    level=logging.DEBUG,  # 设置日志级别为 DEBUG    format='%(asctime)s - %(levelname)s - %(message)s')classSender(object):    @classmethoddefsend_email(cls, payload):# already done in go codepass    @classmethoddefsend_wecom(cls, payload):# already done in go codepass    @classmethoddefsend_dingtalk(cls, payload):# already done in go codepass    @classmethoddefsend_feishu(cls, payload):# already done in go codepass    @classmethoddefsend_mm(cls, payload):# already done in go codepass    @classmethoddefsend_sms(cls, payload):"""        处理短信发送逻辑        """try:            users = payload.get('event', {}).get('notify_users_obj', [])ifnot users:                logging.warning("没有找到通知的用户信息。")return            phones = {u.get("phone") for u in users if u.get("phone")}ifnot phones:                logging.warning("没有有效的手机号码,无法发送短信。")return# 获取短信内容模版            sms_template = payload.get('tpls', {}).get("sms", "短信模板未找到")if sms_template == "短信模板未找到":                logging.warning("短信模板未找到,使用默认模板。")            port = '/dev/ttyUSB0'# 根据实际设备配置            baudrate = 115200            ser = initialize_modem_connection(port, baudrate)if ser isNone:                logging.error("无法连接到短信猫设备,短信发送失败")returnfor phone in phones:try:                    logging.info(f"准备通过短信猫发送短信到: {phone}")                    send_sms_via_modem(ser, phone, sms_template)except Exception as e:                    logging.error(f"发送短信到 {phone} 时发生错误: {str(e)}")            ser.close()            logging.info("短信猫串口已关闭")except Exception as e:            logging.error(f"发送短信时发生未知错误: {str(e)}")    @classmethoddefsend_voice(cls, payload):# 没有电话语音告警passdefinitialize_modem_connection(port, baudrate):"""初始化短信猫的串口连接"""try:        logging.debug(f"尝试连接短信猫设备 {port},波特率 {baudrate}")        ser = serial.Serial(port, baudrate, timeout=1, parity='N', stopbits=1, bytesize=8)if ser.is_open:            logging.info(f"成功连接到短信猫设备 {port}")return serexcept Exception as e:        logging.error(f"无法连接到短信猫设备 {port}: {e}")returnNonedefsend_modem_command(ser, command):"""发送AT指令到短信猫并获取响应"""try:        logging.debug(f"发送 AT 指令: {command}")        ser.write((command + 'r').encode('gbk'))        time.sleep(2)        response = ser.read(ser.in_waiting)  # 读取返回数据if response:            logging.info(f"接收到短信猫响应: {response.decode('gbk')}")else:            logging.warning("短信猫没有返回响应")return response.decode()except Exception as e:        logging.error(f"发送短信猫指令失败: {e}")returnNonedefsend_sms_via_modem(ser, phone_number, message):"""发送短信,通过 AT+SMS 指令"""try:# 构造 AT+SMS 指令        command = f'AT+SMS={phone_number} "{message}"'        send_modem_command(ser, command)        logging.info(f"短信发送到 {phone_number}: {message}")returnTrueexcept Exception as e:        logging.error(f"发送短信失败: {e}")returnFalsedefhello():# 测试使用    print("hello nightingale")defmain():"""    主程序逻辑    """try:        payload = None# 检查是否传入了文件路径作为命令行参数if len(sys.argv) > 1:            file_path = sys.argv[1]try:with open(file_path, 'r') as f:                    payload = json.load(f)                logging.info(f"成功从文件 {file_path} 读取 payload 数据。")except Exception as e:                logging.error(f"从文件 {file_path} 读取数据时发生错误: {str(e)}")returnelse:# 如果没有传入文件路径,继续从标准输入读取 JSON 数据try:                payload = json.load(sys.stdin)                logging.info("成功从标准输入读取 payload 数据。")except json.JSONDecodeError:                logging.error("读取 JSON 数据时发生错误,可能是格式不正确。")returnexcept Exception as e:                logging.error(f"程序执行时发生未知错误: {str(e)}")return# json 包含两个属性,其中 tpls 属性就是通知内容,可以在通知模版(系统设置-通知模板)中自行修改具体内容格式# event 属性包含了这条告警的完整信息,可以从 notify_channels 看到要发送的通知媒介,notify_users_obj 看到接收告警的所有用户信息等等# 遍历告警设置的通知媒介名称# 将告警事件数据保存到 .payload 文件,便于调试try:with open(".payload", 'w') as f:                f.write(json.dumps(payload, indent=4))            logging.info("成功读取并保存 payload 数据到 .payload 文件。")except Exception as e:            logging.warning(f"保存 payload 数据到文件时发生错误: {str(e)}")# json 包含两个属性,其中 tpls 属性就是通知内容,可以在通知模版(系统设置-通知模板)中自行修改具体内容格式# event 属性包含了这条告警的完整信息,可以从 notify_channels 看到要发送的通知媒介,notify_users_obj 看到接收告警的所有用户信息等等        channels = payload.get('event', {}).get('notify_channels', [])ifnot channels:            logging.warning("没有发现通知媒介。")returnfor ch in channels:            send_func_name = f"send_{ch.strip()}"            send_func = getattr(Sender, send_func_name, None)if send_func isNone:                logging.warning(f"方法 {send_func_name} 未找到,跳过该通知媒介。")continuetry:                send_func(payload)except Exception as e:                logging.error(f"调用方法 {send_func_name} 时发生错误: {str(e)}")except Exception as e:        logging.error(f"程序执行时发生未知错误: {str(e)}")if __name__ == "__main__":# 调试输出 sys.argv,确保我们获得的参数是什么    logging.info(f"sys.argv: {sys.argv}")# 检查是否传递了 "hello" 参数或者文件路径if len(sys.argv) == 1:# 如果没有传递任何额外的参数,调用 main()        main()elif sys.argv[1] == "hello":# 如果参数是 "hello",调用 hello()        hello()else:# 如果参数是文件路径,则执行处理文件路径的逻辑        file_path = sys.argv[1]try:            main()except Exception as e:            logging.error(f"无法读取文件 {file_path}: {str(e)}")            print("I am confused")

简单对上面代码做个解释,上面代码主要还是为了方便调试,实现运行先带个参数运行,如下所示:

# 测试python3 notify.py hello# 输出hello nightingale
监控告警:夜莺体系中使用Python实现短信告警

首次运行不带任何额外参数,从主函数中得到夜莺标准输入的一个告警事件,并把告警事件JSON 写入一个.payload 文件中,该文件默认不指定路径,默认保存在夜莺进程目录下,由于是个隐藏文件,你需要ll 查看到:

监控告警:夜莺体系中使用Python实现短信告警

得到一个文件后,我们就可以通过这个文件去不断调试短信内容和短信格式,如下所示:

# 带JSON格式的文件作为参数调试python3 notify.py /opt/nightingale/.payload
监控告警:夜莺体系中使用Python实现短信告警

最终得到告警消息如下:

监控告警:夜莺体系中使用Python实现短信告警

这篇简单介绍了下关于夜莺告警体系中如何通过Python 获取告警事件,并调用短信猫的方式去发送告警短信。

由于短信猫的局限性,现在更多的人可能还是利用短信通道供应商的服务,也可以用我这个去仿写,无非就是短信猫调用的方法改为用request 请求短信通道供应商的接口去发送短信,逻辑上比这个更简单,短信供应商都有demo

好了,简单聊到这里了,需要更深入沟通的话可直接公众号私信我。

📣欢迎朋友们关注公众号📢📢:【网络小斐】!

🙋‍♂️有想法的朋友也可以加我沟通,朋友🔘做个点赞之交!😂😂 欢迎点赞 👍、收藏 💗、关注 💡 三连支持一下,我们下期见~✨

原文始发于微信公众号(网络小斐):监控告警:夜莺体系中使用Python实现短信告警

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2024年12月10日17:50:10
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   监控告警:夜莺体系中使用Python实现短信告警https://cn-sec.com/archives/3491626.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息