Contents
anyblok_dramatiq.bloks.dramatiq.
DramatiqBlok
(registry)Bases: anyblok.blok.Blok
Dramatiq’s Blok class definition
author
= 'jssuzanne'conditional_by
= []conflicting_by
= []declare_actors
(registry)Actor declaration
from anyblok_dramatiq import (
declare_actor_for,
declare_actor_send_for,
)
declare_actor_for(Model.methode_name)
# or
declare_actor_send_for(Model.methode_name)
import_declaration_module
()Python module to import in the given order at start-up
load
()Load all the actor defined in all the installed bloks
name
= 'dramatiq'optional_by
= []reload_declaration_module
(reload)Python module to import while reloading server (ie when adding Blok at runtime
required
= ['anyblok-core']required_by
= ['dramatiq-task']version
= '1.0.3'An actor method is a classmethod who are executed by the dramatiq worker. AnyBlok define two differente actor decorator:
actor
: Works exactly as the dramatiq.actor decoratoractor_send
: This actor call the send broker method by defaultThe actor decorator from anyblok must decorate only an AnyBlok Model, Mixin or Core
actor
¶The more basic actor:
from anyblok_dramatiq import actor
@register(Model)
class MyModel:
...
@actor()
def actor_method(cls, *args, **kwargs)
# do something
...
This use case is simple, you may:
Call directly the actor_method and execute it:
registry.MyModel.actor_method(, *a, **kw)
Use the dramatiq functionnality without Model.Dramatiq.Message
:
message = registry.MyModel.actor_method.send(*a, **kw)
registry.Dramatiq.send2broker(message)
Use the dramatiq functionnality with Model.Dramatiq.Message
to get status and history:
registry.Dramatiq.create_message(registry.MyModel.actor_method, *a, **kw)
Note
In this case the message will be send to dramatiq worker by the postcommit_hook
of
AnyBlok
actor_send
¶The more basic actor:
from anyblok_dramatiq import actor_send
@register(Model)
class MyModel:
...
@actor_send()
def actor_method(cls, *args, **kwargs)
# do something
...
By default this decorator allow one case, use the dramatiq functionnality with Model.Dramatiq.Messag
:
registry.MyModel.actor_method(*a, **kw)
The inheritance of AnyBlok allow to overwrite all classmethod to transform them by an actor easily.
In the case where you want execute directly the actor you have to use the context manager call_directly_the_actor_send
:
from anyblok_dramatiq import call_directly_the_actor_send
with call_directly_the_actor_send():
registry.MyModel.actor_method(*a, **kw)
anyblok_dramatiq.bloks.dramatiq.message.
Dramatiq
Bases: object
No SQL Model, use to get tools for dramatiq messaging
AnyBlok registration:
create_message
(actor, *args, **kwargs)Prepare a message and add an entry in the Message Model :param actor: an Actor instance :param delay: use for postcommit hook send2broker :param _*args: args of the actor :param _*_*kwargs: kwargs of the actor :rtype: dramatiq message instance
send2broker
(*messages, delay=None, run_at=None)Send all the messages with the delay
Parameters: |
|
---|
anyblok_dramatiq.bloks.dramatiq.message.
Message
Bases: anyblok.mixin.DramatiqMessageStatus
Message model for dramatiq
AnyBlok registration:
anyblok.mixin.DramatiqMessageStatus
Fields | |
---|---|
message |
|
id |
|
updated_at |
|
get_instance_of
(message)Called by the middleware to get the model instance of the message
insert
(*args, **kwargs)Over write the insert to add the first history line
update_status
(status, error=None)Called by the middleware to change the status and history
anyblok_dramatiq.bloks.task.
DramatiqTaskBlok
(registry)Bases: anyblok.blok.Blok
Dramatiq’s task definition
author
= 'jssuzanne'conditional_by
= []conflicting_by
= []import_declaration_module
()Python module to import in the given order at start-up
name
= 'dramatiq-task'optional_by
= []reload_declaration_module
(reload)Python module to import while reloading server (ie when adding Blok at runtime
required
= ['anyblok-core', 'dramatiq']required_by
= []version
= '1.0.3'The tasks is based on dramatiq, The instance of the model:
Model.Dramatiq.Task
¶This model is not directly useable, you have to use polymosphic model:
Model.Dramatiq.Task.CallMethod
: call a classmethod on a model, defined by the taskModel.Dramatiq.Task.StepByStep
: call each sub task one by one on function of the orderModel.Dramatiq.Task.Parallel
: call all sub tasks on one shotModel.Dramatiq.Job
¶The job is the execution of the task with dramatiq, The job historize also action done and action to do.
To create a job, you must get a task create the job with the method do_the_job
:
task = registry.Dramatiq.Task.query().first()
task.do_the_job(with_args=tuple(), with_kwargs=dict(), run_at=a_datetime)
# this job will not be traited now and by the process but by another process
Note
In the case where the task will be an StepByStep
or Parallel
task, then the
task create one or more job, one for the job and one for sub jobs
anyblok_dramatiq.bloks.task.task.
Task
Bases: object
Main Task, define the main table
AnyBlok registration:
Fields | |
---|---|
order |
|
create_at |
|
id |
|
task_type |
|
label |
|
update_at |
|
main_task |
|
define_mapper_args
()Polymorphism configuration
do_the_job
(main_job=None, run_at=None, with_args=None, with_kwargs=None)Create a job for this tash and add send it to dramatiq
Parameters: |
|
---|
get_task_type
()List the task type possible
run
(job)Execute the task for one job
Parameters: | job – job executed |
---|
run_next
(job)next action to execute when a sub job finish this task for one job
Parameters: | job – job executed |
---|
anyblok_dramatiq.bloks.task.job.
Job
Bases: object
The job is an execution of an instance of task
AnyBlok registration:
Fields | |
---|---|
status |
|
create_at |
|
error |
|
data |
|
run_at |
|
main_job |
|
update_at |
|
uuid |
|
task |
|
call_main_job
()Call the main job if exist to do the next action of the main job
lock
()lock the job to be sure that only one thread execute the run_next
run
(job_uuid=None)dramatiq actor to execute a specific task
actor_send event call with positionnal argument {‘priority’: 0, ‘queue_name’: ‘default’}