Writing Custom DSPP Filters

Introduction

Datastreams provides a powerful way of collecting kernel and user level data generated by the computations on a system. For most analyses, however, simply gathering the data is not enough. Because of this, Datastreams provides both a library of commonly use filters and the framework for writing custom filter modules to filter and massage data in application-specific ways described here.

Configuration File Specification

In order to use your custom filter modules, you must declare them in the postprocessing configuration file. This is done by specifying a string or list of strings identifying your custom filter modules and how you wish to configure and use them in the post-processing pipeline.

FIXME.D - the language here is very obscure. The way the naming of the module files is discussed is ceompletely unhelpful.

For python source files, you may reference your customer filters in the ‘filters’ dictionary by the filename minus the .py extension:

<foo>
...
filter_modules = "my_local_modules.py"
filters = {
  pipe1 = [
    head.input(...)
    my_local_modules.my_filter(...)
  ]
}
...

Imported modules are referenced in the ‘filters’ dictionary by their full module name:

<foo>
...
filter_modules = "mypackage.myfilters"
filters = {
  pipe1 = [
    head.input(...)
    mypackage.myfilters.my_favorite_filter(...)
  ]
}

Subclassing the Filter Base Class

Filtering.Filter is the base class for all filters and it can be found at:

FIXME.D - verify the path name after Cmake conversion kusp/datastreams/src/datastreams/postprocess/filtering.py

There are four methods as well as several attributes you will subclass to create a custom filter. All of the filters in the standard library subclass the Filter Base class, and are good examples of how to use it.

Methods

Filter.initialize(self)

This is essentially the constructor for a custom filter, but the filter is already installed and ready in the pipeline before this initialization function is called. A reference to the pipeline in which this filter is installed is available in self.pipeline. Also, any namespace information as well as any parameters you specified for this filter will already have been processed and will be available in self.namespace and self.params, respectively. The purpose of this function is to do any extra initialization that is specific to your filter. This usually includes things like opening any files on disk if necessary, or retrieving any namespace pointers associated with this filter. See the Namespace Pointers section.

Filter.process(self, entity)

This function performs the actual filtering of the entities it receives in the pipeline. Entities are received one at a time, so if you require that you see the entire datastream before performing the filter function, simply buffer the entities here and perform the filtering function in Filter.finalize(self) (see the utility.sort() filter for an example). Also, you will need to call self.send() (which sends the entities to all the outputs specified in the output_names attribute) or self.send_output() (which sends the entities to the output specified) to propagate the entities down the pipeline. By default, the inherited output_names attribute holds the string “default”, which results in self.send() sending entities to the next filter in the pipeline.

Filter.finalize(self)

This function will be called when there are no more entities for your filter to process. Use this function to close any files that were used, or, if you had buffered entities in filter.process, send the buffered entities down the pipeline in this function.

Filter.abort(self)

This function is called if any fatal errors occurred upstream. Use this function to delete any output files created during initialize() or process(). If you created any output files during finalize(), there is no need to delete them here.

Attributes

expected_parameters

In order to allow your custom filter to accept parameters from the postprocessing configuration file, you must provide a pattern that the parameters can be verified against. This must be stored in the expected_parameters variable of your Filter subclass. A simple example of this is in the Utility.Sort class, which is declared globally in the Utility.Sort class:

expected_parameters = {
  sort_key : {
    "types": "string",
    "doc" : "Time units to sort by",
    "default": "ns"
  }
}

In the post-processing configuration file, the filter is listed inside a pipeline where it needs to be used:

utility.sort(
  sort_key = tsc
)

These parameters will have been parsed and will be accessible by the time post-processing calls your filter’s initialize() function.

output_names

This is a list of the output names other pipelines in the filters dictionary can connect to. By default, this holds the string “default”, which will send output from this filter to the next filter in the pipeline. If other output_names are specified, other pipelines in the filters dictionary can connect to this output explicitly using the conn invocation in the head filter. For example, if you wanted a subset of the entities for this filter sent to a different output, you would first specify output_names as something like:

output_names = [ "default", "other_output" ]

Then, in the Filter.process(self, entity) function of this filter, you would explicitly specify to send these entities to the special output using the self.send_output() function. An example might be:

if entity is special_entity:
  self.send_output("other_output", entity)

Finally, use the conn invocation to connect this output to another pipeline. For example, if our custom filter was named “foo”, the postprocessing configuration file might have:

other_pipe = [
head.input(
conn = [ foo(output=”other_output”) ] ...

builtin_namespace

This is a string that allows you to hardcode a built-in namespace for your filter. The syntax of specifying the namespace is exactly the same as specifying a namespace in the post-processing configuration file. Simply put the namespace information you wish to specify (what would usually go on the right hand side of namespace = in the post-processing configuration file), into this string. An example of this can be found in the “kernel.py” filter. However, use of this attribute is not recommended. If possible, place namespace information in the postprocessing configuration file.

debug_flag

This is a flag that specifies whether or not the self.debug() calls should print anything. By default, it is false.

process_admin

This is a flag that specifies whether or not your filter should process administrative events. By default, it is false. Normally, these events bypass the process() function to ensure they are not consumed. If you must use them, set this flag to true. However, be careful when turning this option on because not propagating administrative events correctly can cause many different problems later in the pipeline.

Namespace Pointers

One of the main methods used to filter through entities is to use namespace pointers. This class allows you to to check if an entity is of a certain type during postprocessing. In order to use this class in a custom filter, you must first retrieve the NamespacePointer associated with the namespace event you wish to use in Filter.initialize(self). If the group of entities you wanted to filter out were named “FOO/BAR”, you would type:

def initialize(self)
  self.foobar_ptr = self.get_ns_pointer("FOO/BAR")
  ...

Then, in the Filter.process(self) function, you could simply check the cid of the given entity against foobar_ptr:

def filter(self):
  if(entity.get_cid() == foobar_ptr.get_cid()):
    # Do not send this entity down the pipeline
      ...

This method saves much time over doing a string comparison of the entity group and family names.

Indices and tables