pipeline

Pipeline is a foundational service in Restspace. It is the main service that allows you to create services out of other services by composition. Pipeline allows the use of the pipes and filters pattern with any HTTP web service, but usually those within Restspace.

Pipeline works by forwarding the original request to the Pipeline service into the first service in the pipeline. The response from this service is then passed into the second service etc. until the last service where the response from this is returned as the response of the Pipeline service.

Pipeline has two service parameters:

  • pipeline: This is a json array which specifies the pipeline. How this works is described below.
  • onlyLowerServices: This is a true/false value (default false). If true it means when the pipeline is resolving a url on this Restspace host, it starts from the position of this service in the service list when searching. This means you can use a Pipeline service to manage access to another service by mounting it on a service path which overlaps that of the other service.

The items in the pipeline array can be any of the following:

  • A pipeline step which tells the pipeline to include a service in the flow (a string). The url it is on is specified (maybe as a url pattern, maybe with a method). Optionally there can be a condition for executing this step, and optionally there can be a specification for a new name for the message which is emitted from this step.
  • A split or join operator. A split operator processes a message and turns it into a number of messages (which are then processed in parallel along the rest of the pipeline). A join operator takes all the messages currently being processed in parallel by the pipeline and in some way joins them into one message.
  • A mode operator. This tells the pipeline how to process messages with the pipeline elements. Generally this is at the beginning of a pipeline or subpipeline, but it can sometimes be used in the middle.
  • A subpipeline. This is a sublist of items which define a context in which the pipeline is operated in a particular mode. By default the top level pipeline is serial next next mode. By default subpipelines are parallel where the parent is serial and serial next next where the parent is parallel.

The parts making up a pipeline step specification are shown below. Every part is optional but there must be at least a request specification or a rename.

Simple Examples

[ "GET /json/transactions/$>0", "POST /transform/filter-insecure-properties" ]

If this pipeline was mounted on /secure-transactions, a call to /secure-transactions/trans-1 would get the transaction data from /json/transactions/trans-1 and send it through the transform at /transform/filter-insecure-properties. This pipeline could be set to allow partial access to transactions JSON files by piping the JSON through a transform that removed the sensitive data. The pipeline could then be configured with more public permissions than the service on /json/transactions.

[ "GET /json/$>0:(home)/$>1<0:(index)", "POST /template/$>0:(home)" ]

This pipeline is for creating a website out of JSON files rendered through templates. If the path of the request made to this pipeline is e.g. /contact-us this would get the json file at /json/contact-us/index.json and render it with /template/contact-us.html.

Pipeline modes: serial and parallel

Pipeline can also manage more complex graphs of services, running services in parallel, splitting compound messages, processing the parts and merging them. A pipeline can have subpipelines which are configured by creating a subarray in the array representing  a pipeline. Each pipeline or subpipeline is a context in which execution proceeds in a given mode. A mode can be parallel or serial, and in a serial mode you can specify how the pipeline behaves when encountering a step whose condition succeeds or fails.

The possible modes are shown below with their mode specification. The x indicates a step whose condition failed.

The mode specification follows the pattern <parallel or serial or conditional> <fail behaviour> <succeed behaviour>.

If you consider a pipeline with two nested subpipelines like this:

[ <serial url>, <serial url>, [ <parallel url>, <parallel url>, [ <serial url>, <serial url>, ... ], ... ], ... ]

By default the mode of the main pipeline is serial next next, any subpipeline within a serial pipeline it is by default parallel, then any subpipeline within a parallel pipeline is serial next next.

The serial mode operates as already described. When starting parallel mode, each message in the pipeline is copied once for each step in the parallel subpipeline and passed to those steps in parallel. All the resulting messages proceed along the pipeline in parallel. When multiple messages are being processed by the pipeline at once, each one is labelled with a name (described in more detail later). In this case, the default name is the zero based position index of the step in the parallel subpipeline. If the step has a rename specified, this name is used instead.

Conditions: optional execution

