Stream Management

In PM4Py, we offer an interface for stream management, that is intended to be an entrypoint accepting events from potentially many sources and sending the same events to potentially many targets.

Here, we offer an overview of the features provided by the interface.

Instantiation and Start of the Stream

The stream can be instantiated through the instructions

from pm4py.streaming.stream.stream import LiveEventStream
stream = LiveEventStream()

Then, it can be started through the command:

stream.start()

From that command, it starts to accept events.

Usage of a Listener Algorithm – the Event Printer example

We offer an example implementation of a listener algorithm, that simply prints the event received from the stream. To use and register that to the stream, the following commands can be provided:

from pm4py.streaming.algo.event_printer import EventPrinter
event_printer = EventPrinter()
stream.register(event_printer)

Creation of a Listener Algorithm – Extending the Interface

The interface is found in pm4py.streaming.algo.interface, and the name of the class is StreamingAlgorithm.

Essentially, a single method (receive) needs to be overriden. To report, as example, the code of the EventPrinter:

from pm4py.streaming.algo import interface

class EventPrinter(interface.StreamingAlgorithm):

    def receive(self, event):
        print(event)

Appending an Event to the Stream – and seeing the reaction of the EventPrinter

For this purpose, the following code can be used, that appends an Event with two attributes to the stream

from pm4py.objects.log.log import Event
stream.append(Event({"case:concept:name": "1", "concept:name": "A"}))

.. The EventPrinter that was added to the stream is responding as expected.

Stopping the Stream

When the stream is not useful anymore, the following code can be used:

stream.stop()