gaoyuan 8 months ago
parent 091f1c7e7b
commit 83d8e2be71

@ -9,9 +9,20 @@ import re
import threading
import time
import urllib.parse
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
import requests
import smtplib
from email.mime.text import MIMEText
msg_from = '785085670@qq.com' # 发送方邮箱地址。
password = 'rapagfoymtiwbbac' # 发送方QQ邮箱授权码不是QQ邮箱密码。
msg_to = '785085670@qq.com' # 收件人邮箱地址。
# 原先的 print 函数和主线程的锁
_print = print
mutex = threading.Lock()
@ -35,6 +46,9 @@ push_config = {
'BARK_ARCHIVE': '', # bark 推送是否存档
'BARK_GROUP': '', # bark 推送分组
'BARK_SOUND': '', # bark 推送声音
'BARK_ICON': '', # bark 推送图标
'BARK_LEVEL': '', # bark 推送时效性
'BARK_URL': '', # bark 推送跳转URL
'CONSOLE': True, # 控制台输出
@ -59,12 +73,20 @@ push_config = {
'PUSH_KEY': '', # server 酱的 PUSH_KEY兼容旧版与 Turbo 版
'DEER_KEY': '', # PushDeer 的 PUSHDEER_KEY
'DEER_URL': '', # PushDeer 的 PUSHDEER_URL
'CHAT_URL': '', # synology chat url
'CHAT_TOKEN': '', # synology chat token
'PUSH_PLUS_TOKEN': '', # push+ 微信推送的用户令牌
'PUSH_PLUS_USER': '', # push+ 微信推送的群组编码
'QMSG_KEY': '', # qmsg 酱的 QMSG_KEY
'QMSG_TYPE': '', # qmsg 酱的 QMSG_TYPE
'QYWX_ORIGIN': '', # 企业微信代理地址
'QYWX_AM': '', # 企业微信应用
'QYWX_KEY': '', # 企业微信机器人
@ -75,6 +97,28 @@ push_config = {
'TG_PROXY_AUTH': '', # tg 代理认证参数
'TG_PROXY_HOST': '', # tg 机器人的 TG_PROXY_HOST
'TG_PROXY_PORT': '', # tg 机器人的 TG_PROXY_PORT
'AIBOTK_KEY': '', # 智能微秘书 个人中心的apikey 文档地址http://wechat.aibotk.com/docs/about
'AIBOTK_TYPE': '', # 智能微秘书 发送目标 room 或 contact
'AIBOTK_NAME': '', # 智能微秘书 发送群名 或者好友昵称和type要对应好
'SMTP_SERVER': '', # SMTP 发送邮件服务器,形如 smtp.exmail.qq.com:465
'SMTP_SSL': 'false', # SMTP 发送邮件服务器是否使用 SSL填写 true 或 false
'SMTP_EMAIL': '', # SMTP 收发件邮箱,通知将会由自己发给自己
'SMTP_PASSWORD': '', # SMTP 登录密码,也可能为特殊口令,视具体邮件服务商说明而定
'SMTP_NAME': '', # SMTP 收发件人姓名,可随意填写
'PUSHME_KEY': '', # PushMe 酱的 PUSHME_KEY
'CHRONOCAT_QQ': '', # qq号
'CHRONOCAT_TOKEN': '', # CHRONOCAT 的token
'CHRONOCAT_URL': '', # CHRONOCAT的url地址
'WEBHOOK_URL': '', # 自定义通知 请求地址
'WEBHOOK_BODY': '', # 自定义通知 请求体
'WEBHOOK_HEADERS': '', # 自定义通知 请求头
'WEBHOOK_METHOD': '', # 自定义通知 请求方法
'WEBHOOK_CONTENT_TYPE': '' # 自定义通知 content-type
}
notify_function = []
# fmt: on
@ -104,6 +148,9 @@ def bark(title: str, content: str) -> None:
"BARK_ARCHIVE": "isArchive",
"BARK_GROUP": "group",
"BARK_SOUND": "sound",
"BARK_ICON": "icon",
"BARK_LEVEL": "level",
"BARK_URL": "url",
}
params = ""
for pair in filter(
@ -142,8 +189,7 @@ def dingding_bot(title: str, content: str) -> None:
timestamp = str(round(time.time() * 1000))
secret_enc = push_config.get("DD_BOT_SECRET").encode("utf-8")
string_to_sign = "{}\n{}".format(
timestamp, push_config.get("DD_BOT_SECRET"))
string_to_sign = "{}\n{}".format(timestamp, push_config.get("DD_BOT_SECRET"))
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
@ -209,8 +255,11 @@ def gotify(title: str, content: str) -> None:
print("gotify 服务启动")
url = f'{push_config.get("GOTIFY_URL")}/message?token={push_config.get("GOTIFY_TOKEN")}'
data = {"title": title, "message": content,
"priority": push_config.get("GOTIFY_PRIORITY")}
data = {
"title": title,
"message": content,
"priority": push_config.get("GOTIFY_PRIORITY"),
}
response = requests.post(url, data=data).json()
if response.get("id"):
@ -249,10 +298,10 @@ def serverJ(title: str, content: str) -> None:
print("serverJ 服务启动")
data = {"text": title, "desp": content.replace("\n", "\n\n")}
if push_config.get("PUSH_KEY").index("SCT") != -1:
if push_config.get("PUSH_KEY").find("SCT") != -1:
url = f'https://sctapi.ftqq.com/{push_config.get("PUSH_KEY")}.send'
else:
url = f'https://sc.ftqq.com/${push_config.get("PUSH_KEY")}.send'
url = f'https://sc.ftqq.com/{push_config.get("PUSH_KEY")}.send'
response = requests.post(url, data=data).json()
if response.get("errno") == 0 or response.get("code") == 0:
@ -261,6 +310,50 @@ def serverJ(title: str, content: str) -> None:
print(f'serverJ 推送失败!错误码:{response["message"]}')
def pushdeer(title: str, content: str) -> None:
"""
通过PushDeer 推送消息
"""
if not push_config.get("DEER_KEY"):
print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送")
return
print("PushDeer 服务启动")
data = {
"text": title,
"desp": content,
"type": "markdown",
"pushkey": push_config.get("DEER_KEY"),
}
url = "https://api2.pushdeer.com/message/push"
if push_config.get("DEER_URL"):
url = push_config.get("DEER_URL")
response = requests.post(url, data=data).json()
if len(response.get("content").get("result")) > 0:
print("PushDeer 推送成功!")
else:
print("PushDeer 推送失败!错误信息:", response)
def chat(title: str, content: str) -> None:
"""
通过Chat 推送消息
"""
if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"):
print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送")
return
print("chat 服务启动")
data = "payload=" + json.dumps({"text": title + "\n" + content})
url = push_config.get("CHAT_URL") + push_config.get("CHAT_TOKEN")
response = requests.post(url, data=data)
if response.status_code == 200:
print("Chat 推送成功!")
else:
print("Chat 推送失败!错误信息:", response)
def pushplus_bot(title: str, content: str) -> None:
"""
通过 push+ 推送消息
@ -285,11 +378,9 @@ def pushplus_bot(title: str, content: str) -> None:
print("PUSHPLUS 推送成功!")
else:
url_old = "http://pushplus.hxtrip.com/send"
headers["Accept"] = "application/json"
response = requests.post(
url=url_old, data=body, headers=headers).json()
response = requests.post(url=url_old, data=body, headers=headers).json()
if response["code"] == 200:
print("PUSHPLUS(hxtrip) 推送成功!")
@ -308,8 +399,7 @@ def qmsg_bot(title: str, content: str) -> None:
print("qmsg 服务启动")
url = f'https://qmsg.zendee.cn/{push_config.get("QMSG_TYPE")}/{push_config.get("QMSG_KEY")}'
payload = {
"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
payload = {"msg": f'{title}\n\n{content.replace("----", "-")}'.encode("utf-8")}
response = requests.post(url=url, params=payload).json()
if response["code"] == 0:
@ -358,9 +448,12 @@ class WeCom:
self.CORPID = corpid
self.CORPSECRET = corpsecret
self.AGENTID = agentid
self.ORIGIN = "https://qyapi.weixin.qq.com"
if push_config.get("QYWX_ORIGIN"):
self.ORIGIN = push_config.get("QYWX_ORIGIN")
def get_access_token(self):
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
url = f"{self.ORIGIN}/cgi-bin/gettoken"
values = {
"corpid": self.CORPID,
"corpsecret": self.CORPSECRET,
@ -371,8 +464,7 @@ class WeCom:
def send_text(self, message, touser="@all"):
send_url = (
"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="
+ self.get_access_token()
f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
)
send_values = {
"touser": touser,
@ -388,8 +480,7 @@ class WeCom:
def send_mpnews(self, title, message, media_id, touser="@all"):
send_url = (
"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token="
+ self.get_access_token()
f"{self.ORIGIN}/cgi-bin/message/send?access_token={self.get_access_token()}"
)
send_values = {
"touser": touser,
@ -423,7 +514,11 @@ def wecom_bot(title: str, content: str) -> None:
return
print("企业微信机器人服务启动")
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}"
origin = "https://qyapi.weixin.qq.com"
if push_config.get("QYWX_ORIGIN"):
origin = push_config.get("QYWX_ORIGIN")
url = f"{origin}/cgi-bin/webhook/send?key={push_config.get('QYWX_KEY')}"
headers = {"Content-Type": "application/json;charset=utf-8"}
data = {"msgtype": "text", "text": {"content": f"{title}\n\n{content}"}}
response = requests.post(
@ -481,6 +576,268 @@ def telegram_bot(title: str, content: str) -> None:
print("tg 推送失败!")
def aibotk(title: str, content: str) -> None:
"""
使用 智能微秘书 推送消息
"""
if (
not push_config.get("AIBOTK_KEY")
or not push_config.get("AIBOTK_TYPE")
or not push_config.get("AIBOTK_NAME")
):
print("智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送")
return
print("智能微秘书 服务启动")
if push_config.get("AIBOTK_TYPE") == "room":
url = "https://api-bot.aibotk.com/openapi/v1/chat/room"
data = {
"apiKey": push_config.get("AIBOTK_KEY"),
"roomName": push_config.get("AIBOTK_NAME"),
"message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"},
}
else:
url = "https://api-bot.aibotk.com/openapi/v1/chat/contact"
data = {
"apiKey": push_config.get("AIBOTK_KEY"),
"name": push_config.get("AIBOTK_NAME"),
"message": {"type": 1, "content": f"【青龙快讯】\n\n{title}\n{content}"},
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, data=body, headers=headers).json()
print(response)
if response["code"] == 0:
print("智能微秘书 推送成功!")
else:
print(f'智能微秘书 推送失败!{response["error"]}')
def smtp(title: str, content: str) -> None:
"""
使用 SMTP 邮件 推送消息
"""
if (
not push_config.get("SMTP_SERVER")
or not push_config.get("SMTP_SSL")
or not push_config.get("SMTP_EMAIL")
or not push_config.get("SMTP_PASSWORD")
or not push_config.get("SMTP_NAME")
):
print(
"SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送"
)
return
print("SMTP 邮件 服务启动")
message = MIMEText(content, "plain", "utf-8")
message["From"] = formataddr(
(
Header(push_config.get("SMTP_NAME"), "utf-8").encode(),
push_config.get("SMTP_EMAIL"),
)
)
message["To"] = formataddr(
(
Header(push_config.get("SMTP_NAME"), "utf-8").encode(),
push_config.get("SMTP_EMAIL"),
)
)
message["Subject"] = Header(title, "utf-8")
try:
smtp_server = (
smtplib.SMTP_SSL(push_config.get("SMTP_SERVER"))
if push_config.get("SMTP_SSL") == "true"
else smtplib.SMTP(push_config.get("SMTP_SERVER"))
)
smtp_server.login(
push_config.get("SMTP_EMAIL"), push_config.get("SMTP_PASSWORD")
)
smtp_server.sendmail(
push_config.get("SMTP_EMAIL"),
push_config.get("SMTP_EMAIL"),
message.as_bytes(),
)
smtp_server.close()
print("SMTP 邮件 推送成功!")
except Exception as e:
print(f"SMTP 邮件 推送失败!{e}")
def pushme(title: str, content: str) -> None:
"""
使用 PushMe 推送消息
"""
if not push_config.get("PUSHME_KEY"):
print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送")
return
print("PushMe 服务启动")
url = f'https://push.i-i.me/?push_key={push_config.get("PUSHME_KEY")}'
data = {
"title": title,
"content": content,
}
response = requests.post(url, data=data)
if response.status_code == 200 and response.text == "success":
print("PushMe 推送成功!")
else:
print(f"PushMe 推送失败!{response.status_code} {response.text}")
def chronocat(title: str, content: str) -> None:
"""
使用 CHRONOCAT 推送消息
"""
if (
not push_config.get("CHRONOCAT_URL")
or not push_config.get("CHRONOCAT_QQ")
or not push_config.get("CHRONOCAT_TOKEN")
):
print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送")
return
print("CHRONOCAT 服务启动")
user_ids = re.findall(r"user_id=(\d+)", push_config.get("CHRONOCAT_QQ"))
group_ids = re.findall(r"group_id=(\d+)", push_config.get("CHRONOCAT_QQ"))
url = f'{push_config.get("CHRONOCAT_URL")}/api/message/send'
headers = {
"Content-Type": "application/json",
"Authorization": f'Bearer {push_config.get("CHRONOCAT_TOKEN")}',
}
for chat_type, ids in [(1, user_ids), (2, group_ids)]:
if not ids:
continue
for chat_id in ids:
data = {
"peer": {"chatType": chat_type, "peerUin": chat_id},
"elements": [
{
"elementType": 1,
"textElement": {"content": f"{title}\n\n{content}"},
}
],
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
if chat_type == 1:
print(f"QQ个人消息:{ids}推送成功!")
else:
print(f"QQ群消息:{ids}推送成功!")
else:
if chat_type == 1:
print(f"QQ个人消息:{ids}推送失败!")
else:
print(f"QQ群消息:{ids}推送失败!")
def parse_headers(headers):
if not headers:
return {}
parsed = {}
lines = headers.split("\n")
for line in lines:
i = line.find(":")
if i == -1:
continue
key = line[:i].strip().lower()
val = line[i + 1 :].strip()
parsed[key] = parsed.get(key, "") + ", " + val if key in parsed else val
return parsed
def parse_body(body, content_type):
if not body:
return ""
parsed = {}
lines = body.split("\n")
for line in lines:
i = line.find(":")
if i == -1:
continue
key = line[:i].strip().lower()
val = line[i + 1 :].strip()
if not key or key in parsed:
continue
try:
json_value = json.loads(val)
parsed[key] = json_value
except:
parsed[key] = val
if content_type == "application/x-www-form-urlencoded":
data = urlencode(parsed, doseq=True)
return data
if content_type == "application/json":
data = json.dumps(parsed)
return data
return parsed
def format_notify_content(url, body, title, content):
if "$title" not in url and "$title" not in body:
return {}
formatted_url = url.replace("$title", urllib.parse.quote_plus(title)).replace(
"$content", urllib.parse.quote_plus(content)
)
formatted_body = body.replace("$title", title).replace("$content", content)
return formatted_url, formatted_body
def custom_notify(title: str, content: str) -> None:
"""
通过 自定义通知 推送消息
"""
if not push_config.get("WEBHOOK_URL") or not push_config.get("WEBHOOK_METHOD"):
print("自定义通知的 WEBHOOK_URL 或 WEBHOOK_METHOD 未设置!!\n取消推送")
return
print("自定义通知服务启动")
WEBHOOK_URL = push_config.get("WEBHOOK_URL")
WEBHOOK_METHOD = push_config.get("WEBHOOK_METHOD")
WEBHOOK_CONTENT_TYPE = push_config.get("WEBHOOK_CONTENT_TYPE")
WEBHOOK_BODY = push_config.get("WEBHOOK_BODY")
WEBHOOK_HEADERS = push_config.get("WEBHOOK_HEADERS")
formatUrl, formatBody = format_notify_content(
WEBHOOK_URL, WEBHOOK_BODY, title, content
)
if not formatUrl and not formatBody:
print("请求头或者请求体中必须包含 $title 和 $content")
return
headers = parse_headers(WEBHOOK_HEADERS)
body = parse_body(formatBody, WEBHOOK_CONTENT_TYPE)
response = requests.request(
method=WEBHOOK_METHOD, url=formatUrl, headers=headers, timeout=15, data=body
)
if response.status_code == 200:
print("自定义通知推送成功!")
else:
print(f"自定义通知推送失败!{response.status_code} {response.text}")
def one() -> str:
"""
获取一条一言
@ -507,6 +864,10 @@ if push_config.get("IGOT_PUSH_KEY"):
notify_function.append(iGot)
if push_config.get("PUSH_KEY"):
notify_function.append(serverJ)
if push_config.get("DEER_KEY"):
notify_function.append(pushdeer)
if push_config.get("CHAT_URL") and push_config.get("CHAT_TOKEN"):
notify_function.append(chat)
if push_config.get("PUSH_PLUS_TOKEN"):
notify_function.append(pushplus_bot)
if push_config.get("QMSG_KEY") and push_config.get("QMSG_TYPE"):
@ -517,6 +878,30 @@ if push_config.get("QYWX_KEY"):
notify_function.append(wecom_bot)
if push_config.get("TG_BOT_TOKEN") and push_config.get("TG_USER_ID"):
notify_function.append(telegram_bot)
if (
push_config.get("AIBOTK_KEY")
and push_config.get("AIBOTK_TYPE")
and push_config.get("AIBOTK_NAME")
):
notify_function.append(aibotk)
if (
push_config.get("SMTP_SERVER")
and push_config.get("SMTP_SSL")
and push_config.get("SMTP_EMAIL")
and push_config.get("SMTP_PASSWORD")
and push_config.get("SMTP_NAME")
):
notify_function.append(smtp)
if push_config.get("PUSHME_KEY"):
notify_function.append(pushme)
if (
push_config.get("CHRONOCAT_URL")
and push_config.get("CHRONOCAT_QQ")
and push_config.get("CHRONOCAT_TOKEN")
):
notify_function.append(chronocat)
if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"):
notify_function.append(custom_notify)
def send(title: str, content: str) -> None:
@ -524,19 +909,43 @@ def send(title: str, content: str) -> None:
print(f"{title} 推送内容为空!")
return
# 根据标题跳过一些消息推送环境变量SKIP_PUSH_TITLE 用回车分隔
skipTitle = os.getenv("SKIP_PUSH_TITLE")
if skipTitle:
if title in re.split("\n", skipTitle):
print(f"{title} 在SKIP_PUSH_TITLE环境变量内跳过推送")
return
hitokoto = push_config.get("HITOKOTO")
text = one() if hitokoto else ""
content += "\n\n" + text
ts = [
threading.Thread(target=mode, args=(
title, content), name=mode.__name__)
threading.Thread(target=mode, args=(title, content), name=mode.__name__)
for mode in notify_function
]
[t.start() for t in ts]
[t.join() for t in ts]
def sendEmail(subject, content):
msg = MIMEText(content, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = msg_from
msg['To'] = msg_to
try:
client = smtplib.SMTP_SSL('smtp.qq.com', smtplib.SMTP_SSL_PORT)
print("连接到邮件服务器成功")
client.login(msg_from, password)
print("登录成功")
client.sendmail(msg_from, msg_to, msg.as_string())
print("发送成功")
except smtplib.SMTPException as e:
print("发送邮件异常")
finally:
client.quit()
def main():
send("title", "content")

Loading…
Cancel
Save