bark通知监控
demo原因
-
需要及时监控:例如价格涨跌通知
-
使用app: 钉钉机器人,飞书机器人,bark通知
-
推荐飞书或者bark,钉钉需要创建公司费劲
#机器人参考:
https://open.feishu.cn/document/client-docs/bot-v3/bot-overview
✨Bark通知demo1
import requests
def send_bark(server_addr, my_key, my_title, my_msg):
"""
发送通知的函数。
参数:
- server_addr (str): 服务器地址。
- my_key (str): 设备识别码。
- my_title (str): 通知标题。
- my_msg (str): 通知内容。
返回:
- response: 请求的响应对象。
"""
url = "{server}/{key}/{title}/{msg}?sound=minuet".format(
server=server_addr, key=my_key, title=my_title, msg=my_msg
)
response = requests.get(url)
return response
# 示例用法
server_addr = "https://api.day.app" # 请用你的服务器地址替换这里
my_key = "xxxxx" # 此处填写自己的设备识别码
my_title = "实时监控" # å请用你的通知标题替换这里
my_msg = "当前水位xxx" # 请用你的通知内容替换这里
response = send_bark(server_addr, my_key, my_title, my_msg)
print("通知已发送。响应内容:", response.text)
✨Bark通知demo2
import requests
from selenium import webdriver
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import logging
def send_bark(server_addr, bark_key, my_title, my_msg):
url = f"{server_addr}/{bark_key}/{my_title}/{my_msg}?sound=minuet"
try:
response = requests.get(url, params={})
response.raise_for_status() # 检查请求是否成功
return response
except requests.exceptions.RequestException as e:
logging.error(f"发送 Bark 通知时出错:{e}")
return None
def get_prices(driver, selector):
try:
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
price = element.text.strip()
return price
except TimeoutException as e:
logging.error(f"等待元素超时:{e}")
return None
def monitor_prices(server_addr, bark_key, selector, threshold, interval_seconds=3600):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
run_monitoring = True
while run_monitoring:
try:
driver = webdriver.Chrome()
driver.get('https://mybank.icbc.com.cn/icbc/newperbank/perbank3/gold/goldaccrual_query_out.jsp')
jiage1 = get_prices(driver, 'td#activeprice_080020000521')
jiage2 = get_prices(driver, 'td#activeprice_080020000501')
if jiage1 and jiage2:
logging.info(f"积存金实时主动积存价格: {jiage1}元")
logging.info(f"如意积存金实时主动积存价格: {jiage2}元")
jiage2_float = float(jiage2)
if jiage2_float < threshold:
send_bark(server_addr, bark_key, "金价监控", f"ryj价格: {jiage2}元njcj价格: {jiage1}元")
else:
logging.warning("未找到标签.")
except Exception as e:
logging.error(f"发生异常:{e}")
finally:
driver.quit()
try:
time.sleep(interval_seconds)
except KeyboardInterrupt:
logging.info("监控程序被中断.")
run_monitoring = False
if __name__ == "__main__":
server_addr = "https://api.day.app" # 请用你的服务器地址替换这里
bark_key = "xxxxxxxxxxxx" # 请用你的 Bark Key 替换这里
monitor_selector = 'td#activeprice_080020000501' # 请用你的选择器替换这里
price_threshold = 500.14 # 设置价格阈值,即触发通知的价格下限
monitor_interval_seconds = 10 # 设置监控间隔时间,例如10秒
try:
monitor_prices(server_addr, bark_key, monitor_selector, price_threshold, monitor_interval_seconds)
except KeyboardInterrupt:
pass
✨飞书通知demo
import requests
import json
from selenium import webdriver
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def get_prices():
driver = webdriver.Chrome()
driver.get('https://mybank.icbc.com.cn/icbc/newperbank/perbank3/gold/goldaccrual_query_out.jsp')
try:
jiage1_tag = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'td#activeprice_080020000521'))
)
jiage2_tag = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'td#activeprice_080020000501'))
)
jiage1 = jiage1_tag.text.strip()
jiage2 = jiage2_tag.text.strip()
return jiage1, jiage2
except Exception as e:
print(f"发生异常:{e}")
return None, None
finally:
driver.quit()
def send_feishu_notification(jiage1, jiage2):
msg = {
"msg_type": "text",
"content": {"text": f"今天的宇宙航nryj价格:{jiage2}元njcj价格:{jiage1}元"}
}
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxxxxxxx"
headers = {
"Content-type": "application/json",
"charset": "utf-8"
}
msg_encode = json.dumps(msg, ensure_ascii=True).encode("utf-8")
response = requests.post(url=webhook_url, data=msg_encode, headers=headers)
if response.status_code == 200:
print("已发送消息到飞书群聊中")
def monitor_prices(interval_seconds=10):
driver = None
try:
while True:
jiage1, jiage2 = get_prices()
if jiage1 and jiage2:
print(f"积存金实时主动积存价格: {jiage1}元")
print(f"如意积存金实时主动积存价格: {jiage2}元")
jiage2_float = float(jiage2)
if jiage2_float > 490: # 修改价格即可
send_feishu_notification(jiage1, jiage2)
else:
print("未找到标签。")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("用户中断,退出监控。")
finally:
if driver:
driver.quit()
if __name__ == "__main__":
monitor_prices()
原文始发于微信公众号(echoabced):Bark通知监控
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论