Python schedule 源码阅读

Table of content:

About

Schedule 是一个相对优雅易用的定时任务触发的库,作者是 Dan Bader,一个非常活跃的 Python 开发者,可以从以下的例子来了解使用和设计方式。

相关链接:

Example

1
$ pip install schedule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/python
# -*- coding: utf-8 -*-

import schedule
import time


def job():
print("I'm working...")


def hi():
print("hi")

def job_that_executes_once():
# Do some work ...
return schedule.CancelJob


# 每秒执行
schedule.every(1).seconds.do(job)
# 每 10 秒执行
schedule.every(10).seconds.do(hi)
# 每 10 分钟执行
schedule.every(10).minutes.do(job)
# 每周一 3 点执行
schedule.every().monday.at("03:00").do(job)
# 每天执行一次
schedule.every().day.do(job)
# 5 ~ 10s 内随机执行一次
schedule.every(5).to(10).seconds.do(job)
# 只执行一次的任务
schedule.every().day.at('22:30').do(job_that_executes_once)


while True:
schedule.run_pending()
time.sleep(1)

run_pending 做了几个事情:

  • 把任务 append 到了 scheduler 的 jobs 列表
  • 设置每个任务的下次运行的时间,之后会通过这个来判断当前是否应该运行

这样的一些潜在的问题

  • 定时任务的触发逻辑,在常见的部署逻辑下不是很适用。比如你想要一个任务每小时执行一次,而如果在期间部署了,那下次应该执行的时间就变了。
  • job 的执行其实是没有其他错误处理逻辑的,也就是如果你有多个任务,前面一个任务挂了,后面就不会执行了

