ExpandCollapse

+ 1 Pthreads

The pthread_thread.hpp defines pthreads and the system pthread control class. The control is implemented separately.

Felix pthreads are tightly integrated with the garbage collector via the pthread control class. This is necessary because the collector must stop all the pthreads before it can reliably sweep the thread stacks for roots.

Thread control also ensure Felix programs do not terminate until all managed pthreads have completed.

The Felix system uses detached threads. We provide joinable threads here too, but Felix programmers should use detached threads and pchannels for synchronisation.

share/lib/rtl/pthread_thread.hpp

#ifndef __FLX_PTHREAD_THREAD_H__
#define __FLX_PTHREAD_THREAD_H__
#include "flx_pthread_config.hpp"

#if FLX_WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

// auto pthread, because I forget how to deallocate them nicely
// could init in the constructor, but ultimately you don't want the thread
// barging in before you've finished doing other stuff
// Addendum (20051128): doing stdio in turns out to be not very safe.
// I don't know if printf et al are supposed to be thread safe (most impls
// seem to try to be) but I sometimes get deadlocks in ppc64 os x 10.4.2
// with 4.0.1 when printfing to stdout. Nasty.

#include "pthread_thread_control_base.hpp"

#include <utility>
#include <map>
#include <vector>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>

namespace flx { namespace pthread {

// ********************************************************
/// Posix Threads. This class simply wraps the creation
/// and joining of threads. It is not safe.
// ********************************************************

#ifdef _WIN32
typedef HANDLE flx_native_thread_t;
#else
typedef pthread_t flx_native_thread_t;
#endif

flx_native_thread_t PTHREAD_EXTERN get_current_native_thread();
uintptr_t PTHREAD_EXTERN mythrid();

typedef std::pair<uintptr_t const, thread_data_t> thread_entry_t;
typedef std::map<uintptr_t, thread_data_t, std::less<uintptr_t> > thread_registry_t;

class PTHREAD_EXTERN thread_control_t : public virtual thread_control_base_t
{
    thread_control_t (thread_control_t const &); // uncopyable
    void operator=(thread_control_t const&); // uncopyable
    bool do_world_stop;
    size_t thread_counter;
    size_t active_counter;
    ::std::condition_variable_any stop_guard;
    ::std::mutex stop_mutex;
    thread_registry_t threads;
    void unsafe_stop_check();
    void unsafe_suspend();
    void unsafe_resume();
public:
    bool debug;
    bool get_debug()const;
    thread_control_t (bool);
    size_t thread_count();
    size_t active_count();
    void add_thread(void*);
    void remove_thread();
    bool world_stop();
    void join_all() ;
    void world_start();
    void yield();
    void suspend();
    void resume();
    memory_ranges_t *get_block_list(); // called owns result and should delete it
};

struct tstart_t
{
  void (*sr)(void*);
  void *cd;
  thread_control_base_t *tc;
  ::std::mutex *spawner_lock;
  ::std::condition_variable_any *spawner_cond;
  bool *spawner_flag;

  tstart_t(void (*s)(void*),void* c,thread_control_base_t *t, ::std::mutex *sl, ::std::condition_variable_any *sc, bool *sf)
    : sr(s), cd(c), tc(t), spawner_lock(sl), spawner_cond(sc), spawner_flag(sf)
  {}
};

// a class for threads that can't be joined. upon exit all their resources
// are freed. they just evaporate. probably the best type of thread.
class PTHREAD_EXTERN flx_detached_thread_t {
  flx_native_thread_t thr;        ///< the thread
  flx_detached_thread_t(flx_detached_thread_t const&); // uncopyable
  void operator=(flx_detached_thread_t const&); // uncopyable
public:
  flx_detached_thread_t();
  ~flx_detached_thread_t();
  int init(void (*start)(void*), void* udat, thread_control_base_t*, ::std::mutex *, ::std::condition_variable_any *, bool*);
};

// rf: joinable threads. is it an error to not join joinable threads?
class PTHREAD_EXTERN flx_thread_t {
  flx_native_thread_t thr;        ///< the thread
  flx_thread_t(flx_thread_t const&); // uncopyable
  void operator=(flx_thread_t const&); // uncopyable
public:
  flx_thread_t();
  ~flx_thread_t();
  int init(void (*start)(void*), void* udat, thread_control_base_t*);
  void join();
};

/// RAII wrapper for thread class
class PTHREAD_EXTERN flx_thread_wrapper_t {
  flx_thread_t thread;
  flx_thread_wrapper_t(flx_thread_wrapper_t const&); // uncopyable
  void operator=(flx_thread_wrapper_t const&); // uncopyable
public:
  ~flx_thread_wrapper_t();
  flx_thread_wrapper_t(void (*start)(void*), void* udat, thread_control_base_t *tc);
};

}}
#endif

share/src/pthread/pthread_posix_thread.cpp

#include "pthread_thread.hpp"
#if FLX_POSIX
#include <stdio.h>
#include <string.h>  // strerror
#include <cstdlib>
#include <setjmp.h>
#include <functional> // less
#include <assert.h>

namespace flx { namespace pthread {

flx_native_thread_t get_current_native_thread() { return pthread_self(); }
uintptr_t mythrid() { return (uintptr_t)pthread_self(); }

static void *get_stack_pointer() { 
  void *x; 
  void *y = (void*)&x; 
  return y;
}

extern "C" void *flx_pthread_start_wrapper(void *e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  thread_control_base_t *tc = ehd -> tc;
  if(tc == 0)
  {
    fprintf(stderr, "ERROR: flx_pthread_start_wrapper got NULL thread control object\n");
    assert(tc);
  }
  bool debug = tc->get_debug();
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",
       (void*)mythrid(),stack_base, tc);
  if(debug)
      fprintf(stderr,"Thread registering itself\n");
  tc->add_thread(stack_base);
  if(debug)
    fprintf(stderr,"Registered: Spawned Thread %p stack base = %p\n",
      (void*)mythrid(),stack_base, tc);


