Signal Pipeline DSUI Example

Before going through this example, please make sure you have checked out and installed the appropriate kusp kernel and kusp application source trees. Instructions for this can be found at Building the Kernel and Building and Installing KUSP.


This is the second example in the Initial Data Streams Tutorial Sequence and provides a slightly more complicated, and multi-threaded, user program than the first example to illustrate use of Data Streams events and how to write useful Post-Processing filters. If you have not gone through the recommended first example (the Simple DSUI Example) you should do so now. This example can be found in the examples directory.

Checking Out and Building

The sigpipe example is located at <KUSP_TRUNK>/examples/unified_sigpipe. You will need datastreams installed on your testbed as well as the dski module inserted into your kernel. Follow the instructions for Building the Kernel if you have not yet built and installed the KUSP-kernel and and Building and Installing KUSP if you have not yet built and installed the KUSP software.


DSKI is packaged as a loadable module which is inserted automatically on boot up as long as you have the KUSP Udev Rules installed as per the KUSP user software instructions.


Make sure you have the RPM built and installed before you run this example. If you do not have the RPM built the example will not make properly.

If you have not built the KUSP examples before, go to Building KUSP Examples

Structure of sigpipe.c

This program contains a slightly more purposeful, structured use of DSUI instrumentation points. Only instrumentation points of type DSTRM_EVENT are used as an example of how even the most simple of instrumentation points can be used to provide useful, specific output. For the following explanation it may be helpful to open the sigpipe.c source file in an editor for reference. It can be found here:



In the main body of the example, note that we start with the DSUI_BEGIN macro, before we pass argc and argv to our process_options() subroutine. The next DSUI instrumentation point is an event which simply declares that the body of the experiment has begun:

DSUI_BEGIN(&argc, &argv);

In the next several lines we process command line options, set up signal masks, allocate memory for the threads we are intending to spawn, and spawn the threads.

Since this experiment is designed to have one thread per pipeline stage, and we wish to ensure that all pipeline stage threads are created and waiting to process pipeline messages concurrently we make use of PThreads and condition variables. Note at the top of sigpipe.c that the thread_count_control condition variable and associated thread_count_lock mutex are created. In the body of the main routine the statements that follow are standard condition variables which check to see if the thread_count variable has reached the pipeline length. Each pipeline stage thread increments this variable as it is created and waits in a similar section of code testing the condition variable. The effect of using the condition variable in this way is that the main thread and all pipeline stage threads pause at the condition variable test until all threads are ready and then all proceed.

After the threads are spawned the main thread enters a loop which is responsible for sending user-defined number of signals to the first member of the newly created pipeline. Just before it sends a signal we record two DSUI events that are useful during post-processing for different kinds of analysis:

unsigned int pipeline_stage = 0;
pipeline_stage <<= 27;
pipeline_stage |= sigs + 1;
DSTRM_EVENT(GAP_TEST, SIG_SENT, pipeline_stage);


See that we can differentiate our events using the (category-name,entity-name) pair as the event identifier. This will generate two different events, one designated (GAP_TEST,SIG_SENT) and one (PIPE_TEST,PIPE_START). Also note that we supply tag values to these two events. The (PIPE_TEST,PIPE_START) event will be measuring total time a signal spends in the pipeline. Because of this, the only data we need is the sequence number of each signal sent into the pipeline. At the end of the pipeline, a matching (PIPE_TEST,PIPE_END) event will also record the sequence number of each signal. Then using the unique signal sequence numbers we can differentiate between the (possibly) thousands of events that will be generated describing how signals move through the pipeline all sharing the same namespace. Note that these two events, while part of the DSUI namespace for this example, are defined in different categories. Recall that the Datastream entity namespace is two levels with category names being the first component, and entity names within the category being the second. Note also that there will be many instances of each of these events and the tag data for each instance is what uniquely identifies it.

The other event recorded, (GAP_TEST,SIG_SENT) will be used to record the overhead of sending and receiving signals between pipeline stages. The post-processing problem we face is that we want to have an event representing when each message through the pipeline is sent or received at each pipeline stage. To help with this we have given each signal representing a message a unique sequence number and each stage of the pipeline has a unique ID number. The DSUI encoding challenge is how to attach these two pieces of information to a single event. Recall that each event has a 32 bit tag value by default and that we can optionally attach arbitrary data to such events if we wish. In this case, we have chosen to show how a little creativity in using the 32 bit tag value can often save the additional run-time and programming overhead of using optional data. In this case, we use the top five bits to hold the pipeline stage ID and and the lower 27 bits to hold the signal sequence number.

After the parent thread has sent all of the signals the user requested it sends a final kill signal through the pipeline to tell each stage to terminate.

We use another set of events to record the TIDs of the each thread generated in this experiment:

