Chapter 11 : Threads In C++ And Concurrency

Key Words: Concurrency in C++, Threads in C++, Task, process, thread, std::mutex, std::condition_variable, std::unique_lock, producer-consumer pattern, thread safe queue, thread guards

Topics at a glance:

  • Concurrency achieved via threads – parallel execution units
  • Let’s understand the subtleties : Task, Process and Threads
  • Learn about process abstraction
  • Let’s spawn a process – what are the essential steps that every programmer must understand
  • Threads and their inherent issues – race conditions and same time access
  • Mutex locks and Condition Variables
  • Let’s guard our thread using thread guards
  • How to mitigate issues with concurrent execution using C++ facilities
    • Mutually Exclusive Locks a.k.a mutex locks
    • lock ownership and management in C++
  • Let’s implement a Queue
  • Let’s implement a thread-safe queue – Classic producer-consumer pattern

Concurrency in C++

This chapter introduces the threads in C++, their uses, inherent issues, and how to tackle them.

Threads in C++

So what are threads? Threads are the basic units of execution on any platform. Platform is an abstract term here. Platform can be a full rich operating system, in which case, threads are running on the application layer, a.k.a application threads, on top of kernel doing tasks assigned by the user. Platform can be a kernel of an OS, in which threads are called kernel threads and they do privileged task for the kernel. Platform can be also bare-metal or even a virtual machine. Regardless of the platform, threads are units of execution that performs some pre-defined task.

Process and threads

Here, let us assume that platform is an operating system such as MS Windows or Linux based operating system. OS provides the abstraction called process. Any process requires at least one unit of execution or thread. This thread is called the main thread. The function that runs in the main thread is identified by the user defined main function in the program. Usually the OS creates and spawns a process. The steps are too big and out of scope here. But I will try to cover the basics.

Before covering the steps, let us understand what you mean by a process regardless of an OS or bare-metal or hypervisor (virtual) platform. We all know the actual platform on top of which anything runs/executes is the hardware, comprising of the CPU and some memory system along with some peripherals such as IO devices, network interfaces etc. This hardware platform is not limitless. Typically, there is only one processor (with single or multiple cores), one RAM, and limited number of IO devices and other peripherals. These are the basic things that constitute any hardware platform. So if a task has to run on this HW platform directly other tasks cannot be executed as this running task solely takes the full CPU time and engages the HW fully. A computer will become eventually useless as it has to / can run only a single task all the time. To overcome this, the idea is to assign specific time slots for any given task to run and to leave the platform for other tasks once this slot is exhausted and resume back once a slot is free or a pre-defined slot occurs. This task-to-task switching is difficult if the tasks themselves are let to manage and will not be reliable. What if there is a rogue task who does not want to give the resource back. Also, who will manage the task to task context switching, like saving the CPU register values and critical information required by the task to resume its operation? To mitigate all these issues, we came up with the abstraction called process. Process abstracts the underlying platform (HW or some other SW platform) and gives the task an impression that there is only one task that is using the CPU at any given point of time. Processes will get pre-empted by other processes performing other tasks and will get resumed back in a while. Switching the tasks/processes itself is done by another task and we call this special program the task scheduler or process scheduler.

Any task has two aspects:

  1. The functionality that the programmer(s) define in the source code.
  2. An environment for execution.

Process gives the second i.e. an execution environment to run and helps in making sure that the first aspect, which is the functionality, runs as intended. Execution environment is manifested as an abstract entity that provides the task, resources such as a virtual CPU, memory, stack, files, sockets, IO devices etc. This abstract environment is called the process. Process initiates execution from the main function, after setting up some initial things required for the task to run. Process’s will have one unit of execution as I have mentioned above, that runs the main program.  Any execution unit within a process must require one thing i.e. a dedicated stack. As I said in Chapter 4 : The stacks in ‘C’, any C program or a C++ program for that matter, requires stacks for their execution. So, if you got my point clearly, then you can easily guess that for ‘N’ number of threads (i.e. execution units) we need ‘N’ independent stacks. That said, a process itself will have a pool of stack and for every unit of execution i.e. threads it assigns a dedicated slot(s) from this stack pool. The second most important thing required for execution unit is the virtual platform resources. It requires its own copy of CPU registers without which task-to-task context switching is not possible. Before switching over to other task or process, the running process has to save it’s copy of CPU registers and other critical parameters. These information will be used by the underlying platform to resume back the pre-empted task/process after a while. That said these things are not possible without the process abstraction. Let us summarize the key points related to task, process and threads:

