#line 105 "/home/travis/build/felix-lang/felix/src/packages/pthreads.fdoc"
  
  Pchannels are unbuffered synchronisation points
  for pre-emptive threads.
  //$ Similarly to schannels, paired reader-writer pthreads
  cannot proceed until both parties agree data exchange is complete.
  Unlike schannels, both reader and writer can subsequently
  continue concurrently after the exchange.
  
  open class Pchannel
  {
    requires package "flx_pthread";
  
    Pre-emptive thread channels (monitor).
    type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    Pre-emptive thread input channel.
    type ipchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    Pre-emptive thread output channel.
    type opchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
  
    Make bidirectional pchannel.
    fun mk_pchannel[t]: 1->pchannel[t] = "new flx::pthread::monitor_t(PTF gcp->collector->get_thread_control())";
  
    Safe cast from bidirectional to output pchannel.
    ctor[t] opchannel[t](x:pchannel[t]) => C_hack::cast[opchannel[t]] x;
    Safe cast from bidirectional to input pchannel.
    ctor[t] ipchannel[t](x:pchannel[t]) => C_hack::cast[ipchannel[t]] x;
  
    Make an input and an output pchannel out of a bidirectional channel.
    fun mk_iopchannel_pair[t](var ch:pchannel[t]) =>
      ipchannel[t] ch, opchannel[t] ch
    ;
  
    Construct a connected input and output pchannel pair.
    fun mk_iopchannel_pair[t]() =>
      mk_iopchannel_pair[t]$ mk_pchannel[t] ()
    ;
  
  
    // NOTE: read/write on pchannels uses suspend/resume
    // to tell any pending collector it is safe to proceed
    // whilst it is doing the I/O (which may block),
    // to block returning from the I/O during a collection
    // AND, if the I/O completed before the collection got
    // going, to yield at this point.
  
    Read from a pchannel.
    proc _read[t]: pchannel[t] * &&t = """
      {
      //fprintf(stderr,"READ:DQ\\n");
      *$2 = (?1*)($1->dequeue());
      PTF gcp->collector->remove_root(*$2);
      //fprintf(stderr,"DONE READ:DQ\\n");
      }
    """ requires property "needs_ptf";
  
    Write to a pchannel.
    noinline gen read[t] (chan:pchannel[t]) = {
      var p : &t;
      _read (chan,  &p);
      return *p;
    }
    gen read[t] (chan:ipchannel[t]) => read$ C_hack::cast[pchannel[t]] chan;
  
    proc _write[t]: pchannel[t] * &t = """
      {
      //fprintf(stderr,"WRITE:NQ\\n");
      PTF gcp->collector->add_root($2);
      $1->enqueue((void*)$2);
      //fprintf(stderr,"DONE WRITE:NQ\\n");
      }
    """ requires property "needs_ptf";
  
    noinline proc write[t](chan:pchannel[t], v:t) {
      var ps = new v;
      _write (chan,ps);
    }
    proc write[t] (chan:opchannel[t], v:t) { write$ C_hack::cast[pchannel[t]] chan,v; }
  }