A condition on a step in a pipeline tests a message before executing a step. In a parallel subpipeline, if the condition fails, the message copy for that step does not continue down the pipeline. In a serial subpipeline, failure or success results in an action specified by the current mode.

Where there is a subpipeline in a pipeline, the first condition in the pipeline determines whether the pipeline is executed at all, rather than just whether that conditional step is executed.

A condition looks like:

  • :<mime type> The mime type of the expression is as given. There are 3 special words: json indicates any json mime type, text indicates any text mime type and binary any other mime type.
  • #<name> The name of the message is as given.
  • !<error code range>:<error message check> The message represents a failure (in conditional mode) and the error code is in the specified range or list (e.g. 404,500-599), and the optional message check is a string which matches some part of the error message (the check string is url encoded with no spaces).

So for example:

:json

fails if the content type is not some json type

:text/html

fails if the content type is not text/html

#body

fails if the name of the message is not 'body'

!404:No+such+user

fails if the message was not a 404 error where the error message contains 'No such user'

Conditional mode

A pipeline step (and a pipeline) can operate in conditional mode. This sets the pipeline up so as to execute conditions against the result of the pipeline step. A step is run in conditional mode if prefixed with ? followed by a space. A conditional mode step does not terminate the pipeline if the request returns an error status as it usually would. Instead an error response is converted into a message with 200 status and an application/json body that looks like this:

{
  "_errorStatus": <error status code number>,
  "_errorMessage": <error message string>
}

The pipeline in conditional mode operates as if it were serial next end mode. This means the first step or subpipeline with a succeeding condition is the only one which is executed in the rest of the pipeline.

When the conditional mode pipeline has a subpipeline, its default mode is not parallel as it would normally be but serial. This supports subpipelines representing chains of conditional actions.

When the conditional mode pipeline ends, if the message was converted from a failure to a success with error data, it is now converted back to a failed message before being passed back to any parent pipeline. This means the conditional steps in the pipeline act like exception handlers, and if none is triggered, the failure is treated as normal.

As an example:

[
  "? GET /json/data0",
  [ "!404 GET /json/default", "/transform/transf-default" ]
]

This pipeline attempts to get JSON from /json/data0, but if there is no data there (404 error), it will get JSON from /json/default and pass it through the transf-default transform.

Pipeline operators: splitting and merging

Pipelines can also include pipeline operators. These fall into the categories of split and merge operators. These specify a way of splitting a message which in some way has parts into multiple messages for each of those parts, whilst merge operators specify a way of combining all the messages in the pipeline into one. Split operators can be used at any point in a pipeline: the resulting messages are all run through the subsequent pipeline elements in parallel. Split operators assign a name to each part (it sets the header Content-Disposition: form-data; name=<name>) and merge operators use the name when combining the parts.

Split operators are:

  • split: takes a 'multipart/form-data' message and splits it into a message for each part, giving each message the fieldname as its name.
  • unzip: takes an 'application/zip' message and unzips it into a message for each file, giving each message it's path in the zipped structure as its name.
  • jsonSplit: takes a JSON message and if it is a list or object splits into into a message for each list item or property, named with the list index or property name.

Merge operators are:

  • multipart: a 'multipart/form-data' formatted message is created. In this case each message's data is combined as a field value whose name is the name of the message. Streams are preserved by creating a new stream out of the streams of the parts.
  • zip: an 'application/zip' message is created. Each consituent message is added to the zip archive with its name as the filename and its data as the file. Streaming is preserved by combining streams.
  • jsonObject: creates an 'application/json' message for a JSON object created by adding a property for each message whose key is the message's name and whose value is the JSON object in the message for JSON data, the text as a string for text data, or the binary data as a base 64 string for binary data.

Efficiency, streaming and async

The pipeline implementation is designed to maximise the efficiency of all operations. Operations following on from operations being run in parallel are started as soon as the previous operation returns, rather than waiting for all previous parallel elements to complete. Data is held as streams (which are forked if necessary) for as long as possible, so that data can start being returned from a pipeline as soon as possible, before pipeline operations have completed.