#line 187 "/home/travis/build/felix-lang/felix/src/packages/pthreads.fdoc"
  
  Asynchronous Synchronous Pipe.
  Used to link pthreads.
  open class Ppipe {
  
    Send an stream down a channel.
    proc psource[T] (var it:1 -> T) (out:opchannel[T])
    {
      while true do write (out,#it); done
    }
  
    isrc converts a streamable data structure
    such as an array into a source.
    proc pisrc[V,T with Streamable[T,V]] (dat:T) (out:opchannel[opt[V]])
    {
      psource[opt[V]] (dat.iterator) out;
    }
  
  
    Wire a source component to a sink.
    Return coupled fibre ready to run.
    fun pipe[T]
      (w: opchannel[T] -> 0,
      r: ipchannel[T] -> 0)
    :
      1 -> 0
    =>
      {
        var chi,cho = mk_iopchannel_pair[T] ();
        spawn_pthread { (w cho); };
        spawn_pthread { (r chi); };
      }
    ;
  
    Wire a source component to a transducer.
    Return source.
    fun pipe[T,U]
      (w: opchannel[T] -> 0,
      t: ipchannel[T] * opchannel[U] -> 0)
    :
      opchannel[U] -> 0
    =>
      proc (out:opchannel[U])
      {
        var chi,cho = mk_iopchannel_pair[T] ();
        spawn_pthread { (w cho); };
        spawn_pthread { (t (chi, out)); };
      }
    ;
  
    xpipe connects a streamable data structure
    such as an array directly into a transducer.
    fun xpipe[V,T,U with Streamable[T,V]]
      (
        a:T,
        t: ipchannel[opt[V]] * opchannel[U] -> 0
      )
      : opchannel[U] -> 0 =>
      pipe (a.pisrc[V],t)
    ;
  
  
    Wire a transducer into a transducer.
    Return another transducer.
    fun pipe[T,U,V]
      (a: ipchannel[T] * opchannel[U] -> 0,
      b: ipchannel[U] * opchannel[V] -> 0)
    :
      ipchannel[T] * opchannel[V] -> 0
    =>
      proc (inp:ipchannel[T], out:opchannel[V])
      {
        var chi,cho = mk_iopchannel_pair[U] ();
        spawn_pthread { a (inp, cho); };
        spawn_pthread { b (chi, out); };
      }
    ;
  
    Wire a transducer into a sink.
    Return a sink.
    fun pipe[T,U]
      (a: ipchannel[T] * opchannel[U] -> 0,
      b: ipchannel[U] -> 0)
    :
      ipchannel[T]  -> 0
    =>
      proc (inp:ipchannel[T])
      {
        var chi,cho = mk_iopchannel_pair[U] ();
        spawn_pthread { a (inp, cho); };
        spawn_pthread { b (chi); };
      }
    ;
  
  
    Stream sort using intermediate darray.
    Requires stream of option type.
    proc sort[T with Tord[T]] (r: ipchannel[opt[T]], w: opchannel[opt[T]])
    {
       var x = darray[T]();
       acquire:while true do
         match read r with
         | Some v => x+=v;
         | #None => break acquire;
         endmatch;
       done
       x.sort;
       for v in x do
         write (w, Some v);
       done
       write (w,None[T]);
    }
  }