Code

Actor

exception anyblok_dramatiq.actor.AnyBlokActorException

Bases: ValueError

A ValueError exception for anyblok_dramatiq

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class anyblok_dramatiq.actor.AnyBlokActor(fn, *, broker, actor_name, queue_name, priority, options)

Bases: dramatiq.actor.Actor

Overload the dramatiq.actor.Actor class

the goal is to allowthe decorator actor_send, this decorator use directly the method send

send(*args, **kwargs)

Send to the broker

anyblok_dramatiq.actor.declare_actor_for(method, **kwargs)

Method to add anyblok_dramatiq.actor.actor decorator on the class method

Parameters:
  • method – classmethod pointer
  • _**kwargs – decorator kwargs
anyblok_dramatiq.actor.declare_actor_send_for(method, **kwargs)

Method to add anyblok_dramatiq.actor.actor_send decorator on the class method

Parameters:
  • method – classmethod pointer
  • _**kwargs – decorator kwargs
anyblok_dramatiq.actor.actor(queue_name='default', priority=0, **options)

Decorator to get an Actor

Parameters:
  • queue_name – name of the queue
  • priority – priority of the actor
  • _**options – options for actor
anyblok_dramatiq.actor.actor_send(queue_name='default', priority=0, **options)

Decorator to get an AnyBlokActor

Parameters:
  • queue_name – name of the queue
  • priority – priority of the actor
  • _**options – options for actor
anyblok_dramatiq.actor.call_directly_the_actor_send()

Context manager to call directly without use dramatiq

class anyblok_dramatiq.actor.ActorPlugin(registry)

Bases: anyblok.model.plugins.ModelPluginBase

anyblok.model.plugin to allow the build of the anyblok_dramatiq.actor

after_model_construction(base, namespace, transformation_properties)

Do some action with the constructed Model

Parameters:
  • base – the Model class
  • namespace – the namespace of the model
  • transformation_properties – the properties of the model
initialisation_tranformation_properties(properties, transformation_properties)

Initialise the transform properties

Parameters:
  • properties – the properties declared in the model
  • new_type_properties – param to add in a new base if need
insert_in_bases(new_base, namespace, properties, transformation_properties)

Insert in a base the overload

Parameters:
  • new_base – the base to be put on front of all bases
  • namespace – the namespace of the model
  • properties – the properties declared in the model
  • transformation_properties – the properties of the model
transform_base_attribute(attr, method, namespace, base, transformation_properties, new_type_properties)

transform the attribute for the final Model

Parameters:
  • attr – attribute name
  • method – method pointer of the attribute
  • namespace – the namespace of the model
  • base – One of the base of the model
  • transformation_properties – the properties of the model
  • new_type_properties – param to add in a new base if need
class anyblok_dramatiq.actor.ActorSendPlugin(registry)

Bases: anyblok.model.plugins.ModelPluginBase

anyblok.model.plugin to allow the build of the anyblok_dramatiq.actor_send

after_model_construction(base, namespace, transformation_properties)

Do some action with the constructed Model

Parameters:
  • base – the Model class
  • namespace – the namespace of the model
  • transformation_properties – the properties of the model
initialisation_tranformation_properties(properties, transformation_properties)

Initialise the transform properties

Parameters:
  • properties – the properties declared in the model
  • new_type_properties – param to add in a new base if need
insert_in_bases(new_base, namespace, properties, transformation_properties)

Insert in a base the overload

Parameters:
  • new_base – the base to be put on front of all bases
  • namespace – the namespace of the model
  • properties – the properties declared in the model
  • transformation_properties – the properties of the model
transform_base_attribute(attr, method, namespace, base, transformation_properties, new_type_properties)

transform the attribute for the final Model

Parameters:
  • attr – attribute name
  • method – method pointer of the attribute
  • namespace – the namespace of the model
  • base – One of the base of the model
  • transformation_properties – the properties of the model
  • new_type_properties – param to add in a new base if need

Broker

anyblok_dramatiq.broker.prepare_broker(withmiddleware=True)

Configure the broker for send and workers

Dramatiq middleware

class anyblok_dramatiq.middleware.DramatiqMessageMiddleware

Bases: dramatiq.middleware.middleware.Middleware

Middleware for dramatiq, the goal is to detect if the the call was done by anyblok tools with the Model.Dramatiq.Message. This model stock the status of the message and the history of the status’s change

after_process_message(broker, message, *, result=None, exception=None)

Called after process message

If the message is in the Model.Dramatiq.Message then the status will be change to done or failed.

Note

the status is failed if an exception is passed or a rollback is need

Before the end, the session is expired to release the Session pool thread

Parameters:
  • broker – the broker used
  • message – the message send in the broker
  • result – return by the process
  • exception – any Exception raised by the process
after_skip_message(broker, message)

Called after skip message

If the message is in the Model.Dramatiq.Message then the status will be change to skip

Before the end, the session is expired to release the Session pool thread

Parameters:
  • broker – the broker used
  • message – the message send in the broker
after_worker_shutdown(*args, **kwargs)

Called before worker shutdown

Close the AnyBlok registry

before_consumer_thread_shutdown(*args, **kwargs)

Called before consumer thread shutdown

remove the session instance to clean the Session pool

before_enqueue(broker, message, delay)

Called when a message is delayed or enqueued

If the message is in the Model.Dramatiq.Message then the status will be change to delayed or enqueued

Parameters:
  • broker – the broker used
  • message – the message send in the broker
  • delay – delay in milliseconds
before_process_message(broker, message)

Called before process message

Invalid the cache, this is mean that if a cache have to be invalidated then it will be invalidated else nothing is done

If the message is in the Model.Dramatiq.Message then the status will be change to running

Parameters:
  • broker – the broker used
  • message – the message send in the broker
before_worker_thread_shutdown(*args, **kwargs)

Called before worker thread shutdown

remove the session instance to clean the Session pool

Scripts

anyblok_dramatiq.scripts.worker_process(worker_id, logging_fd)

consume worker to process messages and execute the actor

anyblok_dramatiq.scripts.dramatiq(application, configuration_groups, **kwargs)

Run dramatiq workers process to consume en execute actors

Parameters:
  • application – name of the application
  • configuration_groups – list configuration groupe to load
  • **kwargs – ArgumentParser named arguments