Task, process and threads

  1. Task is just a job defined by a programmer. Say like a recipe for a cuisine, but not the dish itself.
  2. Process provides an abstract execution environment for loading this program and executing it such as setting up independent stacks for independent execution units. Aiding task-to-task context switching by abstracting the CPU registers and other critical resources.This is analogous to getting the ingredients to cook the dish, making the cooking utensils ready etc.
  3. Thread is actual unit of execution. The one that runs on top of CPU. That means now the cook (i.e CPU) has started to make the dish. I.e CPU starts executing the program. The final work accomplished by the running thread is the dish itself! Voila!!!
  4. One important thing to note here is that, although process can provide the bare-minimum execution environments in form of stacks and CPU registers and other critical parameters, threads share all other things of a process, such as data segment, open file handles, sockets device handles etc. Programmatically any thread can access these common resources shared within the same process. This is the most important point that we should focus as this leads to lot of issues with thread programming. Process only guarantees that data/resources within one process is safe from another process. But this protection is not there among threads, as they do share process resources among themselves.

These are the minimum essential points that any programmer should understand. Let it be any platform, any OS, hardware, any programming language.

NOTE: All the things I have mentioned above is not specific to any OS/Platform. I have tried to comprehend the bigger picture as generic as possible.

Now, let us go through the basic steps involved in spawning a process till running a thread on CPU.

  1. Typically, OS will create a new process from a parent process. To easily relate, say you are launching an executable file (*.exe) from Windows command prompt or Linux bash, or say you are launching from a GUI program like Windows Explorer or Linux’s File Managers such as Nautilus. So here, the parent process itself is the launcher (command prompt or windows explorer)
  2. This is known as forking a process. An exact replica of the parent process is created. Now we need to replace the executable in this clone with the program’s executable (say ping.exe the famous packet ping program). OS will do this task next.
  3. Once this is done, the process must start execution. Process must set up the minimum execution environment for the thread(s) to run, as I have detailed above, and jump to the entry point of program. It looks for the keyword _main or something similar and jump to that. This is platform specific.
  4. This combined forking and executing can be called as spawning a process.
  5. From here, the main thread will start execution, underlying platform (typically OS or virtual machine) will do pre-emption, context switching and resumption operations. In course of execution the spawned main thread will result in creating multiple threads within the same process or may even result in spawning multiple processes depending upon the complexity of the program.

There are other aspects that needs to be discussed such as thread to thread synchronization, process to process communication through IPC’s etc. Our topic of interest in this chapter is the thread itself. So, let us now focus much on threads and their attributes.

Mostly threads within a process will be doing one part of a big task and is quite likely to be inter-dependent and not as independent as I have mentioned above. Most of the threads will have to share many resources such as data/memory/files/sockets/devices etc in course of their execution. Since they are all shared among thread within the same process, its quite likely that there can be issues related to same time access. The word ‘same’ is especially important here. Same time access can lead to coherency problems, ownership issues etc. I am not going to point out all such issues here as it is widely out of scope.

Now, let us see how C++ provides the programmer facilities/features to define units of execution called threads. But, of course, process itself is implemented by the OS/Virtual machine underneath. The std::thread helps the programmer to define a parallel executable independent unit within the same process. C++ also provides lot of facilities to mitigate the issues inherent with parallel execution of threads sharing common process resources. These includes the following:

  • atomics
  • mutex locks
  • condition variables and predicates
  • lock management entities such a unique_locks, scoped_locks, shared_locks, deferred locks etc.

