Job System and ParallelFor

Some time ago, while profiling our game, I noticed that we have a lot of thread locking and contention resulting from a single mutexed MPMC job queue processing a large amount of tiny jobs. It wasn’t possible to merge work into larger jobs, as it would result in bad scheduling. Obviously, the more fine grained work items there are, the better they schedule.

There are two standard solutions: either make the global MPMC queue lock-free or use job stealing.

Global lock-free MPMC queue is quite complex to implement and still has a lot of contention when processing a large amount of small jobs. Maciej Siniło has a great post about a lock-free MPMC implementations if you are looking for one.

Job stealing replaces a single global MPMC queue with multiple lock-free local MPMC queues (one per a job thread). Jobs are pushed to multiple queues (static scheduling). Every job thread processes its own local queue and if there are no jobs left then it tries to steal a job from the end of a random queue (check out this post for an in-depth description). Job stealing has its own issues – it messes up the order of job processing or in other words it trades lower latency for a higher throughput. Moreover, if static scheduling fails (e.g. jobs have widely different lengths), then job stealing can degrade to a global MPMC queue with a lot of contention.

Before going nuclear with a lock-free MPMC queue or before implementing job stealing it may be interesting to consider some alternatives. I learned to avoid complex generic solutions and instead to favor specialized, but simpler ones. Maybe the specialized solutions won’t be better in the end, but at least it will be easier for the future code maintainer to make some changes or rewrites.

Going back to my profiling investigation, the interesting part was that almost all of those jobs were effectively doing a simple parallel for – spawning a lot of jobs of the same type in order to process the entire array of work items. For example: test visibility of 50k bounding boxes, simulate 100 particle emitters etc. This gave me the idea to abstract job system specifically for this case – a single function, array of elements to process in parallel and shared job configuration (dependencies, priorities, affinities etc.).

The implementation is simple. First we need a ParallelForJob structure (just remember to add some padding to this structure in order to avoid false sharing).

struct ParallelForJob
{
    uint pushNum;
    uint popNum;
    uint completedNum;

    uint elemBatchSize;
    uint nextArrayElem;
    uint arraySize;

    func* function;
};

In order to add a new work item, we just push a single job to the global, protected by a mutex, MPMC queue. Contention isn’t an issue here, because the number of jobs going through this global queue is low.

uint reqBatchNum = ( arraySize + elemBatchSize - 1 ) / elemBatchSize;
uint reqPushNum = ( reqBatchNum + JOB_THREAD_NUM - 1 ) / JOB_THREAD_NUM;
uint pushNum = Min( reqPushNum, JOB_THREAD_NUM );

ParallelForJob job;
job.pushNum = pushNum;
job.popNum = 0;
job.completedNum = 0;
job.elemBatchSize = elemBatchSize;
job.nextArrayElem = 0;
job.arraySize = arraySize;

jobQueueMutex.lock();
jobQueue.push( job );
jobQueueMutex.unlock();

jobThreadSemaphore.Release( pushNum );

After releasing the job thread semaphore, waked job threads pick the next ParallelForJob from the global queue.

jobThreadSemaphore.Wait();

jobQueueMutex.lock();
jobQueue.peek( job );
if ( job.popNum.AtomicAdd( 1 ) + 1 == job.pushNum )
{
    jobQueue.pop();
}
jobQueueMutex.unlock();

Next, job thread starts to process array elements of the picked job. Array elements make a fixed size queue without any producers, so a simple atomic increment is enough to safely pick the next batch of array elements from the multiple job threads in parallel.

while ( true )
{
    uint fromElem = job.nextArrayElem.AtomicAdd( job.elemBatchSize );
    uint toElem = Min( fromElem + job.elemBatchSize, job.arraySize );
    for ( uint i = fromElem; i < toElem; ++i )
    {
       job.function( i );
    }

    if ( toElem >= job.arraySize )
    {
       break;
    }
}

Finally, the last job thread runs an optional cleanup or dependency code.

if ( job.completedNum.AtomicAdd( 1 ) + 1 == job.pushNum )
{
    OnJobFinished( job );
}

Recently, I found out that Arseny Kapoulkine implemented something similar, but with an extra thread wait for the other threads to finish at the end of ParallelForJob processing loop. Still IMO it’s not a widely know approach and it’s worth sharing.

The interesting part about ParallelForJob is that it allows to pause and resume a job without using fibers (just store current array index) and allows to easily cancel a job in flight (just override current array index). Furthermore, this abstraction can be also applied to the jobs themselves. Just replace an array of elements with an array of jobs (instead of an array elements you commit and process an array of jobs).

Advertisements
This entry was posted in C++, Multithreading. Bookmark the permalink.

One Response to Job System and ParallelFor

  1. philmorton says:

    If I understand correctly, these are pretty much the same as the Job-Lists popularised by Doom 3 http://fabiensanglard.net/doom3_bfg/threading.php
    Indeed a great way to take pressure off the queue.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s