  void (*sr)(void*)=ehd->sr; // client function
  void *cd = ehd->cd;        // client data
  if(debug)
    fprintf(stderr,"ehd->spawner_lock = %p\n",ehd->spawner_lock);

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    if (debug)
      fprintf(stderr,"Thread %p acquired mutex\n", (void*)mythrid());
    if (debug)
      fprintf(stderr,"Thread %p notifying spawner it has registered itself\n", (void*)mythrid());
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
    if (debug)
      fprintf(stderr,"Thread %p releasing mutex\n", (void*)mythrid());
  }
  delete ehd;
  if (debug)
    fprintf(stderr,"Thread %p yielding\n", (void*)mythrid());
  tc->yield();
  try {
    if (debug)
      fprintf(stderr,"Thread %p running client code\n", (void*)mythrid());
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  if (debug)
    fprintf(stderr,"Thread %p unregistering\n", (void*)mythrid());
  tc->remove_thread();
  return NULL;
}


extern "C" void *nonflx_pthread_start_wrapper(void *e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  void (*sr)(void*)=ehd->sr; // client function
  void *cd = ehd->cd;        // client data

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
  }
  delete ehd;
  try {
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  return NULL;
}


// ---- detached threads ----------

flx_detached_thread_t::flx_detached_thread_t(flx_detached_thread_t const&){} // uncopyable
void flx_detached_thread_t::operator=(flx_detached_thread_t const&){} // uncopyable

int
flx_detached_thread_t::init(void (*start)(void*), void* udat, thread_control_base_t *tc,
  ::std::mutex * m, ::std::condition_variable_any *c,bool *flag)
{
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  int res = pthread_create(&thr, &attr, flx_pthread_start_wrapper,
    new tstart_t(start, udat, tc, m,c,flag));
  if(res)
  {
     fprintf(stderr, "WARNING: flx_detached_thread_t: pthread_create failed: %s\n",
       strerror(res));
  }
  pthread_attr_destroy(&attr);
  return res;
}

flx_detached_thread_t::~flx_detached_thread_t() { }
flx_detached_thread_t::flx_detached_thread_t() { }

// ---- joinable threads ----------
flx_thread_t::flx_thread_t(flx_thread_t const&){} // uncopyable
void flx_thread_t::operator=(flx_thread_t const&){} // uncopyable

int
flx_thread_t::init(void (*start)(void*), void* udat, thread_control_base_t*tc)
{
  int res = pthread_create(&thr, NULL, nonflx_pthread_start_wrapper,
    new tstart_t(start, udat, tc,NULL,NULL,NULL));
  if(res)
  {
     fprintf(stderr, "WARNING: flx_thread_t: pthread_create failed: %s\n",
       strerror(res));
  }
  return res;
}

void flx_thread_t::join() {
  int res = pthread_join(thr, NULL);
  if(res)
  {
     fprintf(stderr, "flx_thread_t: FATAL: pthread_join failed: %s\n",
       strerror(res));
#ifdef exit
     // Someone wants to replace exit with their own thing ...
     exit(1);
#else
     std::exit(1);
#endif
  }
}

flx_thread_t::~flx_thread_t() { }
flx_thread_t::flx_thread_t() { }

// ---- joinable thread wrapper ----------

flx_thread_wrapper_t::flx_thread_wrapper_t(flx_thread_wrapper_t const&){} // uncopyable
void flx_thread_wrapper_t::operator=(flx_thread_wrapper_t const&){} // uncopyable

flx_thread_wrapper_t::flx_thread_wrapper_t(void (*start)(void*), void* udat, thread_control_base_t*tc)
{
  int res = thread.init(start,udat,tc);
  {
    if(res)
    {
       fprintf(stderr, "FATAL: flx_thread_wapper_t: flx_thread_t.init failed: %s\n",
         strerror(res));
#ifdef exit
     // Someone wants to replace exit with their own thing ...
     exit(1);
#else
     std::exit(1);
#endif
    }
  }
}

flx_thread_wrapper_t::~flx_thread_wrapper_t() { thread.join(); }
}}

#endif

share/src/pthread/pthread_win_thread.cpp

#include "pthread_thread.hpp"
#if FLX_WIN32
#include <stdio.h>
#include <cstdlib>
#include <assert.h>

namespace flx { namespace pthread {

flx_native_thread_t get_current_native_thread() { return GetCurrentThread(); }
uintptr_t mythrid() { return (uintptr_t)GetCurrentThreadId(); }

static void *get_stack_pointer() { 
  void *x; 
  void *y = (void*)&x;
  return y;
}

DWORD WINAPI flx_pthread_start_wrapper(LPVOID e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  thread_control_base_t *tc = ehd -> tc;
  if(tc == 0)
  {
    fprintf(stderr, "ERROR: flx_pthread_start_wrapper got NULL thread control object\n");
    assert(tc);
  }
  bool debug = tc->get_debug();
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",
       (void*)mythrid(),stack_base, tc);
  if(debug)
    fprintf(stderr,"Spawned Thread %p start stack base = %p, tc=%p\n",(void*)mythrid(),stack_base, tc);
  if(tc->get_debug())
    fprintf(stderr,"Thread registering itself\n");
  tc->add_thread(stack_base);
  if(debug)
    fprintf(stderr,"Registered: Spawned Thread %p stack base = %p\n",
      (void*)mythrid(),stack_base, tc);
  void (*sr)(void*)=ehd->sr;
  void *cd = ehd->cd;
  if(debug)
    fprintf(stderr,"ehd->spawner_lock = %p\n",ehd->spawner_lock);

  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    if (debug)
      fprintf(stderr,"Thread %p acquired mutex\n", (void*)mythrid());
    if (debug)
      fprintf(stderr,"Thread %p notifying spawner it has registered itself\n", (void*)mythrid());
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
    if (debug)
      fprintf(stderr,"Thread %p releasing mutex\n", (void*)mythrid());
  }
  delete ehd;
  if (debug)
    fprintf(stderr,"Thread %p yielding\n", (void*)mythrid());
  tc->yield();
  try {
    if (debug)
      fprintf(stderr,"Thread %p running client code\n", (void*)mythrid());
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  if (debug)
    fprintf(stderr,"Thread %p unregistering\n", (void*)mythrid());
  tc->remove_thread();
  return 0;
}

DWORD WINAPI nonflx_pthread_start_wrapper(LPVOID e)
{
  void *stack_base = get_stack_pointer();
  tstart_t *ehd = (tstart_t*)e;
  void (*sr)(void*)=ehd->sr;
  void *cd = ehd->cd;
  if(ehd->spawner_lock)
  {
    ::std::unique_lock< ::std::mutex> dummy(*ehd->spawner_lock);
    *ehd->spawner_flag=true;
    ehd->spawner_cond->notify_all();
  }
  delete ehd;
  try {
    (*sr)(cd);
  }
  catch (...) {
    fprintf(stderr,"Uncaught exception in thread\n");
    ::std::exit(1);
  }
  return 0;
}


// ---- detached threads ----------

flx_detached_thread_t::flx_detached_thread_t(flx_detached_thread_t const&){} // uncopyable
void flx_detached_thread_t::operator=(flx_detached_thread_t const&){} // uncopyable

// returns -1 on failure with error in GetLastError, 0 if all good.
int
flx_detached_thread_t::init(void (*start)(void*), void *lParam, thread_control_base_t *tc,
  ::std::mutex * m, ::std::condition_variable_any *c,bool *flag)
{
  DWORD thread_id = 0;
  thr = (HANDLE)CreateThread(NULL, 0,
    (LPTHREAD_START_ROUTINE)flx_pthread_start_wrapper,
    new tstart_t(start,lParam, tc, m, c, flag), 0,
    &thread_id
  );

  if(!thr)
  {
    DWORD err = GetLastError();
    fprintf(stderr, "flx_detached_thread_t: CreateThread failed: %i\n", err);
    return err;
  }
  return 0;
}

flx_detached_thread_t::~flx_detached_thread_t() { CloseHandle(thr); }
flx_detached_thread_t::flx_detached_thread_t() { }

// ---- joinable threads ----------
flx_thread_t::flx_thread_t(flx_thread_t const&){} // uncopyable
void flx_thread_t::operator=(flx_thread_t const&){} // uncopyable


flx_thread_t::flx_thread_t() { }
flx_thread_t::~flx_thread_t() { }

// this should be idempotent
void
flx_thread_t::join()
{
  // Let's try and wait for the thread to finish, however first I have to
  // tell it to finish up.

  DWORD  wait_res = WaitForSingleObject(thr, INFINITE);

  // will this give me my return status? how do I get that?
  if(WAIT_FAILED == wait_res)
  {
    fprintf(stderr,"WARNING: thread wait failed (%li)\n", GetLastError());
  }

  // I've already tried waiting on the  thread's #include  exit
  if(!CloseHandle(thr))
  {
    fprintf(stderr,"FATAL: failed to delete thread (%li)\n", GetLastError());
    std::exit(1);
  }
}

// returns -1 on failure with error in GetLastError, 0 if all good.
int
flx_thread_t::init(void (*fn)(void*), void *lParam, thread_control_base_t *tc)
{
  DWORD thread_id = 0;
  thr= (HANDLE)CreateThread(NULL, 0,
    (LPTHREAD_START_ROUTINE)nonflx_pthread_start_wrapper,
    new tstart_t(fn,lParam, tc,NULL,NULL,NULL), 0,
    &thread_id
  );

  if(!thr)
  {
    DWORD err = GetLastError();
    fprintf(stderr, "WARNING: flx_thread_t: CreateThread failed: %i\n", err);
    return err;
  }

  return 0;
}

// ---- joinable thread wrapper ----------
flx_thread_wrapper_t::flx_thread_wrapper_t(void (*f)(void*), void *lParam, thread_control_base_t*tc)
{
  int res = thread.init(f,lParam,tc);
  if(res)
  {
    fprintf(stderr,"flx_thread_wrapper_t: FATAL: flx_thread_t.init failed\n");
    std::exit(1);
  }
}
flx_thread_wrapper_t::~flx_thread_wrapper_t() { thread.join(); }

}}

