#line 351 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
  Synchronous Pipes.
  open class Spipe {
  
  #line 359 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Send an stream down a channel.
    proc source_from_stream[S] (var it:1 -> S) (out:%>S) =>
      while true call write (out,#it)
    ;
  
  #line 369 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    source from an iterator
    proc source_from_iterator[S] (var it:1 -> opt[S]) (out:%>S) =>
      match v in it call write$ out,v
    ;
  
  #line 378 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    proc sink_from_procedure[T] (var p:T -> 0) (inp:%<T) =>
      while true call p (read inp)
    ;
  
  #line 390 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire a source component to a sink.
    Return coupled fibre ready to run.
    fun pipe[T]
      (var w: %>T -> 0, // source
      var r: %<T -> 0)  // sink
    :
      1 -> 0
    =>
      {
        var chi,cho = mk_ioschannel_pair[T] ();
        spawn_fthread { (w cho); };
        spawn_fthread { (r chi); };
      }
    ;
  #line 439 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    proc transducer_from_function[O,I] (f:I->O)
      (var r: %<I,
      var w: %>O)
    =>
      while true call write$ w, r.read.f;
    ;
  #line 452 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire a transducer into a sink.
    Return a sink.
    fun pipe[T,U]
      (var a: %<T * %>U -> 0,
      var b: %<U -> 0)
    :
      %<T  -> 0
    =>
      proc (inp:%<T)
      {
        var chi,cho = mk_ioschannel_pair[U] ();
        spawn_fthread { a (inp, cho); };
        spawn_fthread { b (chi); };
      }
    ;
  #line 473 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire a source component to a transducer.
    Return source.
    fun pipe[T,U]
      (var w: %>T -> 0,
      var t: %<T * %>U -> 0)
    :
      %>U -> 0
    =>
      proc (out:%>U)
      {
        var chi,cho = mk_ioschannel_pair[T] ();
        spawn_fthread { (w cho); };
        spawn_fthread { (t (chi, out)); };
      }
    ;
  #line 493 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire a transducer into a transducer.
    Return another transducer.
    fun pipe[T,U,V]
      (var a: %<T * %>U -> 0,
      var b: %<U * %>V -> 0)
    :
      %<T * %>V -> 0
    =>
      proc (inp:%<T, out:%>V)
      {
        var chi,cho = mk_ioschannel_pair[U] ();
        spawn_fthread { a (inp, cho); };
        spawn_fthread { b (chi, out); };
      }
    ;
  #line 570 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    // source to sink: autobuild both arguments
    fun pipe[T] (w:1->T,r:T->0) =>
       w.source_from_stream |-> r.sink_from_procedure;
  
    fun pipe[T] (w:1->opt[T],r:T->0) =>
       w.source_from_iterator |-> r.sink_from_procedure;
  
    // source to sink: autobuild only one argument
    fun pipe[T] (w:1->T,r:%<T->0) =>
       w.source_from_stream |-> r;
  
    fun pipe[T] (w:1->opt[T],r:%<T->0) =>
       w.source_from_iterator |-> r;
  
    fun pipe[T] (w:%>T->0,r:T->0) =>
       w |-> r.sink_from_procedure;
  
    // source to transducer: autobuild both arguments
    fun pipe[T0,T1] (w:1->T0,f:T0->T1) =>
       w.source_from_stream |-> f.transducer_from_function;
  
    fun pipe[T0,T1] (w:1->opt[T0],f:T0->T1) =>
       w.source_from_iterator |-> f.transducer_from_function;
  
    // source to transducer: autobuild one argument
    fun pipe[T0,T1] (w:%>T0->0,f:T0->T1) =>
       w |-> f.transducer_from_function;
  
    fun pipe[T0,T1] (w:1->T0,f:%<T0*%>T1->0) =>
       w.source_from_stream |-> f;
  
    fun pipe[T0,T1] (w:1->opt[T0],f:%<T0 *%>T1->0) =>
       w.source_from_iterator |-> f;
  
    // transducer to transducer: autobuild both arguments
    fun pipe[T0,T1,T2] (f1:T0->T1,f2:T1->T2) =>
      f1.transducer_from_function |-> f2.transducer_from_function;
  
    // transducer to transducer: autobuild one argument
    fun pipe[T0,T1,T2] (f1:T0->T1,f2:%<T1 * %>T2 ->0) =>
      f1.transducer_from_function |-> f2;
  
    fun pipe[T0,T1,T2] (f1:%<T0 * %>T1->0,f2:T1->T2) =>
      f1 |-> f2.transducer_from_function;
  
    // transducer to sink: autobuild both arguments
    fun pipe[T0,T1] (f:T0->T1, r:T1->0) =>
      f.transducer_from_function |-> r.sink_from_procedure;
  
    // transducer to sink: autobuild one argument
    fun pipe[T0,T1] (f:T0->T1, r:%<T1->0) =>
      f.transducer_from_function |-> r;
  
    fun pipe[T0,T1] (f:%<T0 * %>T1->0, r:T1->0) =>
      f |-> r.sink_from_procedure;
  
  #line 731 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
  
    Wire two sources to two sinks.
    Return a bi-pipeline!
    fun pipe[T0,T1]
      (
        var w:
          (%>T0 -> 0) *
          (%>T1 -> 0),
        var r:
          (%<T0 -> 0) *
          (%<T1 -> 0)
      )
    :
      1 -> 0
    =>
      {
         pipe (w.0,r.0) ();
         pipe (w.1, r.1) ();
      }
    ;
  
  
  #line 758 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire two source components to two transducers.
    Return two sources.
    fun pipe[T0,T1,U0,U1]
      (
        var w:
          (%>T0 -> 0) *
          (%>T1 -> 0),
        var t:
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0)
      )
    :
      (%>U0 -> 0) *
      (%>U1 -> 0)
    =>
      pipe (w.0,t.0),
      pipe (w.1,t.1)
    ;
  
  #line 781 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
    Wire two transducers into two sinks.
    Return two sinks.
    fun pipe[T0,T1,U0,U1]
      (
        var a:
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0),
        var b:
           (%<U0 -> 0) *
           (%<U1 -> 0)
      )
    :
      (%<T0  -> 0)  *
      (%<T1  -> 0)
    =>
        pipe (a.0, b.0),
        pipe (a.1, b.1)
    ;
  
  
  #line 805 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
  
    Wire two transducers into two transducers.
    Return two transducers.
    fun pipe[T0,T1,U0,U1,V0,V1]
      (
        var a:
          (%<T0 * %>U0 -> 0) *
          (%<T1 * %>U1 -> 0),
        var b:
          (%<U0 * %>V0 -> 0) *
          (%<U1 * %>V1 -> 0)
      )
    :
      (%<T0 * %>V0 -> 0) *
      (%<T1 * %>V1 -> 0)
    =>
       pipe (a.0, b.0),
       pipe (a.1, b.1)
    ;
  #line 865 "/home/travis/build/felix-lang/felix/src/packages/fibres.fdoc"
  
    Utility for tee connector: array of outputs
    proc run_tee[T,N] (i:%<T, oa:(%>T)^N)
    {
    again:>
      var d = read i;
      for o in oa do
        write (o, d);
      done
      goto again;
    }
  
  
    Wire together a source component with an array of sinks.
    Returns a closed pipeline.
    fun pipe[T,N]
    (
      var w: %>T -> 0,
      var r: (%<T -> 0) ^ N
    )
      : 1 -> 0
    =>
      {
        var i,o = #mk_ioschannel_pair[T];
        spawn_fthread { w o; };
        var cho : (%>T) ^N;
        for var j in 0uz upto r.len - 1uz do
          var inp,out = #mk_ioschannel_pair[T];
          &cho.j <- out;
          noinline proc exec (var k:size, chi:%<T) () { r.k chi; }
          spawn_fthread$ exec (j,inp);
        done
        spawn_fthread { run_tee (i,cho); };
      }
    ;
  
  
    Wire together a source component with two transducers
    Returns two sources.
    //
    // WARNING: I do not know why this one works.
    // It actually starts the threads when called.
    // This is wrong, but I see no alternative.
    //
    // Also, it's a function starting those threads!
    // Functions cannot run threads at the moment.
    // Unless they manually construct a scheduler.
  
    fun pipe[T,U0,U1]
    (
      var w: %>T -> 0,
      var r: (%<T * %>U0 -> 0) * (%<T * %>U1 -> 0)
    )
    :
      (%>U0 -> 0) * (%>U1 -> 0)
    =
    {
        var i,o = #mk_ioschannel_pair[T];
        var chi0,cho0 = #mk_ioschannel_pair[T];
        var chi1,cho1 = #mk_ioschannel_pair[T];
        spawn_fthread { run_tee (i,(cho0, cho1)); };
        spawn_fthread { w o; };
        return
          proc (out:%>U0) { r.0 (chi0,out); },
          proc (out:%>U1) { r.1 (chi1,out); }
        ;
    }
  
  
  
    Stream sort using intermediate darray.
    Requires stream of option type.
    proc sort[T with Tord[T]] (r: %<opt[T], w: %>opt[T])
    {
       var x = darray[T]();
       acquire:while true do
         match read r with
         | Some v => x+=v;
         | #None => break acquire;
         endmatch;
       done
       x.sort;
       for v in x do
         write (w, Some v);
       done
       write (w,None[T]);
    }
  
  } // end class