Attributes
Description
An attribute plugin is simply an AbstractAttributePlugin
subclass designed to implement a Jinja filter.
Requirements
Here is a commented minimal plugin implementation that won’t do anything.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass AbstractAttributePlugin
from lib.plugins import AbstractAttributePlugin
# Required to use the Jinja Undefined state
from jinja2 import Undefined
# Required for type hints
from typing import Any
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "MyPluginClassName"
class MyPluginClassName(AbstractAttributePlugin):
def __init__(self, settings: dict[str, any]):
# Instantiate new plugin and store a copy of its settings dict in self._settings
super().__init__(settings)
# ... plugin init code
def filter(self, value: Any | None | Undefined) -> Any:
# Filter that does nothing
return value
filter method
You should consider reading the official Jinja documentation about custom filters.
The filter()
method always takes at least one value
parameter, and may have some other.
Its generic prototype is:
def filter(self, value: Any | None | Undefined, *args: Any, **kwds: Any) -> Any:
In Jinja, it is called with:
"{{ value | filter }}"
"{{ value | filter(otherarg1, otherarg2) }}"
"{{ value | filter(otherarg1=otherarg1_value, otherarg2=otherarg2_value) }}"
The above expressions are replaced by the filter return value.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass AbstractAttributePlugin
from lib.plugins import AbstractAttributePlugin
# Required to use the Jinja Undefined state
from jinja2 import Undefined
# Required for type hints
from typing import Any
from datetime import datetime
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "DatetimeFormatPlugin"
class DatetimeFormatPlugin(AbstractAttributePlugin):
def filter(self, value:Any, format:str="%H:%M %d-%m-%y") -> str | Undefined:
if isinstance(value, Undefined):
return value
if not isinstance(value, datetime):
raise TypeError(f"""Invalid type '{type(value)}' for datetime_format value: must be a datetime""")
return value.strftime(format)
This filter can now be called with:
"{{ a_datetime_attribute | datetime_format }}"
"{{ a_datetime_attribute | datetime_format('%m/%d/%Y, %H:%M:%S') }}"
"{{ a_datetime_attribute | datetime_format(format='%m/%d/%Y') }}"
Clients
Description
A client plugin is simply a GenericClient
subclass designed to implement simple events handlers, and to split their tasks in atomic subtasks to ensure consistent error reprocessing.
Requirements
Here is a commented minimal plugin implementation that won’t do anything, as it doesn’t implement any event handlers yet.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass GenericClient
from clients import GenericClient
# Required for event handlers method type hints
from lib.config import HermesConfig # only if the plugin implement an __init__() method
from lib.datamodel.dataobject import DataObject
from typing import Any
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "MyPluginClassName"
class MyPluginClassName(GenericClient):
def __init__(self, config: HermesConfig):
# The 'config' var must not be used nor modified by the plugin
super().__init__(config)
# ... plugin init code
Handlers methods
Event handlers
For each data type set up in the client datamodel, the plugin may implement a handler for each of the 5 possible event types:
added
: when an object is added
recycled
: when an object is restored from trashbin (will never be called if trashbin is disabled)
modified
: when an object is modified
trashed
: when an object is put in trashbin (will never be called if trashbin is disabled)
removed
: when an object is deleted
If an event is received by a client, but its handler isn’t implemented, it will silently be ignored.
Each handler must be named on_datatypename_eventtypename.
Example for a Mydatatype
data type:
def on_Mydatatype_added(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
newobj: DataObject,
):
pass
def on_Mydatatype_recycled(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
newobj: DataObject,
):
pass
def on_Mydatatype_modified(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
newobj: DataObject,
cachedobj: DataObject,
):
pass
def on_Mydatatype_trashed(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
cachedobj: DataObject,
):
pass
def on_Mydatatype_removed(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
cachedobj: DataObject,
):
pass
Event handlers arguments
-
objkey
: the primary key of the object affected by the event
-
eventattrs
: a dictionary containing the new object attributes. Its content depends upon the event type:
added
/ recycled
events: contains all object attributes names as key, and their respective values as value
modified
event: always contains three keys:
added
: attributes that were previously unset, but now have a value. Attribute names as key, and their respective values as value
modified
: attributes that were previously set, but whose value has changed. Attribute names as key, and their respective new values as value
removed
: attributes that were previously set, but now don’t have a value anymore. Attribute names as key, and None
as value
trashed
/ removed
events: always an empty dict {}
-
newobj
: a DataObject
instance containing all the updated values of the object affected by the event (see DataObject instances below)
-
cachedobj
: a DataObject
instance containing all the previous (cached) values of the object affected by the event (see DataObject instances below)
DataObject instances
Each data type object can be used intuitively through a DataObject
instance.
Let’s use a simple example with this User
object values (without a mail) from datamodel below:
{
"user_pkey": 42,
"uid": "jdoe",
"givenname": "John",
"sn": "Doe"
}
hermes-client:
datamodel:
Users:
hermesType: SRVUsers
attrsmapping:
user_pkey: srv_user_id
uid: srv_login
givenname: srv_firstname
sn: srv_lastname
mail: srv_mail
Now, if this object is stored in a newobj
DataObject instance:
>>> newobj.getPKey()
42
>>> newobj.user_pkey
42
>>> newobj.uid
'jdoe'
>>> newobj.givenname
'John'
>>> newobj.sn
'Doe'
>>> newobj.mail
AttributeError: 'Users' object has no attribute 'mail'
>>> hasattr(newobj, 'sn')
True
>>> hasattr(newobj, 'mail')
False
Error handling
Any unhandled exception raised in an event handler will be managed by GenericClient
, that will append the event to its error queue.
GenericClient
will then try to process the event regularly until it succeeds, and therefore call the event handler.
But sometimes, a handler must process several operations on target. Imagine a handler like this:
def on_Mydatatype_added(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
newobj: DataObject,
):
if condition:
operation1() # condition is False, operation1() is not called
operation2() # no error occurs
operation3() # this one raises an exception
At each retry the operation2()
function will be called again, but this is not necessarily desirable.
It is possible to divide a handler in steps by using the currentStep
attribute inherited from GenericClient
, to resume the retries at the failed step.
currentStep
always starts at 0 on normal event processing. Its new values are then up to plugin implementations.
When an error occurs, the currentStep
value is stored in the error queue with the event.
The error queue retries will always restore the currentStep
value before calling the event handler.
So by implementing it like below, operation2()
will only be called once.
def on_Mydatatype_added(
self,
objkey: Any,
eventattrs: "dict[str, Any]",
newobj: DataObject,
):
if self.currentStep == 0:
if condition:
operation1() # condition is False, operation1() is not called
# Declare that changes have been propagated on target
self.isPartiallyProcessed = True
self.currentStep += 1
if self.currentStep == 1:
operation2() # no error occurs
# Declare that changes have been propagated on target
self.isPartiallyProcessed = True
self.currentStep += 1
if self.currentStep == 2:
operation3() # this one raises an exception
# Declare that changes have been propagated on target
self.isPartiallyProcessed = True
self.currentStep += 1
Understanding isPartiallyProcessed
attribute
The isPartiallyProcessed
attribute inherited from GenericClient
indicates if the current event processing has already propagated some changes on target. Therefore, it must be set to True
as soon as the slightest modification has been propagated to the target.
It allows autoremediation to merge events whose currentStep
is different from 0 but whose previous steps have not modified anything on the target.
isPartiallyProcessed
is always False
on normal event processing. Its value change is up to plugin implementations.
With the implementation example above, and an exception raised by operation3()
, the autoremediation would not try to merge this partially processed event with possible subsequent events, as isPartiallyProcessed
is True
.
With the implementation example above, but an exception raised by operation2()
, the autoremediation would try to merge this unprocessed event with possible subsequent events, as isPartiallyProcessed
is still False
.
on_save handler
A special handler may be implemented when hermes just have saved its cache files: once some events have been processed and no event is waiting on the message bus, or before ending.
Warning
As this handler isn’t a standard event handler, GenericClient
can’t handle exceptions for it, and process to a retry later.
Any unhandled exception raised in this handler will immediately terminate the client.
It’s up to the implementation to avoid errors.
GenericClient properties and methods
Properties
-
Step number of current event processed. Allow clients to resume an event where it has failed.
-
isPartiallyProcessed: bool
Indicates if the current event processing has already propagated some changes on target.
Must be set to True
as soon as the slightest modification has been propagated to the target.
It allows autoremediation to merge events whose currentStep
is different from 0 but whose previous steps have not modified anything on the target.
-
Read-only attribute that can let client plugin handler know if the current event is being processed as part of an error retry. This can be useful for example to perform additional checks when a library happens to throw exceptions even though it has correctly processed the requested changes, as python-ldap sometimes does.
-
Dict containing the client plugin configuration.
Methods
-
def getDataobjectlistFromCache(objtype: str) -> DataObjectList
Returns cache of specified objtype, by reference. Raise IndexError
if objtype
is invalid
Warning
Any modification of the cache content will mess up your client!!!
-
def getObjectFromCache(objtype: str, objpkey: Any ) -> DataObject
Returns a deepcopy of an object from cache. Raise IndexError
if objtype
is invalid, or if objpkey
is not found
-
Client main loop
Warning
Called by Hermes, to start the client. Must never be called nor overridden
Datasources
Description
A datasource plugin is simply a AbstractDataSourcePlugin
subclass designed to link hermes-server with any datasource.
It requires methods to connect and disconnect to datasource, and to fetch, add, modify and delete data.
Requirements
Here is a commented minimal plugin implementation that won’t do anything.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass AbstractDataSourcePlugin
from lib.plugins import AbstractDataSourcePlugin
# Required for type hints
from typing import Any
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "MyDatasourcePluginClassName"
class MyDatasourcePluginClassName(AbstractDataSourcePlugin):
def __init__(self, settings: dict[str, Any]):
# Instantiate new plugin and store a copy of its settings dict in self._settings
super().__init__(settings)
# ... plugin init code
def open(self):
"""Establish connection with datasource"""
def close(self):
"""Close connection with datasource"""
def fetch(
self,
query: str | None,
vars: dict[str, Any],
) -> list[dict[str, Any]]:
"""Fetch data from datasource with specified query and optional queryvars.
Returns a list of dict containing each entry fetched, with REMOTE_ATTRIBUTES
as keys, and corresponding fetched values as values"""
def add(self, query: str | None, vars: dict[str, Any]):
"""Add data to datasource with specified query and optional queryvars"""
def delete(self, query: str | None, vars: dict[str, Any]):
"""Delete data from datasource with specified query and optional queryvars"""
def modify(self, query: str | None, vars: dict[str, Any]):
"""Modify data on datasource with specified query and optional queryvars"""
Methods
Connection methods
As they don’t take any arguments, the open
and close
methods should rely on plugin settings.
For stateless datasources, they may do nothing.
fetch method
This method is called to fetch some data and provide it to hermes-server.
Depending on the plugin implementation, it may rely on the query
argument or the vars
argument, or both.
The result must be returned as a list of dict. Each list item is a fetched entry stored in a dict, with attribute name as key, and its corresponding value. The value must be of one of the following Python types:
None
- int
- float
- str
- datetime.datetime
- bytes
Allowed iterable types are:
Values must be of one of the types mentioned above. All other types are invalid.
add, delete, and modify methods
These methods are used to modify the datasource, when possible.
Depending on the technical constraints of the data source, they can all be implemented in the same way or not.
Depending on the plugin implementation, they may rely on the query
argument or the vars
argument, or both.
Error handling
No exception should be caught, to allow Hermes error handling to function properly.
Messagebus consumers
Description
A messagebus consumer plugin is simply a AbstractMessageBusConsumerPlugin
subclass designed to link hermes-client with any message bus.
It requires methods to connect and disconnect to message bus, and to consume available events.
Features required from message bus
- Allow to specify a message key/category (producers) and to filter message of a specified key/category (consumers)
- Allow to consume a same message more than once
- Implementing a message offset, allowing consumers to seek the next required message. As it will be stored in clients cache, this offset must be of one of the Python types below:
Requirements
Here is a commented minimal plugin implementation that won’t do anything.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass AbstractMessageBusConsumerPlugin
from lib.plugins import AbstractMessageBusConsumerPlugin
# Required to return Event
from lib.datamodel.event import Event
# Required for type hints
from typing import Any, Iterable
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "MyMessagebusConsumerPluginClassName"
class MyMessagebusConsumerPluginClassName(AbstractMessageBusConsumerPlugin):
def __init__(self, settings: dict[str, Any]):
# Instantiate new plugin and store a copy of its settings dict in self._settings
super().__init__(settings)
# ... plugin init code
def open(self) -> Any:
"""Establish connection with messagebus"""
def close(self):
"""Close connection with messagebus"""
def seekToBeginning(self):
"""Seek to first (older) event in message bus queue"""
def seek(self, offset: Any):
"""Seek to specified offset event in message bus queue"""
def setTimeout(self, timeout_ms: int | None):
"""Set timeout (in milliseconds) before aborting when waiting for next event.
If None, wait forever"""
def findNextEventOfCategory(self, category: str) -> Event | None:
"""Lookup for first message with specified category and returns it,
or returns None if none was found"""
def __iter__(self) -> Iterable:
"""Iterate over message bus returning each Event, starting at current offset.
When every event has been consumed, wait for next message until timeout set with
setTimeout() has been reached"""
Methods to implement
Connection methods
As they don’t take any arguments, the open
and close
methods should rely on plugin settings.
seekToBeginning method
Seek to first (older) event in message bus queue.
seek method
Seek to specified offset event in message bus queue.
setTimeout method
Set timeout (in milliseconds) before aborting when waiting for next event. If None
, wait forever.
findNextEventOfCategory method
Lookup for first message with specified category and returns it, or returns None
if none was found.
As this method will browse the message bus, the current offset will be modified.
__iter__ method
Returns an Iterable that will yield all events available on message bus, starting from current offset.
Those unserializable attributes of Event
instance must be defined before yielding it:
offset
(int | float | str | bytes): offset of the event in message bus
timestamp
(dattime.datetime): timestamp of the event
Event properties and methods
Methods
Messagebus producers
Description
A messagebus producer plugin is simply a AbstractMessageBusProducerPlugin
subclass designed to link hermes-server with any message bus.
It requires methods to connect and disconnect to message bus, and to produce (send) events over it.
Features required from message bus
- Allow to specify a message key/category (producers) and to filter message of a specified key/category (consumers)
- Allow to consume a same message more than once
- Implementing a message offset, allowing consumers to seek the next required message. As it will be stored in clients cache, this offset must be of one of the Python types below:
Requirements
Here is a commented minimal plugin implementation that won’t do anything.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Required to subclass AbstractMessageBusProducerPlugin
from lib.plugins import AbstractMessageBusProducerPlugin
# Required for type hints
from lib.datamodel.event import Event
from typing import Any
# Required to indicate to hermes which class it has to instantiate
HERMES_PLUGIN_CLASSNAME = "MyMessagebusProducerPluginClassName"
class MyMessagebusProducerPluginClassName(AbstractMessageBusProducerPlugin):
def __init__(self, settings: dict[str, Any]):
# Instantiate new plugin and store a copy of its settings dict in self._settings
super().__init__(settings)
# ... plugin init code
def open(self) -> Any:
"""Establish connection with messagebus"""
def close(self):
"""Close connection with messagebus"""
def _send(self, event: Event):
"""Send specified event to message bus"""
Methods to implement
Connection methods
As they don’t take any arguments, the open
and close
methods should rely on plugin settings.
_send method
Note
Be careful to overload the _send()
method and not the send()
one.
The send()
method is a wrapper that handles exceptions while calling _send()
.
Send a message containing the specified event.
The consumer will require the following properties:
evcategory
(str): Key/category of the event (stored in the Event)
timestamp
(dattime.datetime): timestamp of the event
offset
(int | float | str | bytes): offset of the event in message bus
See Event properties and methods below.
Event properties and methods
Properties
Methods