2013-05-01 Mēhua Design Meeting Minutes

A Processing Point (PP) takes one or more Measurement Streams (MS) from upstream Measurement Points (MP) or PP, and outputs one MS, sent to Collection Points (CP) or downstream PPs. A Processing Container (PC) is the software construct containing one or more PPs, implementing event loops and providing all the infrastructure to interact with the outside world (marshalling, ...).

Generic Use Cases

A number of core use cases have been defined:

  • Filtering of a single measurement stream using a predicate.
  • Buffering of a stream.
  • Joining two streams with the same schema.
  • Joining two streams with different schemas.
  • Implementing a feedback loop.

In the latter case we can provide feedback using the normal measurement stream abstraction. The core abstraction here is that of a tuple which is [defined when creating the upstream MS (line 1 in the example SQL-ish language above)].

Design (PP/PC)

Refining the control language from the design document, [particularly replacing WINDOW by OVER (what size of column to process), and PERIOD by EVERY (when to trigger the aggregate functions); both can be expressed in SAMPLES or SECONDS], we get:

CREATE STREAM S3 (ws:int, collect:string) AS
  SELECT fun1(s1.A), fun2(s2.B) 
  FROM S1[pred1(A,x)], S2[pred2(B,y)]
  WHERE s1.C = s2.C
  COLLECT $collect

pred1 and pred2 are filetring predicates [(F) or, better (P)] allowing to decide whether a given input tuple should be entered in the intermediate table to be joined (J) into one table which columns are then processed by aggregating functions (A) fun1 and fun2 over a window (W).

OM NOTE: Shall we use "aggregate" or "process"?

This allows to create a structure as follows.

All created output streams (Injection/Processing Points) will get a sequence number added, which will be available for selection/processing in the downtream PP. Additionally, IP will add a Timestamp field to each of their tuples. It is up to the following PPs to select/transform this information for their output MS, but timestamps will not be automatically added at the processing stages.

NB: The client injection API should provide methods to override/set the timestamp, in addition of just setting it automatically.


The Meehua API should be stateless and passive; all state information is passed around in contexts.

  • Stateless in that the transforms that we support at a processing point should be pure functions (to the extent that C allows).
  • Transforms are passive in that we employ a push model for stream processing.

A PP transform one or more input stream(s) of Tuples (array of differently-typed data elements; coming from the unmarshalling/deserialisation system), into one output stream of tuples, to be marshalled/serialised. The filtering predicates (P), or a logic combination thereof, are applied separately to each input stream ([predn(x,y)]), letting only those tuples which are relevant to the current query into Tables. EVERY relevant interval (number of samples in the tables, or amount of time) tables are joined together (J) according to normal SQL joining logic (WHERE clause).

OM note: It might be tricky to trigger the aggregation based on the number of samples in the tableS, as they might have different length. We might need to specify in which table(s) we count the samples, e.g., EVERY(clock) 2 OR EVERY(table1) 3 (syntax to be discussed further) could mean every two seconds or every time table1 has received three new samples from the predicate-filtering step.

The output of this JOIN is one table, which gets segmented in Columns (array of similarly-typed data elements), which are then passed to the aggregation functions (A) depending on the SELECT clause. Their output is then put back into an output Table, for later breaking-down into Tuples for serialisation.

NB: Some sane default as to the number of rows that are output by the aggregating functions needs to be discussed. How many rows does SELECT AVG(A), A gives?

The base of a PP/PC is architectured around an event loop, calling callback functions whenever relevant (filtering predicates when new tuples arrive, JOIN tet aggregating functions when the EVERY clause is satisfied, ...).

Objective Use Case

The API will be developed in a use-case driven manner, first focusing on building blocks allowing to hardcode PP, before introducing a query language. Nonetheless, the first use case is a PP which takes one MS with a delay field as input, and produces one sample of Tukey's five-number summary for the valid input (i.e., delay <> NaN or, in R syntax, na.rm=True) every 10 input samples.

In pseudo-SQL, this PP can be summarised as follows.

SELECT fivenum(delay), from S1[diff(delay,NaN)] \

Example Pseudo Implementation

  • Coding style: we use C11 and doxument the code in place. (CFLAGS="-std=c11 -pedantic -Wall -Werror")
  • All types are opaque
  • OM note: I however prefer if all variables are declared at the top of the function blocks, rather than on first use (this probably needs to be discussed further).

Memory is allocated by the framework.

Pseudo API

See also the discussion of MeehuaTypes.

#include <stdbool.h>

/** Generic value container.
 * Similar to an OmlValueU */
typedef MeehuaValue;
/** One type. */
typedef MeehuaType;
/** Field-type information.
 * Array of types and, maybe, names */
typedef MeehuaSchema;
/* A key/value pair*/
typedef MeehuaKeyValue;