#endif

+ 2 Monitor

share/lib/rtl/pthread_monitor.hpp

#ifndef __FLX_PTHREAD_MONITOR_H__
#define __FLX_PTHREAD_MONITOR_H__
#include "flx_pthread_config.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include "pthread_thread_control_base.hpp"

// interface for a consumer/producer queue. threads requesting a resource
// that isn't there block until one is available. push/pop re-entrant

namespace flx { namespace pthread {

struct monitor_data_t
{
  void *user_data;
  ::std::atomic<bool> flag;
  monitor_data_t (void* u) : user_data(u), flag(false) {}
};


class PTHREAD_EXTERN monitor_t {
  ::std::atomic<monitor_data_t*> volatile data;
  thread_control_base_t *tc; 
public:
  monitor_t(thread_control_base_t *);
  ~monitor_t();
  void enqueue(void*);
  void* dequeue();
};

}} // namespace pthread, flx
#endif

share/src/pthread/pthread_monitor.cpp

#include "pthread_monitor.hpp"
#include <string.h>       // strerror
#include <assert.h>
#include <thread>
#include <atomic>

using namespace std;

#define NQFENCE ::std::memory_order_seq_cst
#define DQFENCE ::std::memory_order_seq_cst


namespace flx { namespace pthread {

monitor_t::monitor_t(thread_control_base_t *tc_) : tc(tc_), data(0) {}
monitor_t::~monitor_t() { }

static void sleep(thread_control_base_t *tc, size_t ns) 
{
  assert(tc);
  tc->yield();
  //::std::this_thread::sleep_for(::std::chrono::nanoseconds(ns));
  ::std::this_thread::yield();
}

void
monitor_t::enqueue(void* elt)
{
  // wrap user data up with a flag so this thread
  // can wait until our user data elt is consumed
  monitor_data_t monitor_data (elt);
  monitor_data_t *p = &monitor_data;

  // swap user data into the monitor 
  // note we might get back a value some other thread put there
  // in which case we keep swapping until we get a NULL
  // which means we no longer have any data to put into the monitor
  while ( (p = ::std::atomic_exchange_explicit(&data, p, NQFENCE))) sleep (tc,1);

  // wait for the *original* data to be consumed
  // note that some other thread may have swapped that data 
  // into its own space and will be trying as above to swap it
  // into the monitor for a NULL.
  while (!monitor_data.flag.load()) sleep(tc,1);
}

void*
monitor_t::dequeue()
{
  monitor_data_t *p = 0;

  // Swap NULL into the monitor until we get a non-NULL value back.
  while ( !(p = ::std::atomic_exchange_explicit (&data, p, DQFENCE))) sleep(tc,1);

  // grab the user data
  void *elt = p->user_data;
  
  // signal that we have the data
  p->flag.store(true); 
  // the writer that was originally responsible for putting
  // the data we read into the monitor may now proceed
  return elt; // return data
}

}}

+ 3 Shared Counter

share/lib/rtl/pthread_counter.hpp

#ifndef __FLX_PTHREAD_COUNTER_H__
#define __FLX_PTHREAD_COUNTER_H__
#include "flx_pthread_config.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>

namespace flx { namespace pthread {

// ********************************************************
/// Counter with zero signal
// ********************************************************
class PTHREAD_EXTERN flx_ts_counter_t {
  ::std::mutex m;
  ::std::condition_variable_any c;
  long x;
  void operator=(flx_ts_counter_t const &);
  flx_ts_counter_t(flx_ts_counter_t const &);
public:
  flx_ts_counter_t();
  ~flx_ts_counter_t();
  long pre_incr(); // value AFTER increment
  long pre_decr(); // value AFTER decrement
  long post_incr(); // value BEFORE increment
  long post_decr(); // value BEFORE decrement

  long get();
  long set(long);   // returns argument
  long swap(long);  // returns old value
  long decr_pos(); // decrement if >0
  void wait_zero(); // wait for zero
  long operator++() { return pre_incr(); }
  long operator--() { return pre_decr(); }
  long operator++(int) { return post_incr(); }
  long operator--(int) { return post_decr(); }
  long operator*() { return get(); }
  long operator=(long a) { return set(a); }
};
}}