So let us understand these things one by one in detail. I will explain these things selectively with relevant example.

Thread safe Queues

I am taking the example of a queue, implemented as a First In First Out or FIFO. The example is iterative.

In the first iteration, I will implement a basic queue with insert and get operations. One thread will add data to it and after that, the same thread will get the data out of it thus demonstrating the operation of the queue.

Now, let us see the basic queue class. This class is defined to support integer data.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
#include <iostream>

using namespace std;

class Queue
{
public:
    enum class error_codes{bad_size = -1};
private:
    int *memory;
    int size_;
    int head;
    int tail;
    bool is_full;
    bool is_empty;
public:
    explicit Queue(const int size = 0) : 
        size_{size}, is_full{false}, is_empty{true}
    {
        if(size_ <= 0)
        {
            cout << "Bad size specified" << endl;
            throw error_codes::bad_size;
        }
        
        memory = new int[size_];
        
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is created " << endl;
    }
    
    ~Queue()
    {
        delete []memory;
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is destroyed" << endl;
        size_ = 0;
    }
    
    // delete copy/move operations 
    Queue(const Queue& other_stack) =delete;
    Queue& operator=(const Queue& other_stack) =delete;
    Queue(Queue&& other_stack) =delete;
    Queue& operator=(Queue&& other_stack) =delete;
    
    bool insert(const int data)
    {
        bool ret = true;
        
        if( is_full == false )
        {
            memory[tail] = data;
            tail = (tail + 1) % size_;
            if(tail == head)
            {
                is_full = true;
            }
            is_empty = false;
        }
        else
        {
            // queue is full
            cout << "\tQueue is full : " << data  << endl;
            ret = false;
        }
        
        return ret;
    }
    
    bool get(int * data)
    {
        bool ret = true;
        
        if(is_empty == false)
        {
            *data = memory[head];
            head = (head + 1) % size_;
            is_full = false;
            if(head == tail)
            {
                is_empty = true;
            }
        }
        else
        {
            // queue empty
            cout << "\tQueue is empty" << endl;
            ret = false;
        }
        
        return ret;
    }
    
    int size() const
    {
        return size_;
    }
    
};

Now, let us see some basic queue operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
int main()
{
    const int q1_size = 10;
    Queue q1(q1_size);
    
    for(int count = 0; q1.insert(count); ++count)
    {
        // do nothing
    }
    
    int val = 0;
    
    for(int count = 0; q1.get(&val); ++count)
    {
        
        cout << "Val popped : " << dec << val << endl;
    }
    
    return 0;
}

Result:

Queue with size : 10 is created
        Queue is full : 10
Val popped : 0
Val popped : 1
Val popped : 2
Val popped : 3
Val popped : 4
Val popped : 5
Val popped : 6
Val popped : 7
Val popped : 8
Val popped : 9
        Queue is empty
queue with size : 10 is destroyed

Producer-Consumer pattern

Now, let’s start the second iteration. In the second iteration, I will make a thread (producer) that generates data and adds them to queue and another thread (consumer) that takes data out of queue and do some processing.

Mutex lock and condition variable

This will introduce an issue. How the consumer know data is ready or not? How the producer knows whether consumer is not taking some data from the queue, while it tries to add new data. This is an example of classic producer-consumer pattern. To tackle these, I will add the following features into our queue class.

  • Mutex lock
  • Condition variable

In simple terms, mutex lock guarantees only one thread can lock the mutex at any given point of time. Any other threads trying to get the lock will be put to sleep. When the thread that locked the mutex releases the lock, other thread(s) sleeping for the mutex will be scheduled again to run in an order. We will protect the critical regions in the code from simultaneous access from multiple threads through mutex locks.

