Welcome to Data Streams Post-Processing Internals Manual’s documentation!


Need to add documentation about the raw files DSUI/DSKI header is created.

Postprocessing files:

  • entities.py
  • filtering.py
  • headfilter.py
  • inputs.py
  • pipeline.py
  • postprocess
  • pipes.cspec
  • event_data.py (this is definitely used in interpreting raw data files)
  • head.py (in filters, for some reason)
  • namespaces.py (in datastreams, above postprocessing)
  • queues.py (this may be used for the outputmultiplexer)

Useless files?:

  • ppexcept.py
  • outputs.py
  • test.ns

Files I can’t make head or tail of, but which don’t seem to be used by PP:

  • dcg.py
  • ltt.py
  • postprocessd
  • syscall.py

Note: It seems like all of the postprocessing files have also been copied into postprocessing/filters


Describes the attributes of the various types of entities. There is a parent class Entity, and its descendants are: Event, Interval, Histogram, and Counter.

The entity class has these attributes:

  • cid
  • time (dictionary of various time coordinates, such as TSC and ns)
  • pid
  • namespace

The namespace attribute is an instance of the one of the classes in Namespaces. It appears that the namespace object created in entities automatically links to the respective <blank>Spec class in Namespaces. For example, the namespace in an Event is an instance of the EventSpec class. There is a comment that the namespace for an entity should be cleared before pickling (presumably for efficiency). How exactly it is then supposed to be retrieved is unclear.

All classes inherit these, and add others (such as tag number and extra data for events, start and end time for intervals, and buckets and lower and upper bound for histograms).

Most of the functions within these classes are merely getters and setters for these attributes.


This is additional information about entities. It contains the event and family name, description, and some more information.


This is the parent class for all additional filter modules. It initializes the parameters, output, and other values. It has many functions which initalize, access, and print its various attributes. There are also functions which control receiving and sending entities along the pipeline. This is where the initialize, process, and finalize functions of a filter module get called.

The multiple possible outputs could possibly enable splitting the pipeline.

OutputMultiplexer looks like an attempt at sending output to multiple pipelines. However, it is clearly not complete. It attempts to call PipelineQueues (in queues.py), which is also incomplete. It also attempts to call a function in outputs.py, which is an empty file.


This is the first filter in every pipeline. Its input is actually the input sources of the pipeline. If there are multiple sources rather than just one, they are merged into one data stream for the pipeline. Each source’s entities are gotten by the function fetch. Fetch is called by run, which controls the execution of the whole pipeline. It sends entities down the pipeline until there are no more left.


This module describes each type of input source that can be passed into the head filter. The input source can be a connection to another pipeline, or one of several different types of files. Using the struct class, an identifier must be placed at the beginning of any file that will be input to a head filter. This specifies the type of file it is: pickle or a ‘raw’ file. These raw files are generally created from other postprocessing. XML does exist but is unimplemented.

The various classes (for example, the PickleInputSource class) specify functions which override the normal open, read, and close operators for files. This makes fetching entities from any type of file easy.


Note: you can split the pipelines. See examples/unified-sigpipe/sigpipe_dski1.pipes for an example on that.

This module contains the class ProcessingNode, which contains one or more pipelines, which is another class within this module. It has functions which enable creating additional pipelines within the node, and multiple functions to run the whole processing node. The normal run() function runs all the pipelines concurrently, which creates difficulties when there are dependencies. For example, if the 5th pipeline’s head filter has as input a file named temp. This file is actually a pickled file from the 2nd pipeline. However, opposed to what you may think, the 5th pipeline will execute first. This is because as they are all initalized in the beginning, the file has not been created by the 2nd pipeline, and so the 5th seems to create it and, seeing it is empty, immediately runs and finishes processing. This is obviously not the desired functionality of the pipelines.

There is a function which claims it can run the pipelines sequentially. However, the name of this pipeline is run_single_doesntworkanymore. This suggests that it may not work anymore– which it does not seem to.

The reason it seems impossible to run the pipelines sequentially may be because of the class they inherit from: threading. Each pipeline is run as a separate thread of control.

I thought another reason the pipelines could not run sequentially may have been because they are put in a dictionary, which doesn’t have a specific order. However, even when I implemented the pipeline dictionary as a list, they did not run in order. This definitely needs further exploration.


