ExpandCollapse

+ 1 Fibres (fthreads)

share/lib/std/control/fibres.flx

  
  Low level management of Felix fthreads (fibres).
  open class Fibres
  {
    private gen _start[t]: (t->0)*t->cont = "$1->clone()->call(0,$2)";
  
    Function to start a continution with argument type t.
    gen start[t] (p:t->0) (x:t) = { return _start (p,x); }
  
    private fun _start0: (1->0)->cont = "$1->clone()->call(0)";
  
    Function to start a contiuation without an argument.
    gen start (p:1->0) = { return _start0 (p); }
  
    Function to make a fibre out of a continuation.
    gen mk_thread: cont->fthread = "new(*PTF gcp,::flx::rtl::_fthread_ptr_map,false) ::flx::rtl::fthread_t($1)";
  
    // Spawn a fibre on this fibres scheduler.
    // uses a supervisor call so can't be used in a function
    proc spawn_fthread(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_spawn_detached fthr;
    }
  
    proc schedule_fthread(p:1->0)
    {
        var con = start p;              // get continuation of p
        var fthr = mk_thread con;
        svc$ svc_schedule_detached fthr;
    }
  
  
    proc chain : cont = "return $1;";
  
    The type of a fibre scheduler.
    type fibre_scheduler = "::flx::run::sync_sched*" requires header '#include "flx_sync.hpp"';
  
    Construct a fibre scheduler.
     NOTE: NOT GARBAGE COLLECTED!
    ctor fibre_scheduler: 1 = """new ::flx::run::sync_sched(false, PTF gcp, 
      new ::std::list<::flx::rtl::fthread_t*>)""";
    ;
    proc delete_fibre_scheduler : fibre_scheduler = """
      if ($1->ft) $1->collector->remove_root($1->ft);
      for(
       ::std::list<::flx::rtl::fthread_t*>::iterator p = $1->active->begin();
       p != $1->active->end();
       p++
      )
      $1->collector->remove_root(*p);
      delete $1->active; delete $1->ft; delete $1;
    """;
  
    Spawn a fibre on a given scheduler with a given continuation.
    proc spawn_fibre: fibre_scheduler * fthread = """
      $1->collector->add_root($2);
      $1->active->push_back($2);
      $1->frun(); 
    """;
  
    proc frun: (1->0) = "::flx::rtl::executil::frun (PTF gcp, $1);" 
      requires header '#include "flx_executil.hpp"'
    ;
  
   
    The type of the stop state of the fibre scheduler.
    terminated: the scheduler is terminated.
    blocked: the scheduler is out of threads to run.
    delegated: the scheduler has been issued a service
     request by a thread which it cannot satisfy.
     The scheduler is put in delegated state and awaits
     for another service to satisfy the request and put
     it back in operation.
      //$ Note: there is no "operating" state because the
    stop state can only be queried by the schedulers caller
    when the scheduler returns control to it.
    enum fibre_scheduler_state {
      terminated, 
      blocked,   
      delegated  
    };
    fun get_state : fibre_scheduler -> fibre_scheduler_state = "$1->fs";
  
  
    Core user procedure for launching a fibre.
    proc spawn_fthread (fs:fibre_scheduler) (p:1->0) { spawn_fibre (fs,p.start.mk_thread); }
  
    Execute a single step of a fibre.
    gen step: cont -> cont = "$1->resume()";
  
    Schedule death of a fibre.
    proc kill: fthread = "$1->cc = 0;";
  
    Run a continuation until it terminates.
    Do not use this proc if the underlying
    procedure attempts to read messages.
    This is a low level primitive, bypassing fthreads.
    proc run: cont = "::flx::rtl::executil::run($1);" requires package "flx_executil";
  
    private proc _send[t]: &cont * t =
    """
    {
      using namespace ::flx::rtl;
      con_t *tmp = *(con_t**)$1.get_data();
      // run target until it reaches a service request (or death)
      while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
        try { tmp=tmp->resume(); }
        catch (con_t *x) { tmp = x; }
      }
      // check it is alive and making the expected service request
      if (!tmp)
        throw flx_exec_failure_t (__FILE__,"send","Send to terminated procedure");
      if (!tmp->p_svc)
        throw flx_exec_failure_t (__FILE__,"send","Send to unready Procedure");
      if (tmp->p_svc->variant != svc_read)
        throw flx_exec_failure_t (__FILE__,"send","Send to Procedure which is not trying to read");
      // store the message
      **(?1**)tmp->p_svc->data= $2;
      // clear the service request
      tmp->p_svc = 0;
      // run the target until the next service request (or death)
      while(tmp && (!tmp->p_svc || tmp->p_svc->variant == svc_yield)) {
        try { tmp=tmp->resume(); }
        catch (con_t *x) { tmp = x; }
      }
      // save the new continuation
      *(con_t**)$1.get_data() = tmp;
  
    }
    """;
  
    Send a message to a continuation.
    There is no type checking on the message type.
    The procedure is executed until
    the next wait_state, then the message is stored.
    Low level primitive, bypassing fthreads.
    proc send[t] (p:&cont) (x:t)
    {
      _send (p,x);
    }
  
  }
  