Condition variable, is essentially an atomic variable, used for checking a condition is satisfied or not. Here, the condition is to check whether a new data is available in the queue to process or not. Consumer thread, will check whether the condition is passed or else goes to sleep, while producer thread will make the condition pass and notify any waiting threads. i.e. consumer checks whether new data is ready in queue while producer makes new data available in queue. Condition variables work with mutex, as follows:

  • Consumer thread will get the lock on a mutex first
  • Consumer thread will check the condition has passed or else goes to sleep while releasing the lock
  • Meanwhile, producer thread will lock the mutex and set the condition to pass and notifies any other waiting thread while unlocking the mutex.
  • Waiting/sleeping thread will receive this notification and wakes up locking the mutex and checks the condition is pass or not. If not passed goes back to sleep unlocking the mutex. But if passed perform any work/computation and unlocks the mutex.
  • The same cycle repeats as long as it is required.

All these steps are possible through C++’s std:mutex and std::condition_variable. Along with these two more entities are also there to aid us.

They are std::unlique_lock and predicate.

std::unique_lock and predicates

  • std::unique_locks: I told that std::condition_variables works with mutex locks. But in course of execution a condition variable lock the mutex, unlocks the mutex before going to sleep and locks the mutex when coming back from sleep. Condition variable cannot take complete ownership of the mutex. It just uses the mutex in course of its action and releases it when not needed, such as in a sleep state. So, like how std::unique_ptr’s manages the ownership of raw pointers, unique_locks helps in taking care of the ownership of mutex. It actually abstracts various methods which could be invoked on a mutex such as locking the mutex while the unique_lock instance is created in its constructor itself and unlocking or releasing the mutex while unique_lock instance is going out of scope and is being destructed. Condition variable uses such kinds of managers for managing mutex locks and its not recommended to do this without any such lock managers.
  • Predicates: In English, the logic of a predicate implies something which is affirmed or denied concerning an argument of a proposition. Likewise, for a condition variable to work correctly, there should be a condition which could be evaluated to either true or false. This condition is called predicate. So, in this case, consumer thread wait on a std::condition_variable, with a predicate that, it will wake up ONLY to check whether the condition is true or else it will go back to sleep.

Now, let us put all these and see how we can improvise the queue.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include <iostream>
#include <thread>
#include <condition_variable>

using namespace std;

class Queue
{
public:
    enum class error_codes{bad_size = -1};
private:
    int *memory;
    int size_;
    int head;
    int tail;
    bool is_full;
    bool is_empty;
    std::mutex mutex_;
    std::condition_variable cond_var;
    bool data_ready;
public:
    explicit Queue(const int size = 0) : 
        size_{size}, data_ready{false}, is_full{false}, is_empty{true}
    {
        if(size_ <= 0)
        {
            cout << "Bad size specified" << endl;
            throw error_codes::bad_size;
        }
        
        memory = new int[size_];
        
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is created " << endl;
    }
    
    ~Queue()
    {
        delete []memory;
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is destroyed" << endl;
        size_ = 0;
    }
    
    // delete copy/move operations 
    Queue(const Queue& other_queue) =delete;
    Queue& operator=(const Queue& other_queue) =delete;
    Queue(Queue&& other_queue) =delete;
    Queue& operator=(Queue&& other_queue) =delete;
    
    
    bool insert(const int data)
    {
        bool ret = true;
        std::unique_lock<mutex> lck_(mutex_);
        
        if( is_full == false )
        {
            memory[tail] = data;
            tail = (tail + 1) % size_;
            if(tail == head)
            {
                is_full = true;
            }
            is_empty = false;
        }
        else
        {
            // queue is full
            cout << "\tQueue is full : " << data  << endl;
            ret = false;
        }
        
        if(ret == true)
        {
            data_ready = true;
            cond_var.notify_one(); // unlocks the lock within the function 
        }
        
        //lck_.unlock(); // actually unnecessary to do this for unique_lock 
        
        return ret;
    }
    
