Contents
Changelog:
- 24 Feb 2022: note additional issues the supplied tests will not find
- 27 Feb 2022: update
pool-testto supportshortandlongoptions and document this - 12 Mar 2022: add note about
module load gccon department machines - 14 Mar 2022: update
pool-test.ccin skeleton actually include “shorter” test - 17 Mar 2022: add note about
module load gdbon department machines
Your Task
-
Implement a thread pool, which runs tasks across a fixed number of threads. Each of these threads should take tasks from a queue and run them, waiting for new tasks whenever the queue is empty. Your thread pool will start tasks in the order they are submitted and also support waiting for a particular task submitted to thread pool to complete. Each task in the thread pool will be identified by a name.
Your implementation must start threads once when the thread pool is created and only deallocate them when the
Stop()method is called. You may not start new threads each time a task is submitted.Your thread pool should follow the interface defined in the header file
pool.hin our skeleton code [last updated 2022-03-14]. This interface has two classes, one called Task representing tasks to run an done called ThreadPool representing the thread pool itself. You may add additional methods and member variables to these classes for your implementation.The Task class must have the following methods:
-
Task(),virtual ~Task()— constructor and virtual destructor -
virtual void Run() = 0— pure virtual (i.e. abstract) method overriden by subclasses. This will be run by threads in the thread pool when a task is run.
The ThreadPool class must have the following methods:
-
ThreadPool(int num_threads)— constructor, which takes the number of threads to create to run queued tasks. (Most likely, this is where you will create threads to run tasks.) -
void SubmitTask(const std::string &name, Task *task)— function to submit (enqueue) a task. The task can be identified bynamein future calls toWaitForTask(). Either when the task completes or when the task is waited for, the ThreadPool must be deallocate it as withdelete task.You may assume that for a particular ThreadPool object,
SubmitTaskwill be called only with thenameof a task that has not already been submitted and for whichWaitForTaskhas not already been called.This method should return immediately after adding the task to a queue, allocating additional space for the queue if necessary. (If this allocation fails, we do not care what happens.) It should never wait for running tasks to finish and free up space to store additional pending tasks.
-
void WaitForTask(const std::string &name)— wait for a particular task, not returning until the task completes. Note that this method may be called any time after the task is submitted, including after it has already finished running.You may assume that
WaitForTaskis called exactly once per submitted task. -
void Stop()— stop the thread pool, terminating all its threads normally. If any thread is in the middle of running a Task, this should wait for that thread to finish running the task rather than interrupting it. Before this returns, all the threads spawned by thread pool should have finished executing and it should be safe to deallocate the thread pool.
Every method of the
ThreadPoolclass except for theStop()method may be called from any thread, including from one of the tasks in submitted to the thread pool. TheStop()method may be called from any thread except one that was created by the thread pool.Provided that tasks are waited for as they complete, your thread pool should not use more and more memory the more tasks that run. (For example, you should not store a list of the names of all finished tasks from which tasks are never removed until thread pool is stopped (even when those finished tasks were waited for before the thread pool stops).)
Your implementation must support multiple
ThreadPoolobjects being used at the same time, so you should not use global variables.In no case, may any of the methods above (or below, if you took CoA 2) or the threads spawned by the constructor to run queued tasks consume a lot of compute time while waiting for some condition (e.g. a task finishing or a new task being available) to become true. A thread that needs to wait should arrange to be put into a sleeping/waiting state until the condition is likely true. (That is, use something like a condition variable or semaphore to wait until another thread indicates that something has happened, rather than simply checking a condition in a loop.)
-
-
If you have previously taken CoA 2, then as an additional requirement, you must implement the following methods:
-
bool CancelTask(const std::string &name)— stop a task with a particular name from running if it has not already started. This should returntrueif successful, andfalseif the task in question has already started running or completed.You may assume that this is called instead of calling
WaitForTaskfor a task of the same name. -
void Pause()— after any currently running tasks complete, temporarily stop the thread pool worker threads from running any tasks. This must not return until after all the worker threads are not running tasks. -
void Resume()— assuming a previous call to Pause was made, cause the thread pool threads to resume processing tasks.
-
-
Your submission should include a
Makefilewhich produces a statically linked librarylibpool.a, like our skeleton code does. -
You can use the supplied
pool-test-tsanandpool-testprograms to help test your thread pool implementation. Thepool-test-tsanprogram is built with compiler options that attempt to detect data races (using ThreadSanitizer);pool-testwith compiler options that attempt to detect memory errors (using AddressSanitizer.You can run:
./pool-test-tsan shorteror./pool-test shorterto run two tests, each of which uses a ThreadPool with one thread, and submits two tasks to it (requires this version of pool-test.cc, which was included with the skeleton as of 14 March)./pool-test-tsan shortor./pool-test shortto run a subset of these tests that use ThreadPools with at most two threads and avoids submitting very large numbers of tasks../pool-test-tsan longor./pool-test longto run a more thorough set of tests, including tests that start many tasks and use more than two threads
You can also edit
pool-test.cc(especially themain()functoin near the bottom) and recompile to change the set of tests that’s enabled to aid debugging.Note
pool-testis not a complete test, primary because some things are complex to test automatically in this assignment. For example:- the test may not expose all the race conditions that might exist in your code (especially since the exact timing with which your code is run will vary between machines),
- it does not determine if your code consumes a lot of compute time while waiting for a condition to become true,
- it does not determine whether you comply with the requirement to not spawn new threads for each task,
- it does not determine whether you comply with the requirement to not use more and more memory the more tasks that are run, and
- it does not determine whether your implementation allows two ThreadPools to be used at the same time
If you have previously taken CoA 2, then we have supplied a pool-test-coa2-extra.cc which contains some tests for the extra functions you must implement. To use these tests, add it in addition to the supplied
pool-test.ccto the Makefile, following the pattern used forpool-test.ccto add additional rules to the Makefile. Like with suppliedpool-test, these tests will not completely test the methods you need to implement. -
Produce a
.tar.gzfile of your submission like the onemake submitwill produce and submit to the submission site.
Hints
General advice
-
The producer/consumer pattern we discussed in lecture is very useful for this assignment.
-
You can use the C++ standard library’s deque or list as a queue, using
push_back()to insert elements onto the queue; andfront()andpop_front()to remove elements from the queue.Alternately, you could write your own queue with a linked list, or dynamic array.
-
You can use something like the C++ standard library’s map (typically implemented with a balanced tree) to store a mapping between
std::strings and information about particular tasks. -
To safely access almost any of the containers (e.g.
map,deque,vector) in the C++ standard library, you must ensure you prevent a thread from adding or removing elements from the container while any other thread is reading the container. -
You will need to use some synchronization mechanism to manage the queue of waiting tasks and manage reporting when tasks finish. Probably this will either be mutexes and condition variables (what I used in my implementation) or semaphores.
-
Most likely you will want (at least) one condition variable or semaphore for each task to handle waiting for tasks to complete.
Example of Usage
-
To use the ThreadPool class you create, a user would create a subclass of Task that implements the
Run()method that performs an operation they want to add to the queue of operations to do:class ComputeSumTask : public Task { public: ComputeSumTask(int *sum_destination, int *array_to_sum, int array_size) { this->sum_destination = sum_destination; this->array_to_sum = array_to_sum; this->array_size = array_size; } void Run() { int sum = 0; for (int i = 0; i < this->array_size; ++i) { sum += this->array_to_sum[i]; } *this->sum_destination = sum; } int *sum_destination, int *array_to_sum; int array_size; };Notice that the
Tasksubclass can (and typically would) contain member variables. Then, submit a bunch of instances of this class for each thing they wanted to do in parallelint arrayA[ARRAY_SIZE], arrayB[ARRAY_SIZE]; int sum_of_A, sum_of_B; ThreadPool pool(num_threads); pool.SubmitTask("sum arrayA", new ComputeSumTask(&sum_of_A, arrayA, ARRAY_SIZE)); pool.SubmitTask("sum arrayB", new ComputeSumTask(&sum_of_B, arrayB, ARRAY_SIZE)); ...and finally wait for the tasks to complete before stopping the thread pool:
pool.WaitForTask("sum arrayA"); pool.WaitForTask("sum arrayB"); pool.Stop();
Some Pthreads Resources
-
The lecture slides on
pthreads,pthread_mutexes,pthread_conds, etc. -
Chapters 27-31 of Operating Systems: Three Easy Pieces. Notably chapter 30 has code examples for condition varibles and chapter 31 has code examples for semaphores.
-
The official documentation at http://pubs.opengroup.org/onlinepubs/9699919799/.
Using std::map
std::map is one of the C++ standard libraries key-value containers:
-
Given a
std::map<KeyType, ValueType> m, you can check whether astd::mapcontains a particular keykusing:if (m.count(k) > 0) { /* code to run if it does */ } -
Given a
std::map<KeyType, ValueType> m, you can insert the valuevfor the keykwithm.insert({k, v});or
m.insert(std::make_pair(k, v)); -
Given a
std::map<KeyType, ValueType> m, you can retrieve the value for the keykwithm.at(k)It is also possible to use
m[k], but this allocates a value forkif one is not already present, which is often not what you want. -
Given a
std::map<KeyType, ValueType> m, you can remove the keyk(and its corresponding value) withm.erase(k)
On using the department machines
- If you are using the department machines, you may need to run
module load gccbefore compiling to get access to a recent version of gcc/g++
Using GDB to help diagnose hangs
-
If you experience a hang when running tests, I would suggest starting by trying to reproduce the hang using a debugger like GDB. On the department machines, you may need to use
module load gdbto get access to the most recent version of GDB. Once running under GDB, you can run a test until it hangs, then use control-C to stop the program and go into the debugger. Once in GDB, a command likethread apply all backtracewill show what each thread was doing. This should, at least, give you more information about what might be causing the hang.
If you need more information about what the threads are doing, you can sue the debugger to examine the local variables of each thread. In the
thread apply all backtraceoutput, each thread will have a number. You can use this to switch between threads using a command likethread 14and then use
upanddownto change where on that thread’s call stack the debugger is working. Once you have the debugger selecting a particular thread and a particular function call, you can examine the local variables, etc. usingprintorinfo localsor similar.