怎么去解决这样的问题呢 ? 可以在一个 decorator 里面统一处理,举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
logging.basicConfig(format='%(asctime)s;%(levelname)s:%(message)s',
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
logger = logging.getLogger(__name__)


def safe_schedule(cancel_on_failure=False):
def _(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
logger.info("Running job %s" % func.__name__)
func(*args, **kwargs)
logger.info("Finish job %s" % func.__name__)
except Exception as e:
logger.exception(e.message)
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return _

@catch_exceptions()
def bad_task():
return 1 / 0

这样可以做到:

  • 统一打一点的日志,方便知道开始执行和结束的时间
  • 统一的错误处理,让任务之间尽量不要互相影响

代码阅读

核心是两个类

  • Job, 定义每个任务
  • Scheduler, 编排多个任务

scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Scheduler(object):
"""
Objects instantiated by the :class:`Scheduler <Scheduler>` are
factories to create jobs, keep record of scheduled jobs and
handle their execution.
"""
def __init__(self):
self.jobs = []

def run_pending(self):
"""
Run all jobs that are scheduled to run.

Please note that it is *intended behavior that run_pending()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you only call run_pending()
in one hour increments then your job won't be run 60 times in
between but only once.
"""
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)

job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class Job(object):
"""
A periodic job as used by :class:`Scheduler`.

:param interval: A quantity of a certain time unit
:param scheduler: The :class:`Scheduler <Scheduler>` instance that
this job will register itself with once it has
been fully configured in :meth:`Job.do()`.

Every job runs at a given fixed time interval that is defined by:

* a :meth:`time unit <Job.second>`
* a quantity of `time units` defined by `interval`

A job is usually created and returned by :meth:`Scheduler.every`
method, which also defines its `interval`.
"""
def __init__(self, interval, scheduler=None):
self.interval = interval # pause interval * unit between runs
self.latest = None # upper limit to the interval
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', … 比如 hour,day, week, minute
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run 下一次执行的时间,通过这个来判断一个 job 是否应该执行
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
self.tags = set() # unique set of tags for the job
self.scheduler = scheduler # scheduler to register with Scheduler 的类

def __lt__(self, other):
"""
PeriodicJobs are sortable based on the scheduled time they
run next.
"""
return self.next_run < other.next_run

def __repr__(self):
# Every 5 seconds do job() (last run: [never], next run: 2018-03-27 17:27:31)

def format_time(t):
return t.strftime('%Y-%m-%d %H:%M:%S') if t else '[never]'

timestats = '(last run: %s, next run: %s)' % (
format_time(self.last_run), format_time(self.next_run))

if hasattr(self.job_func, '__name__'):
job_func_name = self.job_func.__name__
else:
job_func_name = repr(self.job_func)
args = [repr(x) for x in self.job_func.args]
kwargs = ['%s=%s' % (k, repr(v))
for k, v in self.job_func.keywords.items()]
call_repr = job_func_name + '(' + ', '.join(args + kwargs) + ')'

if self.at_time is not None:
return 'Every %s %s at %s do %s %s' % (
self.interval,
self.unit[:-1] if self.interval == 1 else self.unit,
self.at_time, call_repr, timestats)
else:
fmt = (
'Every %(interval)s ' +
('to %(latest)s ' if self.latest is not None else '') +
'%(unit)s do %(call_repr)s %(timestats)s'
)

return fmt % dict(
interval=self.interval,
latest=self.latest,
unit=(self.unit[:-1] if self.interval == 1 else self.unit),
call_repr=call_repr,
timestats=timestats
)

@property
def seconds(self):
self.unit = 'seconds'
return self

@property
def days(self):
self.unit = 'days'
return self

@property
def week(self):
# 通过这样的方式来进行提示
assert self.interval == 1, 'Use weeks instead of week'
return self.weeks

@property
def weeks(self):
# 通过一个 property 来设置属性,并且返回 self
self.unit = 'weeks'
return self

@property
def monday(self):
assert self.interval == 1, 'Use mondays instead of monday'
self.start_day = 'monday'
return self.weeks

def tag(self, *tags):
"""
Tags the job with one or more unique indentifiers.

Tags must be hashable. Duplicate tags are discarded.

:param tags: A unique list of ``Hashable`` tags.
:return: The invoked job instance
"""
if not all(isinstance(tag, collections.Hashable) for tag in tags):
raise TypeError('Tags must be hashable')
self.tags.update(tags)
return self

def at(self, time_str):
"""
Schedule the job every day at a specific time.

Calling this is only valid for jobs scheduled to run
every N day(s).

:param time_str: A string in `XX:YY` format.
:return: The invoked job instance
"""
assert self.unit in ('days', 'hours') or self.start_day
hour, minute = time_str.split(':')
minute = int(minute)
if self.unit == 'days' or self.start_day:
hour = int(hour)
assert 0 <= hour <= 23
elif self.unit == 'hours':
hour = 0
assert 0 <= minute <= 59
self.at_time = datetime.time(hour, minute)
return self

def to(self, latest):
"""
Schedule the job to run at an irregular (randomized) interval.

The job's interval will randomly vary from the value given
to `every` to `latest`. The range defined is inclusive on
both ends. For example, `every(A).to(B).seconds` executes
the job function every N seconds such that A <= N <= B.

:param latest: Maximum interval between randomized job runs
:return: The invoked job instance
"""
self.latest = latest
return self

def do(self, job_func, *args, **kwargs):
"""
Specifies the job_func that should be called every time the
job runs.

Any additional arguments are passed on to job_func when
the job runs.

:param job_func: The function to be scheduled
:return: The invoked job instance
"""
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
self.scheduler.jobs.append(self)
return self

@property
def should_run(self):
"""
:return: ``True`` if the job should be run now.
"""
return datetime.datetime.now() >= self.next_run

def run(self):
"""
每个 job 通过这个来执行,执行完成之后标记上次执行时间和下次要执行的时间

Run the job and immediately reschedule it.

:return: The return value returned by the `job_func`
"""
logger.info('Running job %s', self)
ret = self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret

def _schedule_next_run(self):
"""
Compute the instant when this job should run next.
"""
import pdb; pdb.set_trace()
assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')

if self.latest is not None:
assert self.latest >= self.interval
interval = random.randint(self.interval, self.latest)
else:
interval = self.interval

# 因为 unit 和 interval 的设计可以直接用来表示时间,看起来还挺优雅的
self.period = datetime.timedelta(**{self.unit: interval})

self.next_run = datetime.datetime.now() + self.period
if self.start_day is not None:
assert self.unit == 'weeks'
weekdays = (
'monday',
'tuesday',
'wednesday',
'thursday',
'friday',
'saturday',
'sunday'
)
assert self.start_day in weekdays
# 比如,saturday 是 5
weekday = weekdays.index(self.start_day)
days_ahead = weekday - self.next_run.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
self.next_run += datetime.timedelta(days_ahead) - self.period
if self.at_time is not None:
assert self.unit in ('days', 'hours') or self.start_day is not None
kwargs = {
'minute': self.at_time.minute,
'second': self.at_time.second,
'microsecond': 0
}
if self.unit == 'days' or self.start_day is not None:
kwargs['hour'] = self.at_time.hour
self.next_run = self.next_run.replace(**kwargs)
# If we are running for the first time, make sure we run
# at the specified time *today* (or *this hour*) as well
if not self.last_run:
now = datetime.datetime.now()
if (self.unit == 'days' and self.at_time > now.time() and
self.interval == 1):
self.next_run = self.next_run - datetime.timedelta(days=1)
elif self.unit == 'hours' and self.at_time.minute > now.minute:
self.next_run = self.next_run - datetime.timedelta(hours=1)
if self.start_day is not None and self.at_time is not None:
# Let's see if we will still make that time we specified today
if (self.next_run - datetime.datetime.now()).days >= 7:
self.next_run -= self.period

解决 functool partial 的 doc, name 问题,并计划下一次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def do(self, job_func, *args, **kwargs):
"""
Specifies the job_func that should be called every time the
job runs.

Any additional arguments are passed on to job_func when
the job runs.

:param job_func: The function to be scheduled
:return: The invoked job instance
"""
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
self.scheduler.jobs.append(self)
return self

关于头图

拍摄自 MIT

最后的棒棒
Python 包管理编年史