    bool get(int * data)
    {
        bool ret = true;
        
        unique_lock<mutex> lck(mutex_);
        
        // predicate 'check for data_ready' is VERY IMPORTANT here 
        cond_var.wait(lck, [this]{return this->data_ready;} );
        
        if(is_empty == false)
        {
            *data = memory[head];
            head = (head + 1) % size_;
            is_full = false;
            if(head == tail)
            {
                is_empty = true;
            }
        }
        else
        {
            // queue empty
            cout << "\tQueue is empty" << endl;
            ret = false;
        }
        
        return ret;
    }
    
    int size() const
    {
        return size_;
    }
    
};

void producer(Queue& queue)
{
    bool ret = false;
    
    for(int count = 0; count < 100; ++count)
    {
        while(false == queue.insert(count));            
    }
    
    return;
}

void consumer(Queue& queue)
{
    int val = 0;
    while(true)
    {
        queue.get(&val);
        
        cout << "Val popped : " << val << endl;
        
        if(val == 99) break;
    }
    
    return;
}

Please see the producer thread function, consumer thread function, how they synchronize with one another through the predicate based on data_ready through mutex locks and condition_variables. Understand how std::unique_locks are used to manage mutex object.

Now let us create two threads; a producer thread and a consumer thread and see the queue in action:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
int main()
{    
    Queue q1(10);
    
    // create consumer thread first 
    std::thread consumer_thread(consumer, std::ref(q1));
    
    // create producer thread
    std::thread producer_thread(producer, std::ref(q1));
    // wait for the threads to join back main
    producer_thread.join();
    consumer_thread.join();
    
    cout << "Main ends" << endl;
    
    return 0;
}

Result:

Queue with size : 10 is created
        Queue is full : 10
Val popped : 0
        Queue is full : 11
Val popped : 1
        Queue is full : 12
Val popped : 2
        Queue is full : 13
Val popped : 3
        Queue is full : 14
Val popped : 4
        Queue is full : 15
Val popped : 5
        Queue is full : 16
Val popped : 6
        Queue is full : 17
Val popped : 7
        Queue is full : 18
Val popped : 8
        Queue is full : 19
Val popped : 9
        Queue is full : 20
Val popped : 10
        Queue is full : 21
Val popped : 11
        Queue is full : 22
Val popped : 12
        Queue is full : 23
Val popped : 13
        Queue is full : 24
Val popped : 14
        Queue is full : 25
Val popped : 15
        Queue is full : 26
Val popped : 16
        Queue is full : 27
Val popped : 17
        Queue is full : 28
Val popped : 18
        Queue is full : 29
Val popped : 19
        Queue is full : 30
Val popped : 20
        Queue is full : 31
Val popped : 21
        Queue is full : 32
Val popped : 22
        Queue is full : 33
Val popped : 23
        Queue is full : 34
Val popped : 24
        Queue is full : 35
Val popped : 25
        Queue is full : 36
Val popped : 26
        Queue is full : 37
Val popped : 27
        Queue is full : 38
Val popped : 28
        Queue is full : 39
Val popped : 29
        Queue is full : 40
Val popped : 30
        Queue is full : 41
Val popped : 31
        Queue is full : 42
Val popped : 32
        Queue is full : 43
Val popped : 33
        Queue is full : 44
Val popped : 34
        Queue is full : 45
Val popped : 35
        Queue is full : 46
Val popped : 36
        Queue is full : 47
Val popped : 37
        Queue is full : 48
Val popped : 38
        Queue is full : 49
Val popped : 39
        Queue is full : 50
Val popped : 40
        Queue is full : 51
Val popped : 41
        Queue is full : 52
Val popped : 42
        Queue is full : 53
Val popped : 43
        Queue is full : 54
Val popped : 44
        Queue is full : 55
Val popped : 45
        Queue is full : 56
Val popped : 46
        Queue is full : 57
Val popped : 47
        Queue is full : 58
Val popped : 48
        Queue is full : 59
Val popped : 49
        Queue is full : 60
