Bloks

Blok dramatiq

class anyblok_dramatiq.bloks.dramatiq.DramatiqBlok(registry)

Bases: anyblok.blok.Blok

Dramatiq’s Blok class definition

author = 'jssuzanne'
conditional_by = []
conflicting_by = []
classmethod declare_actors(registry)

Actor declaration

from anyblok_dramatiq import (
    declare_actor_for,
    declare_actor_send_for,
)
declare_actor_for(Model.methode_name)
# or
declare_actor_send_for(Model.methode_name)
classmethod import_declaration_module()

Python module to import in the given order at start-up

load()

Load all the actor defined in all the installed bloks

name = 'dramatiq'
optional_by = []
classmethod reload_declaration_module(reload)

Python module to import while reloading server (ie when adding Blok at runtime

required = ['anyblok-core']
required_by = ['dramatiq-task']
version = '1.0.0'

Memento

An actor method is a classmethod who are executed by the dramatiq worker. AnyBlok define two differente actor decorator:

  • actor: Works exactly as the dramatiq.actor decorator
  • actor_send: This actor call the send broker method by default

The actor decorator from anyblok must decorate only an AnyBlok Model, Mixin or Core

actor

The more basic actor:

from anyblok_dramatiq import actor

@register(Model)
class MyModel:

    ...

    @actor()
    def actor_method(cls, *args, **kwargs)
        # do something
        ...

This use case is simple, you may:

  • Call directly the actor_method and execute it:

    registry.MyModel.actor_method(, *a, **kw)
    
  • Use the dramatiq functionnality without Model.Dramatiq.Message:

    message = registry.MyModel.actor_method.send(*a, **kw)
    registry.Dramatiq.send2broker(message)
    
  • Use the dramatiq functionnality with Model.Dramatiq.Message to get status and history:

    registry.Dramatiq.create_message(registry.MyModel.actor_method, *a, **kw)
    

    Note

    In this case the message will be send to dramatiq worker by the postcommit_hook of AnyBlok

actor_send

The more basic actor:

from anyblok_dramatiq import actor_send

@register(Model)
class MyModel:

    ...

    @actor_send()
    def actor_method(cls, *args, **kwargs)
        # do something
        ...

By default this decorator allow one case, use the dramatiq functionnality with Model.Dramatiq.Messag:

registry.MyModel.actor_method(*a, **kw)

The inheritance of AnyBlok allow to overwrite all classmethod to transform them by an actor easily.

In the case where you want execute directly the actor you have to use the context manager call_directly_the_actor_send:

from anyblok_dramatiq import call_directly_the_actor_send

with call_directly_the_actor_send():
    registry.MyModel.actor_method(*a, **kw)

API doc

Message

class anyblok_dramatiq.bloks.dramatiq.message.Dramatiq

Bases: object

No SQL Model, use to get tools for dramatiq messaging

Declaration type:
 Model
Tablename:dramatiq
Registry name:Model.Dramatiq
Inherit model or mixin:
 
classmethod create_message(actor, *args, **kwargs)

Prepare a message and add an entry in the Message Model :param actor: an Actor instance :param delay: use for postcommit hook send2broker :param _*args: args of the actor :param _*_*kwargs: kwargs of the actor :rtype: dramatiq message instance

classmethod send2broker(*messages, delay=None, run_at=None)

Send all the messages with the delay

Parameters:
  • _*messages – message instance list
  • delay – delay before send
  • run_at – datetime when the process must be executed
class anyblok_dramatiq.bloks.dramatiq.message.Message

Bases: anyblok.mixin.DramatiqMessageStatus

Message model for dramatiq

Declaration type:
 

Model

Tablename:

dramatiq_message

Registry name:

Model.Dramatiq.Message

Inherit model or mixin:
 
  • <class ‘anyblok.mixin.DramatiqMessageStatus’>
field name Description
message
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • Field type - <class ‘anyblok.column.Json’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
updated_at
  • Context:
  • Label - None
  • is auto updated - False
  • Field type - <class ‘anyblok.column.DateTime’>
id
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • Field type - <class ‘anyblok.column.UUID’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
  • primary_key - True
classmethod get_instance_of(message)

Called by the middleware to get the model instance of the message

classmethod insert(*args, **kwargs)

Over write the insert to add the first history line

update_status(status, error=None)

Called by the middleware to change the status and history

Blok dramatiq task

class anyblok_dramatiq.bloks.task.DramatiqTaskBlok(registry)

Bases: anyblok.blok.Blok

Dramatiq’s task definition

author = 'jssuzanne'
conditional_by = []
conflicting_by = []
classmethod import_declaration_module()

Python module to import in the given order at start-up

name = 'dramatiq-task'
optional_by = []
classmethod reload_declaration_module(reload)

Python module to import while reloading server (ie when adding Blok at runtime

required = ['anyblok-core', 'dramatiq']
required_by = []
version = '1.0.0'

Memento

The tasks is based on dramatiq, The instance of the model:

  • Task: define what the task have to do
  • Job: historize the execution of an instance of Task with specific data

Model.Dramatiq.Task

This model is not directly useable, you have to use polymosphic model:

  • Model.Dramatiq.Task.CallMethod: call a classmethod on a model, defined by the task
  • Model.Dramatiq.Task.StepByStep: call each sub task one by one on function of the order
  • Model.Dramatiq.Task.Parallel: call all sub tasks on one shot

Model.Dramatiq.Job

The job is the execution of the task with dramatiq, The job historize also action done and action to do. To create a job, you must get a task create the job with the method do_the_job:

task = registry.Dramatiq.Task.query().first()
task.do_the_job(with_args=tuple(), with_kwargs=dict(), run_at=a_datetime)
# this job will not be traited now and by the process but by another process

Note

In the case where the task will be an StepByStep or Parallel task, then the task create one or more job, one for the job and one for sub jobs

API doc

Task

class anyblok_dramatiq.bloks.task.task.Task

Bases: object

Main Task, define the main table

Declaration type:
 Model
Tablename:dramatiq_task
Registry name:Model.Dramatiq.Task
Inherit model or mixin:
 
field name Description
order
  • is crypted - False
  • Label - None
  • default - 100
  • Field type - <class ‘anyblok.column.Integer’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
label
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • size - 64
  • Field type - <class ‘anyblok.column.String’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
update_at
  • Context:
  • is auto updated - True
  • Label - None
  • nullable - False
  • Field type - <class ‘anyblok.column.DateTime’>
id
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • is crypted - False
  • Label - None
  • autoincrement - True
  • Field type - <class ‘anyblok.column.Integer’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • primary_key - True
create_at
  • Context:
  • is auto updated - False
  • Label - None
  • nullable - False
  • Field type - <class ‘anyblok.column.DateTime’>
main_task
  • _remote_columns - None
  • backref - sub_tasks
  • Label - None
  • info:
  • remote_model - Model.Dramatiq.Task
  • remote_name - sub_tasks
  • Field type - <class ‘anyblok.relationship.Many2One’>
  • Context:
  • _column_names - None
  • unique - False
  • model - Model.Dramatiq.Task
task_type
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • size - 64
  • Field type - <class ‘anyblok.column.Selection’>
  • selections - get_task_type
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
classmethod define_mapper_args()

Polymorphism configuration

do_the_job(main_job=None, run_at=None, with_args=None, with_kwargs=None)

Create a job for this tash and add send it to dramatiq

Parameters:
  • main_job – parent job if exist
  • run_at – datetime to execute the job
  • with_args – tuple of the argument to pass at the job
  • with_kwargs – dict of the argument to pass at the job
classmethod get_task_type()

List the task type possible

run(job)

Execute the task for one job

Parameters:job – job executed
run_next(job)

next action to execute when a sub job finish this task for one job

Parameters:job – job executed

Job

class anyblok_dramatiq.bloks.task.job.Job

Bases: object

The job is an execution of an instance of task

Declaration type:
 Model
Tablename:dramatiq_job
Registry name:Model.Dramatiq.Job
Inherit model or mixin:
 
field name Description
uuid
  • is crypted - False
  • Label - None
  • default - <function uuid1 at 0x7f44030d5598>
  • Field type - <class ‘anyblok.column.UUID’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
  • primary_key - True
data
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • Field type - <class ‘anyblok.column.Json’>
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
update_at
  • Context:
  • is auto updated - True
  • Label - None
  • nullable - False
  • Field type - <class ‘anyblok.column.DateTime’>
task
  • _remote_columns - None
  • Label - None
  • info:
  • nullable - False
  • remote_model - Model.Dramatiq.Task
  • Field type - <class ‘anyblok.relationship.Many2One’>
  • Context:
  • _column_names - None
  • unique - False
  • model - Model.Dramatiq.Task
error
  • is crypted - False
  • Label - None
  • default - <class ‘anyblok.column.NoDefaultValue’>
  • Field type - <class ‘anyblok.column.Text’>
  • foreign_key - None
  • Context:
  • DB column name - None
status
  • is crypted - False
  • Label - None
  • default - new
  • size - 64
  • Field type - <class ‘anyblok.column.Selection’>
  • selections:
  • (‘new’, ‘New’)
  • (‘waiting’, ‘Waiting’)
  • (‘running’, ‘Running’)
  • (‘failed’, ‘Failed’)
  • (‘done’, ‘Done’)
  • foreign_key - None
  • Context:
  • DB column name - None
  • nullable - False
run_at
  • Context:
  • Label - None
  • is auto updated - False
  • Field type - <class ‘anyblok.column.DateTime’>
main_job
  • _remote_columns - None
  • backref - sub_jobs
  • Label - None
  • info:
  • remote_model - Model.Dramatiq.Job
  • remote_name - sub_jobs
  • Field type - <class ‘anyblok.relationship.Many2One’>
  • Context:
  • _column_names - None
  • unique - False
  • model - Model.Dramatiq.Job
create_at
  • Context:
  • is auto updated - False
  • Label - None
  • nullable - False
  • Field type - <class ‘anyblok.column.DateTime’>
call_main_job()

Call the main job if exist to do the next action of the main job

lock()

lock the job to be sure that only one thread execute the run_next

classmethod run(job_uuid=None)

dramatiq actor to execute a specific task

actor_send event call with positionnal argument {‘queue_name’: ‘default’, ‘priority’: 0}