Task supervisor

Task supervisor is a component which manages task thread pool and run task schedulers (workers).

Usage

When atasker package is imported, default task supervisor is automatically created.

from atasker import task_supervisor

# thread pool
task_supervisor.set_thread_pool(
    pool_size=20, reserve_normal=5, reserve_high=5)
task_supervisor.start()

Warning

Task supervisor must be started before any scheduler/worker or task.

Task priorities

Task supervisor supports 4 task priorities:

  • TASK_LOW
  • TASK_NORMAL (default)
  • TASK_HIGH
  • TASK_CRITICAL
from atasker import TASK_HIGH

def test():
    pass

background_task(test, name='test', priority=TASK_HIGH)()

Pool size

Parameter pool_size for task_supervisor.set_thread_pool defines size of the task (thread) pool.

Pool size means the maximum number of the concurrent tasks which can run. If task supervisor receive more tasks than pool size has, they will wait until some running task is finished.

Actually, parameter pool_size defines pool size for the tasks, started with TASK_LOW priority. Tasks with higher priority have “reserves”: pool_size=20, reserve_normal=5 means create pool for 20 tasks but reserve 5 more places for the tasks with TASK_NORMAL priority. In this example, when task supervisor receives such task, pool is “extended”, up to 5 places.

For TASK_HIGH pool size can be extended up to pool_size + reserve_normal + reserve_high, so in the example above: 20 + 5 + 5 = 30.

Tasks with priority TASK_CRITICAL are always started instantly, no matter how busy task pool is, and thread pool is being extended for them with no limits. Multiprocessing critical tasks are started as soon as multiprocessing.Pool object has free space for the task.

To make pool size unlimited, set pool_size=0.

Parameters min_size and max_size set actual system thread pool size. If max_size is not specified, it’s set to pool_size + reserve_normal + reserve_high. It’s recommended to set max_size slightly larger manually to have a space for critical tasks.

By default, max_size is CPU count * 5. You may use argument min_size=’max’ to automatically set minimal pool size to max.

Note

pool size can be changed while task supervisor is running.

Poll delay

Poll delay is a delay (in seconds), which is used by task queue manager, in workers and some other methods like start/stop.

Lower poll delay = higher CPU usage, higher poll delay = lower reaction time.

Default poll delay is 0.1 second. Can be changed with:

task_supervisor.poll_delay = 0.01 # set poll delay to 10ms

Blocking

Task supervisor is started in its own thread. If you want to block current thread, you may use method

task_supervisor.block()

which will just sleep while task supervisor is active.

Timeouts

Task supervisor can log timeouts (when task isn’t launched within a specified number of seconds) and run timeout handler functions:

def warning(t):
    # t = task thread object
    print('Task thread {} is not launched yet'.format(t))

def critical(t):
    print('All is worse than expected')

task_supervisor.timeout_warning = 5
task_supervisor.timeout_warning_func = warn
task_supervisor.timeout_critical = 10
task_supervisor.timeout_critical_func = critical

Stopping task supervisor

task_supervisor.stop(wait=True, stop_schedulers=True, cancel_tasks=False)

Params:

  • wait wait until tasks and scheduler coroutines finish. If wait=<number>, task supervisor will wait until coroutines finish for the max. wait seconds. However if requested to stop schedulers (workers) or task threads are currently running, method stop wait until they finish for the unlimited time.
  • stop_schedulers before stopping the main event loop, task scheduler will call stop method of all schedulers running.
  • cancel_tasks if specified, task supervisor will try to forcibly cancel all scheduler coroutines.

aloops: async executors and tasks

Usually it’s unsafe to run both schedulers (workers) executors and custom tasks in supervisor’s event loop. Workers use event loop by default and if anything is blocked, the program may be freezed.

To avoid this, it’s strongly recommended to create independent async loops for your custom tasks. atasker supervisor has built-in engine for async loops, called “aloops”, each aloop run in a separated thread and doesn’t interfere with supervisor event loop and others.

