文章重要更新 240724

由于近期惨遭某些省的 IP 多轮恶意刷量,现新增两个小脚本加强网站被攻击状况的感知。
原文写于 231206

除了使用自建服务器上的 Cron 执行定时脚本以外,还可以使用 GitHub Action 完成这项工作。

这篇文章通过一些简单的 Python 脚本演示定时脚本配合 Webhook 的使用。

通过这个思路,你可以展开更多的想法:

  • 实时监测 GitHub 的各项信息并推送
  • NPM 包下载量监测
  • LeetCode 每日一题
  • 排名监测
  • 获取某官网上的重要公告
  • 监控云服务平台的各项数据

本文主要介绍:

  1. 获取用户 Github 上的 Star 数量并推送的脚本编写
  2. 根据用户名监测某作者 NPM 包年下载量
  3. 实时监测多吉云团队公布的文章「近期部分加速域名晚间遭遇恶意流量事件说明」通告是否更新
  4. 监控腾讯云 CDN 请求数是否超限

获取 Github 上的 Star 数量并推送

GitHub 应用注册

打开你的 GitHub 主页(profile)进入开发者选项:

image.png

选择「新建一个」GitHub APP。根据需要进行相关设置的填写。

image.png

生成密钥:

image.png

记住密钥,待会脚本需要调用。

脚本的编写

这里参考了 songquanpeng/scripts 的实现,并进行修复与改编。