Val popped : 50
        Queue is full : 61
Val popped : 51
        Queue is full : 62
Val popped : 52
        Queue is full : 63
Val popped : 53
        Queue is full : 64
Val popped : 54
        Queue is full : 65
Val popped : 55
        Queue is full : 66
Val popped : 56
        Queue is full : 67
Val popped : 57
        Queue is full : 68
Val popped : 58
        Queue is full : 69
Val popped : 59
        Queue is full : 70
Val popped : 60
        Queue is full : 71
Val popped : 61
        Queue is full : 72
Val popped : 62
        Queue is full : 73
Val popped : 63
        Queue is full : 74
Val popped : 64
        Queue is full : 75
Val popped : 65
        Queue is full : 76
Val popped : 66
        Queue is full : 77
Val popped : 67
        Queue is full : 78
Val popped : 68
        Queue is full : 79
Val popped : 69
        Queue is full : 80
Val popped : 70
        Queue is full : 81
Val popped : 71
        Queue is full : 82
Val popped : 72
        Queue is full : 83
Val popped : 73
        Queue is full : 84
Val popped : 74
        Queue is full : 85
Val popped : 75
        Queue is full : 86
Val popped : 76
        Queue is full : 87
Val popped : 77
        Queue is full : 88
Val popped : 78
        Queue is full : 89
Val popped : 79
        Queue is full : 90
Val popped : 80
        Queue is full : 91
Val popped : 81
        Queue is full : 92
Val popped : 82
        Queue is full : 93
Val popped : 83
        Queue is full : 94
Val popped : 84
        Queue is full : 95
Val popped : 85
        Queue is full : 96
Val popped : 86
        Queue is full : 97
Val popped : 87
        Queue is full : 98
Val popped : 88
        Queue is full : 99
Val popped : 89
Val popped : 90
Val popped : 91
Val popped : 92
Val popped : 93
Val popped : 94
Val popped : 95
Val popped : 96
Val popped : 97
Val popped : 98
Val popped : 99
Main ends
Queue with size : 10 is destroyed

But here there is a small problem, the main() has to make sure it will wait for the threads, it has spawned to return back. A process cannot exit while the threads are running. C++ make sure the threads are terminated when the process exits. To avoid this, main() waits on the threads to return back by invoking the join method. This should be done carefully every time. Let us see a way to ensure this policy.

Thread guards

Creating thread guards are the way to do this automatically. In the below code I have defined a thread_guard class, that creates the threads in its constructor and joins the thread it created in its destructor, thus making use of the famous C++ RAII principle.

Let us see thread_guard in action. Also, this time I have changed the Queue class to work with generic type ‘T’, making Queue class a template class Queue<T>.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#include <iostream>
#include <thread>
#include <condition_variable>

using namespace std;

template<typename T>
class Queue
{
public:
    enum class error_codes{bad_size = -1};
private:
    T *memory;
    int size_;
    int head;
    int tail;
    bool is_full;
    bool is_empty;
    std::mutex mutex_;
    std::condition_variable cond_var;
    bool data_ready;
public:
    explicit Queue(const int size = 0) : 
        size_{size}, data_ready{false}, is_full{false}, is_empty{true}
    {
        if(size_ <= 0)
        {
            cout << "Bad size specified" << endl;
            throw error_codes::bad_size;
        }
        
        memory = new T[size_];
        
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is created " << endl;
    }
    
    ~Queue()
    {
        delete []memory;
        head = 0;
        tail = 0;
        cout << "Queue with size : " << size_ << " is destroyed" << endl;
        size_ = 0;
    }
    
