Workers

Worker is an object which runs specified function (executor) in a loop.

Common

Worker parameters

All workers support the following initial parameters:

  • name worker name (default: name of executor function if specified, otherwise: auto-generated UUID)
  • func executor function (default: worker.run)
  • priority worker thread priority
  • o special object, passed as-is to executor (e.g. object worker is running for)
  • on_error a function which is called, if executor raises an exception
  • on_error_kwargs kwargs for on_error function
  • supervisor alternative task supervisor
  • poll_delay worker poll delay (default: task supervisor poll delay)

Overriding parameters at startup

Initial parameters name, priority and o can be overriden during worker startup (first two - as _name and _priority)

myworker.start(_name='worker1', _priority=atasker.TASK_LOW)

Executor function

Worker executor function is either specified with annotation or named run (see examples below). The function should always have **kwargs param.

Executor function gets in args/kwargs:

  • all parameters worker.start has been started with.
  • _worker current worker object
  • _name current worker name
  • _task_id if executor function is started in multiprocessing pool - ID of current task (for thread pool, task id = thread name).

Note

If executor function return False, worker stops itself.

Asynchronous executor function

Executor function can be asynchronous, in this case it’s executed inside task supervisor loop, no new thread is started and priority is ignored.

When background_worker decorator detects asynchronous function, class BackgroundAsyncWorker is automatically used instead of BackgroundWorker (BackgroundQueueWorker, BackgroundEventWorker and BackgroundIntervalWorker support synchronous functions out-of-the-box).

Additional worker parameter loop (_loop at startup) may be specified to put executor function inside external async loop.

Note

To prevent interference between supervisor event loop and executors, it’s strongly recommended to specify own async event loop or create aloop.

Multiprocessing executor function

To use multiprocessing, task supervisor mp pool must be created.

If executor method run is defined as static, workers automatically detect this and use multiprocessing pool of task supervisor to launch executor.

Note

As executor is started in separate process, it doesn’t have an access to self object.

Additionally, method process_result must be defined in worker class to process executor result. The method can stop worker by returning False value.

Example, let’s define BackgroundQueueWorker. Python multiprocessing module can not pick execution function defined via annotation, so worker class is required. Create it in separate module as Python multiprocessing can not pick methods from the module where the worker is started:

Warning

Multiprocessing executor function should always finish correctly, without any exceptions otherwise callback function is never called and task become “freezed” in pool.

myworker.py

class MyWorker(BackgroundQueueWorker):

    # executed in another process via task_supervisor
    @staticmethod
    def run(task, *args, **kwargs):
        # .. process task
        return '<task result>'

    def process_result(self, result):
        # process result

main.py

from myworker import MyWorker

worker = MyWorker()
worker.start()
# .....
worker.put_threadsafe('task')
# .....
worker.stop()

Workers

BackgroundWorker

Background worker is a worker which continuously run executor function in a loop without any condition. Loop of this worker is synchronous and is started in separate thread instantly.

# with annotation - function becomes worker executor
from atasker import background_worker

@background_worker
def myfunc(*args, **kwargs):
    print('I am background worker')

# with class
from atasker import BackgroundWorker

class MyWorker(BackgroundWorker):

    def run(self, *args, **kwargs):
        print('I am a worker too')

myfunc.start()

myworker2 = MyWorker()
myworker2.start()

# ............

# stop first worker
myfunc.stop()
# stop 2nd worker, don't wait until it is really stopped
myworker2.stop(wait=False)

BackgroundAsyncWorker

Similar to BackgroundWorker but used for async executor functions. Has additional parameter loop= (_loop in start function) to specify either async event loop or aloop object. By default either task supervisor event loop or task supervisor default aloop is used.

# with annotation - function becomes worker executor
from atasker import background_worker

@background_worker
async def async_worker(**kwargs):
    print('I am async worker')

async_worker.start()

# with class
from atasker import BackgroundAsyncWorker

class MyWorker(BackgroundAsyncWorker):

    async def run(self, *args, **kwargs):
        print('I am async worker too')

worker = MyWorker()
worker.start()

BackgroundQueueWorker

Background worker which gets data from asynchronous queue and passes it to synchronous or Asynchronous executor.

Queue worker is created as soon as annotator detects q=True or queue=True param. Default queue is asyncio.queues.Queue. If you want to use e.g. priority queue, specify its class instead of just True.

# with annotation - function becomes worker executor
from atasker import background_worker

@background_worker(q=True)
def f(task, **kwargs):
    print('Got task from queue: {}'.format(task))

@background_worker(q=asyncio.queues.PriorityQueue)
def f2(task, **kwargs):
    print('Got task from queue too: {}'.format(task))

# with class
from atasker import BackgroundQueueWorker

class MyWorker(BackgroundQueueWorker):

    def run(self, task, *args, **kwargs):
        print('my task is {}'.format(task))


f.start()
f2.start()
worker3 = MyWorker()
worker3.start()
f.put_threadsafe('task 1')
f2.put_threadsafe('task 2')
worker3.put_threadsafe('task 3')

put method is used to put task into worker’s queue. The method is thread-safe.

BackgroundEventWorker

Background worker which runs asynchronous loop waiting for the event and launches synchronous or asynchronous executor when it’s happened.

Event worker is created as soon as annotator detects e=True or event=True param.

# with annotation - function becomes worker executor
from atasker import background_worker

@background_worker(e=True)
def f(task, **kwargs):
    print('happened')

# with class
from atasker import BackgroundEventWorker

class MyWorker(BackgroundEventWorker):

    def run(self, *args, **kwargs):
        print('happened')


f.start()
worker3 = MyWorker()
worker3.start()
f.trigger_threadsafe()
worker3.trigger_threadsafe()

trigger_threadsafe method is used to put task into worker’s queue. The method is thread-safe. If worker is triggered from the same asyncio loop, trigger method can be used instead.

BackgroundIntervalWorker

Background worker which runs synchronous or asynchronous executor function with the specified interval or delay.

Worker initial parameters:

  • interval run executor with a specified interval (in seconds)
  • delay delay between executor launches
  • delay_before delay before executor launch

Parameters interval and delay can not be used together. All parameters can be overriden during startup by adding _ prefix (e.g. worker.start(_interval=1))

Background interval worker is created automatically, as soon as annotator detects one of the parameters above:

@background_worker(interval=1)
def myfunc(**kwargs):
    print('I run every second!')

@background_worker(interval=1)
async def myfunc2(**kwargs):
    print('I run every second and I am async!')

myfunc.start()
myfunc2.start()

As well as event worker, BackgroundIntervalWorker supports manual executor triggering with worker.trigger() and worker.trigger_threadsafe()