A distributed pipe is a single construct that can be used for map-reduce and stream transformation. It is a further development of the DAQ.

A dpipe is an object which accepts a series of input rows and generates an equal amount of output rows. It may or may not preserve order and it may or may not be transactional. The input row of a dpipe is a tuple of values. To each element of the tuple corresponds a transformation. The transformation is expressed as a partitioned SQL function, basically a function callable by daq_call, with arguments specifying the partition where it is to be run. The output row is formed by gathering together the transformation results of each element of the input tuple.

Conceptually, this is like a map operation, like running several DAQ's, one for each column of the dpipe. A transformation function does not always need to produce a value. It may also produce a second partitioned function call with new arguments which will be partitioned and scheduled by the dpipe. Since the second function is independently partitioned, this may be used for implementing a reduce phase. This phase may then return a value and/or further functions to be called.

Ultimately, the dpipe functions are all expected to return a value. When each function of the input row has returned a value, the output row is ready and can be retrieved. Returning a value is a way for the application to synchronize with the operation of the dpipe. The dpipe can be used for mapping values or side effects or both.

The dpipe object is created with the dpipe SQL function. This takes a set of flags and a list of previously declared dpipe transformation names. A dpipe transformation is declared by specifying a SQL procedure, a partitioning key and a set of flags.

The dpipe is fed data with the dpipe_input function. This takes the dpipe object and one value per transformation specified when the dpipe was made. The values can be arrays, so that a transformation function can logically have multiple arguments. The results of the dpipe are retrieved with the dpipe_next function,. This returns a new result row at each subsequent call until one row has been returned for each input row. Depending on flags, these may be returned as they become available or in the original order. Each input is substituted with the transformation result in the output row. Some transformation can be identity so as to preserve a row id for correlating inputs and outputs if they are not processed in order.

After this, the dpipe can be reused. The dpipe is freed when there are no longer references to it.

Finally, since transformations may, in addition to producing values, also produce other functions to be called for their effects, dpipe_reuse is called to make sure that all these functions are run until no more operations are left.