Python是一個廣泛用於編程的語言,在日常開發中經常需要進行一些定時的任務,比如定時執行爬蟲、發送郵件等等。實現這些功能最常用的方法是使用Python的定時任務模塊,threading.timer。本篇文章將詳細闡述如何使用threading.timer模塊實現定時任務的功能。
一、threading.timer簡介
threading.timer模塊是Python中一個非常方便的模塊,它提供了定時執行任務的功能。使用該模塊,可以讓程序按照指定的時間間隔執行任務,非常適合需要在特定時間執行某些操作的場合。其中,通過設置join()方法可以讓程序等待線程執行完畢後再結束。下面是使用threading.timer的一個示例代碼:
import threading import time def func(): print("Hello, world!") def run(): while True: t = threading.Timer(30.0, func) t.start() t.join() time.sleep(60) if __name__ == "__main__": run()
在上述代碼中,func()函數是需要定時執行的任務,本例中每30秒執行一次。run()函數中,通過調用threading.Timer實例來創建定時器,然後執行任務,並通過join()方法等待任務執行完畢。函數最後調用time.sleep()方法來休眠,以等待下一次任務的執行。
二、使用threading.timer實現爬蟲定時任務
在爬蟲領域,有時需要定時更新爬取的數據,以保證數據的準確性。下面我們將以一個爬取天氣預報的簡單爬蟲為例,演示如何使用threading.timer實現爬蟲的定時任務:
import threading import time import requests from bs4 import BeautifulSoup def get_weather(): url = "http://www.weather.com.cn/weather/101280101.shtml" res = requests.get(url) res.encoding = "utf-8" soup = BeautifulSoup(res.text, "html.parser") # 獲取當天天氣 today_weather = soup.find("p", class_="wea").string.strip() # 獲取實時溫度 real_time_temp = soup.find("span", class_="tem").string.strip() return today_weather, real_time_temp def run(): while True: # 獲取當前時間 now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 執行任務 today_weather, real_time_temp = get_weather() # 輸出結果 print("{}\n天氣:{}\n溫度:{}".format(now_time, today_weather, real_time_temp)) time.sleep(1800) if __name__ == "__main__": run()
在上述代碼中,我們使用了requests和BeautifulSoup等模塊來實現對天氣預報網站的爬取和數據解析,然後每30分鐘輸出一次結果。其中,可以根據實際需求自行修改時間間隔和輸出方式。
三、使用threading.timer實現郵件發送任務
在Python中,藉助smtplib和email模塊,我們可以很方便地實現郵件的發送功能。下面我們將演示如何使用threading.timer實現定時發送郵件:
import threading import time import smtplib from email.mime.text import MIMEText from email.header import Header def send_email(): # 發送郵件的基本信息 smtp_server = "smtp.qq.com" # SMTP郵件伺服器 sender = "sender@qq.com" # 發件人郵箱 receiver = "receiver@qq.com" # 收件人郵箱 username = "sender@qq.com" # 發件人郵箱賬號 password = "password" # 發件人郵箱密碼 # 構造郵件內容 message = MIMEText("這是一封測試郵件", "plain", "utf-8") message["From"] = Header("發件人姓名", "utf-8") message["To"] = Header("收件人姓名", "utf-8") message["Subject"] = Header("Python測試郵件", "utf-8") try: smtp_obj = smtplib.SMTP_SSL(smtp_server, 465) smtp_obj.login(username, password) smtp_obj.sendmail(sender, receiver, message.as_string()) print("郵件發送成功!") except smtplib.SMTPException: print("郵件發送失敗!") def run(): while True: # 執行任務 send_email() # 等待60秒 time.sleep(60) if __name__ == "__main__": run()
在上述代碼中,我們構造了一封簡單的測試郵件,然後每分鐘執行一次send_email()任務發送郵件。其中,可以根據實際需求自行修改郵件內容和時間間隔。
綜上所述,我們可以看到,使用Python中的threading.timer模塊可以很方便地實現定時任務的功能,並且應用場景非常廣泛。無論是爬蟲定時更新數據,還是定時發送郵件,都可以藉助該模塊來實現。
原創文章,作者:NBOU,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/136971.html