This is a specification file for the configuration files. Using the name filters for the dictionary of pipelines seems a bit confusing. This also problem exists in the Pipeline class.

Basic Structure

When you run a postprocess command, you are running the file postprocess in the postprocess directory. It has a variety of input parameters, bt generall you run it with the f option. In effect it takes your configuration file, runs it through the parser to access the pipelines, and creates a PipelineNode object.

The PipelineNode class is contained in the pipeline.py file. Effectively the PipelineNode class is a dictionary of pipelines. The name of the pipeline is the key, and the value is the pipeline itself. Each pipeline is an object of the class Pipeline. This Pipeline class now contains a list of all the filters within the pipeline. These pipelines are then initialized by running the function create_pipeline on each one. This function instantiates the list of filters within the pipeline. Each pipeline must begin with a head filter, which is contained in the headfilter.py file.

Once all of these pipelines and filters are initialized, the postprocess file then calls the run function in the pipeline.py file. This function is specifically designed to run all the pipelines concurrently. There are multiple functions to run the pipelines, but none seem to be currently capable of running them sequentially, which would be a desired functionality. In any case, run first calls establish_connections, which (according to a comment) must be called before running pipelines. However, in a quick test, they did seem to run without this command. Interestingly, the actual headfilter.py file’s establish_connections seems to be empty, but an inheriting class, in file head.py in the postprocess/filters direcory, does have a seemingly complete establish_connections function. While I do believe we use this file to some degree, I’m not sure how.

Then, after connections have been established, the current run function basically just lets the pipelines loose. The Pipeline class inherits from the python threading class. The run function then just calls the threading functions start and join. The start function creates a separate thread of control, and runs that object’s run function. The join function blocks the calling thread until the thread whose join function was run finishes.

Once the pipelines run functions have been called, this just calls the first filter in the pipeline’s run function. Yes, there are plenty of run functions. The first filter always has to be the head filter, and so this particular run is within the headfilter.py file. This run function goes through the input sources, and, using the fetch function, retrieves each entity. There are several different types of possible inputs: raw, pipeline, or pickled file. It calls the read function from the inputs.py file. This file determines the connection or file type, and using the correct method to retrieve entities from that file or pipeline, it retrieves a single entity. It is then passed along the pipeline. Each filter in the pipeline (including the head filter) is subclassed from the filtering.py file. This has a retrieve function, which takes the incoming entity, checks to make sure it is not the last one (a PipelineEnd entity), and calls the process function. This function is overridden by every filter in order to do different things to the datastream. The entities are then output with various send functions. Thus the entities go along the pipelines and through each of the filters.

Main isses with the postprocessing framework:

  1. Lack of ability to run pipelines and filters sequentially
  2. Problems with pickling files
  • Specifically, the pickled files cannot be unpickled
  • This is connected with the first problem: If the filter that wants to unpickle starts running before the filter which creates the pickled file, then it either won’t find any entities to process, or it won’t find a file to open.
  • A rather separate issue is with the entities namespaces. These supposedly should be cleared before pickling, for efficiency, but in order to retrieve these after unpickling, apparently we need to be able to access the dictionary of family and entity names at the beginning of the datastream. This is currently not being pickled or unpickled?

DSUI header generation:

Subroutine which fills in the DSUI output binary file header values:
Location in DSUI code where the header is initially written:
datastreams/dsui/libdsui/logging_thread.c -> init_logging_thread()

Location in DSUI code where each DSUI IP is added to the dictionary of namespace events at the beginning of the DSUI output binary file:

datastreams/dsui/libdsui/dsui.c -> __dsui_register_ip()

DSKI header generation:

The header for the DSKI binary output file is generated in:



def write_dski_header(fout, ns):

The path of execution is:

  1. Fetching dski_context returns a dictionary of all IPs in the kernel
  2. class dski_context also features a subroutine create_channel
  3. create_channel initialized an object of class dski_channel
  4. during initialization, a class dski_channel creates any needed reader threads for the output channels it is reading from. The reader threads are wrapped in the Python class reader_thread.
  5. during initialization, a class reader_thread invokes write_dski_header
  6. write_dski_header outputs to the DSKI output file for each reader_thread the unique header for this instance of DSKI use, which includes the complete dictionary of DSKI IP points, the system name, etc.


Indices and tables