#endif

share/src/pthread/pthread_counter.cpp

#include "pthread_counter.hpp"
#include <stdio.h>

namespace flx { namespace pthread {


flx_ts_counter_t::flx_ts_counter_t() : x(0) {}

flx_ts_counter_t::~flx_ts_counter_t() {
  wait_zero();
}

long flx_ts_counter_t::pre_incr() {
  ::std::unique_lock< ::std::mutex> l(m);
  ++x;
  return x;
}

long flx_ts_counter_t::pre_decr() {
  ::std::unique_lock< ::std::mutex> l(m);
  --x;
  if(x==0) c.notify_all();
  return x;
}

long flx_ts_counter_t::post_incr() {
  ::std::unique_lock< ::std::mutex> l(m);
  ++x;
  return x+1;
}

long flx_ts_counter_t::post_decr() {
  ::std::unique_lock< ::std::mutex> l(m);
  --x;
  if(x==0) c.notify_all();
  return x+1;
}

long flx_ts_counter_t::decr_pos() {
  ::std::unique_lock< ::std::mutex> l(m);
  if(x>0)--x;
  if(x==0) c.notify_all();
  return x;
}

long flx_ts_counter_t::get() {
  ::std::unique_lock< ::std::mutex> l(m);
  return x;
}

long flx_ts_counter_t::set(long a) {
  ::std::unique_lock< ::std::mutex> l(m);
  x = a;
  return x;
}

long flx_ts_counter_t::swap(long a) {
  ::std::unique_lock< ::std::mutex> l(m);
  long tmp = x;
  x = a;
  if(x==0) c.notify_all();
  return tmp;
}

void flx_ts_counter_t::wait_zero() {
  ::std::unique_lock< ::std::mutex> l(m);
  while(1){
    if(x==0)return;
    c.wait(m);
  }
}

}}

+ 4 Shared Boolean

share/lib/rtl/pthread_waitable_bool.hpp

#ifndef __FLX_PTHREAD_WAIT_BOOL_H__
#define __FLX_PTHREAD_WAIT_BOOL_H__
#include "flx_pthread_config.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>

namespace flx { namespace pthread {

// a waitable boolean.
class PTHREAD_EXTERN waitable_bool {
  ::std::mutex cv_lock;       // to work with the condition var
  ::std::condition_variable_any finished_cond;
  bool finished;   // might seem redundant, but that's how CVs work.
public:
  waitable_bool();

  void wait_until_true();
  void signal_true();
};

}} // namespace pthread, flx
#endif // __FLX_PTHREAD_WAIT_BOOL_H__

share/src/pthread/pthread_waitable_bool.cpp

#include "pthread_waitable_bool.hpp"

namespace flx { namespace pthread {

waitable_bool::waitable_bool()
  : finished(false)
{
  // nothing
}

// can be called from any thread
void
waitable_bool::wait_until_true()
{
  ::std::unique_lock< ::std::mutex> locker(cv_lock);

  // wait for the wakeup to say it's finished
  while(!finished)
  {
    finished_cond.wait(cv_lock);
  }
}

void
waitable_bool::signal_true()
{
  { // the mutex is required for the memory barrier..
    ::std::unique_lock< ::std::mutex> locker(cv_lock);
    finished = true;
  }
  finished_cond.notify_all();
  // do absolutely NOTHING here as a typical use of this class is to
  // wait for a thread exit and then destruct its resources, which could
  // very well include this object. boom.
}

} }

+ 5 Thread Control

share/src/pthread/pthread_thread_control.cpp

#include "pthread_thread.hpp"
#include <stdio.h>
#include <cstdlib>
#include <cassert>

#define FLX_SAVE_REGS \
  jmp_buf reg_save_on_stack; \
  setjmp (reg_save_on_stack)


namespace flx { namespace pthread {
static void *get_stack_pointer() { 
  void *x; 
  void *y = (void*)&x;
  return y; 
}

bool thread_control_t::get_debug()const { return debug; }

thread_control_base_t::~thread_control_base_t(){}

thread_control_t::thread_control_t (thread_control_t const &) {}
void thread_control_t::operator=(thread_control_t const &) {}

thread_control_t::thread_control_t (bool d) :
  do_world_stop(false), thread_counter(0), active_counter(0), debug(d)
  {
    if(debug)
      fprintf(stderr,"INITIALISING THREAD CONTROL OBJECT\n");
  }

size_t thread_control_t::thread_count()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    return thread_counter;
  }

size_t thread_control_t::active_count()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    return active_counter;
  }

void thread_control_t::add_thread(void *stack_base)
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    uintptr_t id = mythrid();
    threads.insert (std::make_pair(id, thread_data_t (stack_base)));
    ++thread_counter;
    ++active_counter;
    if(debug)
      fprintf(stderr, "Adding thread %p base %p, count=%zu\n", (void*)(uintptr_t)id, stack_base, thread_counter);
    stop_guard.notify_all();
  }

void thread_control_t::remove_thread()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    uintptr_t id = mythrid();
    if (threads.erase(id) == 0)
    {
      fprintf(stderr, "Remove thread %p which is not registered\n", (void*)(uintptr_t)id);
      std::abort();
    }
    --thread_counter;
    --active_counter;
    if(debug)
      fprintf(stderr, "Removed thread %p, count=%zu\n", (void*)(uintptr_t)id, thread_counter);
    stop_guard.notify_all();
  }

// stop the world!

// NOTE: ON EXIT, THE MUTEX REMAINS LOCKED

bool thread_control_t::world_stop()
  {
    stop_mutex.lock();
    if(debug)
      fprintf(stderr,"Thread %p Stopping world, active threads=%zu\n", (void*)mythrid(), active_counter);
    if (do_world_stop) {
      stop_mutex.unlock();
      return false; // race! Someone else beat us
    }
    do_world_stop = true;
    stop_guard.notify_all();
    while(active_counter>1) {
      if(debug)
        for(
          thread_registry_t::iterator it = threads.begin();
          it != threads.end();
          ++it
        )
        {
          fprintf(stderr, "Thread = %p is %s\n",(void*)(uintptr_t)(*it).first, (*it).second.active? "ACTIVE": "SUSPENDED");
        }
      if(debug)
        fprintf(stderr,"Thread %p Stopping world: begin wait, threads=%zu\n",  (void*)mythrid(), thread_counter);
      stop_guard.wait(stop_mutex);
      if(debug)
        fprintf(stderr,"Thread %p Stopping world: checking threads=%zu\n", (void*)mythrid(), thread_counter);
    }
    // this code has to be copied here, we cannot use 'yield' because
    // it would deadlock ourself
    {
      uintptr_t id = mythrid();
      FLX_SAVE_REGS;
      void *stack_pointer = get_stack_pointer();
      if(debug)
        fprintf(stderr,"World stop thread=%p, stack=%p!\n",(void*)(uintptr_t)id, stack_pointer);
      thread_registry_t::iterator it = threads.find(id);
      if(it == threads.end()) {
        fprintf(stderr,"MAIN THREAD: Cannot find thread %p in registry\n",(void*)(uintptr_t)id);
        abort();
      }
      (*it).second.stack_top = stack_pointer;
      if(debug)
        fprintf(stderr,"Stack size = %zu\n",(size_t)((char*)(*it).second.stack_base -(char*)(*it).second.stack_top));
    }
    if(debug)
      fprintf(stderr,"World STOPPED\n");
    return true; // we stopped the world
  }