+ 2 Synchronous Channels

share/lib/std/control/schannels.flx

  
  Sychronous Channels.
  Used to exchange control and possibly data
  between Felix f-threads (aka fibres).
  
  open class Schannel 
  {
    The type of a bidirectional synchronous channel.
    _gc_pointer type schannel[t] = "::flx::rtl::schannel_t*";
  
    The type of an input synchronous channel.
    _gc_pointer type ischannel[t] = "::flx::rtl::schannel_t*";
  
    The type of an output synchronous channel.
    _gc_pointer type oschannel[t] = "::flx::rtl::schannel_t*";
  
    Create a bidirectional synchronous channel.
    gen mk_schannel[t]: 1->schannel[t] =
      "new(*PTF gcp,::flx::rtl::schannel_ptr_map,false) ::flx::rtl::schannel_t(PTF gcp)"
      requires property "needs_gc"
    ;
  
    Model a NULL pointer as an schannel. 
    Necessary for killing off schannels,
    so as to make them unreachable, so the gc can reap them.
    Note: null_schannels are safe.
    gen mk_null_schannel[t]: 1->schannel[t] = "NULL";
  
    Model a NULL pointer as an ischannel. 
    Necessary for killing off schannels,
    so as to make them unreachable, so the gc can reap them.
    gen mk_null_ischannel[t]: 1->ischannel[t] = "NULL";
  
    Model a NULL pointer as an oschannel. 
    Necessary for killing off schannels,
    so as to make them unreachable, so the gc can reap them.
    gen mk_null_oschannel[t]: 1->oschannel[t] = "NULL";
  
    Check if an schannel is NULL.
    fun isNULL[T] :schannel[T] -> bool = "NULL==$1";
  
    Check if an ischannel is NULL.
    fun isNULL[T] :ischannel[T] -> bool = "NULL==$1";
  
    Check if an oschannel is NULL.
    fun isNULL[T] :oschannel[T] -> bool = "NULL==$1";
  
    Safe cast from bidirectional to ouput synchronous channel.
    ctor[t] oschannel[t](x:schannel[t]) => C_hack::cast[oschannel[t]] x;
  
    Safe cast from bidirectional to input synchronous channel.
    ctor[t] ischannel[t](x:schannel[t]) => C_hack::cast[ischannel[t]] x;
  
    Make an input and an output channel out of a bidirectional channel.
    gen mk_ioschannel_pair[t](var ch:schannel[t]) =>
      ischannel[t] ch, oschannel[t] ch
    ;
  
    Construct a connected input and output channel pair.
    gen mk_ioschannel_pair[t]() =>
      mk_ioschannel_pair[t]$ mk_schannel[t] ()
    ;
  
    Read an item from a bidirectional channel.
    inline gen read[T] (chan:schannel[T]) = {
      var loc: &T;
      svc$ svc_sread$ C_hack::cast[_schannel] chan, C_hack::reinterpret[&root::address] (&loc);
      return deref loc;
    }
  
    Read an item from an input channel.
    inline gen read[T] (chan:ischannel[T]) => read$ C_hack::cast[schannel[T]] chan;
  
    Write an item to a bidirectional channel.
    proc write[T] (chan:schannel[T], v:T) {
      var ps = C_hack::cast[root::address]$ new v;
      svc$ svc_swrite$ C_hack::cast[_schannel] chan, &ps;
    }
   
    Multi Write an item to a bidirectional channel.
    proc broadcast[T] (chan:schannel[T], v:T) {
      var ps = C_hack::cast[root::address]$ new v;
      svc$ svc_multi_swrite$ C_hack::cast[_schannel] chan, &ps;
    }
   
    Multi Write an item to an output channel.
    proc write[T] (chan:oschannel[T], v:T) { 
      write (C_hack::cast[schannel[T]] chan, v); 
    }
    proc broadcast[T] (chan:oschannel[T], v:T) { 
      broadcast (C_hack::cast[schannel[T]] chan, v); 
    }
  
    // Very high power though not very efficient conversion
    // from ischannel to iterator.
    // Given i: ischannel[T] you can just write
    // for j in i do .. done
    gen iterator[T] (i:ischannel[T]) () : opt[T] = {
    next:>
      var y = None[T];
      frun { var x = read i; y = Some x; };
      match y do
      | Some _ => yield y; goto next;
      | None => return y;
      done
    }
  }
  

