Signal Pipeline DSKI Example


This example is an extension of the previous one in the Initial Data Streams Tutorial Sequence which used the same multi-threaded pipeline computation exchanging signals, but which only gathered Data Streams events from the user level using DSUI. The most important point for the analysis of the data is that while the DSUI instrumentation points note the time at which events take place in the user code, they cannot indicate any interrupt processing or context switch to unrelated threads on the system that may take place between sending a signal at pipeline stage N and receiving at pipeline stage N+1. This experiment adds DSKI instrumentation to gain a view of context switching and thus of each thread’s execution intervals, but it still ignores interrupt processing.

It is also worth noting in this example that the DSKI context switch events are not filtered and therefore represent context switches by all processes on the machine. We can use DSUI events where each pipeline stage tells us its PID to select the pipeline threads’ context switch events from the raw DSKI data, which let us calculate the execution intervals of each pipeline stage. Further post-processing can then enables us to determine which send/receive pairs at the DSUI level are interrupted by context switches to other unrelated processes. A variation on this is to use to Active Filtering of the DSKI events to only gather those related to pipeline threads. This would illustrate a commonly used technique for reducing the instrumentation effect and is discussed in another example: Active Filtering


This example shows how to collect kernel events of a running process and how these events can be analyzed during postprocessing. It covers a number of KUSP tools and features including: DSKI instrumentation, DSKI filtering, and writing custom postprocessing filters. It is intended to be the third example in the Initial Data Streams Tutorial Sequence. If you have not gone through the recommended first and second examples, you should do so now.


The sigpipe experiment spawns a number of threads representing a pipeline, and sends a number of signals down the thread pipeline. While the experiment runs, a DSKI datastream is opened. This datastream collects all enabled kernel events. It only has the SCHEDULER/SWITCH_TO and SCHEDULER/SWITCH_FROM events enabled. These events are logged on either side of the scheduler’s context_switch() call, and during postprocessing, they are used to determine how long each thread in the pipeline actually used the CPU.

Checking Out and Building

The sigpipe experiment is in the kusp examples tree under unified_sigpipe. You will need datastreams installed on your testbed as well as the dski module inserted into your kernel. Follow the instructions for Building the Kernel if you have not yet built and installed the KUSP-kernel and and Building and Installing KUSP if you have not yet built and installed the KUSP software. DSKI is packaged as a loadable module and you must insert the dski module into your kernel if you have not already done so:

#FIXME.D - should we do it this way, or run Tyrian's udev rule script?
sudo chmod 666 /dev/dski

If you have not built the KUSP examples before, go to Building KUSP Examples

After this is done, go to DSKI signal pipeline example:

bash> cd $KUSPROOT/examples/build/unified_sigpipe/

Running the Example

To run the example, type:

bash> cd unified_sigpipe
bash> make -f sigpipe-dski1

The makefile target sigpipe-dski1 both runs the file and runs the postprocessing command. You can look at the makefile and see that the command to actually run the experiment is:

dskictrl -c sigpipe1.dski

Consider what the sigpipe1.dski file is specifying. First, it specifies that the data collected will be output in files in the sigpipe1.dski.bin* directory. It also tells dskictrl to spawn a child process while it logs data. By default, dskictrl will stop logging dski events after the child process has finished execution:

output_base = "sigpipe1.dski.bin"
child = "./sigpipe --dsui-config sigpipe.dsui --threads=5 --stimuli=1000"