// used by mainline to wait for other threads to die
void thread_control_t::join_all()
  {
    ::std::unique_lock< ::std::mutex> m(stop_mutex);
    if(debug)
      fprintf(stderr,"Thread %p Joining all\n", (void*)mythrid());
    while(do_world_stop || thread_counter>1) {
      unsafe_stop_check();
      stop_guard.wait(stop_mutex);
    }
    if(debug)
      fprintf(stderr,"World restarted: do_world_stop=%d, Yield thread count now %zu\n",do_world_stop,thread_counter);
  }

// restart the world
void thread_control_t::world_start()
  {
    if(debug)
      fprintf(stderr,"Thread %p Restarting world\n", (void*)mythrid());
    do_world_stop = false;
    stop_mutex.unlock();
    stop_guard.notify_all();
  }

memory_ranges_t *thread_control_t::get_block_list()
{
  memory_ranges_t *v = new std::vector<memory_range_t>;
  thread_registry_t::iterator end = threads.end();
  for(thread_registry_t::iterator i = threads.begin();
    i != end;
    ++i
  )
  {
    thread_data_t const &td = (*i).second;
    // !(base < top) means top <= base, i.e. stack grows downwards
    assert(!std::less<void*>()(td.stack_base,td.stack_top));
    // from top upto base..
    v->push_back(memory_range_t(td.stack_top, td.stack_base));
  }
  return v;
}

void thread_control_t::suspend()
{
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[suspend: thread= %p]\n", (void*)mythrid());
  unsafe_suspend();
}

void thread_control_t::resume()
{
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[resume: thread= %p]\n", (void*)mythrid());
  unsafe_resume();
}


void thread_control_t::unsafe_suspend()
{
  void *stack_pointer = get_stack_pointer();
  uintptr_t id = mythrid();
  if(debug)
    fprintf(stderr,"[unsafe_suspend:thread=%p], stack=%p!\n",(void*)(uintptr_t)id, stack_pointer);
  thread_registry_t::iterator it = threads.find(id);
  if(it == threads.end()) {
    if(debug)
      fprintf(stderr,"[unsafe_suspend] Cannot find thread %p in registry\n",(void*)(uintptr_t)id);
      abort();
  }
  (*it).second.stack_top = stack_pointer;
  (*it).second.active = false;
  if(debug) // VC++ is bugged, doesn't support %td format correctly?
    fprintf(stderr,"[unsafe_suspend: thread=%p] stack base %p > stack top %p, Stack size = %zd\n",
      (void*)(uintptr_t)id,
      (char*)(*it).second.stack_base,
      (char*)(*it).second.stack_top, 
      (size_t)((char*)(*it).second.stack_base -(char*)(*it).second.stack_top));
  --active_counter;
  if(debug)
    fprintf(stderr,"[unsafe_suspend]: active thread count now %zu\n",active_counter);
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_suspend]: stop_guard.notify_all() done");
}

void thread_control_t::unsafe_resume()
{
  if(debug)
    fprintf(stderr,"[unsafe_resume: thread %p]\n", (void*)mythrid());
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.notify_all() done");
  while(do_world_stop) stop_guard.wait(stop_mutex);
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.wait() done");
  ++active_counter;
  uintptr_t id = mythrid();
  thread_registry_t::iterator it = threads.find(id);
  if(it == threads.end()) {
    if(debug)
      fprintf(stderr,"[unsafe_resume: thread=%p] Cannot find thread in registry\n",(void*)(uintptr_t)id);
      abort();
  }
  (*it).second.active = true;
  if(debug) {
    fprintf(stderr,"[unsafe_resume: thread=%p] resumed, active count= %zu\n",
      (void*)mythrid(),active_counter);
  }
  stop_guard.notify_all();
  if(debug)
    fprintf(stderr,"[unsafe_resume]: stop_guard.notify_all() done");
}

// mutex already held
void thread_control_t::unsafe_stop_check()
{
  if (do_world_stop)
  {
    if(debug)
      fprintf(stderr,"[unsafe_stop_check: thread=%p] world_stop detected\n", 
        (void*)mythrid());
    FLX_SAVE_REGS;
    unsafe_suspend();
    unsafe_resume();
  }
}

void thread_control_t::yield()
{
  ::std::unique_lock< ::std::mutex> m(stop_mutex);
  if(debug)
    fprintf(stderr,"[yield: thread=%p]\n", (void*)mythrid());
  unsafe_stop_check();
}

}}

+ 5.1 New bound queue

A lock free thread safe bag for holding non-null pointers.

share/lib/rtl/pthread_lf_bag.hpp

#ifndef __FLX_PTHREAD_LF_BAG_H__
#define __FLX_PTHREAD_LF_BAG_H__

#include "flx_pthread_config.hpp"
#include <stdint.h>
#include <atomic>
#include "pthread_thread_control_base.hpp"

namespace flx { namespace pthread {

struct PTHREAD_EXTERN pthread_lf_bag {
  ::std::atomic <void *> * volatile a;
  size_t n;
  thread_control_base_t *tc;
  
  // for statistics
  size_t throughput;

  // these indices are for optimisation purposes ONLY
  // the head points at the next element to dequeue or a bit earlier
  ::std::atomic<size_t> head;

  // we can't use unsigned type because the value may go negative
  // if dequeue operations decrement the counter before the enqueue
  // that pushed the data does.
  ::std::atomic<int32_t> used; 

  pthread_lf_bag (thread_control_base_t *tc_, size_t n_);

  // the destructor is not safe!
  // to make it safe one needs to be sure the queue is empty
  // AND that no more values will be enqueued.
  // This is very hard to do. Using a smart ptr for the bag
  // ensures there will be no more enqueue operations started
  // but not that one is not in progress. The queue may appear
  // empty during the progress of such final enqueue operations.
  // there is no safe way to ensure the queue will remain empty.
  ~pthread_lf_bag();

  void enqueue(void *d);
  void *dequeue ();
};

}} // namespaces
#endif

share/src/pthread/pthread_lf_bag.cpp

