ExpandCollapsePrev Next Index

+ 2.1 taskwork2.flx

  //
  //  Task worker - design 2
  //  Adds pub-sub flow to receive and respond to kill signal
  //
  open ZMQ;
  include "std/io/faio";
  var sys_clock = #Faio::mk_alarm_clock;
  
  var context = zmq_init 1;
  
      //  Socket to receive messages on
  var receiver = context.mk_socket ZMQ_PULL;
  receiver.connect "tcp://localhost:5557";
  
      //  Socket to send messages to
  var sender = context.mk_socket ZMQ_PUSH;
  sender.connect "tcp://localhost:5558";
  
      //  Socket for control input
  var controller = context.mk_socket ZMQ_SUB;
  controller.connect "tcp://localhost:5559";
  controller.set_opt$ zmq_subscribe "";
  
  //  Process messages from receiver and controller
  var items = varray (
    zmq_poll_item (receiver, ZMQ_POLLIN),
    zmq_poll_item (controller, ZMQ_POLLIN));
  
  //  Process messages from both sockets
  var run = true; while run do
     C_hack::ignore$ poll (items, -1.0);
     if (items.0.revents \& ZMQ_POLLIN).short != 0s do
       tim := receiver.recv_string;
  
       //  Do the work
       Faio::sleep (sys_clock, tim.atof);
  
       //  Send results to sink
       sender.send_string tim;
  
       //  Simple progress indicator for the viewer
       print "."; fflush stdout;
     done
  
     //  Any waiting controller command acts as 'KILL'
     if (items.1.revents \& ZMQ_POLLIN).short != 0s do
        run =false; //  Exit loop
     done
  done
  //  Finished
  receiver.close;
  sender.close;
  controller.close;
  context.term;