一、基本概念
在編寫程序時,我們經常需要在規定的時間點運行代碼來執行某些任務。其中,scheduled任務允許我們預定義任務執行的時間和頻率。
scheduled任務通常是指重複執行的什麼任務,在一定的時間間隔內執行,即使腳本正在運行也是如此。這種方式可以被廣泛應用,如清理無用文件、發生郵件、備份文件等。
每分鐘執行一次是其中一種常見的方式。下面是一個基本的示例代碼:
import schedule import time def job(): print("I'm working...") schedule.every(1).minutes.do(job) while True: schedule.run_pending() time.sleep(1)
在這個示例中,定義了一個函數job(),並使用schedule庫中的every()方法指示每隔1分鐘執行該函數。在while循環中,schedule.run_pending()會讓schedule定時任務儘可能運行,time.sleep(1)是為了確保Python不會在構建旋轉調度器時佔用太多CPU。
二、任務定時管理
scheduled任務的管理可以使用schedule庫,該庫可以讓我們方便地添加任務、修改任務、刪除任務和查詢任務。下面從這幾個方面詳細闡述如何管理scheduled任務。
1. 添加任務
使用schedule庫添加任務時,可以使用每個線程間隔的時間來定義任務執行的特定時間。可以使用以下不同的參數定義任務:
a. interval:任務的運行間隔。
b. start_time:首次運行任務的時間,可以是datetime、date或time對象。
c. end_time:可選的任務結束時間,可以是datetime、date或time對象。
d. last_run_at:任務上次運行的時間。
e. next_run_at:任務下次運行的時間。
因此,在定義新任務時,需要給定任務名稱、任務執行函數和任務執行時間間隔。下面是一個添加任務的示例代碼:
import schedule import time def job(): print("I'm working...") schedule.every(1).minutes.do(job).tag('task1') while True: schedule.run_pending() time.sleep(1)
2. 修改任務
一旦我們添加了一個任務,就可以根據需要修改該任務。在schedule庫中,我們可以使用label來唯一地標識每個任務。下面是一個修改任務的示例代碼:
import schedule import time def job(): print("I'm working...") j = schedule.every(1).minutes.do(job).tag('task1') j.interval(2) while True: schedule.run_pending() time.sleep(1)
3. 刪除任務
我們還可以根據具體的需求從schedule調度器中刪除任務。刪除任務的操作比添加任務和修改任務操作更簡單,只需要調用schedule的cancel()方法,如下所示:
import schedule import time def job(): print("I'm working...") j = schedule.every(1).minutes.do(job).tag('task1') schedule.cancel_job(j) while True: schedule.run_pending() time.sleep(1)
4. 查詢任務
如果要查詢所有正在運行的任務,可以用schedule的下面方法來輸出所有被調用過的函數列表:
import schedule import time def job(): print("I'm working...") schedule.every(1).minutes.do(job) while True: print(schedule.jobs) # 輸出任務列表 schedule.run_pending() time.sleep(1)
三、任務執行情況監控
如果一個腳本需要長時間執行,我們需要對執行情況進行監控。schedule庫為這個目的提供了一些內置功能。例如,我們可以使用schedule的on_success()回調函數記錄每次任務成功執行的時間。下面是一個監控任務執行情況的示例代碼:
import schedule import time def job(): print("I'm working...") def on_success(job): print('Task {0} ran successfully at {1}'.format(job.name, datetime.datetime.now())) schedule.every(1).minutes.do(job).tag('task1') schedule.every().day.at("10:30").do(lambda: on_success(), job) while True: schedule.run_pending() time.sleep(1)
在上面的代碼中,on_success()函數將在每次成功運行任務時執行。如果您希望在執行任務時發生錯誤,可以使用on_error()回調函數。
四、總結
在本文中,我們詳細介紹了如何使用Python中的schedule庫來管理和監控scheduled任務。我們首先介紹了scheduled任務的概念,然後討論了如何添加、修改、刪除和查詢任務。最後,我們提供了一個例子,演示了如何使用回調函數來監視任務的執行情況。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/247717.html