// simple very efficient lock free bag
#include <atomic>
#include <chrono>
#include <algorithm>
#include <thread>
#include <stdlib.h>
#include "pthread_lf_bag.hpp"
#include <assert.h>

using namespace flx::pthread;

// 10 ms max sleep, that's 10,000,000 nanoseconds
#define MAXSLEEP (size_t)10000000

static void sleep(thread_control_base_t *tc, size_t ns) 
{
  assert(tc);
  tc->yield();
  //::std::this_thread::sleep_for(::std::chrono::nanoseconds(ns));
  ::std::this_thread::yield();
}

#define NQFENCE ::std::memory_order_seq_cst
#define DQFENCE ::std::memory_order_seq_cst


  pthread_lf_bag::pthread_lf_bag (thread_control_base_t *tc_, size_t n_) :
    n (n_), tc(tc_), head(0), used(0), 
    throughput(0),
    a((::std::atomic<void*>*)calloc (n_ , sizeof (void*))) 
  {}

  // the destructor is not safe!
  // to make it safe one needs to be sure the queue is empty
  // AND that no more values will be enqueued.
  // This is very hard to do. Using a smart ptr for the bag
  // ensures there will be no more enqueue operations started
  // but not that one is not in progress. The queue may appear
  // empty during the progress of such final enqueue operations.
  // there is no safe way to ensure the queue will remain empty.
  pthread_lf_bag::~pthread_lf_bag() { }

  void pthread_lf_bag::enqueue(void *d) 
  { 
wait:
    size_t stime = 1;
    while (used.load(::std::memory_order_seq_cst) == n) sleep(tc,stime);
    size_t i = (head + used) % n;
    while 
    (
      (d = ::std::atomic_exchange_explicit(a + i, d, 
        NQFENCE))
    ) 
    { 
      if (used.load(::std::memory_order_seq_cst) == n) goto wait; // lost the race
      i = (i + 1) % n; 
      if (i == head) sleep(tc,stime);
    }
    ++used;
  }

  void *pthread_lf_bag::dequeue () 
  { 
wait:
    size_t stime = 1;
    while (used.load(::std::memory_order_seq_cst) == 0) sleep(tc,stime );
    
    size_t i = head.load(::std::memory_order_seq_cst);
    void *d = nullptr;
    while 
    (
      !(d = ::std::atomic_exchange_explicit(a + i, d, 
        DQFENCE))
    ) 
    { 
      if (used.load(::std::memory_order_seq_cst) == 0) goto wait; // lost the race
      i = (i + 1) % n; 
      if (i == head) sleep(tc,stime);
    }
    head.store (i,::std::memory_order_seq_cst);
    --used;
    ++throughput;
    return d;
  }

share/lib/std/pthread/pthread_lf_bag.flx

  class LockFreeBag
  {
    type lf_bag = "::std::shared_ptr<::flx::pthread::pthread_lf_bag>"
      requires 
        header '#include "pthread_lf_bag.hpp"',
        package "pthread",
        Cxx11_headers::memory
    ;
    // note: unmanaged container at the moment!!
    ctor lf_bag : size = """
       ::std::shared_ptr<::flx::pthread::pthread_lf_bag> 
       (new ::flx::pthread::pthread_lf_bag(PTF gcp->collector->get_thread_control(),$1))
    """;
    proc enqueue : lf_bag * address = "$1->enqueue ($2);";
    gen dequeue : lf_bag -> address = "$1->dequeue ()";
    gen len : lf_bag -> size = "$1->n"; 
    gen used : lf_bag -> size = "$1->used.load()"; 
  }

+ 6 Bound Queue

share/lib/rtl/pthread_bound_queue.hpp

#ifndef __FLX_PTHREAD_BOUND_QUEUE_H__
#define __FLX_PTHREAD_BOUND_QUEUE_H__
#include "flx_pthread_config.hpp"
#include "flx_gc.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>

// interface for a consumer/producer queue. threads requesting a resource
// that isn't there block until one is available. push/pop re-entrant

namespace flx { namespace pthread {

// ********************************************************
/// Thread safe bounded queue.
///
/// The queue can be locked by setting bound=0.
/// In this state it can only be unlocked by setting a non-zero bound.
///
/// If the bound is set to 1 (the default),
/// then the queue is always either empty or full.
/// An empty queue blocks readers until a writer sends some data.
/// A full queue blocks writers, until a reader reads the data.
/// Note that when the queue is empty a writer can write data
/// and continues without waiting for the data to be read.
// ********************************************************

class PTHREAD_EXTERN bound_queue_t {
  ::std::condition_variable_any size_changed;
  size_t bound;
public:
  void *lame_opaque; // has to be public for the scanner to find it
  ::std::mutex member_lock;
  bound_queue_t(size_t);
  ~bound_queue_t();
  void enqueue(void*);
  void* dequeue();
  void* maybe_dequeue();
  void resize(size_t);
  void wait_until_empty();
  size_t len();
};

PTHREAD_EXTERN ::flx::gc::generic::scanner_t bound_queue_scanner;

}} // namespace pthread, flx
#endif

share/src/pthread/pthread_bound_queue.cpp

#include "pthread_bound_queue.hpp"
#include <queue>        // stl to the bloated rescue
#include <stdio.h>      // debugging in scanner

using namespace std;

namespace flx { namespace pthread {
typedef deque<void*> void_queue;

#define ELTQ ((void_queue*)lame_opaque)

bound_queue_t::bound_queue_t(size_t n) : bound(n)
{
  lame_opaque = new void_queue;
}

// Much care is needed deleting a queue.
// A safe method is possible .. but not provided here
bound_queue_t::~bound_queue_t()
{
  delete ELTQ;
}

// get the number of element in the queue
// (NOT the bound!)
size_t bound_queue_t::len() {
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  return ELTQ->size();
}

void bound_queue_t::wait_until_empty() {
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(!ELTQ->empty())
    size_changed.wait(member_lock);
}

void
bound_queue_t::enqueue(void* elt)
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(ELTQ->size() >= bound) // guard against spurious wakeups!
    size_changed.wait(member_lock);
  ELTQ->push_back(elt);
  size_changed.notify_all(); // cannot return an error
}

void*
bound_queue_t::dequeue()
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  while(ELTQ->empty())  // guard against spurious wakeups!
    size_changed.wait(member_lock);
  void *elt = ELTQ->front();
  ELTQ->pop_front();
  size_changed.notify_all();
  return elt;
}

void*
bound_queue_t::maybe_dequeue()
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  void *elt = NULL;
  if (ELTQ->size() > 0)
  {
    elt = ELTQ->front();
    ELTQ->pop_front();
    size_changed.notify_all();
  }
  return elt;
}


void
bound_queue_t::resize(size_t n)
{
  ::std::unique_lock< ::std::mutex>   l(member_lock);
  bound = n;
  // get things rolling again
  size_changed.notify_all();
}