这个 python 脚本(这里命名为 github_stars.py)所做的工作:

  1. 调用 GitHub API(Repositories - GitHub Docs
  2. 统计所有项目的 star 数量
  3. 与上次统计结果进行比较(结果保存在本地)
  4. 调用推送服务发送 POST JSON 格式的请求

执行脚本需要的三个参数:

  • GitHub 用户名
  • GitHub API 令牌
  • 调用的推送服务(将发送 POST)请求

源代码【非面向对象版本】如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-

#!/usr/bin/python3
import json
import requests
import sys

# 存储上次检测的 star 数的文件
filename = "./.star_count"
# Push链接
push_url = ""
# 用户名
username=""

def star_counter(username: str, token: "") -> int:
"""
调用GitHub API获取个人仓库信息,返回统计的stars数
:param username:
:param token:
:return:
"""
# 链接的构建
all_repos_url = f"https://api.github.com/users/{username}/repos?per_page=100"
header = {} if token == "" else {"Authorization": f"bearer {token}"}
# 向GitHub发送请求
res = requests.get(all_repos_url, header)
repos = res.json()
# 解析响应并统计
count = 0
for repo in repos:
count += repo["stargazers_count"]
return count


def load_count() -> int:
'''
加载上一次的统计结果,若无则默认为0
:return:
'''
try:
with open(filename, 'r') as f:
count = int(f.read())
except IOError:
count = 0
return count


def save_count(count: int):
'''
保存统计结果
:param count:
:return:
'''
with open(filename, 'w') as f:
f.write(str(count))


def send_message(msg: str):
"""
发送消息
:param msg:
:return:
"""
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
pyload = {
"title":"GitHub Stars",
"msg":msg
}
response = requests.post(push_url,data=json.dumps(pyload),headers=headers).text
print(response)

def message_construct(last_count,current_count) -> str:
"""
自定义的信息构造函数。
样例:【每日检测】检测到用户uuanqin总star数增加 :) 。[0 -> 6, total: 6]
:param last_count:
:param current_count:
:return:
"""
s = f"【每日检测】检测到用户{username}总star数"
if current_count > last_count:
s += f"增加 :) 。"
else:
s += f"降低 :( 。"
s += f"[{last_count} -> {current_count}, total: {current_count-last_count}]"
return s

def main():
# 参数检查
if len(sys.argv) != 4:
print("Error! This script requires three arguments: GITHUB_USERNAME GITHUB_TOKEN PUSH_URL")
return
# 参数设置
global push_url,username
username = sys.argv[1]
token = sys.argv[2]
push_url = sys.argv[3]

# 获取上一次统计结果
last_count = load_count()
# 获取本次统计结果
current_count = star_counter(username, token)

# 根据数量变化进行判断
if current_count != last_count:
send_message(message_construct(last_count,current_count))
# 只有数量发生变化才保存结果
save_count(current_count)

if __name__ == '__main__':
main()

注意,代码中不要使用奇奇怪怪的字符,比如 emoji 表情等,导致 utf-8 和 gbk 都不能识别。

这里我使用了自己的部署服务。使用示例:

1
python ./github_stars.py uuanqin 9xxxxxxxx7 https://push.uuanqin.top/webhook/8xxxxxxe

把脚本放在 Linux 服务器上,记得测试。

这里使用的 API 似乎有点老了,但是还能用。我在文档中没有找到确切的对应文档说明。

定时执行

新建 cron 文件:

1
2 8 * * * /usr/bin/python /var/www/push_script/github_stars/github_stars.py uuanqin 9xxxxxxxxxxxxxx7 https://push.uuanqin.top/webhook/8xxxxxxxxxxxxxxe

添加定时任务:

1
crontab github_stars.cron

每天早上 8 点 02 分调用该脚本。

在线网站 验证结果:

image.png

不了解 cron 可以查看这篇文章:Linux 使用 Cron 创建定时任务

像我的部署服务使用了飞书群机器人,推送效果如下:

image.png

NPM 下载量的监测

根据上一小节介绍的脚本,我们重构一下,改成面向对象的版本。以下脚本能同时支持 GitHub stars 数量的检测以及 NPM 年下载量的检测。

脚本使用了第三方 npm 数据服务:npm-stat: download statistics for NPM packages

注意,临时的数据文件存储在 DIR_OF_TEMP_FILE 目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# -*- coding: utf-8 -*-
import datetime
#!/usr/bin/python3
import json
import os

import requests
import sys
from abc import ABC, abstractmethod

from dateutil.relativedelta import relativedelta

DIR_OF_TEMP_FILE = "/var/www/push_script/count_detect" # 临时变量存储文件夹
DIR_OF_PUBLIC_JSON_FILE = "/var/www/public_json_files" # json数据文件保存的文件夹

class Obj:
# 存储统计数的文件名
filename = ""
title = ""
last_count = 0
current_count = 0
# Push链接
push_url = ""

def __init__(self,push_url):
self.push_url = push_url
self.load_count()
self.count()


@abstractmethod
def count(self):
pass

@abstractmethod
def message_construct(self):
pass


def load_count(self):
'''
加载上一次的统计结果,若无则默认为0
'''
try:
with open(self.filename, 'r') as f:
count = int(f.read())
except IOError:
count = 0
self.last_count = count


def save_count(self):
'''
保存统计结果
'''
with open(self.filename, 'w') as f:
f.write(str(self.current_count))

def send_message(self, msg: str):
"""
发送消息
"""
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
pyload = {
"title": self.title,
"msg": msg
}
response = requests.post(self.push_url, data=json.dumps(pyload), headers=headers).text
print(response)

def push(self):
# 根据数量变化进行判断
current_count = self.current_count
last_count = self.last_count
if current_count != last_count:
self.send_message(self.message_construct())
# 只有数量发生变化才保存结果
self.save_count()


class GitHubStars(Obj):

# 用户名
username = ""
github_token = ""
def __init__(self,push_url,token,username):

self.filename = os.path.join(DIR_OF_TEMP_FILE,".star_count")
self.title = "GitHub Stars Count"
self.github_token = token
self.username = username
super().__init__(push_url)

def count(self):
"""
调用GitHub API获取个人仓库信息,返回统计的stars数
"""
# 链接的构建
all_repos_url = f"https://api.github.com/users/{self.username}/repos?per_page=100"
header = {} if self.github_token == "" else {"Authorization": f"bearer {self.github_token}"}
# 向GitHub发送请求
res = requests.get(all_repos_url, header)
repos = res.json()
# 解析响应并统计
count = 0
for repo in repos:
count += repo["stargazers_count"]
self.current_count = count

def message_construct(self) -> str:
"""
自定义的信息构造函数。
样例:【每日检测】检测到用户uuanqin总star数增加 :) 。[0 -> 6, total: 6]
"""
s = f"【每日检测】检测到用户{self.username}总star数"
if self.current_count > self.last_count:
s += f"增加 :) 。"
else:
s += f"降低 :( 。"
s += f"[{self.last_count} -> {self.current_count}, total: {self.current_count-self.last_count}]"
return s

class NpmAuthorDownloads(Obj):
username = ""
def __init__(self,push_url,username):
self.filename = os.path.join(DIR_OF_TEMP_FILE,".npm_count")
self.title = "GitHub Stars Count"
self.username = username
super().__init__(push_url)

def count(self):
nowtime = datetime.datetime.now()
yesterdate = nowtime - datetime.timedelta(days=+1)
# last_year = yesterdate - relativedelta(years=1)
yesterdate_strf=yesterdate.strftime('%Y-%m-%d')
last_year_strf= "2023-10-18" # 这是我写的第一个插件发布前两天 # last_year.strftime('%Y-%m-%d')
url = f"https://npm-stat.com/api/download-counts?author={self.username}&from={last_year_strf}&until={yesterdate_strf}"
# 向GitHub发送请求
res = requests.get(url)
repos = res.json()
# 解析响应并统计
count = 0
for p,con in repos.items():
for d,v in con.items():
count += v
self.current_count = count

# 写入计算结果到Json文件以便公开访问
public_dict = {
"name":"npm-stat-by-author",
"total": count
}
json_str = json.dumps(public_dict)
json_name = os.path.join(DIR_OF_PUBLIC_JSON_FILE,"npm_stat_author_total.json")
with open(json_name, 'w') as f:
f.write(json_str)


def message_construct(self):
"""
自定义的信息构造函数。
"""
s = f"【每日检测】检测到用户{self.username}的NPM账户去年包下载量发生变动。"
s += f"[{self.last_count} -> {self.current_count}, total: {self.current_count - self.last_count}]"
return s


def main():
# 参数检查
if len(sys.argv) <2:
print("Error! Arguments is illegal")
return

type = sys.argv[1]
obj = None

if type=="github_stars" and len(sys.argv) != 5:
print("Error! This script requires Four arguments: TYPE GITHUB_USERNAME GITHUB_TOKEN PUSH_URL")
return
elif type=="npm_downloads" and len(sys.argv) != 4:
print("Error! This script requires Four arguments: TYPE NPM_AUTHOR PUSH_URL")
return
elif type!= "github_stars" and type!= "npm_downloads":
print("Error! Arguments is illegal")
return


if type == "github_stars":
obj = GitHubStars(sys.argv[4],sys.argv[3],sys.argv[2])
elif type == "npm_downloads":
obj = NpmAuthorDownloads(sys.argv[3],sys.argv[2])

obj.push()


if __name__ == '__main__':
main()

示例 cron:

1
2
2 8 * * * /usr/bin/python /var/www/push_script/count_detect/count_detect.py github_stars uuanqin 94xxxxc2b7 https://push.uuanqin.top/webhook/3f2xxxxbd
3 8 * * * /usr/bin/python /var/www/push_script/count_detect/count_detect.py npm_downloads wuanqin https://push.uuanqin.top/webhook/3f2xxxxbd

image.png

检测重要网页的更新

背景:2024 年 7 月开始,中小型博客网站遭遇恶意流量攻击。多吉云团队公布的高危 IP 为中小博客站长提供了有用的防范信息。我们需要定时检测这个重要信息的更新及时将危险 IP 列入黑名单中。

信息通知地址:公告 - 多吉云 (dogecloud.com)

通过 F12 调试工具得知接口 API 请求地址为: https://api.dogecloud.com/home/announcement/detail.json?id=26

编写简单脚本检测文章内容是否改变。如果改变则推送通知。

脚本内容为上一小节中脚本的简化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -*- coding: utf-8 -*-
import datetime
# !/usr/bin/python3
import json
import os

import requests
import sys


class Notice:
filename = ""
title = ""

# Push链接
push_url = ""

# 上次页面内容
last_content = ""
# 本次加载内容
now_content = ""

def __init__(self, push_url):
self.push_url = push_url
self.filename = os.path.join(os.getcwd(), ".cdn_notice")
self.title = "CDN Notice was Updated"
self.load_page()
self.get()

def load_page(self):
'''
加载上一次的页面报告,若无则默认为空
'''
try:
with open(self.filename, 'r') as f:
last_content = f.read()
except IOError:
last_content = ""
self.last_content = last_content

def save_page(self):
'''
保存页面 只有页面发生变化才保存结果
'''
with open(self.filename, 'w') as f:
f.write(self.now_content)

def send_message(self, msg: str):
"""
发送消息
"""
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
pyload = {
"title": self.title,
"msg": msg
}
response = requests.post(self.push_url, data=json.dumps(pyload), headers=headers).text
print(response)

def push(self):
# 根据页面变化进行判断
if self.now_content != self.last_content:
self.send_message(self.message_construct())
# 只有页面发生变化才保存结果
self.save_page()

def get(self):
"""
获取页面数据
"""

# header = {} if self.github_token == "" else {"Authorization": f"bearer {self.github_token}"}

res = requests.get("https://api.dogecloud.com/home/announcement/detail.json?id=26")
res_json = res.json()
self.now_content = res_json["data"]["content"]

def message_construct(self) -> str:
"""
自定义的信息构造函数。
"""
s = f"【每日检测】检测到多吉云CDN恶意IP公示更新。请及时查看: https://www.dogecloud.com/announcement/26"
return s


def main():
# 参数检查
if len(sys.argv) < 2:
print("Error! Arguments is illegal")
return

push_url = sys.argv[1]
obj = Notice(push_url)

obj.push()


if __name__ == '__main__':
main()

示例 cron:

1
9 8 * * * /usr/bin/python /var/www/push_script/cdn_notice/cdn_notice.py https://push.uuanqin.top/webhook/3f2xxxxbd

image.png

监控腾讯云 CDN 命中请求数

腾讯云预警时效还是有点长,敏感度不够,我们可以利用 API 自写脚本进行预警。CDN 流量与带宽监控需要开通 ecdn,所以这里就监控命中的请求数或响应码。

需求:每半小时对 CDN 响应状态码进行分析,半小时内命中的响应状态码数量异常时将推送通知。

安装 SDK:

1
pip install tencentcloud-sdk-python-cdn

通过腾讯云 API Explorer 快速得到 SDK 代码写法:API Explorer - 云 API - 控制台 (tencent.com)

任何时候使用任何 API 时都要注意保护好密钥。详见:将 CDN 缓存自动刷新加入到博客发布的工作流(Hexo、WordPress)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: utf-8 -*-

#!/usr/bin/python3

import json
import os
import datetime
import sys
import types

import requests
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.cdn.v20180606 import cdn_client, models

IMPORTANT_PUSH_URL = ""
NORMAL_PUSH_URL = ""
now_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_datetime = (datetime.datetime.now()+datetime.timedelta(minutes=-30)).strftime("%Y-%m-%d %H:%M:%S")

MESSAGE_TITLE = "CDN请求监控"
def detect():
try:
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred = credential.Credential(os.environ.get("TENCENT_API_SECRETID"), os.environ.get("TENCENT_API_SECRETKEY"))
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "cdn.tencentcloudapi.com"

# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的
client = cdn_client.CdnClient(cred, "", clientProfile)

# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.DescribeCdnDataRequest()
params = {
"StartTime": last_datetime,
"EndTime": now_datetime,
"Metric": "statusCode",
"Interval": "5min"
}
req.from_json_string(json.dumps(params))

# 返回的resp是一个DescribeCdnDataResponse的实例,与请求对象对应
resp = client.DescribeCdnData(req)
# 输出json格式的字符串回包
res_dict = json.loads(resp.to_json_string())
print(res_dict)
data_analyse(res_dict)

except TencentCloudSDKException as err:
print(err)

# 限制值
HALF_HOUR_LIMIT_HITS = 500
# FIVE_MINUTES_LIMIT_HITS = 200

def data_analyse(info_dict:dict):
"""
数据分析。
:param info_dict:
:return:
"""
global HALF_HOUR_LIMIT_HITS


total_dict = dict()
cdn_datas = info_dict.get("Data")[0].get("CdnData")
for data in cdn_datas:
metric = data.get("Metric")
total = data.get("SummarizedData").get("Value")
total_dict[metric]=total


total_2xx = total_dict.get("2xx")
total_3xx = total_dict.get("3xx")
total_4xx = total_dict.get("4xx")
total_5xx = total_dict.get("5xx")

s = "【CDN状态码检测】 " + last_datetime + " 至 " + now_datetime

if total_2xx>= HALF_HOUR_LIMIT_HITS:
send_message(IMPORTANT_PUSH_URL,MESSAGE_TITLE,f"{s}2xx 响应次数过多,CDN可能正在遭受盗刷!count={total_2xx}")

if total_3xx>= HALF_HOUR_LIMIT_HITS:
send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s}3xx 响应次数异常。count={total_3xx}")

if total_4xx>= HALF_HOUR_LIMIT_HITS:
send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s}4xx 响应次数异常。count={total_4xx}")

if total_5xx>= HALF_HOUR_LIMIT_HITS:
send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s} 5xx 响应次数异常。count={total_5xx}")


def send_message(push_url:str,title:str, msg: str):
"""
发送消息
"""
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
pyload = {
"title": title,
"msg": msg
}
response = requests.post(push_url, data=json.dumps(pyload), headers=headers).text
print(response)

def main():
global IMPORTANT_PUSH_URL,NORMAL_PUSH_URL
# 参数检查
if len(sys.argv) < 3:
print("Error! Arguments is illegal")
return

IMPORTANT_PUSH_URL = sys.argv[1]
NORMAL_PUSH_URL = sys.argv[2]


detect()

if __name__ == '__main__':
main()

示例 cron 写法:

1
*/30 * * * * /usr/bin/python /var/www/push_script/cdn_request_detect/hit-request-notice.py https://push.uuanqin.top/webhook/92xxxxx70be7 https://push.uuanqin.top/webhook/3fxxxx24bd

image.png

cron 的其它应用

本文参考