在日常工作中,我們常常需要編寫一些定時任務(比如每日發送郵件,定時爬蟲等)。這時候,Python提供了很多好用的庫可以幫助我們實現定時任務調度。本文將介紹Python中三個比較常用的定時任務調度庫:schedule、APScheduler和Celery,讓你輕鬆實現定時任務調度。
一、schedule庫
schedule是Python中的一個輕量級的定時任務調度庫。其依據調用時間來執行相應的任務,支持間隔時間、定期執行任務,可設置任務重複次數上限,同時支持秒級別的任務調度。使用schedule非常簡單,例如:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
while True:
schedule.run_pending()
time.sleep(1)
以上代碼會將任務job按照調度時間執行。我們可以看到,通過schedule我們可以非常簡單的實現需要按照時間調度的任務。
二、APScheduler庫
APScheduler是Python中的一個較為常用的基於日期和時間的任務調度庫。它提供了多種調度方式,比如間隔執行、定時執行、循環執行等。其中最重要的是基於時間間隔的任務調度。我們可以使用pip安裝該庫:
pip install apscheduler
接著,我們可以使用以下代碼調用一個簡單的任務:
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("I'am working...")
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=5)
scheduler.start()
以上代碼可以使任務job每隔5s執行一次。 BlockingScheduler是阻塞型的調度器,調度器會創建一個單獨的調度線程來執行任務。因此,我們需要使用scheduler.start()來啟動該調度器。
三、Celery庫
Celery是一個非常流行的任務調度框架,適合於處理大量並發任務。使用Celery可以實現非同步任務調度,使得任務在後台完成,而不會對用戶的操作產生影響。Celery採用分散式架構,在低延遲、大並發量的場景中有很好的優化效果。下面是一個簡單的示例:
from celery import Celery
app = Celery('tasks', backend='redis://localhost', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
result = add.delay(4, 5)
print(result.wait())
以上代碼使用Celery實現了一個簡單的非同步任務調度程序。我們可以使用delay方法來將函數調用轉化為任務,該任務會放入到任務隊列中。通過wait()方法,程序會等待任務執行完成後獲取結果。
結語
本文介紹了三個常用的Python定時任務調度庫並展示了相應的實現代碼。有了這些庫的幫助,我們可以輕鬆地實現定時任務調度,提高生產效率。當然,以上庫只是定時任務調度的冰山一角,讀者可以根據自身需求選擇合適的庫進行使用。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/301821.html