using namespace flx;;
using namespace gc;
using namespace generic;

void *bound_queue_scanner(
  collector_t *collector, 
  gc_shape_t *shape, void *pp, 
  size_t dyncount, 
  int reclimit
)
{
  // input is a pointer to a pointer to a bound queue object
  void *p = *(void**)pp;
  bound_queue_t *bq = (bound_queue_t*)p;
  void_queue *pq = (void_queue*) bq->lame_opaque;
  printf("Scanning bound queue %p->%p\n", pp, p);
  
  ::std::deque<void*>::const_iterator stl_end = pq->end();
  for(
    ::std::deque<void*>::const_iterator iter= pq->begin(); 
    iter < stl_end;
    ++iter
  ) {
    void *value = *iter;
    printf("bound_queue scanning p=%p\n",value); 
    collector->register_pointer(value,reclimit);
  }
  return 0;
}


}}


$PWD/src/config/flx_bound_queue.fpc

Name: Pthread Bound Queue
Requires: flx_pthread flx_gc
includes: '"pthread_bound_queue.hpp"'

+ 7 Thread Pool

This thread pool is used internally for C++ jobs by the asynchronous I/O system. The threads are Felix threads and the Felix bound queue is used for the fifo.

share/lib/rtl/pthread_work_fifo.hpp

#ifndef __FLX_PTHREAD_WORKER_FIFO_H__
#define __FLX_PTHREAD_WORKER_FIFO_H__
#include "flx_pthread_config.hpp"
#include "pthread_thread.hpp"
#include "pthread_bound_queue.hpp"
#include <mutex>
#include <thread>

namespace flx { namespace pthread {

/// Class of jobs to be queued in fifo for execution.
class PTHREAD_EXTERN worker_task
{
public:
  virtual ~worker_task() {}   // c++ should do this automatically

  /// function called by worker thread to carry out user job
  virtual void doit() = 0;

  /// function called by worker thread after doit() is completed
  /// used to notify job completion
  virtual void finished() = 0; // finished hook (mi serve start gancia?)
};

/// Job scheduler, executes jobs in turn from queue
/// Does not delete dequeued jobs
class PTHREAD_EXTERN worker_fifo
{
  ::std::mutex nlock;
  int nthreads;                 /// scheduled number of threads
  bound_queue_t fifo;

  static void thread_start(void*); // thread entry point, passed this
  bool thread_loop_body();      // returns keep going flag
  void stop_worker_thread();
  void start_worker_thread();

public:
  worker_fifo(int n, int m);   /// n: Q bound, m: # of threads
  ~worker_fifo();
  void add_worker_task(worker_task* task);
  int get_nthreads();
  void set_nthreads(int);
};

}} // namespace pthread, flx
#endif  // __WORKER_FIFO__

share/src/pthread/pthread_work_fifo.cpp

#include <stdio.h>    // printf
#include "pthread_work_fifo.hpp"
namespace flx { namespace pthread {

int worker_fifo::get_nthreads() {
  ::std::unique_lock< ::std::mutex> dummy(nlock);
  return nthreads;
}

void worker_fifo::set_nthreads(int n)
{
  ::std::unique_lock< ::std::mutex> dummy(nlock);
  while(nthreads<n) start_worker_thread();
  while(nthreads>n) stop_worker_thread();
}

void worker_fifo::start_worker_thread()
{
  ++nthreads;
  //fprintf(stderr,"Spawn detached worker thread, count=%d\n",nthreads);
  flx_detached_thread_t().init(thread_start, this,NULL, NULL, NULL, NULL);
}

worker_fifo::worker_fifo(int n, int m) : nthreads(0), fifo(n)
{
  set_nthreads(m);
}

void
worker_fifo::stop_worker_thread()
{
  //fprintf(stderr,"Kill detached worker thread, count=%d\n",nthreads);
  --nthreads;
  add_worker_task(NULL);    // thread safe takedown.
}

worker_fifo::~worker_fifo()
{
  while(nthreads>0)stop_worker_thread();
  fifo.wait_until_empty();
}

// io thread entry point, passed this
void
worker_fifo::thread_start(void* udat)
{
  worker_fifo*  fio = (worker_fifo*)udat;
  while(fio->thread_loop_body()) ;
}

// dequeues one task and executes it, calling finished hook. interprets
// null task as a request to exit.
bool
worker_fifo::thread_loop_body()
{
  worker_task*  req = (worker_task*)fifo.dequeue();
  //fprintf(stderr,"dequeued worker_task (%p)\n", req);

  if(!req) return false;        // finished, got quit signal

  req->doit();
  req->finished();          // finish hook. I find this handy
  // Note: task not deleted. use finish hook if you want that

  return true;            // keep going
}

void
worker_fifo::add_worker_task(worker_task* task)
{
  //fprintf(stderr,"adding worker task %p\n",task);
  fifo.enqueue(task);         // don't worry, fifo is re-entrant
}

}}

+ 8 Thread Safe Collector.

The thread safe collector class flx_ts_collector_t is derived from the flx_collector_t class. It basically dispatches to its base with locks as required.

share/lib/rtl/flx_ts_collector.hpp

#ifndef __FLX_TS_COLLECTOR_H__
#define __FLX_TS_COLLECTOR_H__
#include "flx_collector.hpp"
#include "pthread_thread.hpp"
#include <thread>
#include <mutex>

namespace flx {
namespace gc {
namespace collector {

/// Naive thread safe Mark and Sweep Collector.
struct PTHREAD_EXTERN flx_ts_collector_t :
  public flx::gc::collector::flx_collector_t
{
  flx_ts_collector_t(allocator_t *, flx::pthread::thread_control_t *);
  ~flx_ts_collector_t();

private:
  /// allocator
  void *v_allocate(gc_shape_t *ptr_map, size_t);

  /// collector (returns number of objects collected)
  size_t v_collect();

  // add and remove roots
  void v_add_root(void *memory);
  void v_remove_root(void *memory);

  // statistics
  size_t v_get_allocation_count()const;
  size_t v_get_root_count()const;
  size_t v_get_allocation_amt()const;

private:
  mutable ::std::mutex mut;
};


}}} // end namespaces

#endif

share/src/pthread/flx_ts_collector.cpp

#include "flx_rtl_config.hpp"
#include "flx_ts_collector.hpp"

