Feature image

One of my pet projects over the past few years has been my deep learning framework, built entirely in C++ link. Once I had an initial iteration that worked sequentially and could successfully train a neural network to accomplish specific tasks, I turned toward leveraging concurrency to speed up training. My first thought was to farm out computationally expensive tasks to different CPU cores, primarily focusing on functions that perform computations on matrices.

In my project, I have a set of polymorphic classes that represent different types of neural network layers (like a dense layer, input layer, flatten layer, etc.). Each initialized class owns its matrix of weights and, upon receiving an input matrix, performs some computations between the two and sends out an output matrix. These matrices are often tens of thousands of values in size, which provided a good opportunity for multi-threading computations. For example, with my matrix multiplication function, I did the following:

Lookup how many cores I have on the machine. Break up the matrix into separate equal parts matching the number of available cores. I don’t actually break apart the matrix; I generate index ranges to represent dividing the matrix. Create a new task that uses my matrix multiplication function with parameters representing the bounding index. Once all tasks are complete, each task is deleted via RAII. This approach worked, and I saw significant improvement in my training times. However, the inefficiency of creating and destroying tasks every time a neural layer performed forward propagation (which usually happens thousands of times during a training session) was obviously not efficient and bothered me. The solution was not to constantly create and destroy threads, but instead to generate the number of threads to match the number of cores on a machine at the beginning of the program, and continuously use those same threads throughout the program’s lifetime in a thread pool. Here’s how I did that in C++:

I decided that each neural layer should own its own ThreadPool class. The ThreadPool class would be responsible for managing its threads throughout the program’s lifetime, including the management of all tasks sent its way. The public interface of this class looks like this:

C++
 1    class Thread_Pool {
 2        
 3        Thread_Pool();
 4        ~Thread_Pool();
 5        void setupPool(size_t threads);
 6        void clearPool();
 7        void wait();
 8        
 9        template<typename F, typename... Args>
10        void enqueue(F&&f, Args&&... args);
11    };

I decided I only wanted to utilize this thread pool during training when the mini-batch size is equal to or larger than the number of cores on the machine. From my testing, the overhead of using threads outweighs the performance gains of concurrency when the matrix sizes are too small. To accommodate this, the default constructor does not do much. Instead, when my program needs to use the thread pool, each layer calls the setupPool() function to initialize the thread pool with the appropriate number of threads. The clearPool() method does cleanup when training is complete, and the thread pool is no longer needed (the destructor will also call this method). This approach leaves room for mistakes since some of the initial state management is outside the constructor, but I felt it was a worthy tradeoff in this case. The enqueue function takes a task the neural layer needs to complete and assigns it to a thread. The wait() function is called anytime we want our neural layer class to hold until the Thread_Pool has no further work.

The private members and functions look like this:

C++
 1    std::vector<std::thread> workers;
 2    std::vector<std::function<void()>> tasks;
 3
 4    void worker();
 5
 6    std::mutex queueMutex;
 7    std::condition_variable condition;
 8    std::condition_variable finishedCondition;
 9    std::atomic<bool> stop{false};
10    std::atomic<int> activeTasks{0};

I will go over these more in detail as we further evaluate our implementation details. Starting with our setupPool function

C++
1void Thread_Pool::setupPool(size_t threads) {
2    stop = false;
3    for (size_t i = 0; i < threads; i++) {
4        workers.emplace_back(&Thread_Pool::worker::this);
5    }
6}

This function is called by the neural layer class when it determines that the thread pool is appropriate. The threads variable always matches the number of cores on a machine by passing this variable:

C++
1    const auto processor_count = std::thread::hardware_concurrency();

We construct the appropriate number of threads and place them into our thread vector called workers, where they will live throughout their lifetime. Each thread will run the worker(), which looks like this:

C++
 1    void Thread_Pool::worker() {
 2        while (true) {
 3
 4            std::function<void()> task;
 5
 6            {
 7                std::unique_lock<std::mutex> lock(queueMutex);
 8                condition.wait(lock, [this]() {
 9                    return stop || !task.empty();
10                });
11
12                if (stop) {
13                    return;
14                }
15
16                task = std::move(tasks.front());
17                tasks.pop();
18            }
19
20            task();
21
22      
23            activeTasks--;
24            finishedCondition.notify_one();
25            
26        }
27    }

We want to ensure each thread is kept alive and ready for work when the time comes, so the worker() function runs a while(true) loop. However, the thread will often have no work to do, which is why we call the wait() function on our private condition variable. The condition variable locks our queueMutex mutex until the lambda we pass it returns true. The best part is that the thread itself goes to sleep and doesn’t continuously check the lambda, alleviating CPU overhead. Instead, it waits for us to call condition.notify_one(), which we do elsewhere in the class. The condition checks two things: first, whether we’ve asked the Thread_Pool class to stop. If so, it continues, and we check if stop == true. If it is, we exit the while loop. The other check is whether our tasks queue has something in it. If it’s not empty, we assign the front item to the task variable and pop it off the queue. At this point, we can release our mutex, allowing other threads to safely pull from the tasks queue without a race condition. The thread then executes the task, decrements the activeTasks, and notifies the finishedCondition that one task is done.