chan = {
    buffer_size = 500000,
    buffer_count = 50

The next section configures the channel for the datastream used in this example. Specifically, it tells DSKI to open a relay channel with the specified number of buffers of the specified size to use in logging the kernel events for each stream to a file in user space:

dski = {
    channel = chan
        filters = []
        enabled = {
            SCHEDULER = [

This section specifies the properties of the DSKI datastream. The dski datastream logs using chan. It attaches no filters and enables the SCHEDULER/SWITCH_TO and SCHEDULER/SWITCH_FROM events in the kernel.

Running the Example More Realistically

FIXME: Currently running the kernel compile while running the experiment has no effect on the number of context switches, or the proportion. This needs to be fixed before this portion of the experiment has any meaning. That goes for the Active Filtering example as well.

Possible add sleep time in the sigpipe example to extend the duration of the example, so that it is more comparable to the duration of the kernel compile. This would give the kernel compile’s comepeting load much more impact on the number of context switch events.

When we ran this example just now, it was not completely realistic. In most circumstances, the system is not going to be idle. Unless you were doing something intensive in the background, the amount of context switch events that would be filtered out would not be too huge. We want to create competing load for the machine in order to more fully demonstrate the amount of irrelevant context switch events that can occur. In our case, we decided to use a kernel compile. This is a process which would create one of the most heavy loads on the system. A kernel compile forks many times, which creates significant amounts of context switches.

In order to run this example while running a kernel compile, execute the command:

make -f sigpipe-dski1-compete-load

This effectively checks out a kernel (2.6.24/ to your ~/tmp directory and builds it, and then runs the example. However, it is not actually saved to the ~/tmp directory, but rather saved to /projects/kurt, and a symbolic link is created with ~/tmp. You can look in the file to see how exactly this is done.

Running this will take quite a long time, so you may want to take a break.

Once it is finished, you will notice that the proportion of events that are filtered out is much much higher this time. This demonstrates how using Active Filtering in the next example will be that much more important.


The postprocessing has already been run when you ran the makefile target. However, the command to run the postprocessing alone is:

postprocess f sigpipe_dski1.pipes

The command will print out the number of events in the unfiltered datastream, the filtered datastream, and the number of context switch events in a single gap. It will also generate a text file actually showing every context event during that gap (note that none of the PIDs are from the sigpipe example). Also, several histograms are generated. Three are the same as the previous example (sigpipe DSUI), but the two new ones represent the CPU time of the filtered and unfiltered datastream, respectively. In order to understand this output, it is essential to understand the postprocessing configuration file and the filters that it uses.

DSPP Filter Pipeline: sigpipe_dski1.pipes

The processing of the raw experimental data is specified in the sigpipe_dski1.pipes configuration file. This process can be viewed as a set of related but distinguishable logical sections.

Merging Datastreams

You should be familiar with the basics of how postprocessing configuration files work from the Signal Pipeline DSUI , but there are some new features in this file that were not present in that example. The first thing to notice is the actual flow of the pipeline. Because there are different files representing different datastreams, several different head filters are used to setup the datastreams we wish to operate on. The datastreams represented in the sigpipe1.dski.bin/chan/cpu.bin* and sigpipe.dsui.bin are read in and merged in the f1 pipeline using the conn function:

pids = [
              conn = [custom, dski]

At this point the datastream in the pids pipeline is a merged version of the custom and dski pipelines.

Splitting Datastreams

Another capability of postprocessing is splitting datastreams. By specifying the parameter output_names within a filter, you can send the datastream to multiple pipelines. Using the split_outputs filter in, you can generalize this, passing in the pipeline names you want to send output to, as you can see at the bottom of the dsui pipeline:

          outputs = ["count_uf", "count_f", "custom"]

This sends the datastream to three different pipelines: count_uf, count_f, and custom.

Using Custom Filters

The next thing to notice is the use of the custom ‘’sigpipe_dsui_filter’’ and ‘’ filters. These are python files which contain the custom filters written specifically for this experiment. They are located in and The purpose of this code is to perform filtering on the data generated by the sigpipe experiment that is not possible using the generic filters installed with postprocessing (these are located in $KUSPROOT/subsystems/datastreams/postprocess/filters/). Before writing a custom filter, you should always check the generic filters to make sure the filtering you need is not already implemented by one of them. You can see the documentation for this filter library here: DSPP Filter Library

The Post-Processing Framework makes using custom filters easy. In order to use a custom filter, all you have to do is list the name of the Python file in the filter_modules list. In this example, this is done here:

filter_modules = ["", "custom_PIDs"]

Postprocess will automatically import this code when it is invoked. The only thing that is left to do is to actually write the custom filter itself.

Writing Custom Filters

The new custom filter in this example is the ‘’custom_PIDs’’ filter. Using this filter, we are able to access the PID values of the sigpipe threads, which are located in the tag values of the THREAD/THREAD_ID events. Once we have these, we filter the context switch events by those PID values discovered earlier. As you can see, this is a very specific filter, which is why it is not included in the filter library. The PIDs are saved to a file, which is then read by another filter in order to filter by PID.

It should not surprise you that in order to create a custom filter, some classes from the post-processing framework will need to be imported. The first few lines of this file are essential for most custom filters for reasons that will become clearer below:

from postprocess import filtering
from postprocess import entities
from datastreams import namespaces

Every filter used in postprocessing must be derived through inheritance from the abstract base class Filter (located in kusp/trunk/subsystems/datastreams/postprocess/ If you are not familiar with Python, this is accomplished by putting the class you are inheriting from in parentheses when you define your class:

class filter_context_events(filtering.Filter):

There is documentation provided in the file, as well as a dedicated document: How to Write a Custom Filter. This desribes in some detail which methods to subclass and how particular variables can be used, but the following sections will describe these issues in the context of this example for the most important methods and variables.


The expected_parameters variable is a string written in the Config File Language that lays out which parameters your filter can accept. If you lay this section out correctly, the parameters given by the user will be accessible in the self.params dictionary. For example, a common option of many filters is to discard the specified events, rather than only retaining them. In this example, the desired functionality is that if the user sets discard to true, context switch events with matching PIDs should not be passed on in the pipeline. This is accomplished by first listing consume as a boolean in your expected_parameters string:

"discard" : {
     "types" : "boolean",
     "doc" : "If True, discard events with matching PIDs. If False, \
           retain events with matching PIDs.",
     "default" : False

Second, recording the value of the parameter in initialize:

self.discard = self.params["discard"]

And third, using this parameter to determine which events are allowed to proceed in the pipeline. In the following Python conditional, which is in the finalize function, if discard is false and the entity’s PID matches, the entity is sent along the pipeline. If discard is true, the entity must not match in order for it to be sent:

if (not self.discard) and entity.get_pid() in self.real_pids:
elif self.discard and entity.get_pid() not in self.real_pids:


In addition to the expected_parameters variable, there are a few important methods you should subclass when writing a custom filter. The first method is called initialize and it can be thought of as the filter’s constructor method. By the time this method is called, however, the filter is already installed and ready in the pipeline. Any parameters you’ve specified in expected_parameters will be accessible in self.params at this point. The purpose of this method is to do any extra initialization specific to your filter. In this example, we use the method to copy parameters into self.params as well as to initialize the data structures we will be using. Another thing you’ll notice in this function is the use of the namespace pointer to get references to the instrumentation points we want to match:

self.to_ptr = self.get_ns_pointer("SCHEDULER/SWITCH_TO")
self.from_ptr = self.get_ns_pointer("SCHEDULER/SWITCH_FROM")

This is a common practice in almost all of the postprocessing filters because it is much faster than actually doing a string comparison of the entire event name. Now, in order to determine if a given entity, an entity refers to a logged event, is a SCHEDULER/SWITCH_TO entity, we just perform the following check in the process method:

if entity.get_cid() == self.to_ptr.get_cid():


The method used in postprocessing filters that will actually have access to a stream of logged events is the process method. The entities are received one at a time as an argument to the process method, which is simply called in a loop until the end of the datastream is reached. In order to advance entities down the pipeline, filter authors must use the self.send_output method:

self.send_output("default", entity)

The default output is the next filter in the pipeline. See the How to Write a Custom Filter document for more information on customizing filters, including specifying output file names.

The purpose of this filter is to compute the total amount of time each task represented in the datastream spent on the CPU. While the sigpipe program was executing, DSKI logged SWITCH_TO and SWITCH_FROM events (with the pid of the running task attached) every time a task was switched to or switched from by the scheduler. In addition to this, every DSKI event that is logged will also attach a timestamp value, which can be converted into nanoseconds by the utility2.TSC_conversion() filter. With this information, we can compute the total time each task spent on the CPU (in nanoseconds) by processing the datastream:

id = entity.get_tag()
log_time = entity.get_log_time()
machine = entity.get_machine()

The machine variable represents the clock source that was used when the entity was originally logged. In this way, if data was collected from multiple CPU’s, we can correctly distinguish between tasks running on one CPU and tasks running on another. Also, this is a good time to note that Python is a dynamically typed language. Type mismatch errors will occur during runtime, and they are common when you are using unfamiliar code. For example, you may reasonably assume that the entity.get_log_time() method would return a reference to a long or int object. However, this is simply not the case. The code that actually computes the difference between the times logged by the SWITCH_FROM and SWITCH_TO events looks like:

self.task_times[id].append(log_time["ns"].get_value() - self.start_time[id]["ns"].get_value())

The entity.get_log_time() method actually returns a reference to an object of type dict. The log_time[“ns”].get_value() returns the nanosecond value of type long that we need to compute the interval of time that a task spent on the CPU. The only way to debug these errors is to actually read through some of the code you wish to use.


The finalize method is called after all of the entities in the pipeline have been processed. It should be used for closing any opened files, or handling any buffered data. In this example, because of the manner in which postprocessing was designed, we do not have access to the PIDs until some time after the process function started, we cannot do our postprocessing within process, otherwise some data will be lost. Therefore we filter the entities within finalize.

After your finished writing these methods, the filter is ready to use. Simply refer to it in the postprocessing configuration file, and Postprocess will import it automatically.

The Custom Filter in Context

Now that you understand how custom_PIDs works, you can understand how it works in each of the pipelines. The pids pipeline uses the unfiltered DSKI data as well as the DSUI data logged by the experiment itself (which has been modified by the custom pipeline to generate three different intervals). The pipeline filters out all events that do not occur within one of these intervals: the interval which represents a single gap, specifically between the 2nd and 3rd threads. The DSUI data contains THREAD/THREAD_ID events that have the PID of the tasks associated with the sigpipe example attached as the tag. The custom filter uses this data to discard all DSKI events that do not have a matching PID. These resulting events are then narrated to the gap_events file and counted.

The next two filters, count_uf and count_f, merely count the number of entities in the datastream. Those in the count_uf are the unfiltered DSKI events, while the count_f is the filtered datastream. Then in the final two pipelines, we examine another difference between the unfiltered and filtered datastreams. While we saw that the volume of events is significantly diminished by filtering by PID, we did not see the distribution. These final two pipelines each create a histogram based on the CPU time of the running tasks.

Analyzing the Results

FIXME: The results need to be analyzed!

Finally, open up the histograms that were created by the CPU time intervals. There is not much to note here other than the fact that you can create these types of histograms. Notice the y-axis uses a logarithmic scale. This makes it easier to see rare outliers. Compare what you know about each task and the distribution and frequency of the CPU time intervals it created. Is it what you expected?

See Also

It is instructive to read through the code associated with this experiment to see how its output is produced. These files are especially important:


Indices and tables