ExpandCollapse

+ 1 mtserver.flx

  //
  //  Multithreaded Hello World server
  //
  open ZMQ;
  include "std/io/faio";
  var sys_clock = #Faio::mk_alarm_clock;
  
  proc worker_routine (context:zmq_context) {
    //  Socket to talk to dispatcher
    var receiver = context.mk_socket ZMQ_REP;
    receiver.connect "inproc://workers";
  
    while true do
      var s = receiver.recv_string;
      println (f"Received request: [%S]" s);
      //  Do some 'work'
      Faio::sleep (sys_clock,1.0e-3);
      //  Send reply back to client
      receiver.send_string "World";
    done
  }
  
  var context = zmq_init 1;
  
  //  Socket to talk to clients
  var clients = context.mk_socket ZMQ_ROUTER;
  clients.bind "tcp://*:5555";
  
  //  Socket to talk to workers
  var workers = context.mk_socket ZMQ_DEALER;
  workers.bind "inproc://workers";
  
  //  Launch pool of worker threads
  for var thread_nbr in 0 upto 4 do
    spawn_pthread { worker_routine context; };
  done
  
  //  Connect work threads to client threads via a queue
  var pollitems = varray (
    zmq_poll_item (clients, ZMQ_POLLIN), 
    zmq_poll_item (workers,ZMQ_POLLIN));
  while true do
    C_hack::ignore$ poll(pollitems, -1.0);
    if (pollitems.0.revents \& ZMQ_POLLIN).short != 0s do
      send_strings workers clients.recv_strings;
    done
    if (pollitems.0.revents \& ZMQ_POLLIN).short != 0s do
      send_strings clients workers.recv_strings;
    done
  done