    // delete copy/move operations 
    Queue(const Queue& other_queue) =delete;
    Queue& operator=(const Queue& other_queue) =delete;
    Queue(Queue&& other_queue) =delete;
    Queue& operator=(Queue&& other_queue) =delete;
    
    
    bool insert(const T data)
    {
        bool ret = true;
        std::unique_lock<mutex> lck_(mutex_);
        
        if( is_full == false )
        {
            memory[tail] = data;
            tail = (tail + 1) % size_;
            if(tail == head)
            {
                is_full = true;
            }
            is_empty = false;
        }
        else
        {
            // queue is full
            cout << "\tQueue is full : " << data  << endl;
            ret = false;
        }
        
        if(ret == true)
        {
            data_ready = true;
            cond_var.notify_one(); // unlocks the lock within the function 
        }
        
        //lck_.unlock(); // actually unnecessary to do this for unique_lock 
        
        return ret;
    }
    
    bool get(T * data)
    {
        bool ret = true;
        
        unique_lock<mutex> lck(mutex_);
        
        // predicate 'check for data_ready' is VERY IMPORTANT here 
        cond_var.wait(lck, [this]{return this->data_ready;} );
        
        if(is_empty == false)
        {
            *data = memory[head];
            head = (head + 1) % size_;
            is_full = false;
            if(head == tail)
            {
                is_empty = true;
            }
        }
        else
        {
            // queue empty
            cout << "\tQueue is empty" << endl;
            ret = false;
        }
        
        return ret;
    }
    
    int size() const
    {
        return size_;
    }
    
};

template<typename T>
void producer(Queue<T>& queue)
{
    bool ret = false;
    
    for(int count = 0; count < 100; ++count)
    {
        while(false == queue.insert(count));            
    }
    
    return;
}

template<typename T>
void consumer(Queue<T>& queue)
{
    int val = 0;
    while(true)
    {
        queue.get(&val);
        
        cout << "Val popped : " << val << endl;
        
        if(val == 99) break;
    }
    
    return;
}

template<typename T>
class thread_guard
{
private:
    void (*thread_func)(Queue<T> &);
    Queue<T>& q_ref_;
    std::thread this_thread;
public:
    thread_guard(void (*thread_function)(Queue<T> &), Queue<T>& q_ref):
        thread_func{thread_function}, q_ref_{q_ref}, 
        this_thread(thread_func, std::ref(q_ref_))
    {
        cout << "Thread guard crated" << endl;
    }
    
    ~thread_guard()
    {
        this_thread.join();
        cout << "Thread has joined back and thread_guard is destroyed" << endl;
    }
    
    // delete any copy/move operations
    thread_guard(const thread_guard&) =delete;
    thread_guard(thread_guard&&) =delete;
    thread_guard& operator=(const thread_guard&) =delete;
    thread_guard& operator=(thread_guard&&) =delete;
};

Let us see the main() now:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
int main()
{    
    Queue<int> q1(10);
    
    // create consumer thread first 
    thread_guard<int> th_g_consumer(consumer<int>, q1);
    
    // create producer thread
    thread_guard<int> th_g_producer(producer<int>, q1);
    
    cout << "Main ends" << endl;
    
    return 0;
}

See now, main does not have to bother about joining the threads and it ends right after creating the thread_guards for producer and consumer. You can see “Main Ends” is printed in result-line #4 itself, but the threads executed till their completion and joined back the thread_guards in their destructors.

Programmer needs to use thread_guards to manage a thread’s life-cycle efficiently adhering to the principles of RAII.

Result:

Queue with size : 10 is created
Thread guard crated
Thread guard crated
        Queue is full : 10 Main ends
Val popped : 0
        Queue is full : 11
Val popped : 1
        Queue is full : 12
Val popped : 2
        Queue is full : 13
Val popped : 3
        Queue is full : 14
Val popped : 4
        Queue is full : 15
Val popped : 5
        Queue is full : 16
Val popped : 6
        Queue is full : 17
Val popped : 7
        Queue is full : 18
Val popped : 8
        Queue is full : 19
Val popped : 9
        Queue is full : 20
Val popped : 10
        Queue is full : 21
Val popped : 11
        Queue is full : 22
Val popped : 12
        Queue is full : 23
Val popped : 13
        Queue is full : 24
Val popped : 14
        Queue is full : 25
