Changes between Initial Version and Version 1 of Developer/6aFuture/Meehua/Minutes


Ignore:
Timestamp:
Jul 15, 2020, 4:49:48 PM (4 years ago)
Author:
seskar
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • Developer/6aFuture/Meehua/Minutes

    v1 v1  
     1= 2013-05-01 Mēhua Design Meeting Minutes =
     2
     3A Processing Point (PP) takes one or more Measurement Streams (MS) from upstream Measurement Points (MP) or PP, and outputs one MS, sent to Collection Points (CP) or downstream PPs. A Processing Container (PC) is the software construct containing one or more PPs, implementing event loops and providing all the infrastructure to interact with the outside world (marshalling, ...).
     4
     5== Generic Use Cases ==
     6A number of core use cases have been defined:
     7* Filtering of a single measurement stream using a predicate.
     8* Buffering of a stream.
     9* Joining two streams with the same schema.
     10* Joining two streams with different schemas.
     11* Implementing a feedback loop.
     12
     13In the latter case we can provide feedback using the normal measurement stream abstraction. The core abstraction here is that of a tuple which is [defined when creating the upstream MS (line 1 in the example SQL-ish language above)].
     14
     15== Design (PP/PC) ==
     16Refining the control language from the design document, [particularly replacing WINDOW by OVER (what size of column to process), and PERIOD by EVERY (when to trigger the aggregate functions); both can be expressed in SAMPLES or SECONDS], we get:
     17{{{
     18CREATE STREAM S3 (ws:int, collect:string) AS
     19  SELECT fun1(s1.A), fun2(s2.B)
     20  FROM S1[pred1(A,x)], S2[pred2(B,y)]
     21  WHERE s1.C = s2.C
     22  OVER $ws SAMPLES
     23  EVERY 1 SECONDS
     24  COLLECT $collect
     25  ;
     26}}}
     27
     28pred1 and pred2 are filetring predicates [(F) or, better (P)] allowing to decide whether a given input tuple should be entered in the intermediate table to be joined (J) into one table which columns are then processed by aggregating functions (A) fun1 and fun2 over a window (W).
     29
     30'''OM NOTE:''' Shall we use "aggregate" or "process"?
     31
     32This allows to create a structure as follows.
     33
     34All created output streams (Injection/Processing Points) will get a sequence number added, which will be available for selection/processing in the downtream PP. Additionally, IP will add a Timestamp field to each of their tuples. It is up to the following PPs to select/transform this information for their output MS, but timestamps will not be automatically added at the processing stages.
     35
     36'''NB:''' The client injection API should provide methods to override/set the timestamp, in addition of just setting it automatically.
     37
     38== API ==
     39The Meehua API should be stateless and passive; all state information is passed around in contexts.
     40* Stateless in that the transforms that we support at a processing point should be pure functions (to the extent that C allows).
     41* Transforms are passive in that we employ a push model for stream processing.
     42A PP transform one or more input stream(s) of Tuples (array of differently-typed data elements; coming from the unmarshalling/deserialisation system), into one output stream of tuples, to be marshalled/serialised. The filtering predicates (P), or a logic combination thereof, are applied separately to each input stream ([predn(x,y)]), letting only those tuples which are relevant to the current query into Tables. EVERY relevant interval (number of samples in the tables, or amount of time) tables are joined together (J) according to normal SQL joining logic (WHERE clause).
     43
     44'''OM note:''' It might be tricky to trigger the aggregation based on the number of samples in the tableS, as they might have different length. We might need to specify in which table(s) we count the samples, e.g., EVERY(clock) 2 OR EVERY(table1) 3 (syntax to be discussed further) could mean every two seconds or every time table1 has received three new samples from the predicate-filtering step.
     45
     46The output of this JOIN is one table, which gets segmented in Columns (array of similarly-typed data elements), which are then passed to the aggregation functions (A) depending on the SELECT clause. Their output is then put back into an output Table, for later breaking-down into Tuples for serialisation.
     47
     48'''NB:''' Some sane default as to the number of rows that are output by the aggregating functions needs to be discussed. How many rows does SELECT AVG(A), A gives?
     49
     50The base of a PP/PC is architectured around an event loop, calling callback functions whenever relevant (filtering predicates when new tuples arrive, JOIN tet aggregating functions when the EVERY clause is satisfied, ...).
     51
     52== Objective Use Case ==
     53The API will be developed in a use-case driven manner, first focusing on building blocks allowing to hardcode PP, before introducing a query language. Nonetheless, the first use case is a PP which takes one MS with a delay field as input, and produces one sample of [http://stat.ethz.ch/R-manual/R-patched/library/stats/html/fivenum.html Tukey's five-number summary] for the valid input (i.e., delay <> NaN or, in R syntax, na.rm=True) every 10 input samples.
     54
     55In pseudo-SQL, this PP can be summarised as follows.
     56{{{
     57CREATE STREAM SUMMARY AS \
     58SELECT fivenum(delay), from S1[diff(delay,NaN)] \
     59OVER 10 SAMPLES
     60}}}
     61
     62== Example Pseudo Implementation ==
     63* '''Coding style:''' we use C11 and doxument the code in place. (CFLAGS="-std=c11 -pedantic -Wall -Werror")
     64* All types are opaque
     65* '''OM note:''' I however prefer if all variables are declared at the top of the function blocks, rather than on first use (this probably needs to be discussed further).
     66Memory is allocated by the framework.
     67
     68=== Pseudo API ===
     69See also the discussion of [wiki:6aFuture/Meehua/Types MeehuaTypes].
     70{{{
     71#include <stdbool.h>
     72
     73/** Generic value container.
     74 * Similar to an OmlValueU */
     75typedef MeehuaValue;
     76/** One type. */
     77typedef MeehuaType;
     78/** Field-type information.
     79 * Array of types and, maybe, names */
     80typedef MeehuaSchema;
     81/* A key/value pair*/
     82typedef MeehuaKeyValue;
     83
     84/** A tuple with some type information.
     85 * Array of MeehuaValue, and pointer to MeehuaSchema */
     86typedef MeehuaTuple;
     87
     88/** An array of tuples */
     89typedef MeehuaTable;
     90
     91/** An array of MeehuaValue of a given MeehuaType
     92 * XXX: Probably with a name */
     93typedef MeehuaColumn;
     94
     95/** The context for a function */
     96typedef MeehuaContext;
     97
     98/** An instance of a processing point */
     99typedef MeehuaProcessingPoint;
     100
     101/** A filter predicate
     102 * XXX: This verbose way of documenting code might only be relevant in actual
     103 * implementations, \param-based documentation should be good enough for
     104 * headers, and make them more easily readable. */
     105typedef bool (*MehuaPredicate) (
     106        MeehuaContext *ctx,    /**< Context in which to run the predicate */
     107        MeehuaTuple t        /**< Tuple on which to run the predicat */
     108        );
     109
     110typedef bool (*MehuaPredicate) (MeehuaContext *ctx, MeehuaTuple t);
     111
     112/** An aggregation function
     113 * \param[inout] ctx, Context in which to run the predicate
     114 * \param in Array of input columns
     115 * \param out Array of output columns
     116 * \return 0 on success, <0 on error
     117 * XXX: This is probably how we want to document headers */
     118typedef int (*MehuaAggregate) (MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn out[]); /* XXX: We might need specific tyes for arrays, to hold the lengths */
     119
     120/** Generic description of functions (predicate or aggregation), for dlopen(3)ing */
     121typedef MeehuaPPComponent;
     122
     123/** Generic context-setup function, based on a key/value pair list */
     124typedef int (*MehuaSetup) (MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn out[]); /* XXX: We might need specific tyes for arrays, to hold the lengths */
     125}}}
     126
     127=== Pseudo Implementation ===
     128{{{
     129typedef struct {
     130  char *name;                    /**< Name of the component */
     131
     132  MehuaPredicate    predicate;        /**< Predicate function provided by the component, can be NULL */
     133  MehuaSetup        predicate_setup;    /**< Setup function for the predicate */
     134
     135  MehuaAggregate    aggregate;        /**< Aggregate function provided by the component, can be NULL */
     136  MehuaSetup        aggregate_setup;    /**< Setup function for the aggregate */
     137
     138} MeehuaPPComponent;
     139
     140typedef struct {
     141  char *key;        /**< Key */
     142
     143  MeehuaValue value;    /**< Value */
     144  MeehuaType type;    /**< Type of value */
     145
     146} MeehuaKeyValue;
     147
     148typedef struct MeehuaContext {
     149    char *name;
     150
     151    struct MeehuaContext *container;
     152
     153    /* XXX: This should probably be subclassed rather than all put in one */
     154
     155    /* PP */
     156    int n_streams;
     157    MeehuaStream *streams;
     158
     159    /* Predicate/Aggregate */
     160    MeehuaValue arguments[];
     161
     162    /* Aggregate */
     163    int n_inputs;
     164    int n_outputs; /* XXX: Or should it be part ofr output below? */
     165    MeehuaColumns output[];
     166};
     167}}}
     168
     169=== Pseudo PP Code ===
     170{{{
     171/** \copydoc MeehuaPredicate
     172 * Test is v1 != v2
     173 */
     174bool diff_predicate(MeehuaContext *ctx,    MeehuaTuple t)
     175{
     176    /* ctx should contain column references or intrinsic values to compare*/
     177}
     178
     179/** \copydoc MeehuaComponentSetup
     180 * Specify values to use when testing difference.
     181 * XXX: How to differentiate element name from intrinsic value?
     182 */
     183int diff_predicate_setup(MeehuaContext *ctx, MeehuaKeyValue kv)
     184{
     185    /* parse kv to find v1 and v2, and store it into ctx */
     186}
     187
     188MeehuaPPComponent diff_component = {
     189    .name = "diff",
     190    .predicate = diff_predicate,
     191    .predicate_setup diff_predicate_setup,
     192};
     193
     194/** \copydoc MeehuaPredicate
     195 * Compute Tukey's five-number statistics
     196 */
     197bool fivenum_aggregate(MeehuaContext *ctx, MeehuaColumn in[], MeehuaColumn[] out)
     198{
     199    /* process in, and populate out */
     200}
     201
     202/** \copydoc MeehuaComponentSetup
     203 * Specify columns to apply the aggregation function on.
     204 * XXX: Also, required output columns, if not all
     205 */
     206int fivenum_aggregate_setup(MeehuaContext *ctx, MeehuaKeyValue kv)
     207{
     208    /* parse kv to find v1 and store it into ctx
     209     also store the number of input/output columns required, for allocation by the system */
     210        ctx->n_inputs = 1; /* One input vector */
     211        ctx->n_outputs = 5; /* Five output columns */
     212}
     213
     214MeehuaPPComponent fivenum_component = {
     215    .name = "fivenum",
     216    .aggregate = fivenum_predicate,
     217    .aggregate_setup fivenum_predicate_setup,
     218}
     219
     220/*
     221 * XXX: Doxumentation in this pseudo code does NOT follow the coding guidelines
     222 */
     223
     224/** Setup a fivenum-computing PP for an MS with a 'delay' field, every 10 valid tuples */
     225int fivenum_pp_setup(MeehuaContext *ctx)
     226{
     227    int every = 10;
     228
     229    MeehuaKeyValue diff_param[] = {
     230        {"v1", MeehuaValueString("delay"), MH_COLUMN },
     231        {"v2", MeehuaValueDouble("NaN"), MH_INTRINSIC },
     232        { NULL, NULL, MH_END},
     233    };
     234    MeehuaKeyValue fivenum_param[] = {
     235        {"v1", MeehuaValueString("delay"), MH_COLUMN },
     236        { NULL, NULL, MH_END},
     237    };
     238
     239    /* Find needed components by name */
     240    ctx->predicate[0] = mh_component_lookup(OML_PREDICATE, "diff");
     241    ctx->aggregate[0] = mh_component_lookup(OML_AGGREGATE, "fivenum");
     242
     243    /* Create input table */
     244    ctx->input_table[0] = mh_table_create(every);
     245    /* No join in this PP, use the ftable (could also be NULL?) */
     246    ctx->join_table = ctx-ftable;
     247    ctx->out_table = mh_table_create(every);
     248
     249    ctx->count = 0; /* Could probably be done upstream */
     250
     251    ctx->predicate_ctx[0] = mh_malloc(...); /* This should be done elsewhere */
     252    ctx->predicate_ctx[0]->container = ctx;
     253    ctx->aggregate_ctx[0] = mh_malloc(...); /* Ditto */
     254    ctx->aggregate_ctx[0]->container = ctx;
     255
     256    ctx->predicate->predicate_setup[0](ctx->predicate_ctx[0], diff_param);
     257    ctx->aggregate[0]->aggregate_setup(ctx->aggregate_ctx, fivenum_param);
     258    ctx->aggregate[0]->output = mh_malloc(ctx->aggregate_ctx->n_outputs); /* Output of the first
     259
     260    /* Register a callback for new streams with the proccesing container,
     261     * based on name (here: "MS1" */
     262    ctx->container->on_stream("MS1", fivenum_stream_cbk, (void*)ctx); /* XXX: We need a better way to address streams */
     263
     264    /* XXX: We would need one fresh context per stream here; all the
     265     * initialisation above is probably better done in the
     266     * fivenum_stream_cbk callback below
     267     */
     268}
     269
     270/** Stream handler.
     271 * Called when a new stream matching the requirements is received. */
     272void fivenum_stream_cbk(MeehuaStream *stream, MeehuaEvent *event, void *data)
     273{
     274    /* Called when a new tuple arrives */
     275    stream->on_tuple(fivenum_tuple_cbk, data); /* XXX: The predicate could
     276                              be put directly in the
     277                              arguments, and checked by
     278                              the system; would it
     279                              still be relevant to have
     280                              this callback then? */
     281
     282    /* Called according to the EVERY clause, after the join has happened */
     283    stream->on_trigger(fivenum_trigger_cbk, data);
     284}
     285
     286/** Tuple handler.
     287 * Called when a new tuple is received in a stream. */
     288void fivenum_tuple_cbk(MeehuaTuple *tuple, MeehuaEvent *event, void *data)
     289{
     290    MeehuaContext *ctx = data;
     291
     292    if(!ctx->predicate(ctx->predicate_ctx[0], tuple)) {
     293        return;
     294    }
     295
     296    mh_table_add_tuple(ctx->ftable, tuple);
     297
     298    if(++ctx->count >= 10) {
     299        tuple->stream->trigger(); /* XXX: Should probably be done upstream */
     300    }
     301}
     302
     303/** Aggregation handler,
     304 * Called when the every clause triggers */
     305void fivenum_trigger_cbk(MeehuaXXX *XXX, MeehuaEvent *event, void *data)
     306{
     307    int i;
     308    MeehuaContext *ctx = data;
     309
     310    /* This should return a MeehuaColumn[] with the "delay" data */
     311    MeehuaColumn incols[] = mh_table_get_columns(ctx->jtable,
     312            mh_component_get_input_names(ctx->aggregate_ctx[0])); /* This should return { "delay", } */
     313
     314    ctx->aggregate[0](ctx->aggregate_ctx[0], incols, ctx->aggregate_ctx[0]->output); /* XXX: Should output always be assumed to be in ctx? */
     315
     316    /* XXX: Should probably be part of another data_ready-like callback */
     317    table_fill_columns(ctx->otable, 0, ctx->aggregate_ctx[0]->output); /* 0 is used as the offset of the first columns to fill */
     318
     319    for(i = 0; i<ctx->n_streams; i++) {
     320        ctx->streams[i]->new_data(ctx->otable);
     321    }
     322}
     323}}}
     324
     325=== Olivier's Afterthought Notes ===
     326After reading parts of [1], I am unsure about two things.
     327Do we really need first level filters implemented differently? They are just upstream SELECT statements, but seem to be overly specified (and limited) here with the predicates; these are really just optimisation.
     328A concrete example of multirow output would be GROUP BY clauses, e.g., getting the average speeds of each vehicle in a window would output one row per vehicle; or should the GROUP BY clause result in row-split output tables separately fed to column aggregation functions limited to one-row output.
     329N. Jain, S. Mishra, A. Srinivasan, J. Gehrke, J. Widom, H. Balakrishnan, U. Çetintemel, M. Cherniack, R. Tibbetts, and S. B. Zdonik, "Towards a streaming SQL standard," Proceedings of the VLDB Endowment, vol. 1, no. 2, pp. 1379-1390, Aug. 2008. [Online]. Available: http://www.vldb.org/pvldb/1/1454179.pdf
     330IMG_20130501_134729.jpg (159 KB) Olivier Mehani, 02/05/2013 06:50 PM
     331
     332Powered by Redmine © 2006-2014 Jean-Philippe Lang