+ 3 Synchronous multiplexor

The following device acts like a select, that is, the reader get all the input data, but the order is indeterminate.

[Not clear how this is useful .. ]

share/lib/std/control/mux.flx

  
  Schannel multiplexor.
  Read multiple input schannels, write to an output schannel.
  open class Multiplexor
  {
    Schannel copy.
    noinline proc copy[T] (i:ischannel[T],o:oschannel[T]) () 
    {
      while true do 
        var x = read i;
        write (o,x);
      done
    }
  
    Schannel multiplexor based on iterator argument.
    Accepts stream of input schannels.
    Writes to output schannel.
    proc mux[T] (inp:1->opt[ischannel[T]], out:oschannel[T]) ()
    {
      for i in inp do 
        spawn_fthread$ copy(i,out); 
      done 
    }
  
  
    Schannel multiplexor based on streamable data structure.
    Creates stream of input schannels.
    Writes to output schannel.
    fun mux[C,T with Streamable[C,ischannel[T]]] (a:C, out:oschannel[T]) =>
      mux (iterator a, out)
    ;
  }

+ 4 Schannel and Pipe syntax

Special syntax for both pipes and also abbreviation for schannel types.

share/lib/std/control/spipeexpr.fsyn

syntax spipeexpr 
{
  //$ Left assoc, for schannel pipes.
  x[ssetunion_pri] := x[ssetunion_pri] "|->" x[>ssetunion_pri] =># "(infix 'pipe)"; 

  //$ Right assoc, for schannel pipes transformers
  x[ssetunion_pri] := x[>ssetunion_pri] "=>" x[ssetunion_pri] =># "(infix 'trans_type)"; 

  //$ Non associative, streaming data structure into transducer.
  x[ssetunion_pri] := x[>ssetunion_pri] ">->" x[>ssetunion_pri] =># "(infix 'xpipe)"; 

  //$ input schannel type %<T
  x[sprefixed_pri] := "%<" x[spower_pri] =># '`(ast_name ,_sr "ischannel" (,_2))';

  //$ output schannel type %>T
  x[sprefixed_pri] := "%>" x[spower_pri] =># '`(ast_name ,_sr "oschannel" (,_2))';

  //$ input/output schannel type %<>T
  x[sprefixed_pri] := "%<>" x[spower_pri] =># '`(ast_name ,_sr "ioschannel" (,_2))';

}



+ 5 Synchronous Pipeline Concepts

Felix provides synchronous fibres (fthreads) and channels (schannels) which can be used to construct active circuits. The simplest case of these circuits is a pipeline.

Let us open our class and begin.