Create

If you plan to use async worker executors, create aloop:

a = task_supervisor.create_aloop('myworkers', default=True, daemon=True)
# the loop is instantly started by default, to prevent add param start=False
# and then use
# task_supervisor.start_aloop('myworkers')

To determine in which thread executor is started, simply get its name. aloop threads are called “supervisor_aloop_<name>”.

Using with workers

Workers automatically launch async executor function in default aloop, or aloop can be specified with loop= at init or _loop= at startup.

Executing own coroutines

aloops have 2 methods to execute own coroutines:

# put coroutine to loop
task = aloop.background_task(coro(args))

# blocking wait for result from coroutine
result = aloop.run(coro(args))

Other supervisor methods

Note

It’s not recommended to create/start/stop aloops without supervisor

# set default aloop
task_supervisor.set_default_aloop(aloop):

# get aloop by name
task_supervisor.get_aloop(name)

# stop aloop (not required, supervisor stops all aloops at shutdown)
task_supervisor.stop_aloop(name)

# get aloop async event loop object for direct access
aloop.get_loop()

Multiprocessing

Multiprocessing pool may be used by workers and background tasks to execute a part of code.

To create multiprocessing pool, use method:

from atasker import task_supervisor

# task_supervisor.create_mp_pool(<args for multiprocessing.Pool>)
# e.g.
task_supervisor.create_mp_pool(processes=8)

# use custom mp Pool

from multiprocessing import Pool

pool = Pool(processes=4)
task_supervisor.mp_pool = pool

# set mp pool size. if pool wasn't created before, it will be initialized
# with processes=(pool_size+reserve_normal+reserve_high)
task_supervisor.set_mp_pool(
    pool_size=20, reserve_normal=5, reserve_high=5)

Custom task supervisor

from atasker import TaskSupervisor

my_supervisor = TaskSupervisor(
    pool_size=100, reserve_normal=10, reserve_high=10)

class MyTaskSupervisor(TaskSupervisor):
    # .......

my_supervisor2 = MyTaskSupervisor()

Putting own tasks

If you can not use background tasks for some reason, you may put own tasks manually and put it to task supervisor to launch:

task = task_supervisor.put_task(target=myfunc, args=(), kwargs={},
  priority=TASK_NORMAL, delay=None)

If delay is specified, the thread is started after the corresponding delay (seconds).

After the function thread is finished, it should notify task supervisor:

task_supervisor.mark_task_completed(task=task) # or task_id = task.id

If no task_id specified, current thread ID is being used:

# note: custom task targets always get _task_id in kwargs
 def mytask(**kwargs):
    # ... perform calculations
   task_supervisor.mark_task_completed(task_id=kwargs['_task_id'])

 task_supervisor.put_task(target=mytask)

Note

If you need to know task id, before task is put (e.g. for task callback), you may generate own and call put_task with task_id=task_id parameter.

Putting own tasks in multiprocessing pool

To put own task into multiprocessing pool, you must create tuple object which contains:

  • unique task id
  • task function (static method)
  • function args
  • function kwargs
  • result callback function
import uuid

from atasker import TT_MP

task = task_supervisor.put_task(
   target=<somemodule.staticmethod>, callback=<somefunc>, tt=TT_MP)

After the function is finished, you should notify task supervisor:

task_supervisor.mark_task_completed(task_id=<task_id>, tt=TT_MP)

Creating own schedulers

Own task scheduler (worker) can be registered in task supervisor with:

task_supervisor.register_scheduler(scheduler)

Where scheduler = scheduler object, which should implement at least stop (regular) and loop (async) methods.

Task supervisor can also register synchronous schedulers/workers, but it can only stop them when stop method is called:

task_supervisor.register_sync_scheduler(scheduler)

To unregister schedulers from task supervisor, use unregister_scheduler and unregister_sync_scheduler methods.