/** A tuple with some type information.
 * Array of MeehuaValue, and pointer to MeehuaSchema */
typedef MeehuaTuple;

/** An array of tuples */
typedef MeehuaTable;

/** An array of MeehuaValue of a given MeehuaType
 * XXX: Probably with a name */
typedef MeehuaColumn;

/** The context for a function */
typedef MeehuaContext;

/** An instance of a processing point */
typedef MeehuaProcessingPoint;

/** A filter predicate
 * XXX: This verbose way of documenting code might only be relevant in actual
 * implementations, \param-based documentation should be good enough for 
 * headers, and make them more easily readable. */
typedef bool (*MehuaPredicate) (
        MeehuaContext *ctx,    /**< Context in which to run the predicate */
        MeehuaTuple t        /**< Tuple on which to run the predicat */

typedef bool (*MehuaPredicate) (MeehuaContext *ctx, MeehuaTuple t);

/** An aggregation function
 * \param[inout] ctx, Context in which to run the predicate
 * \param in Array of input columns
 * \param out Array of output columns
 * \return 0 on success, <0 on error
 * XXX: This is probably how we want to document headers */
typedef int (*MehuaAggregate) (MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn out[]); /* XXX: We might need specific tyes for arrays, to hold the lengths */

/** Generic description of functions (predicate or aggregation), for dlopen(3)ing */
typedef MeehuaPPComponent;

/** Generic context-setup function, based on a key/value pair list */
typedef int (*MehuaSetup) (MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn out[]); /* XXX: We might need specific tyes for arrays, to hold the lengths */

Pseudo Implementation

typedef struct {
  char *name;                    /**< Name of the component */

  MehuaPredicate    predicate;        /**< Predicate function provided by the component, can be NULL */
  MehuaSetup        predicate_setup;    /**< Setup function for the predicate */

  MehuaAggregate    aggregate;        /**< Aggregate function provided by the component, can be NULL */
  MehuaSetup        aggregate_setup;    /**< Setup function for the aggregate */

} MeehuaPPComponent;

typedef struct {
  char *key;        /**< Key */

  MeehuaValue value;    /**< Value */
  MeehuaType type;    /**< Type of value */

} MeehuaKeyValue;

typedef struct MeehuaContext {
    char *name;

    struct MeehuaContext *container;

    /* XXX: This should probably be subclassed rather than all put in one */

    /* PP */
    int n_streams;
    MeehuaStream *streams;

    /* Predicate/Aggregate */
    MeehuaValue arguments[];

    /* Aggregate */
    int n_inputs;
    int n_outputs; /* XXX: Or should it be part ofr output below? */
    MeehuaColumns output[];

Pseudo PP Code

/** \copydoc MeehuaPredicate
 * Test is v1 != v2
bool diff_predicate(MeehuaContext *ctx,    MeehuaTuple t)
    /* ctx should contain column references or intrinsic values to compare*/

/** \copydoc MeehuaComponentSetup
 * Specify values to use when testing difference.
 * XXX: How to differentiate element name from intrinsic value?
int diff_predicate_setup(MeehuaContext *ctx, MeehuaKeyValue kv)
    /* parse kv to find v1 and v2, and store it into ctx */

MeehuaPPComponent diff_component = {
    .name = "diff",
    .predicate = diff_predicate,
    .predicate_setup diff_predicate_setup,

/** \copydoc MeehuaPredicate
 * Compute Tukey's five-number statistics
bool fivenum_aggregate(MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn[] out)
    /* process in, and populate out */

/** \copydoc MeehuaComponentSetup
 * Specify columns to apply the aggregation function on.
 * XXX: Also, required output columns, if not all
int fivenum_aggregate_setup(MeehuaContext *ctx, MeehuaKeyValue kv)
    /* parse kv to find v1 and store it into ctx
     also store the number of input/output columns required, for allocation by the system */
        ctx->n_inputs = 1; /* One input vector */
        ctx->n_outputs = 5; /* Five output columns */

MeehuaPPComponent fivenum_component = {
    .name = "fivenum",
    .aggregate = fivenum_predicate,
    .aggregate_setup fivenum_predicate_setup,

 * XXX: Doxumentation in this pseudo code does NOT follow the coding guidelines

/** Setup a fivenum-computing PP for an MS with a 'delay' field, every 10 valid tuples */
int fivenum_pp_setup(MeehuaContext *ctx)
    int every = 10;

    MeehuaKeyValue diff_param[] = {
        {"v1", MeehuaValueString("delay"), MH_COLUMN },
        {"v2", MeehuaValueDouble("NaN"), MH_INTRINSIC },
        { NULL, NULL, MH_END},
    MeehuaKeyValue fivenum_param[] = {
        {"v1", MeehuaValueString("delay"), MH_COLUMN },
        { NULL, NULL, MH_END},

    /* Find needed components by name */
    ctx->predicate[0] = mh_component_lookup(OML_PREDICATE, "diff");
    ctx->aggregate[0] = mh_component_lookup(OML_AGGREGATE, "fivenum");

    /* Create input table */
    ctx->input_table[0] = mh_table_create(every);
    /* No join in this PP, use the ftable (could also be NULL?) */
    ctx->join_table = ctx-ftable;
    ctx->out_table = mh_table_create(every);

    ctx->count = 0; /* Could probably be done upstream */

    ctx->predicate_ctx[0] = mh_malloc(...); /* This should be done elsewhere */
    ctx->predicate_ctx[0]->container = ctx;
    ctx->aggregate_ctx[0] = mh_malloc(...); /* Ditto */
    ctx->aggregate_ctx[0]->container = ctx;

    ctx->predicate->predicate_setup[0](ctx->predicate_ctx[0], diff_param);
    ctx->aggregate[0]->aggregate_setup(ctx->aggregate_ctx, fivenum_param);
    ctx->aggregate[0]->output = mh_malloc(ctx->aggregate_ctx->n_outputs); /* Output of the first

    /* Register a callback for new streams with the proccesing container,
     * based on name (here: "MS1" */
    ctx->container->on_stream("MS1", fivenum_stream_cbk, (void*)ctx); /* XXX: We need a better way to address streams */

    /* XXX: We would need one fresh context per stream here; all the
     * initialisation above is probably better done in the
     * fivenum_stream_cbk callback below

/** Stream handler.
 * Called when a new stream matching the requirements is received. */
void fivenum_stream_cbk(MeehuaStream *stream, MeehuaEvent *event, void *data)
    /* Called when a new tuple arrives */
    stream->on_tuple(fivenum_tuple_cbk, data); /* XXX: The predicate could
                              be put directly in the
                              arguments, and checked by
                              the system; would it
                              still be relevant to have
                              this callback then? */

    /* Called according to the EVERY clause, after the join has happened */
    stream->on_trigger(fivenum_trigger_cbk, data);

/** Tuple handler.
 * Called when a new tuple is received in a stream. */
void fivenum_tuple_cbk(MeehuaTuple *tuple, MeehuaEvent *event, void *data)
    MeehuaContext *ctx = data;

    if(!ctx->predicate(ctx->predicate_ctx[0], tuple)) {

    mh_table_add_tuple(ctx->ftable, tuple);

    if(++ctx->count >= 10) {
        tuple->stream->trigger(); /* XXX: Should probably be done upstream */

/** Aggregation handler,
 * Called when the every clause triggers */
void fivenum_trigger_cbk(MeehuaXXX *XXX, MeehuaEvent *event, void *data)
    int i;
    MeehuaContext *ctx = data;

    /* This should return a MeehuaColumn[] with the "delay" data */
    MeehuaColumn incols[] = mh_table_get_columns(ctx->jtable,
            mh_component_get_input_names(ctx->aggregate_ctx[0])); /* This should return { "delay", } */

    ctx->aggregate[0](ctx->aggregate_ctx[0], incols, ctx->aggregate_ctx[0]->output); /* XXX: Should output always be assumed to be in ctx? */

    /* XXX: Should probably be part of another data_ready-like callback */
    table_fill_columns(ctx->otable, 0, ctx->aggregate_ctx[0]->output); /* 0 is used as the offset of the first columns to fill */

    for(i = 0; i<ctx->n_streams; i++) {

Olivier's Afterthought Notes

After reading parts of [1], I am unsure about two things. Do we really need first level filters implemented differently? They are just upstream SELECT statements, but seem to be overly specified (and limited) here with the predicates; these are really just optimisation. A concrete example of multirow output would be GROUP BY clauses, e.g., getting the average speeds of each vehicle in a window would output one row per vehicle; or should the GROUP BY clause result in row-split output tables separately fed to column aggregation functions limited to one-row output.

  1. Jain, S. Mishra, A. Srinivasan, J. Gehrke, J. Widom, H. Balakrishnan, U. Çetintemel, M. Cherniack, R. Tibbetts, and S. B. Zdonik, "Towards a streaming SQL standard," Proceedings of the VLDB Endowment, vol. 1, no. 2, pp. 1379-1390, Aug. 2008. [Online]. Available:

IMG_20130501_134729.jpg (159 KB) Olivier Mehani, 02/05/2013 06:50 PM

Powered by Redmine © 2006-2014 Jean-Philippe Lang

Last modified 10 months ago Last modified on Jul 15, 2020, 4:49:48 PM
Note: See TracWiki for help on using the wiki.