import asyncio
import datetime
import functools
import logging
import socket
import time
import re
import os
import sys
MINUTE = datetime.timedelta(minutes=1).total_seconds()
HOUR = datetime.timedelta(hours=1).total_seconds()
DAY = datetime.timedelta(days=1).total_seconds()
WEEK = datetime.timedelta(weeks=1).total_seconds()
logger = logging.getLogger(__name__)
class ServiceBase(object):
"""Base service class."""
def __init__(self, loop):
self._loop = loop
self._should_stop = False
self._main_task = None
self._state = self.StoppedState(self)
def start(self):
return self._state.start()
def should_stop(self):
return self._state.should_stop()
async def wait(self):
return await self._state.wait()
def is_running(self):
return self._state.is_running()
async def _run(self):
raise NotImplementedError
class State(object):
def __init__(self, obj):
""":type obj: ServiceBase"""
self._obj = obj
def start(self):
pass
def should_stop(self):
pass
async def wait(self):
task = self._obj._main_task
if task:
await task
def is_running(self):
return False
class StoppedState(State):
def _on_stop(self, future):
self._obj._state = ServiceBase.StoppedState(self._obj)
self._obj._should_stop = False
def start(self):
obj = self._obj
obj._main_task = obj._loop.create_task(obj._run())
obj._main_task.add_done_callback(self._on_stop)
obj._state = ServiceBase.RunningState(obj)
class RunningState(State):
def should_stop(self):
obj = self._obj
obj._should_stop = True
obj._main_task.cancel()
obj._state = ServiceBase.StoppingState(obj)
def is_running(self):
return True
class StoppingState(State):
def start(self):
raise ProgrammingError(
"Cannot start stopping service. Please wait while it stop."
)
class ProgrammingError(Exception):
pass
class RateLimit:
"""Decorator to limit function calls to one per *period* seconds.
If less than *period* seconds have passed since the last call,
then the request to call the function is replace with an *on_drop*
call with the same arguments.
If *on_drop* is None [default] then the call is just dropped
"""
def __init__(self, period, timer=time.monotonic, *, on_drop=None):
self._next_call_time = None
self._period = period
self._timer = timer
self._on_drop = on_drop
@property
def should_be_called(self):
return (
self._next_call_time is None
or self._next_call_time <= self._timer()
)
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if self.should_be_called:
self._next_call_time = self._timer() + self._period
return func(*args, **kwargs)
elif self._on_drop is not None:
return self._on_drop(*args, **kwargs)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if self.should_be_called:
self._next_call_time = self._timer() + self._period
return await func(*args, **kwargs)
elif self._on_drop is not None:
return self._on_drop(*args, **kwargs)
return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
rate_limit = RateLimit
class CoalesceCalls:
def __init__(self):
self.call_time = float("-inf")
self.delayed_call = None
def coalesce_calls(self, period, *, done_callback=None):
"""
Decorator to coalesce coroutine calls to one per *period* seconds.
Requests for a coroutine call in a given time period are coalesced:
If t is the time of the last call, then N call requests in the [t,
t+period) time interval results in a single call at the
t+period time iff N>0 i.e.,
if less than *period* seconds have passed since the last call,
then the calls are coalesced: (N-1) requests are dropped, Nth
requests is performed in *period* seconds.
It is unspecified which exact call is made if arguments differ.
If the call is not dropped then *done_callback* is attached
to the task when the coroutine is scheduled with the event loop.
Given `c` is the time of the last [actual] call (`loop.create_task()`)
And `T` is the coalesce time period
When a call request arrives at `t` time
Then
| call pending? | t>c+T | c<=t<=c+T | t