ExpandCollapsePrev Next Index

+ 2.1 Pthreads and Channels

Felix also supports pre-emptive threads with channels. The syntax is similar to that used for fibres and channels:

pthreads_01_data.data

Some test data.
For pthreads_01.fdoc.

  include "std/pthread/pchannels";
   
  proc client(x:ipchannel[opt[string]]) {
    var finished = false;
    while not finished do
      match read x with
      | Some line => print line;
      | #None => finished = true;
      endmatch;
    done
  }
  
  proc server(x:opchannel[opt[string]]) {
    var directory = System::argv 1;
    var filename = Filename::join (directory,"pthreads_01_data.data");
    f := fopen_input_text filename;
    var line = readln f;
    while line != "" do
      write$ x,Some line;
      line = readln f;
    done;
    fclose f;
    write$ x, None[string];
  }
  
  proc launch() {
    inp,out:= mk_iopchannel_pair[opt[string]]();
    spawn_pthread { client(inp); };
    spawn_pthread { server (out); };
  }
  
  launch;

Some test data.
For pthreads_01.fdoc.

+ 2.1.1 Broadcast example

Here is the broadcast code using pthreads:

  gen broadcaster() : 
    opchannel[opt[opchannel[string]]] * // subscription channel
    opchannel[string]                   // news reporting channel
  =
  {
    // -------------------------------------------------------------
    // SUBSCRIPTIONS
    // -------------------------------------------------------------
  
    // subscriber list
    var clients = darray[opchannel[string]] ();
  
    // subscription channel
    iregistry, oregistry := mk_iopchannel_pair[opt[opchannel[string]]]();
  
    // accept registrations until end of broadcast
    // a subscription of None signals end of broadcast
    spawn_pthread {
      var end_of_news = false;
      while not end_of_news do
        match read iregistry with
        | #None => end_of_news = true;
        | Some chan => 
          push_back (clients, chan);
          println "Got a subscriber";
        endmatch;
      done
      println "Finished accepting subscribers";
    };
   
    // -------------------------------------------------------------
    // RELAY THE NEWS
    // -------------------------------------------------------------
  
    // news reading channel
    inews, onews := mk_iopchannel_pair[string]();
  
    // send news to all clients
    spawn_pthread {
      var news_line = read inews;
      while true do 
        if len clients > 0uz do
          for var i in 0uz upto len clients - 1uz do
            write$ clients.i, news_line;
          done
        done
        if news_line == "END OF NEWS\n" do 
          println "Finished news relay"; 
          return; 
        done
        news_line = read inews;
      done
    };
  
    // return the channel for subscribing to news reports,
    // and the channel for making news reports
    return oregistry, onews;
  }
  
  // Create the broadcast station
  subscribe, news := broadcaster();
  
  // create a template for bored train commuters who want to read the news
  proc commuter (i:int) {
    ichan, ochan := mk_iopchannel_pair[string]();
    write$ subscribe, Some ochan; //subscribe to news
    var line = read ichan;
    while line != "END OF NEWS\n" do 
      print$ "Commuter " + str i + ": " + line;
      line = read ichan;
    done
    print$ "Commuter " + str i + ": " + line;
  }
  
  // create two commuters
  spawn_pthread { commuter 1; };
  spawn_pthread { commuter 2; };
  
  // create a reporter that just sends a file as news
  spawn_pthread {
    var i = 1;
    while i < 10 do
      print$ "Reporting : " + i.str;
      write$ news, i.str+"\n";
      ++i;
    done
    write$ news, "END OF NEWS\n";
    write$ subscribe, None[opchannel[string]];
    println "Done reporting news";
  };
  
  println$ "Demo started";

You should note that there's no assurance the subscribers get all the news! In fact this code can deadlock, because the news can be over before one of the subscribers gets to subscribe. In that case the write to the subscription channel will hang forever since the reader is gone.