c++ work queues with blocking

Well. That’s really quite simple; You’re rejecting the tasks posted!

template< typename Task >
void run_task(task task){
    boost::unique_lock<boost::mutex> lock( mutex_ );
    if(0 < available_) {
        --available_;
        io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
    }
}

Note that the lock “waits” until the mutex is not owned by a thread. This might already be the case, and possibly when available_ is already 0. Now the line

if(0 < available_) {

This line is simply the condition. It’s not “magical” because you’re holding the mutex_ locked. (The program doesn’t even know that a relation exists between mutex_ and available_). So, if available_ <= 0 you will just skip posting the job.


Solution #1

You should use the io_service to queue for you. This is likely what you wanted to achieve in the first place. Instead of keeping track of “available” threads, io_service does the work for you. You control how many threads it may use, by running the io_service on as many threads. Simple.

Since io_service is already thread-safe, you can do without the lock.

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
    public:
        tpool( std::size_t tpool_size );
        ~tpool();

        template<typename Task>
        void run_task(Task task){
            io_service_.post(task);
        }
    private:
        // note the order of destruction of members
        boost::asio::io_service io_service_;
        boost::asio::io_service::work work_;

        boost::thread_group threads_;
};

extern tpool dbpool;
#endif

#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"

tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
    for (std::size_t i = 0; i < tpool_size; ++i)
    {
        threads_.create_thread( 
                boost::bind(&boost::asio::io_service::run, &io_service_) 
            );
    }
}

tpool::~tpool() {
    io_service_.stop();

    try {
        threads_.join_all();
    }
    catch(...) {}
}

void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; }

int main() {
    tpool dbpool(50);

    dbpool.run_task(foo);
    dbpool.run_task(bar);

    boost::this_thread::sleep_for(boost::chrono::seconds(1));
}

For shutdown purposes, you will want to enable “clearing” the io_service::work object, otherwise your pool will never exit.


Solution #2

Don’t use io_service, instead roll your own queue implementation with a condition variable to notify a worker thread of new work being posted. Again, the number of workers is determined by the number of threads in the group.

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done\n";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

Leave a Comment