share/lib/std/control/spipes.flx

  Synchronous Pipes.
  open class Spipe {
  

+ 6 Sources and Sinks.

+ 6.1 An infinite Source.

A source is an thread which writes output down a channel. We will wrap a generator up to create a source.

share/lib/std/control/spipes.flx

    Send an stream down a channel.
    proc source_from_stream[S] (var it:1 -> S) (out:%>S) =>
      while true call write (out,#it)
    ;
  

+ 6.2 A finite Source.

Here, we use an iterator returning an stream of optional values to create a finite source. If there is Some value we return the value otherwise we just terminate.

share/lib/std/control/spipes.flx

    source from an iterator
    proc source_from_iterator[S] (var it:1 -> opt[S]) (out:%>S) =>
      match v in it call write$ out,v
    ;
  

+ 6.3 An infinite Sink.

A sink is a procedure which reads from a channel. We will wrap up an ordinary procedure into a sink.

share/lib/std/control/spipes.flx

    proc sink_from_procedure[T] (var p:T -> 0) (inp:%<T) => 
      while true call p (read inp)
    ;
  

+ 6.4 A basic pipeline

Now we need to connect our source and sink. The pipe operator below takes a pair of arguments, the sink procedure comes first, then the source. The function has an infix ascii art operator which can be used conveniently: |->.

share/lib/std/control/spipes.flx

    Wire a source component to a sink.
    Return coupled fibre ready to run.
    fun pipe[T] 
      (var w: %>T -> 0, // source
      var r: %<T -> 0)  // sink
    :
      1 -> 0
    => 
      {
        var chi,cho = mk_ioschannel_pair[T] ();
        spawn_fthread { (w cho); };
        spawn_fthread { (r chi); };
      }
    ;

It is important to note this function does nothing but return a procedure closure, it doesn't actually start anything running. It creates a single synchronous channel with an input and output endpoint shared by the source and sink. The closure is essential to hide these variables to ensure the source and sink suicide together when they become unreachable.

+ 6.5 A simple example

Now we will show a simple example.

test/regress/rt/spipesex-01.flx

  include "std/control/spipes";
  
  begin
    var src = (1,2,3,4,5).iterator.source_from_iterator;
    var snk = println[int].sink_from_procedure;
  
    #(src |-> snk);
  end

test/regress/rt/spipesex-01.expect

1
2
3
4
5

+ 7 Transducers.

A transducer is a procedure which reads from one channel and writes to another. The most basic transducers can be created from a function.

share/lib/std/control/spipes.flx

    proc transducer_from_function[O,I] (f:I->O) 
      (var r: %<I,
      var w: %>O)
    =>
      while true call write$ w, r.read.f;
    ;

+ 7.1 Wire transducer to sink.

In order to use a transducer we have to be able to wire it to other components. First, wire one into a sink, producing another sink.

share/lib/std/control/spipes.flx

    Wire a transducer into a sink.
    Return a sink.
    fun pipe[T,U]
      (var a: %<T * %>U -> 0,
      var b: %<U -> 0)
    :
      %<T  -> 0 
    => 
      proc (inp:%<T)
      {
        var chi,cho = mk_ioschannel_pair[U] ();
        spawn_fthread { a (inp, cho); };
        spawn_fthread { b (chi); };
      }
    ;

+ 7.2 Wire transducer to source.

Alternately we can wire a transducer into a source, producing another source.

share/lib/std/control/spipes.flx

    Wire a source component to a transducer.
    Return source.
    fun pipe[T,U]
      (var w: %>T -> 0,
      var t: %<T * %>U -> 0)
    :
      %>U -> 0 
    => 
      proc (out:%>U)
      {
        var chi,cho = mk_ioschannel_pair[T] ();
        spawn_fthread { (w cho); };
        spawn_fthread { (t (chi, out)); };
      }
    ;

+ 7.3 Wire transducer to transducer.

Also we would like to be able to wire two transducers together, producing another transducer.

share/lib/std/control/spipes.flx

    Wire a transducer into a transducer.
    Return another transducer.
    fun pipe[T,U,V]
      (var a: %<T * %>U -> 0,
      var b: %<U * %>V -> 0)
    :
      %<T * %>V -> 0 
    => 
      proc (inp:%<T, out:%>V)
      {
        var chi,cho = mk_ioschannel_pair[U] ();
        spawn_fthread { a (inp, cho); };
        spawn_fthread { b (chi, out); };
      }
    ;

+ 7.4 A simple example

Here is an example:

test/regress/rt/spipesex-02.flx

  include "std/control/spipes";
  begin
    var src = (1,2,3,4,5).iterator.source_from_iterator;
    var snk = println[int].sink_from_procedure;
    var inc = transducer_from_function (fun (x:int)=> x + 1);
    var sqr = transducer_from_function (fun (x:int) => x * x);
    println$ "--";
    #(src |-> inc |-> sqr |-> snk);
    println$ "--";
    src = (1,2,3,4,5).iterator.source_from_iterator;
    #((src |-> inc) |-> sqr |-> snk);
    println$ "--";
    src = (1,2,3,4,5).iterator.source_from_iterator;
    #(src |-> (inc |-> sqr) |-> snk);
    println$ "--";
    src = (1,2,3,4,5).iterator.source_from_iterator;
    #(src |-> inc |-> (sqr |-> snk));
    println$ "--";
  end

Note the vital feature: the pipe operator is associative!

test/regress/rt/spipesex-02.expect

--
4
9
16
25
36
--
4
9
16
25
36
--
4
9
16
25
36
--
4
9
16
25
36
--

+ 8 A more direct overload.

We can make the pipe operator provide a more direct overloads. Note carefully the danger of these impure operators: specialisation rules will generally select the right one, unless your generator, function, or procedure happens to want to send schannels themselves down schannels!

share/lib/std/control/spipes.flx

    // source to sink: autobuild both arguments
    fun pipe[T] (w:1->T,r:T->0) => 
       w.source_from_stream |-> r.sink_from_procedure;
  
    fun pipe[T] (w:1->opt[T],r:T->0) => 
       w.source_from_iterator |-> r.sink_from_procedure;
  
    // source to sink: autobuild only one argument
    fun pipe[T] (w:1->T,r:%<T->0) => 
       w.source_from_stream |-> r;
  
    fun pipe[T] (w:1->opt[T],r:%<T->0) => 
       w.source_from_iterator |-> r;
  
    fun pipe[T] (w:%>T->0,r:T->0) => 
       w |-> r.sink_from_procedure;
  
    // source to transducer: autobuild both arguments
    fun pipe[T0,T1] (w:1->T0,f:T0->T1) => 
       w.source_from_stream |-> f.transducer_from_function;
  
    fun pipe[T0,T1] (w:1->opt[T0],f:T0->T1) => 
       w.source_from_iterator |-> f.transducer_from_function;
  
    // source to transducer: autobuild one argument
    fun pipe[T0,T1] (w:%>T0->0,f:T0->T1) => 
       w |-> f.transducer_from_function;
  
    fun pipe[T0,T1] (w:1->T0,f:%<T0*%>T1->0) => 
       w.source_from_stream |-> f;
  
    fun pipe[T0,T1] (w:1->opt[T0],f:%<T0 *%>T1->0) => 
       w.source_from_iterator |-> f;
  
    // transducer to transducer: autobuild both arguments
    fun pipe[T0,T1,T2] (f1:T0->T1,f2:T1->T2) => 
      f1.transducer_from_function |-> f2.transducer_from_function;
  
    // transducer to transducer: autobuild one argument
    fun pipe[T0,T1,T2] (f1:T0->T1,f2:%<T1 * %>T2 ->0) => 
      f1.transducer_from_function |-> f2;
  
    fun pipe[T0,T1,T2] (f1:%<T0 * %>T1->0,f2:T1->T2) => 
      f1 |-> f2.transducer_from_function;
  
    // transducer to sink: autobuild both arguments
    fun pipe[T0,T1] (f:T0->T1, r:T1->0) =>
      f.transducer_from_function |-> r.sink_from_procedure;
  
    // transducer to sink: autobuild one argument
    fun pipe[T0,T1] (f:T0->T1, r:%<T1->0) =>
      f.transducer_from_function |-> r;
  
    fun pipe[T0,T1] (f:%<T0 * %>T1->0, r:T1->0) =>
      f |-> r.sink_from_procedure;
  

+ 8.1 A simple example

Here is an example:

test/regress/rt/spipesex-02a.flx

  include "std/control/spipes";
  begin
    #((1,2,3,4,5).iterator |-> (fun (x:int) => x + 1) |-> (fun (x:int) => x * x) |-> println[int]);
  end

Note the vital feature: the pipe operator is associative!

test/regress/rt/spipesex-02a.expect

4
9
16
25
36

+ 9 Raw Devices.

So far we have introduced a simple method for creating three kinds of device, with a simple form of operation, and a method for connecting them in a pipeline.

You may wonder what happens if a device in a pipeline simply returns. The answer is simple. Nothing happens. The pipeline eventually collapses. Control resumes after the statement constructing the pipeline.

In fact we have already seen this behaviour when we connected a finite source at the head of a pipeline.

Let us look now at a more complex system. To do this we need to construct raw devices. We will make a simple tokeniser. First our source emits characters:

test/regress/rt/spipesex-03.flx

  include "std/control/spipes";
  proc source(w:%>char)
  {
    var s = "The quick brown fox jumped over the lazy dog";
    for ch in s call write (w,ch);
    write (w,char 0);
  }

Notice this source simply returns when there is no data left, after emitting a terminating nul character.

Now we will do the sink which prints the tokens:

test/regress/rt/spipesex-03.flx

  proc sink (r:%<string)
  {
    while true call println$ read r;
  }

This is an infinite loop, but it will stall when it can't read any more tokens. Obviously we want the output to be:

test/regress/rt/spipesex-03.expect

The
quick
brown
fox
jumped
over
the
lazy
dog

Finally we will write the tokeniser:

test/regress/rt/spipesex-03.flx

  proc tokenise (i:%<char, o:%>string)
  {
    var s = "";
  skipwhite:>
    var ch = read i;
    if ch == ' ' goto skipwhite;
  gather:>
    if ch == char 0  goto fin;
    s += string ch;
    ch = read i;
    if ch != ' ' goto gather;
    write (o,s);
    s = "";
    if ch != char 0 goto skipwhite;
  fin:>
    if s != "" call write (o,s);
  }

What is important to observe here is that there are multiple reads on the input, for each output. Finally the pipeline:

test/regress/rt/spipesex-03.flx

  #(source |-> tokenise |-> sink);

+ 10 Bi-connectors

We will now provide some ways to make slightly more complex circuits. Bi-connectors work with pairs of channels.

+ 10.1 Wire two sources to two sinks.

The most basic pipeline.

share/lib/std/control/spipes.flx

   
    Wire two sources to two sinks.
    Return a bi-pipeline!
    fun pipe[T0,T1]
      (
        var w: 
          (%>T0 -> 0) * 
          (%>T1 -> 0),
        var r:
          (%<T0 -> 0) * 
          (%<T1 -> 0)
      )
    :
      1 -> 0
    =>
      {
         pipe (w.0,r.0) ();
         pipe (w.1, r.1) ();
      }
    ;
  
  

+ 10.2 Wire two sources to two transducers.

Construct a pair of sources from a pair of sources and a pair of transducers.

share/lib/std/control/spipes.flx

    Wire two source components to two transducers.
    Return two sources.
    fun pipe[T0,T1,U0,U1]
      (
        var w: 
          (%>T0 -> 0) *
          (%>T1 -> 0),
        var t: 
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0)
      )
    :
      (%>U0 -> 0) *
      (%>U1 -> 0)
    => 
      pipe (w.0,t.0),
      pipe (w.1,t.1)
    ;
  

+ 10.3 Wire two transducers to two sinks.

Returns a pair of sinks.

share/lib/std/control/spipes.flx

    Wire two transducers into two sinks.
    Return two sinks.
    fun pipe[T0,T1,U0,U1]
      (
        var a: 
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0),
        var b: 
           (%<U0 -> 0) *
           (%<U1 -> 0)
      )
    :
      (%<T0  -> 0)  *
      (%<T1  -> 0) 
    => 
        pipe (a.0, b.0),
        pipe (a.1, b.1)
    ;
  
  

