Streaming Process Mining

Streaming Process Mining

In PM4Py, we offer support for streaming process mining functionalities, including:

  • Streaming process discovery (DFG)
  • Streaming conformance checking (footprints and TBR)
  • Streaming importing of XES/CSV files

Streaming Package – General Structure

The management of the stream of events is done by the pm4py.streaming.stream.live_event_stream.LiveEventStream class. This class provides access to two methods:

  • register(algo): registers a new algorithm to the live event stream (that will be notified when an event is added to the stream.
  • append(event): adds an event to the live event stream.

The LiveEventStream processes the incoming events using a thread pool. This helps to manage a “flood” of events using a given number of different threads.

For the streaming algorithms, that are registered to the LiveEventStream, we provide an interface that should be implemented. The following methods should be implemented inside each streaming algorithm:

  • _process(event): a method that accepts and process an incoming event.
  • _current_result(): a method that returns the current state of the streaming algorithm.

Streaming Process Discovery (Directly-Follows Graph)

The following example will show how to discover a DFG from a stream of events.

Let’s first define the (live) event stream:

from pm4py.streaming.stream.live_event_stream import LiveEventStream

live_event_stream = LiveEventStream()

Then, create the streaming DFG discovery object (that will contain the list of activities and relationships inside the DFG):

from pm4py.streaming.algo.discovery.dfg import algorithm as dfg_discovery

streaming_dfg = dfg_discovery.apply()

Then, we need to register the streaming DFG discovery to the stream:

live_event_stream.register(streaming_dfg)

And start the stream:

live_event_stream.start()

To put some known event log in the stream, we need to import a XES log:

import os
from pm4py.objects.log.importer.xes import importer as xes_importer

log = xes_importer.apply(os.path.join("tests", "input_data", "running-example.xes"))

And then convert that to a static event stream:

from pm4py.objects.conversion.log import converter as stream_converter

static_event_stream = stream_converter.apply(log, variant=stream_converter.Variants.TO_EVENT_STREAM)

Then, we can add all the events to the live stream:

for ev in static_event_stream:
    live_event_stream.append(ev)

Then, stopping the stream, we make sure that the events in the queue are fully processed:

live_event_stream.stop()

At the end, we can get the directly-follows graph, along with the activities of the graph, the set of start and end activities, by doing:

dfg, activities, sa, ea = streaming_dfg.get()

If we do print(dfg) on the running-example.xes log we obtain:

{(‘register request’, ‘examine casually’): 3, (‘examine casually’, ‘check ticket’): 4, (‘check ticket’, ‘decide’): 6, (‘decide’, ‘reinitiate request’): 3, (‘reinitiate request’, ‘examine thoroughly’): 1, (‘examine thoroughly’, ‘check ticket’): 2, (‘decide’, ‘pay compensation’): 3, (‘register request’, ‘check ticket’): 2, (‘check ticket’, ‘examine casually’): 2, (‘examine casually’, ‘decide’): 2, (‘register request’, ‘examine thoroughly’): 1, (‘decide’, ‘reject request’): 3, (‘reinitiate request’, ‘check ticket’): 1, (‘reinitiate request’, ‘examine casually’): 1, (‘check ticket’, ‘examine thoroughly’): 1, (‘examine thoroughly’, ‘decide’): 1}

Streaming Conformance Checking (TBR)

The following examples will show how to check conformance against a stream of events with the footprints and token-based replay algorithms. For both the examples that follow, we assume to work with the running-example.xes log and with a log discovered using inductive miner infrequent with the default noise threshold (0.2).

The following code can be used to import the running-example.xes log

import os
from pm4py.objects.log.importer.xes import importer as xes_importer
log = xes_importer.apply(os.path.join("tests", "input_data", "receipt.xes"))

And convert that to a static stream of events:

from pm4py.objects.conversion.log import converter as log_converter
static_event_stream = log_converter.apply(log, variant=log_converter.Variants.TO_EVENT_STREAM)

Then, the following code can be used to discover a process tree using the inductive miner:

from pm4py.algo.discovery.inductive import algorithm as inductive_miner
tree = inductive_miner.apply_tree(log, variant=inductive_miner.Variants.IMf)

And convert that to a Petri net:

from pm4py.objects.conversion.process_tree import converter as pt_converter
net, im, fm = pt_converter.apply(tree)

Now, we can apply the streaming TBR.

Then, we create a live event stream:

from pm4py.streaming.stream.live_event_stream import LiveEventStream
live_event_stream = LiveEventStream()

And the streaming token-based replay algorithm:

from pm4py.streaming.algo.conformance.tbr import algorithm as tbr_algorithm
streaming_tbr = tbr_algorithm.apply(net, im, fm)

Moreover, we can register that to the live event stream:

live_event_stream.register(streaming_tbr)

And start the live event stream:

live_event_stream.start()

After that, we can add each event of the log to the live event stream:

for ev in static_event_stream:
    live_event_stream.append(ev)

And then, stop the event stream:

live_event_stream.stop()

And get statistics on the execution of the replay (how many missing tokens were needed?) as a Pandas dataframe. This method can be called throughout the lifecycle of the stream, providing the “picture” of the replay up to that point:

conf_stats = streaming_tbr.get()
print(conf_stats)

In addition to this, the following methods are available inside the streaming TBR that print some warning during the replay. The methods can be overriden easily (for example, to send the message with mail):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_missing_tokens
  • message_case_not_in_dictionary
  • message_final_marking_not_reached

Streaming Conformance Checking (footprints)

Footprints is another conformance checking method offered in PM4Py, which can be implemented in the context of streaming events. In the following, we see an application of the streaming footprints.

First of all, we can discover the footprints from the process model:

from pm4py.algo.discovery.footprints import algorithm as fp_discovery
footprints = fp_discovery.apply(tree)

Then, we can create the live event stream:

from pm4py.streaming.stream.live_event_stream import LiveEventStream
live_event_stream = LiveEventStream()

Then, we can create the streaming footprints object:

from pm4py.streaming.algo.conformance.footprints import algorithm as fp_conformance
streaming_footprints = fp_conformance.apply(footprints)

And register that to the stream:

live_event_stream.register(streaming_footprints)

After that, we can start the live event stream:

live_event_stream.start()

And append every event of the original log to this live event stream:

for ev in static_event_stream:
    live_event_stream.append(ev)

Eventually, we can stop the live event stream:

live_event_stream.stop()

And get the statistics of conformance checking:

conf_stats = streaming_footprints.get()
print(conf_stats)

In addition to this, the following methods are available inside the streaming footprints that print some warning during the replay. The methods can be overriden easily (for example, to send the message with mail):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_footprints_not_possible
  • message_start_activity_not_possible
  • message_end_activity_not_possible
  • message_case_not_in_dictionary

Streaming Importer (XES – trace-by-trace)

In order to be able to process the traces of a XES event log that might not fit in the memory, or when a sample of a big log is needed, the usage of the XES trace-by-trace streaming importer helps to cope with the situation.

The importer can be used in a natural way, providing the path to the log:

from pm4py.streaming.importer.xes import importer as xes_importer

streaming_log_object = xes_importer.apply("C:/running-example.xes", variant=xes_importer.Variants.XES_TRACE_STREAM)

And it is possible to iterate over the traces of this log (that are read trace-by-trace):

for trace in streaming_log_object:
    print(trace)

Streaming Importer (XES – event-by-event)

In order to be able to process the events of a XES event log that might not fit in the memory, or when the sample of a big log is needed, the usage of the XES event-by-event streaming importer helps to cope with the situation. In this case, the single events inside the traces are picked during the iteration.

The importer can be used in a natural way, providing the path to the log:

from pm4py.streaming.importer.xes import importer as xes_importer

streaming_ev_object = xes_importer.apply("C:/running-example.xes", variant=xes_importer.Variants.XES_EVENT_STREAM)

And it is possible to iterate over the single events of this log (that are read during the iteration):

for event in streaming_ev_object:
    print(event)

Streaming Importer (CSV – event-by-event)

In order to be able to process the events of a CSV event log that might not fit in the memory, or when the sample of a big log is needed, Pandas might not be feasible. In this case, the single rows of the CSV file are parsed during the iteration.

The importer can be used in a natural way, providing the path to a CSV log:

from pm4py.streaming.importer.csv import importer as csv_importer
log_object = csv_importer.apply("C:/running-example.csv")

And it is possible to iterate over the single events of this log (that are read during the iteration):

for ev in log_object:
    print(ev)

.