for (i = 0; i < pipeline_len; i++){

It is easy to understate how useful something as simple as recording the unique ID’s (TID or thread ID) of threads in a program can be. If other events also include, either in their tag or their extra data, the TID of the executing thread that generated them, then data specific to the threads of a program can be separated from events generated by other threads on the system. No matter where the instrumentation points are generated or how many total events are generated by any number of total threads, the unique TIDs associated with each thread can be used later to filter the raw data from a program for just those events that pertain to the threads of interest. This is particularly significant, for example, when using context switch events to determine execution intervals for threads. These events are generated for every thread in the system. Context switch events are among the strongest motivations for a program to use Datastream Active Filters to reduce the effect of instrumentation load on system behavior during execution.

In the final lines of the main body, the main thread waits for the pipeline stage threads to finish execution, and then records the end of its own execution by generating the (FUNC_MAIN, IN_MAIN_FUNC) event before cleaning up the DSUI framework:


Next we will discuss the code executed by each thread representing each stage of the pipeline.


This code is executed by each member of the signal pipeline.

Note that like the main body, we use an event to record when the execution of this particular thread begins:


The first few lines, not shown here, allow the thread to store its TID in the globally accessible array, tidlist[], so that other threads implementing other pipeline stages will know the TID of the threads to which they will send signals.

The next few lines are where each thread uses the thread_count_control condition variable and its associated mutex, thread_count_lock. In the case of each pipeline stage thread, the thread first obtains the mutex and then increments the shared thread_count variable. It then checks to see if thread_count has reached its target value, which guarantees that every pipeline stage thread is now ready to work. Note that the last thread to reach this point, instead of blocking, signals to the blocked threads, main thread and other pipeline stages, that they should re-check the condition because it is now satisfied.

The body of the thread code is a large loop. This loop will allow us to continually poll for incoming signals. When one is received, we check to see if it is the kill signal, at which point we exit the loop.

Otherwise, we generate a signal received event:

unsigned int unique_id = id+1;
unique_id <<= 27;
unique_id |= sigcnt;


This should look very familiar, as we did something similar in the main body of the experiment, when sending signals from the main thread into the first stage of the pipeline. As before, we encode the pipeline ID into the top five bits of the tag value, and the signal sequence number into the bottom 27 bits. Using just the tag value creates lower overhead when recording the event, and reduces the instrumentation effect compard to attaching extra data to an event. By carefully combining both values into our unique tag value, we can then separate them out in post-processing.

We use the same unique ID when we generate a sent event at the end of the loop:


This event illustrates an important, yet easily overlooked, aspect of Data Streams events: they can appear in more than one place in the code. Specifically, this event appears both in the parent code and in the code for the piepline stage. Further, it is important to realize that given the fundamental semantics of PThreads, the pipeline stage code is executed by multiple threads. While it is common, and fairly natural, for each event to appear in the instrumented code in only one location and be uniquely identified by its (category-name, event-name) pair, this is not always the case, as illustrated here. Instead, since each event must be uniquely identified during post-processing, we actually use a quadruple and not the (category-name, event-name) pair alone to do this. specifically the quadruple is: (category-name, entity-name, pipeline-stage, message number).

We also generate an event if this is the last thread in the pipeline, indicating that a signal has traversed the entire pipeline:


Later, this event can be paired with (PIPE_TEST,PIPE_START) event generated in the main body of this experiment to determine the total time it takes a signal to traverse the pipeline. Note that we could have just as easily used a DSTRM_INTERVAL event type and saved ourselves some post-processing by generating a single event describing the interval between when a signal entered the pipeline and when it exited, rather than generating two events which must then be found and paired during post-processing. In this instance we chose to use a pair of standard events to explicitly demonstrate how such location and pairing of events can be done in a custom filter.

Lastly, if the thread code receives its kill signal, it exits the loop, recording the end of its period of execution with yet another event:



The sigpipe experiment spawns a number of threads representing a pipeline and sends a number of signals down the thread pipeline while collecting various Datastream events.

To run the experiment, type:

bash$ cd $KUSPROOT/examples/build/unified_sigpipe
bash$ ./sigpipe --threads=<int> --stimuli=<int> [-g] [-c]

      --stimuli=      number of stimuli to send through pipeline
      --threads=      number of threads in pipeline
      --cpus=         the number of cpus to use
      -g=             run under group scheduling
      -c=             create a ccsm set representing the computation
      --help          this menu

Try this if nothing works:

bash$ ./sigpipe --threads=<int> --stimuli=<int> --dsui-config sigpipe.dsui

NOTE: Make sure sigpipe.dsui.bin has been generated in the current directory. Running the experiment with too few threads or stimuli might cause errors while running postprocessing scripts


As discussed above, the sigpipe experiment logs a number of DSUI events. To run the included post-processing for those events, type:

bash$ postprocess f sigpipe.pipes

This processes the DS data file(s) produced by the sigpipe experiment according to the semantics specified in the configuration file “sigpipe.pipes”.


As with the Simple DSUI Example, the file with the “.pipes” extension is, by our convention, the post-processing configuration file. In it we specify sets of filters forming one or more pipelines which we use to extract meaningful data measuring application behaviour, and after processing the raw DSUI instrumentation point events we can create alternate views of the DSUI data. In this configuration file we specify several pipelines, and then link them together as an example of how multiple pipelines can be created and linked with one another to perform a wide variety of operations on the DS output data.

For the rest of this example it is strongly recommended that you read the detailed documentation found in the files as we cover them. The files can be found in:


Declaring multiple pipelines is simple. Additional pipelines are declared in the same way as the first pipeline, following all of the same rules, and are added as entries in the filters dictionary:

filter_modules = ""
filters = {
   dsui = [
   custom = [
   create_histograms = [
   create_graphs = [

Using Custom Filters

The Post Processing framework makes using custom filters easy. In order to use a custom filter, all you have to do is include the name of the Python file containing the filter implementation in the filter_modules list:

filter_modules = ""

Post-processing will automatically import code from this file. In this example, “” is the Python file which contains the custom filter written specifically for this experiment. However, it is recommended that you always check the generic filters when you feel the impulse to write a custom filter because it is easy to accidentally re-invent the wheel. A wiki describing the current set of generic post-processing filters can be found here: Post Processing Filter Library

The Python code for commonly used DS filters can be found here:


When developing a new filter, it may be useful to examine the methods of existing filters and see how certain types of filtering are done. Now, all that remains is to actually write the new filter.

Writing Custom Filters

The Datastreams Post-processing framework attempts to simplify writing filters by providing an abstract filter class from which new filter types are created by inheritance. The abstract filter class defines the super-set of methods which a filter can implement, and provides a default implementation of each. In many cases, the default implementation is empty, leaving specific filters to provide substantive actions as appropriate. There are five methods for each post-processing filter: expected_parameters, initialize, process, finalize, and abort. Process is the method every filter must implement to be a unique filter, as it is the only method with direct access to the entity stream. The other methods may or may not be implemented by a given filter depending on the method’s utility in implementing the filter’s semantics.

Here, we cover the basic purposes of each method in the course of creating the pipeline_intervals filter. Note that there are more specific comments located in the Python file containing this examples custom filter code:


For a more in-depth explanation of writing custom filters, including higher-level functionality not used in this example, please see: How to Write a Custom Filter


This is a dictionary which allows the developer to define the set of filter parameters whose values can be specified for an instance of this filter in a pipeline configuration file. It is not required that a filter implement this dictionary but it is currently the only method to change the filter configuration from its default. The value of the parameters specified are used to modify the default values of state variables controlling filter behaviour. Filters are not required to use this parameter dictionary and if a set of parameter values is supplied to an instance of a filter which does not use this dictionary a fatal configuration parsing error occurs. Note that in the configuration file, sigpipe.pipes the custom filter declaration, pipeline_intervals, specifies non-default values for four parameters.

Note that various combinations of optional, required, and default value are possible. For example, we can specify that a given parameter, Fred, is required. In this case, failure to provide a value for Fred in the filter instance creation statement will result in a parsing error. Alternatively, if Fred is optional we might or might not specify a default value.


The initialize method is, by definition, where the initial environment for filter execution is established. This is where values for all filter state variables are established either by direct assignment, or by using expected_parameters specified by the filter instance declaration in the configuration file.


This is the main body of the filter. The process method is called for each entity which flows through the filter and the method can take any action in response that it desires. Often the action can involve deleting or modifying the event. In other cases, state variables of the filter may be updated in response to various entities, but the entities themselves are unmodified. For example, a simple counting filter might increment a state variable in response to a particular type of entity as they pass by. Note that in such a situation it would be common for this counting filter to emit a special event representing the count accumulated at the end of post-processing, which would be done in the finalize method. In another case, a filter might accumulate the set of entities entering it, but emit entities only in specific circumstances. For example, such a filter might emit entities in batches when they had been certified to comply with a specific set of constraints or a filter might retain all entities until the end and emit them only in the finalize method.


This is the last method of any filter executed. Specifically, this means that any and all events seen by this filter will have passed through the process method before the finalize subroutine is called. Filters do not have to do anything in finalize, but it can be convienient or necessary to do so. For example, in the create_histograms pipeline of this example there are three such filters accumulating different histograms. Each of these consumes the entire set of entities given to it and then in its finalize method emits an entity containing the accumulated histogram. Many other filters accumulating statistics would have a similar structure, since they would update state variables as they see each entity flowing by, but would report their results at the end using finalize. However, note that such a filter might also emit snapshots of its accumulated statistics during entity processing and would thus emit them from its process method, while the final record would still be emitted by the finalize method.


This method provides an opportunity for clean-up code to execute in the case of a fatal error during post-processing which shuts down further pipeline execution. It is not used in this example.


#FIXME.D - discuss the results from the pipeline

See Also

It is instructive to read through the code associated with this example to see how its output is produced. These files are especially important:


Indices and tables