+ 10.4 Wire two transducers to two transducers.

Returns a pair of transducers.

share/lib/std/control/spipes.flx

  
    Wire two transducers into two transducers.
    Return two transducers.
    fun pipe[T0,T1,U0,U1,V0,V1]
      (
        var a: 
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0),
        var b: 
          (%<U0 * %>V0 -> 0) * 
          (%<U1 * %>V1 -> 0) 
      )
    :
      (%<T0 * %>V0 -> 0) *
      (%<T1 * %>V1 -> 0)
    => 
       pipe (a.0, b.0),
       pipe (a.1, b.1)
    ;

+ 10.5 A simple example.

test/regress/rt/spipesex-04.flx

  include "std/control/spipes";
  
  // sources
  var src1 = (1,2,3,4,5).iterator.source_from_iterator;
  var src2 = (1,2,3,4,5).iterator.source_from_iterator;
  
  // transducers
  var inc = transducer_from_function (fun (x:int)=> x + 1);
  var sqr = transducer_from_function (fun (x:int) => x * x);
  var str1 = Str::str[int].transducer_from_function;
  
  // sinks
  // we will save the results and print them because
  // the order of accepting the data between the
  // two sinks is indeterminate
  var result1 = list[string]();
  var result2 = list[string]();
  proc set1(s:string) => result1 = s ! result1;
  proc set2(s:string) => result2 = s ! result2;
  var snk1 = set1.sink_from_procedure;
  var snk2 = set2.sink_from_procedure;
  
  // pipeline
  #((src1,src2)  |-> (inc,sqr) |-> (str1,str1) |-> (snk1,snk2));
  
  // show results
  println$ result1;
  println$ result2;

