Clicky

A simple thread safe queue for use in multi-threaded C++ applications

An efficient thread safe queue is one of the first things that should be implemented in any multithreaded application. The requirements for my thread safe queues are usually something like this:

  1. The thread safe queue shall provide the ability to contain any data type (generic container).
  2. The thread safe queue shall provide blocking dequeue calls with timeouts.
  3. The thread safe queue shall provide “fast” enqueue and dequeue calls, (where the definition of  “fast” is application specific)
  4. The number of elements on the queue shall not be artificially limited.
  5. The thread safe queue shall allow for any number of concurrent producers and consumers.

There may also be other less important requirements on a per application basis, but those 5 seem to be generally applicable to all applications I work on. An example of a thread safe queue that meets these requirements is used in my Terminal Emulator (ComBomb) source code at ThreadSafeQueue.h:

Below I will talk about each one in more detail.

Generic Container

This one is pretty obvious, if you want a queue of ints, then it should be reasonably easy to create one, or if you want a queue of structs, then it should also be reasonably easy to create one. For this I typically just use a C++ template similar to the way STL containers are implemented.

Blocking dequeue

Why a blocking dequeue? Simple: A blocking dequeue call allows your application to just wait until something is enqueued. The thought behind this is that you have a thread processing incoming data from the queue. If there is nothing in the queue, then the thread has nothing to do, so it is reasonable to just wait. However a timeout is almost always required if for no other reason than to check if the thread needs to exit periodically.

Typically my threads look something like this:

void MyClass::threadX()
{
    MyData data;
    while (_threadXRunning == true)
    {
        if (_incomingQueue.waitDequeue(data, QUEUE_WAIT_TIMEOUT) == true)
        {
            processData(data);
        }
    }
}

As you can see the thread just waits for data from the queue, if something shows up then it gets processed, otherwise the thread running state is checked, and the thread exits if necessary otherwise it just goes back to blocking on the queue again.

Fast enqueue/dequeue

Fast enqueue, and dequeue operations is of course always nice, but the speed usually depends on the datatype being queued. Which is why I almost always use pointer or even more frequently smart pointer types. If your queue elements are 2k bytes, then every queue and dequeue operation requires a 2k byte copy; which could really bog a system down. On the other hand if you just queue a pointer to your 2k byte data then the queueing operations will be pretty quick.

There is still the little matter of the mutex that is typically used to protect the queue itself. Obtaining the mutex may block your thread for any amount of time, however if the queue elements are small (pointers, or other primitive types) then in practice obtaining the mutex is typically quick due to the fact that no queue operation will hold it for very long.

No queue length limit

The STL does not impose limits to the length of its containers, and in fact no one should. Of course there is risk that memory might become an issue, for example if you have a pattern like this:

{
    while (_applicationRunning == true)
    {
        if (_someThing == true)
        {
            queue.enqueue(new HugeDataStruct());
        }
    }
}

Then obviously at some point it might be the case that there is no memory left, and your program dies. In situations like this I tend to avoid the memory allocation, and instead use a second queue which is pre-populated with a fixed number of HugeDataStructs (typically called a pool) then I just dequeue an element from the HugeDataStruct pool and put it on my queue. Then when processing of the HugeDataStruct is complete it is returned to the pool. This makes the above pattern look more like this:

{
    while (_applicationRunning == true)
    {
        if (_someThing == true)
        {
            if (_pool.dequeue(hugeDataStruct, SOME_TIMEOUT) == true)
            {
                _queue.enqueue(hugeDataStruct);
            }
            else
            {
                errorReport("Return some HugeDataStructs, you dummy!");
            }
        }
    }
}

And the thread that processes the queue looks something like this:

{
    MyData data;
    while (_threadXRunning == true)
    {
        if (_queue.waitDequeue(data, QUEUE_WAIT_TIMEOUT) == true)
        {
            processData(data);
            _pool.enqueue(data);
        }
    }
}

Of course with this technique you have to be careful with the following issues:

  1. When you dequeue an element from the pool, the constructor is not called, so it may contain stale/invalid data, be sure to init everything necessary.
  2. When you are done processing the data it must be returned to the pool. This can be done manually as shown above, or with some type of garbage collection thread associated with the pool, or with your own handy dandy intrusive_ptr implementation. However you do it, if the pool goes empty, your application is out of memory.

The moral of the story is that with this pool+queue technique there does not need to be an artificial limit to the length of any queue, because the number of elements in the pool will put an upper bound your worst case memory usage.

Unlimited concurrent producers and consumers

This requirement is what normally leads to the use of a mutex to protect the queue. It just makes the code easy to implement, maintain, and then it follows that the API is also easy to use. You could use a lock free queue, but then there are usually some kind of restriction on queue use which makes the code less maintainable or violates one of my other requirements. For example consider the case where you write an application that uses a lock free queue that only supports single producer/consumer and it works beautifully, then a year later some new guy comes on board and adds new functionality to the application. In doing so he adds code that does single producer/multiple consumer, maybe it works, maybe it gets to a customer, maybe it then breaks? Or maybe it breaks in testing, and then you realize that you need to implement a new queue to support this new use case… Whatever happens, extra time is added to the schedule, and customers are unhappy.

I find it is best to just use tools that just work however they are used, it is future proof, maintainable, and the additional overhead (i.e. mutex) is probably fine for most applications.

Posted in Programming

Leave a Reply

Your email address will not be published. Required fields are marked *

*