Val popped : 15
        Queue is full : 26
Val popped : 16
        Queue is full : 27
Val popped : 17
        Queue is full : 28
Val popped : 18
        Queue is full : 29
Val popped : 19
        Queue is full : 30
Val popped : 20
        Queue is full : 31
Val popped : 21
        Queue is full : 32
Val popped : 22
        Queue is full : 33
Val popped : 23
        Queue is full : 34
Val popped : 24
        Queue is full : 35
Val popped : 25
        Queue is full : 36
Val popped : 26
        Queue is full : 37
Val popped : 27
        Queue is full : 38
Val popped : 28
        Queue is full : 39
Val popped : 29
        Queue is full : 40
Val popped : 30
        Queue is full : 41
Val popped : 31
        Queue is full : 42
Val popped : 32
        Queue is full : 43
Val popped : 33
        Queue is full : 44
Val popped : 34
        Queue is full : 45
Val popped : 35
        Queue is full : 46
Val popped : 36
        Queue is full : 47
Val popped : 37
        Queue is full : 48
Val popped : 38
        Queue is full : 49
Val popped : 39
        Queue is full : 50
Val popped : 40
        Queue is full : 51
Val popped : 41
        Queue is full : 52
Val popped : 42
        Queue is full : 53
Val popped :    Queue is full : 4354
Val popped : 44
        Queue is full : 55
Val popped : 45
        Queue is full : 56
Val popped : 46
        Queue is full : 57
Val popped : 47
        Queue is full : 58
Val popped : 48
        Queue is full : 59
Val popped : 49
        Queue is full : 60
Val popped : 50
        Queue is full : 61
Val popped : 51
        Queue is full : 62
Val popped : 52
        Queue is full : 63
Val popped : 53
        Queue is full : 64
Val popped : 54 Queue is full : 65
        Queue is full : 65
Val popped : 55
        Queue is full : 66
Val popped : 56
        Queue is full : 67
Val popped : 57
        Queue is full : 68
Val popped :    Queue is full : 69
58
        Queue is full : 69
Val popped : 59
        Queue is full : 70
Val popped : 60
        Queue is full : 71
Val popped : 61
        Queue is full : 72
Val popped : 62
        Queue is full : 73
Val popped : 63
        Queue is full : 74
Val popped : 64
        Queue is full : 75
Val popped : 65
        Queue is full : 76
Val popped : 66
        Queue is full : 77
Val popped : 67
        Queue is full : 78
Val popped : 68 Queue is full :
79
Val popped : 69
        Queue is full : 80
Val popped : 70
        Queue is full : 81
Val popped : 71
        Queue is full : 82
Val popped : 72
        Queue is full : 83
Val popped : 73
        Queue is full : 84
Val popped : 74
        Queue is full : 85
Val popped : 75
        Queue is full : 86
Val popped : 76
        Queue is full : 87
Val popped : 77
        Queue is full : 88
Val popped : 78
        Queue is full : 89
Val popped : 79
        Queue is full : 90
Val popped : 80
        Queue is full : 91
Val popped : 81
        Queue is full : 92
Val popped : 82
        Queue is full : 93
Val popped : 83
        Queue is full : 94
Val popped : 84
        Queue is full : 95
Val popped : 85
        Queue is full : 96
Val popped : 86
        Queue is full : 97
Val popped : 87
        Queue is full : 98
Val popped : 88
        Queue is full : 99
Val popped : 89
Thread has joined back and thread_guard is destroyed
Val popped : 90
Val popped : 91
Val popped : 92
Val popped : 93
Val popped : 94
Val popped : 95
Val popped : 96
Val popped : 97
Val popped : 98
Val popped : 99
Thread has joined back and thread_guard is destroyed
Queue with size : 10 is destroyed

Observe how the thread_guard made sure the threads are joined back after completing their functions.

Enjoyed the chapter? Let me know in the comments below. Thanks! 🙂

Leave a Reply