Source code for commonutil.ticktock
# -*- coding: utf-8 -*-
""" 時間相關輔助函式 / Time related support routines """
import sys
import time
import datetime
import logging
_log = logging.getLogger(__name__)
_tz_offset_info_cached = None
_tz_offset_info_cache_time = 0
[docs]def get_tz_offset_info(use_cached_info=True):
# type: (bool) -> Tuple[int, str]
"""
取出時區偏移資訊,當呼叫端允許時使用快取,快取資料會留存 1800 秒 (30 分鐘) 以處理日光節約時間轉換的議題
Get time-zone offset information.
Will use cached data if acceptable. The cache will keep 1800 seconds (half hour) for DST transition.
Args:
use_cached_info=True: 是否要使用快取的結果,預設為 (True) 使用 / use cache or not
Return:
型式為 (偏移秒數, 時區名稱) 的 tuple
Tuple in (offset-seconds, timezone-name,) form
"""
global _tz_offset_info_cached
# {{{ 使用 cache 資料如果呼叫端允許時
if use_cached_info:
global _tz_offset_info_cache_time
current_time = time.time()
if (current_time - _tz_offset_info_cache_time) < 1800:
_tz_offset_info_cache_time = current_time
if _tz_offset_info_cached is not None:
return _tz_offset_info_cached
# }}} 使用 cache 資料如果呼叫端允許時
# {{{ determine tz-utc-offset and tz-name
tz_name_ndst, tz_name_dst, = time.tzname # pylint: disable=unbalanced-tuple-unpacking
localtm = time.localtime()
if localtm.tm_isdst == 0:
host_tz_utcoffset_second = time.timezone
host_tz_name_raw = tz_name_ndst
else:
host_tz_utcoffset_second = time.altzone
host_tz_name_raw = tz_name_dst
# }}} determine tz-utc-offset and tz-name
# {{{ generate time zone text
try:
host_tz_name = host_tz_name_raw.decode()
except Exception:
host_tz_name = None
# -- should success here in most case
if host_tz_name is None:
try:
host_tz_name = host_tz_name_raw.decode(sys.stdout.encoding)
except Exception:
host_tz_name = None
if host_tz_name is None:
host_tz_symbol = "" if (host_tz_utcoffset_second < 0) else "+"
host_tz_hh = int(host_tz_utcoffset_second / 3600)
host_tz_name = "UTC%s%02d" % (
host_tz_symbol,
host_tz_hh,
)
# }}} generate time zone text
_tz_offset_info_cached = (
host_tz_utcoffset_second,
host_tz_name,
)
return _tz_offset_info_cached
[docs]def timedelta_to_seconds(d):
# type: (datetime.timedelta) -> int
"""
將給定的 datetime.timedelta 物件內含秒數計算出來傳回
Calculate seconds in given datetime.timedelta object.
Args:
d: 要取秒數值的 datetime.timedelta 物件
Return:
給定的 datetime.timedelta 內含之秒數值
Total seconds in given datetime.timedelta object.
"""
return (d.days * 86400 + d.seconds)
[docs]def CYCLE_PRECISION_SECOND(dt):
# type: (datetime.datetime) -> datetime.datetime
""" (Cycle Precision Function) 週期精確度為秒的時間點調整函式 / Date time value adjustment function for run_timeseries_loop
"""
return dt.replace(microsecond=0, tzinfo=None)
[docs]def CYCLE_PRECISION_MINUTE(dt):
# type: (datetime.datetime) -> datetime.datetime
""" (Cycle Precision Function) 週期精確度為分的時間點調整函式 / Date time value adjustment function for run_timeseries_loop
"""
return dt.replace(second=0, microsecond=0, tzinfo=None)
# pylint: disable=too-many-arguments,too-many-locals
[docs]def run_timeseries_loop(cycling_callable,
housekeeping_callable=None,
tstamp_storage_path=None,
invoke_period=1800,
cycle_period=3600,
housekeeping_period=7200,
max_delay=120,
cycle_precision=CYCLE_PRECISION_MINUTE):
# type: (Callable[[datetime.datetime, datetime.datetime], bool], Callable[[], None], str, int, int, int, int, Callable[[datetime.datetime], datetime.datetime]) -> None
"""
執行掃描時間序列的迴圈,指定的函數物件會在指定時間後被呼叫,兩個分別代表時間區段起點與終點之時間點會被傳入指定函數。
Run a loop to call given function (or call-able) periodically.
Two date-time objects will be passed into given function. The objects indicates segments in continues time flow.
- 注意: 時間序列精度 (cycle_precision) 的值應小於掃描頻率 (invoke_period) 的值,由於精度值為程式指定,因此呼叫此函式的程式碼應該自行檢查
- Caution: the cycle precision adjust by precision function should smaller than invoke period.
Args:
cycling_callable: 迴圈函數,函數原型為 (bool) cycling_callable(range_s, range_e) 當回傳值為 False 時將結束迴圈 / Call-able to be invoke in given cycle. The function prototype is (bool) cycling_callable(range_s, range_e). The loop will terminate when return value of given call-able is False.
housekeeping_callable=None: 定期自動檢點函數 / House-keeping call-able
tstamp_storage_path=None: 儲存 time-stamp 值的檔案的路徑 / File path to keep progress.
invoke_period=1800: 掃描週期 / Scan period to check if next invoke should take place
cycle_period=3600: 每次掃描要延展的長度,作為呼叫 cycling_callable 的參數時間的長度 / Time span of the arguments for invoking cycle call-able
housekeeping_period=7200: 執行檢點作業的週期 / Period to perform house keeping
max_delay=120: 啓動時可接受的最大延誤時間,啟動時距離上次結束如果已經超過給定時間,則直接使用目前時間減去此延誤時間作為時間區段的起始時間 / Max acceptable delay between invoking
cycle_precision=CYCLE_PRECISION_MINUTE: 時間序列精度 / Time value adjustment function
"""
last_housekeeping_tstamp = 0
prev_tstamp = 0
# {{{ 確保 house keeping 相關參數是可以執行後續動作的
if housekeeping_period is None:
housekeeping_callable = None
housekeeping_period = 7200
# }}} 確保 house keeping 相關參數是可以執行後續動作的
# {{{ load time-stamp from previous run
if tstamp_storage_path is not None:
try:
with open(tstamp_storage_path, "r") as fp:
l = fp.readline()
prev_tstamp = int(l.strip())
except Exception:
prev_tstamp = 0
# }}} load time-stamp from previous run
current_tstamp = time.time()
range_s = None
range_e = cycle_precision(datetime.datetime.fromtimestamp(max((current_tstamp - max_delay), prev_tstamp)))
while True:
current_tstamp = time.time()
range_s = range_e
range_e = cycle_precision(datetime.datetime.fromtimestamp(current_tstamp + cycle_period))
# {{{ check if range value reasonable (should not happen unless having big clock drift during iteration, check for safe)
if not (range_s < range_e):
range_s, range_e, = (range_e, range_s)
idle_seconds = timedelta_to_seconds(range_e - range_s)
if idle_seconds > 0:
time.sleep(idle_seconds)
else:
time.sleep(1)
continue
# }}} check if range value reasonable
# run cycling function
ret = cycling_callable(range_s, range_e)
# {{{ save time-stamp
if tstamp_storage_path is not None:
try:
with open(tstamp_storage_path, "w") as fp:
fp.write(str(current_tstamp))
fp.write("\n")
except Exception as e:
_log.warning("cannot write into time-stamp storage (path=%r): %r", tstamp_storage_path, e)
# }}} save time-stamp
if not ret:
return
# {{{ do house keeping
if (last_housekeeping_tstamp + housekeeping_period) < current_tstamp:
if housekeeping_callable is not None:
housekeeping_callable()
last_housekeeping_tstamp = current_tstamp
# }}} do house keeping
prev_tstamp = current_tstamp
# {{{ idle for next invoke
idle_seconds = invoke_period - (time.time() - prev_tstamp)
if idle_seconds > 0:
time.sleep(idle_seconds)
# }}} idle for next invoke
# vim: ts=4 sw=4 ai nowrap