namespace flx {
namespace gc {
namespace collector {

flx_ts_collector_t::flx_ts_collector_t(allocator_t *a, flx::pthread::thread_control_t *tc) :
  flx_collector_t(a,tc)
{}

flx_ts_collector_t::~flx_ts_collector_t(){}

void *flx_ts_collector_t::v_allocate(gc_shape_t *ptr_map, size_t x) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_allocate(ptr_map,x);
}

size_t flx_ts_collector_t::v_collect() {
  // NO MUTEX
  //if(debug)
  //  fprintf(stderr,"[gc] Request to collect, thread_control = %p, thread %p\n", thread_control, (size_t)flx::pthread::get_current_native_thread());
  return impl_collect();
}

void flx_ts_collector_t::v_add_root(void *memory) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  impl_add_root(memory);
}

void flx_ts_collector_t::v_remove_root(void *memory) {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  impl_remove_root(memory);
}

size_t flx_ts_collector_t::v_get_allocation_count()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_allocation_count();
}

size_t flx_ts_collector_t::v_get_root_count()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_root_count();
}

size_t flx_ts_collector_t::v_get_allocation_amt()const {
  ::std::unique_lock< ::std::mutex> dummy(mut);
  return impl_get_allocation_amt();
}


}}} // end namespaces


+ 9 Fast Resource Lock

This is a fast application level lock to be used for serialisation of transient accessed to data structures. It is a mutex, however unlike system mutex, it is safe to use with the Felix GC.

System mutex are NOT GC safe because in Felix every allocation may potentially trigger a garbage collection which requires a world stop. Since world stops are cooperative, the collector must wait until all threads have voluntarily yielded, usually by themselves performing an allocation or an explicit call to perform a collection, but suicide should work too.

However if a thread blocks trying to lock a mutex held by another thread which is now stopped for the GC, we have a deadlock. So a user level lock must have a timeout and a spin loop which includes regular checking for a GC world stop request.

It would be acceptable if the check were done atomically with blocking on a lock request followed by another check, because locking itself does not change reachability state. With those semantics, it's fine for the thread to block, provided the GC counts it as having yielded, and it cannot unblock during the GC. That basically means unlocking must also do the check, to ensure blocked threads stay blocked.

share/lib/rtl/pthread_fast_lock.hpp

#ifndef __pthread_fast_lock__
#define __pthread_fast_lock__
#include "flx_pthread_config.hpp"
#include "pthread_thread_control_base.hpp"
#include <atomic>

namespace flx { namespace rtl {

class PTHREAD_EXTERN fast_lock
{
  ::std::atomic_flag flag;
  ::flx::pthread::thread_control_base_t *tc;
public:
  fast_lock(::flx::pthread::thread_control_base_t *);
  fast_lock() = delete;
  fast_lock(fast_lock const&)  = delete;
  void operator = (fast_lock const&) = delete;
  void lock();
  void unlock();
};
}}
#endif

$PWD/CRAP/src/pthread/pthread_fast_lock.cpp

#include "pthread_fast_lock.hpp"
#include <chrono>
#include <thread>
#include <mutex>

namespace flx { namespace rtl {
fast_lock::fast_lock(::flx::pthread::thread_control_base_t *tc_) : tc(tc_) { flag.clear(); }
void fast_lock::unlock() { flag.clear(); }
void fast_lock::lock() {
  while (!flag.test_and_set())
  {
    tc->yield();
    ::std::this_thread::sleep_for(::std::chrono::nanoseconds (200));
  }
}

}}

$PWD/CRAP/lib/std/pthread/pthread_fast_lock.flx

  class FastLock
  {
     type fast_lock = "::flx::rtl::fast_lock*" 
       requires header '#include "pthread_fast_lock.hpp"';
     ctor fast_lock : unit = "new ::flx::rtl::fast_lock(PTF gcp->collector->get_thread_control())";
     proc delete : fast_lock = "delete $1;";
     proc lock : fast_lock = "$1->lock();";
     proc unlock : fast_lock = "$1->unlock();";
  
  }

+ 10 Build System

$PWD/buildsystem/flx_pthread.py

import fbuild
from fbuild.functools import call
from fbuild.path import Path
from fbuild.record import Record
from fbuild.builders.file import copy

import buildsystem
from buildsystem.config import config_call

# ------------------------------------------------------------------------------

def build_runtime(phase):
    print('[fbuild] [rtl] build pthread')
    path = Path(phase.ctx.buildroot/'share'/'src/pthread')

    srcs = Path.glob(path / '*.cpp')
    includes = [
      phase.ctx.buildroot / 'host/lib/rtl', 
      phase.ctx.buildroot / 'share/lib/rtl']
    macros = ['BUILD_PTHREAD']
    flags = []
    libs = [
        call('buildsystem.flx_gc.build_runtime', phase),
    ]
    external_libs = []

    pthread_h = config_call('fbuild.config.c.posix.pthread_h',
        phase.platform,
        phase.cxx.shared)

    dst = 'host/lib/rtl/flx_pthread'
    if pthread_h.pthread_create:
        flags.extend(pthread_h.flags)
        libs.extend(pthread_h.libs)
        external_libs.extend(pthread_h.external_libs)

    return Record(
        static=buildsystem.build_cxx_static_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            cflags=flags,
            libs=[lib.static for lib in libs],
            external_libs=external_libs,
            lflags=flags),
        shared=buildsystem.build_cxx_shared_lib(phase, dst, srcs,
            includes=includes,
            macros=macros,
            cflags=flags,
            libs=[lib.shared for lib in libs],
            external_libs=external_libs,
            lflags=flags))

+ 11 Configuration Database

$PWD/src/config/unix/flx_pthread.fpc

Name: Flx_pthread
Description: Felix Pre-emptive threading support

provides_dlib: -lflx_pthread_dynamic
provides_slib: -lflx_pthread_static
includes: '"pthread_thread.hpp"'
Requires: flx_gc flx_exceptions pthread
library: flx_pthread
macros: BUILD_PTHREAD
srcdir: src/pthread
src: .*\.cpp

$PWD/src/config/win32/flx_pthread.fpc

Name: Flx_pthread
Description: Felix Pre-emptive threading support

provides_dlib: /DEFAULTLIB:flx_pthread_dynamic
provides_slib: /DEFAULTLIB:flx_pthread_static
includes: '"pthread_thread.hpp"'
Requires: flx_gc flx_exceptions pthread
library: flx_pthread
macros: BUILD_PTHREAD
srcdir: src/pthread
src: .*\.cpp

$PWD/src/config/pthread.fpc

Description: pthread support defaults to no requirements

$PWD/src/config/linux/pthread.fpc

Description: Linux pthread support
requires_dlibs: -lpthread
requires_slibs: -lpthread

share/lib/rtl/flx_pthread_config.hpp

#ifndef __FLX_PTHREAD_CONFIG_H__
#define __FLX_PTHREAD_CONFIG_H__
#include "flx_rtl_config.hpp"
#ifdef BUILD_PTHREAD
#define PTHREAD_EXTERN FLX_EXPORT
#else
#define PTHREAD_EXTERN FLX_IMPORT
#endif
#endif