ExpandCollapsePrev Next Index

+ 2.1 tasksink2.flx

  //
  //  Task sink - design 2
  //  Adds pub-sub flow to send kill signal to workers
  //
  include "std/posix/time";
  include "std/io/faio";
  
  open ZMQ;
  
  var sys_clock = #Faio::mk_alarm_clock;
  
  var context = zmq_init 1;
  
  //  Socket to receive messages on
  var receiver = context.mk_socket ZMQ_PULL;
  receiver.bind "tcp://*:5558";
  
  //  Socket for worker control
  var controller = context.mk_socket ZMQ_PUB;
  controller.bind "tcp://*:5559";
  
  //  Wait for start of batch
  C_hack::ignore$ receiver.recv_string;
  
  //  Start our clock now
  start_time := #Time::time;
  
  //  Process 100 confirmations
  for var task_nbr in 0 upto 99 do
    C_hack::ignore$ receiver.recv_string;
    print if (task_nbr / 10) * 10 == task_nbr then ":" else "." endif;
    fflush (stdout);
  done
  println$ f"Total elapsed time: %d msec"$ 
    (#Time::time - start_time).int * 1000;
  
  //  Send kill signal to workers
  controller.send_string "KILL";
  
  //  Finished
  Faio::sleep (sys_clock, 0.001); //  Give 0MQ time to deliver
  
  receiver.close;
  controller.close;
  context.term;