test/regress/rt/spipesex-04.expect

list('6', '5', '4', '3', '2')
list('25', '16', '9', '4', '1')

+ 11 Multi-connectors

share/lib/std/control/spipes.flx

   
    Utility for tee connector: array of outputs
    proc run_tee[T,N] (i:%<T, oa:(%>T)^N)
    {
    again:>
      var d = read i;
      for o in oa do
        write (o, d);
      done
      goto again;
    }
  
  
    Wire together a source component with an array of sinks.
    Returns a closed pipeline.
    fun pipe[T,N] 
    (
      var w: %>T -> 0,
      var r: (%<T -> 0) ^ N
    )
      : 1 -> 0
    =>
      {
        var i,o = #mk_ioschannel_pair[T];
        spawn_fthread { w o; };
        var cho : (%>T) ^N;
        for var j in 0uz upto r.len - 1uz do
          var inp,out = #mk_ioschannel_pair[T];
          &cho.j <- out;
          noinline proc exec (var k:size, chi:%<T) () { r.k chi; }
          spawn_fthread$ exec (j,inp);
        done
        spawn_fthread { run_tee (i,cho); };
      }
    ;
  
  
    Wire together a source component with two transducers
    Returns two sources.
    //
    // WARNING: I do not know why this one works.
    // It actually starts the threads when called.
    // This is wrong, but I see no alternative.
    //
    // Also, it's a function starting those threads!
    // Functions cannot run threads at the moment.
    // Unless they manually construct a scheduler.
  
    fun pipe[T,U0,U1] 
    (
      var w: %>T -> 0,
      var r: (%<T * %>U0 -> 0) * (%<T * %>U1 -> 0)
    )
    :
      (%>U0 -> 0) * (%>U1 -> 0)
    =
    {
        var i,o = #mk_ioschannel_pair[T];
        var chi0,cho0 = #mk_ioschannel_pair[T];
        var chi1,cho1 = #mk_ioschannel_pair[T];
        spawn_fthread { run_tee (i,(cho0, cho1)); };
        spawn_fthread { w o; };
        return
          proc (out:%>U0) { r.0 (chi0,out); },
          proc (out:%>U1) { r.1 (chi1,out); }
        ;
    }
  
  
  
    Stream sort using intermediate darray.
    Requires stream of option type.
    proc sort[T with Tord[T]] (r: %<opt[T], w: %>opt[T])
    {
       var x = darray[T]();
       acquire:while true do
         match read r with
         | Some v => x+=v;
         | #None => break acquire;
         endmatch;
       done
       x.sort;
       for v in x do
         write (w, Some v);
       done
       write (w,None[T]);
    }
  
  } // end class
  

test/regress/rt/spipesex-05.flx

  include "std/control/schannels";
  include "std/control/spipes";
  
  // Pipe test.
  proc source (cho:%>int) { 
    for var i in 0 upto 9 do write (cho,i); done 
  }
  
  proc sink (chi:%<int) { 
    while true do var x= read chi; println x; done 
  }
  
  proc xduce(chi: %<int, cho: %>int) {
    write (cho, 99);
    while true do var x = read chi; write (cho, x); write (cho, 2 * x); done
  }
  
  spawn_fthread$  source |-> xduce |-> sink;

test/regress/rt/spipesex-05.expect

99
0
0
1
2
2
4
3
6
4
8
5
10
6
12
7
14
8
16
9
18