利用 Cron 编写简单的定时脚本 | 总字数: 3.9k | 阅读时长: 17分钟 | 浏览量: |
由于近期惨遭某些省的 IP 多轮恶意刷量,现新增两个小脚本加强网站被攻击状况的感知。
原文写于 231206
。
除了使用自建服务器上的 Cron 执行定时脚本以外,还可以使用 GitHub Action 完成这项工作。
这篇文章通过一些简单的 Python 脚本演示定时脚本配合 Webhook 的使用。
通过这个思路,你可以展开更多的想法:
实时监测 GitHub 的各项信息并推送
NPM 包下载量监测
LeetCode 每日一题
排名监测
获取某官网上的重要公告
监控云服务平台的各项数据
…
本文主要介绍:
获取用户 Github 上的 Star 数量并推送的脚本编写
根据用户名监测某作者 NPM 包年下载量
实时监测多吉云团队公布的文章「近期部分加速域名晚间遭遇恶意流量事件说明」通告是否更新
监控腾讯云 CDN 请求数是否超限
获取 Github 上的 Star 数量并推送
GitHub 应用注册
打开你的 GitHub 主页(profile)进入开发者选项:
选择「新建一个」GitHub APP。根据需要进行相关设置的填写。
生成密钥:
记住密钥,待会脚本需要调用。
脚本的编写
这里参考了 songquanpeng/scripts 的实现,并进行修复与改编。
这个 python 脚本(这里命名为 github_stars.py
)所做的工作:
调用 GitHub API(Repositories - GitHub Docs )
统计所有项目的 star 数量
与上次统计结果进行比较(结果保存在本地)
调用推送服务发送 POST JSON 格式的请求
执行脚本需要的三个参数:
GitHub 用户名
GitHub API 令牌
调用的推送服务(将发送 POST)请求
源代码【非面向对象版本】如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 import jsonimport requestsimport sysfilename = "./.star_count" push_url = "" username="" def star_counter (username: str , token: "" ) -> int : """ 调用GitHub API获取个人仓库信息,返回统计的stars数 :param username: :param token: :return: """ all_repos_url = f"https://api.github.com/users/{username} /repos?per_page=100" header = {} if token == "" else {"Authorization" : f"bearer {token} " } res = requests.get(all_repos_url, header) repos = res.json() count = 0 for repo in repos: count += repo["stargazers_count" ] return count def load_count () -> int : ''' 加载上一次的统计结果,若无则默认为0 :return: ''' try : with open (filename, 'r' ) as f: count = int (f.read()) except IOError: count = 0 return count def save_count (count: int ): ''' 保存统计结果 :param count: :return: ''' with open (filename, 'w' ) as f: f.write(str (count)) def send_message (msg: str ): """ 发送消息 :param msg: :return: """ headers = { "Content-Type" : "application/json; charset=UTF-8" } pyload = { "title" :"GitHub Stars" , "msg" :msg } response = requests.post(push_url,data=json.dumps(pyload),headers=headers).text print (response) def message_construct (last_count,current_count ) -> str : """ 自定义的信息构造函数。 样例:【每日检测】检测到用户uuanqin总star数增加 :) 。[0 -> 6, total: 6] :param last_count: :param current_count: :return: """ s = f"【每日检测】检测到用户{username} 总star数" if current_count > last_count: s += f"增加 :) 。" else : s += f"降低 :( 。" s += f"[{last_count} -> {current_count} , total: {current_count-last_count} ]" return s def main (): if len (sys.argv) != 4 : print ("Error! This script requires three arguments: GITHUB_USERNAME GITHUB_TOKEN PUSH_URL" ) return global push_url,username username = sys.argv[1 ] token = sys.argv[2 ] push_url = sys.argv[3 ] last_count = load_count() current_count = star_counter(username, token) if current_count != last_count: send_message(message_construct(last_count,current_count)) save_count(current_count) if __name__ == '__main__' : main()
注意,代码中不要使用奇奇怪怪的字符,比如 emoji 表情等,导致 utf-8 和 gbk 都不能识别。
这里我使用了自己的部署服务。使用示例:
1 python ./github_stars.py uuanqin 9xxxxxxxx7 https://push.uuanqin.top/webhook/8xxxxxxe
把脚本放在 Linux 服务器上,记得测试。
这里使用的 API 似乎有点老了,但是还能用。我在文档中没有找到确切 的对应文档说明。
定时执行
新建 cron 文件:
1 2 8 * * * /usr/bin/python /var/www/push_script/github_stars/github_stars.py uuanqin 9xxxxxxxxxxxxxx7 https://push.uuanqin.top/webhook/8xxxxxxxxxxxxxxe
添加定时任务:
1 crontab github_stars.cron
每天早上 8 点 02 分调用该脚本。
附 在线网站 验证结果:
不了解 cron 可以查看这篇文章:站内文章 Linux 使用 Cron 创建定时任务
像我的部署服务使用了飞书群机器人,推送效果如下:
NPM 下载量的监测
根据上一小节介绍的脚本,我们重构一下,改成面向对象的版本。以下脚本能同时支持 GitHub stars 数量的检测以及 NPM 年下载量的检测。
脚本使用了第三方 npm 数据服务:npm-stat: download statistics for NPM packages
注意,临时的数据文件存储在 DIR_OF_TEMP_FILE
目录下。
import datetimeimport jsonimport osimport requestsimport sysfrom abc import ABC, abstractmethodfrom dateutil.relativedelta import relativedeltaDIR_OF_TEMP_FILE = "/var/www/push_script/count_detect" DIR_OF_PUBLIC_JSON_FILE = "/var/www/public_json_files" class Obj : filename = "" title = "" last_count = 0 current_count = 0 push_url = "" def __init__ (self,push_url ): self .push_url = push_url self .load_count() self .count() @abstractmethod def count (self ): pass @abstractmethod def message_construct (self ): pass def load_count (self ): ''' 加载上一次的统计结果,若无则默认为0 ''' try : with open (self .filename, 'r' ) as f: count = int (f.read()) except IOError: count = 0 self .last_count = count def save_count (self ): ''' 保存统计结果 ''' with open (self .filename, 'w' ) as f: f.write(str (self .current_count)) def send_message (self, msg: str ): """ 发送消息 """ headers = { "Content-Type" : "application/json; charset=UTF-8" } pyload = { "title" : self .title, "msg" : msg } response = requests.post(self .push_url, data=json.dumps(pyload), headers=headers).text print (response) def push (self ): current_count = self .current_count last_count = self .last_count if current_count != last_count: self .send_message(self .message_construct()) self .save_count() class GitHubStars (Obj ): username = "" github_token = "" def __init__ (self,push_url,token,username ): self .filename = os.path.join(DIR_OF_TEMP_FILE,".star_count" ) self .title = "GitHub Stars Count" self .github_token = token self .username = username super ().__init__(push_url) def count (self ): """ 调用GitHub API获取个人仓库信息,返回统计的stars数 """ all_repos_url = f"https://api.github.com/users/{self.username} /repos?per_page=100" header = {} if self .github_token == "" else {"Authorization" : f"bearer {self.github_token} " } res = requests.get(all_repos_url, header) repos = res.json() count = 0 for repo in repos: count += repo["stargazers_count" ] self .current_count = count def message_construct (self ) -> str : """ 自定义的信息构造函数。 样例:【每日检测】检测到用户uuanqin总star数增加 :) 。[0 -> 6, total: 6] """ s = f"【每日检测】检测到用户{self.username} 总star数" if self .current_count > self .last_count: s += f"增加 :) 。" else : s += f"降低 :( 。" s += f"[{self.last_count} -> {self.current_count} , total: {self.current_count-self.last_count} ]" return s class NpmAuthorDownloads (Obj ): username = "" def __init__ (self,push_url,username ): self .filename = os.path.join(DIR_OF_TEMP_FILE,".npm_count" ) self .title = "GitHub Stars Count" self .username = username super ().__init__(push_url) def count (self ): nowtime = datetime.datetime.now() yesterdate = nowtime - datetime.timedelta(days=+1 ) yesterdate_strf=yesterdate.strftime('%Y-%m-%d' ) last_year_strf= "2023-10-18" url = f"https://npm-stat.com/api/download-counts?author={self.username} &from={last_year_strf} &until={yesterdate_strf} " res = requests.get(url) repos = res.json() count = 0 for p,con in repos.items(): for d,v in con.items(): count += v self .current_count = count public_dict = { "name" :"npm-stat-by-author" , "total" : count } json_str = json.dumps(public_dict) json_name = os.path.join(DIR_OF_PUBLIC_JSON_FILE,"npm_stat_author_total.json" ) with open (json_name, 'w' ) as f: f.write(json_str) def message_construct (self ): """ 自定义的信息构造函数。 """ s = f"【每日检测】检测到用户{self.username} 的NPM账户去年包下载量发生变动。" s += f"[{self.last_count} -> {self.current_count} , total: {self.current_count - self.last_count} ]" return s def main (): if len (sys.argv) <2 : print ("Error! Arguments is illegal" ) return type = sys.argv[1 ] obj = None if type =="github_stars" and len (sys.argv) != 5 : print ("Error! This script requires Four arguments: TYPE GITHUB_USERNAME GITHUB_TOKEN PUSH_URL" ) return elif type =="npm_downloads" and len (sys.argv) != 4 : print ("Error! This script requires Four arguments: TYPE NPM_AUTHOR PUSH_URL" ) return elif type != "github_stars" and type != "npm_downloads" : print ("Error! Arguments is illegal" ) return if type == "github_stars" : obj = GitHubStars(sys.argv[4 ],sys.argv[3 ],sys.argv[2 ]) elif type == "npm_downloads" : obj = NpmAuthorDownloads(sys.argv[3 ],sys.argv[2 ]) obj.push() if __name__ == '__main__' : main()
示例 cron:
1 2 2 8 * * * /usr/bin/python /var/www/push_script/count_detect/count_detect.py github_stars uuanqin 94xxxxc2b7 https://push.uuanqin.top/webhook/3f2xxxxbd 3 8 * * * /usr/bin/python /var/www/push_script/count_detect/count_detect.py npm_downloads wuanqin https://push.uuanqin.top/webhook/3f2xxxxbd
检测重要网页的更新
背景:2024 年 7 月开始,中小型博客网站遭遇恶意流量攻击。多吉云团队公布的高危 IP 为中小博客站长提供了有用的防范信息。我们需要定时检测这个重要信息的更新及时将危险 IP 列入黑名单中。
信息通知地址:公告 - 多吉云 (dogecloud.com)
通过 F12 调试工具得知接口 API 请求地址为: https://api.dogecloud.com/home/announcement/detail.json?id=26
编写简单脚本检测文章内容是否改变。如果改变则推送通知。
脚本内容为上一小节中脚本的简化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import datetimeimport jsonimport osimport requestsimport sysclass Notice : filename = "" title = "" push_url = "" last_content = "" now_content = "" def __init__ (self, push_url ): self .push_url = push_url self .filename = os.path.join(os.getcwd(), ".cdn_notice" ) self .title = "CDN Notice was Updated" self .load_page() self .get() def load_page (self ): ''' 加载上一次的页面报告,若无则默认为空 ''' try : with open (self .filename, 'r' ) as f: last_content = f.read() except IOError: last_content = "" self .last_content = last_content def save_page (self ): ''' 保存页面 只有页面发生变化才保存结果 ''' with open (self .filename, 'w' ) as f: f.write(self .now_content) def send_message (self, msg: str ): """ 发送消息 """ headers = { "Content-Type" : "application/json; charset=UTF-8" } pyload = { "title" : self .title, "msg" : msg } response = requests.post(self .push_url, data=json.dumps(pyload), headers=headers).text print (response) def push (self ): if self .now_content != self .last_content: self .send_message(self .message_construct()) self .save_page() def get (self ): """ 获取页面数据 """ res = requests.get("https://api.dogecloud.com/home/announcement/detail.json?id=26" ) res_json = res.json() self .now_content = res_json["data" ]["content" ] def message_construct (self ) -> str : """ 自定义的信息构造函数。 """ s = f"【每日检测】检测到多吉云CDN恶意IP公示更新。请及时查看: https://www.dogecloud.com/announcement/26" return s def main (): if len (sys.argv) < 2 : print ("Error! Arguments is illegal" ) return push_url = sys.argv[1 ] obj = Notice(push_url) obj.push() if __name__ == '__main__' : main()
示例 cron:
1 9 8 * * * /usr/bin/python /var/www/push_script/cdn_notice/cdn_notice.py https://push.uuanqin.top/webhook/3f2xxxxbd
监控腾讯云 CDN 命中请求数
腾讯云预警时效还是有点长,敏感度不够,我们可以利用 API 自写脚本进行预警。CDN 流量与带宽监控需要开通 ecdn,所以这里就监控命中的请求数或响应码。
需求:每半小时对 CDN 响应状态码进行分析,半小时内命中的响应状态码数量异常时将推送通知。
安装 SDK:
1 pip install tencentcloud-sdk-python-cdn
通过腾讯云 API Explorer 快速得到 SDK 代码写法:API Explorer - 云 API - 控制台 (tencent.com)
任何时候使用任何 API 时都要注意保护好密钥。详见:站内文章 将 CDN 缓存自动刷新加入到博客发布的工作流(Hexo、WordPress)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 import jsonimport osimport datetimeimport sysimport typesimport requestsfrom tencentcloud.common import credentialfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfilefrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.cdn.v20180606 import cdn_client, modelsIMPORTANT_PUSH_URL = "" NORMAL_PUSH_URL = "" now_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S" ) last_datetime = (datetime.datetime.now()+datetime.timedelta(minutes=-30 )).strftime("%Y-%m-%d %H:%M:%S" ) MESSAGE_TITLE = "CDN请求监控" def detect (): try : cred = credential.Credential(os.environ.get("TENCENT_API_SECRETID" ), os.environ.get("TENCENT_API_SECRETKEY" )) httpProfile = HttpProfile() httpProfile.endpoint = "cdn.tencentcloudapi.com" clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = cdn_client.CdnClient(cred, "" , clientProfile) req = models.DescribeCdnDataRequest() params = { "StartTime" : last_datetime, "EndTime" : now_datetime, "Metric" : "statusCode" , "Interval" : "5min" } req.from_json_string(json.dumps(params)) resp = client.DescribeCdnData(req) res_dict = json.loads(resp.to_json_string()) print (res_dict) data_analyse(res_dict) except TencentCloudSDKException as err: print (err) HALF_HOUR_LIMIT_HITS = 500 def data_analyse (info_dict:dict ): """ 数据分析。 :param info_dict: :return: """ global HALF_HOUR_LIMIT_HITS total_dict = dict () cdn_datas = info_dict.get("Data" )[0 ].get("CdnData" ) for data in cdn_datas: metric = data.get("Metric" ) total = data.get("SummarizedData" ).get("Value" ) total_dict[metric]=total total_2xx = total_dict.get("2xx" ) total_3xx = total_dict.get("3xx" ) total_4xx = total_dict.get("4xx" ) total_5xx = total_dict.get("5xx" ) s = "【CDN状态码检测】 " + last_datetime + " 至 " + now_datetime if total_2xx>= HALF_HOUR_LIMIT_HITS: send_message(IMPORTANT_PUSH_URL,MESSAGE_TITLE,f"{s} 2xx 响应次数过多,CDN可能正在遭受盗刷!count={total_2xx} " ) if total_3xx>= HALF_HOUR_LIMIT_HITS: send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s} 3xx 响应次数异常。count={total_3xx} " ) if total_4xx>= HALF_HOUR_LIMIT_HITS: send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s} 4xx 响应次数异常。count={total_4xx} " ) if total_5xx>= HALF_HOUR_LIMIT_HITS: send_message(NORMAL_PUSH_URL,MESSAGE_TITLE,f"{s} 5xx 响应次数异常。count={total_5xx} " ) def send_message (push_url:str ,title:str , msg: str ): """ 发送消息 """ headers = { "Content-Type" : "application/json; charset=UTF-8" } pyload = { "title" : title, "msg" : msg } response = requests.post(push_url, data=json.dumps(pyload), headers=headers).text print (response) def main (): global IMPORTANT_PUSH_URL,NORMAL_PUSH_URL if len (sys.argv) < 3 : print ("Error! Arguments is illegal" ) return IMPORTANT_PUSH_URL = sys.argv[1 ] NORMAL_PUSH_URL = sys.argv[2 ] detect() if __name__ == '__main__' : main()
示例 cron 写法:
1 */30 * * * * /usr/bin/python /var/www/push_script/cdn_request_detect/hit-request-notice.py https://push.uuanqin.top/webhook/92xxxxx70be7 https://push.uuanqin.top/webhook/3fxxxx24bd
cron 的其它应用
本文参考