Often, I break down my matrix into its indexed ranges and then call enqueue() on the Thread_Pool class in a loop to concurrently process subsets of the matrix on different cores. Here, it’s important for the neural layer to wait for all cores to finish processing their part of the matrix. The neural layer calls wait(), which causes the thread pool class to block until all tasks are completed. The wait() function looks like this:

C++
1    void Thread_Pool::wait() {
2
3        std::unique_lock<std::mutex> lock(queueMutex);
4
5        finishedCondition.wait(lock, [this]() {
6            return stop || (tasks.empty() && activeTasks == 0);
7        })
8    }

The finished condition checks to see if the tasks queue is empty and activeTasks == 0 (meaning all tasks are not only off the queue but completed). If these conditions aren’t met, the main thread the neural layer runs on is put to sleep. Anytime we complete a task in worker() and call finishedCondition.notify_one(), we check again to see if we can end the wait() and hand control back to the neural layer.

Adding tasks to the thread pool is one of the harder challenges and requires meta-programming and a template parameter pack since we need it to handle any task given by the neural layer (as long as no return value is expected). This is the job of the enqueue() function, which generally gets called like this:

C++
 1    const auto processor_count = std::thread::hardware_concurrency();
 2    for (int i = 0; i < processor_count - 1; i++) {
 3        _threadPool.enqueue(&Tensor::MatmulInner<a_f>,
 4                                this,
 5                                std::ref(m1),
 6                                std::ref(m2),
 7                                bias,
 8                                i * dimensions_per_thread,
 9                                dimensions_per_thread,
10                                af);
11    }
12    _threadPool.wait();

A lot of the parameters passed won’t make sense without further context about the program, but that’s part of the point. The enqueue() function doesn’t need to understand the parameters passed to it; it just needs to ensure they can be executed by a thread. The enqueue() function works like this:

C++
 1    template<typename... F, Args... args>
 2    void enqueue(F&& f, Args&&... args) {
 3        {
 4            std::unique_lock<std::mutex> lock(queueMutex);
 5            tasks.emplace([f = std::forward<F>(f), ...args = std::forward<Args>(args)]() mutable {
 6                if constexpr (std::is_member_function_pointer<F>::value) {
 7                    std::invoke(f, std::foward<Args>(args)...);
 8                } else {
 9                    f(std::foward<Args>(args)...);
10                }
11            });
12            activeTasks++;
13        }
14        condition.notify_one();
15    }

Because I want to accept any function with an arbitrary number of arguments, we use templates with a template paramter pack to capture the arguments. I capture these parameters as rvalues (&&) in order to ensure we can bind to both rvalues and lvalues, but this will require us to use perfect forwarding later on. I next lock the queueMutex to ensure no thread attempts to modify the tasks queue prior to me getting a new task on it. I then attempt to add a new std::function task to the tasks queue that will represent the function I want my task to execute. I do this by emplacing a lambda that captures the f and args. I call std::foward on both these because I want to ensure if these values are initially passed as lvalues that I make copies and if they are passed by rvalues that I properly move them. This task will modify matrices and I therefore mark the lambda as mutable. I next utilize SFINAE by checking to see if f is a member function pointer (f is a pointer to a member function of a class, which it will be in all my uses). If it is, we need to use std::invoke to ensure correct calling semantics where the first arguments is typically an instance of the class. Otherwise, we can directly have the lambda call the f function. These are compile time checks so they should have little to no impact on our runtime. Once we have added the task lambda to our tasks queue, we can increment our activeTasks tracker, release the queueMutex and notify our condition variable that as task is ready. We saw earlier in our worker() that a notified condition variable will check to see if there is something in the task queue, grab it, and execute it.

Finally we need to implement the cleanup of the class which lives within the clearPool()

C++
 1    void Thread_Pool::clearPool() {
 2        {
 3            std::unique_lock<std::mutex> lock(queueMutex);
 4            stop = true;
 5            condition.notify_all();
 6        }
 7        for (std::thread &worker : workers) {
 8            if (worker.joinable()) {
 9                worker.join();
10            }
11        }
12
13        workers.clear();
14
15        std::queue<std::function<void()>> empty;
16        std::swap(tasks,empty);
17
18        activeTasks = 0;
19
20        finishedCondition.notify_all();
21    }

We set all our invariants to values that signal no further work is needed for the threads in our thread pool. We join our threads, clear our workers vector in which our threads live, we ensure our threads queue is empty, and notify our finishedCondition just in case a thread is being blocked by our wait() function. This function is utilized by my neural net when switching from training to inference and no longer needs the concurrency a thread pool provides. I also call this function from the destructor for RAII.

Thread pools are great. They provide utilization of multiple cpu cores with little overhead while often hidding implementation details we prefer to keep out of other parts of our program. Part of what makes this work so well is there is little to no communication needed between threads. Each thread works completely independent of the rest, which often is not the case. When threads need to pass information between them, better (more complex options) such as programming thread affinities might be required.