Grand Central Dispatch is one of the truly great APIs to come out of Apple in the past few years. In the latest installment of the Let's Build series, I'm going to explore a reimplementation of the most basic features ofdispatch_queue, a topic suggested by Rob Rix.
Overview
A dispatch queue is a queue of jobs backed by a global thread pool. Typically, jobs submitted to a queue are executed asynchronously on a background thread. All threads share a single pool of background threads, which makes the system more efficient.
That is the essence of the API which I'll replicate. There are a number of fancy features provided by GCD which I'll ignore for the sake of simplicity. For example, the number of threads in the global pool scales up and down with the amount of work to be done and the CPU utilization of the system. If you have a bunch of jobs hitting the CPU hard and you submit another job, GCD will avoid creating another worker thread for it, because it's already running at 100% and another thread will just make things less efficient. I'll skip over this and just use a hardcoded limit on the number of threads. I'll skip other fancy features like target queues and barriers on concurrent queues as well.
The goal is to concentrate on the essence of dispatch queues: they can be serial or concurrent, they can dispatch jobs synchronously or asynchronously, and they're backed by a shared global thread pool.
Code
As usual, the code for today's article is available on GitHub here:
https://github.com/mikeash/MADispatchQueue
If you'd like to follow along as you read, or just want to explore on your own, you can find it all there.
Interface
GCD is a C API. Although GCD objects have turned into Objective-C objects on more recent OS releases, the API remains pure C (plus Apple's blocks extension). This is great for a low-level API, and GCD presents a remarkably clean interface, but for my own purposes I prefer to write my reimplementation in Objective-C.
The Objective-C class is calledMADispatchQueue and it only has four calls:
Here's the interface declaration:
@interfaceMADispatchQueue :NSObject+(MADispatchQueue*)globalQueue;-(id)initSerial:(BOOL)serial;-(void)dispatchAsync:(dispatch_block_t)block;-(void)dispatchSync:(dispatch_block_t)block;@end
The goal, then, is to implement these methods to actually do what they say they do.
Thread Pool Interface
The thread pool that backs the queues has a simpler interface. It will do the grunt work of actually running the submitted jobs. The queues will then be responsible for submitting their enqueued jobs at the right time to maintain the queue's guarantees.
The thread pool has a single job: submit some work to be run. Accordingly, its interface has just a single method:
@interfaceMAThreadPool :NSObject-(void)addBlock:(dispatch_block_t)block;@end
Since this is the core, let's implement it first.
Thread Pool Implementation
Let's look at instance variables first. The thread pool is going to be accessed from multiple threads, both internally and externally, and so needs to be thread safe. While GCD goes out of its way to use fast atomic operations whenever possible, for my conceptual rebuilding I'm going to stick with good old-fashioned locks. I need the ability to wait and signal on this lock, not just enforce mutual exclusion, so I'm using anNSCondition rather than a plainNSLock. If you're not familiar with it,NSCondition is basically a lock and a single condition variable wrapped into one:
NSCondition*_lock;
In order to know when to spin up new worker threads, I need to know how many threads are in the pool, how many are actually busy doing work, and the maximum number of threads I can have:
NSUInteger_threadCount;NSUInteger_activeThreadCount;NSUInteger_threadCountLimit;
Finally, there's a list of blocks to execute. This is anNSMutableArray, treated as a queue by appending new blocks to the end and removing whem from the front:
NSMutableArray*_blocks;
Initialization is simple. Initialize the lock, initialize the blocks array, and set the thread count limit to an arbitrarily-chosen 128:
-(id)init{if((self=[superinit])){_lock=[[NSConditionalloc]init];_blocks=[[NSMutableArrayalloc]init];_threadCountLimit=128;}returnself;}
The worker threads run a simple infinite loop. As long as the blocks array is empty, it will wait. Once a block is available, it will dequeue it from the array and execute it. When doing so, it will increment the active thread count, then decrement it again when done. Let's get started:
-(void)workerThreadLoop:(id)ignore{
The first thing it does is acquire the lock. Note that it does thisbefore the loop begins. The reason will become clear at the end of the loop:
[_locklock];
Now loop forever:
while(1){
If the queue is empty, wait on the lock:
while([_blockscount]==0){[_lockwait];}
Note that this is done with a loop, not just anif statement. The reason for this isspurious wakeup. In short,wait can potentially return even though nothing signaled, so for correct behavior the condition being checked needs to be reevaluated whenwait returns.
Once a block is available, dequeue it:
dispatch_block_tblock=[_blocksfirstObject];[_blocksremoveObjectAtIndex:0];
Indicate that this thread is now doing something by incrementing the active thread count:
_activeThreadCount++;
Now it's time to execute the block, but we have to release the lock first, otherwise we won't get any concurrency and we'll have all sorts of entertaining deadlocks:
[_lockunlock];
With the lock safely relinquished, execute the block:
block();
With the block done, it's time to decrement the active thread count. This must be done with the lock held to avoid race conditions, and that's the end of the loop:
[_locklock];_activeThreadCount--;}}
Now you can see why the lock had to be acquired before entering the loop above. The last act in the loop is to decrement the active thread count, which requires the lock to be held. The first thing at the top of the loop is to check the blocks queue. By performing the first lock outside of the loop, subsequent iterations can use a single lock operation for both operations, rather than locking, unlocking, and immediately locking again.
Now foraddBlock:
-(void)addBlock:(dispatch_block_t)block{
Everything here needs to be done with the lock acquired:
[_locklock];
The first task is to add the new block to the blocks queue:
[_blocksaddObject:block];
If there's an idle worker thread ready to take this block, then there isn't much to do. If there aren't enough idle worker threads to handle all the outstanding blocks, and the number of worker threads isn't yet at the limit, then it's time to create a new one:
NSUIntegeridleThreads=_threadCount-_activeThreadCount;if([_blockscount]>idleThreads&&_threadCount<_threadCountLimit){[NSThreaddetachNewThreadSelector:@selector(workerThreadLoop:)toTarget:selfwithObject:nil];_threadCount++;}
Now everything is ready for a worker thread to get started on the block. In case they're all sleeping, wake one up:
[_locksignal];
Then relinquish the lock and we're done:
[_lockunlock];}
That gives us a thread pool that can spawn worker threads up to a pre-set limit to service blocks as they come in. Now to implement queues with this as the foundation.
Queue Implementation
Like the thread pool, the queue will use a lock to protect its contents. Unlike the thread pool, it doesn't need to do any waiting or signaling, just basic mutual exclusion, so it uses a plainNSLock:
NSLock*_lock;
Like the thread pool, it maintains a queue of pending blocks in anNSMutableArray:
NSMutableArray*_pendingBlocks;
The queue knows whether it's serial or concurrent:
BOOL_serial;
When serial, it also tracks whether it currently has a block running in the thread pool:
BOOL_serialRunning;
Concurrent queues behave the same whether anything is running or not, so they don't track this.
The global queue is stored in a global variable, as is the underlying shared thread pool. They're both created in+initialize:
staticMADispatchQueue*gGlobalQueue;staticMAThreadPool*gThreadPool;+(void)initialize{if(self==[MADispatchQueueclass]){gGlobalQueue=[[MADispatchQueuealloc]initSerial:NO];gThreadPool=[[MAThreadPoolalloc]init];}}
The+globalQueue method can then just return the variable, since+initialize is guaranteed to have created it:
+(MADispatchQueue*)globalQueue{returngGlobalQueue;}
This is just the sort of thing that calls fordispatch_once, but it felt like cheating to use a GCD API when I'm reimplementing a GCD API, even if it's not the same one.
Initializing a queue consists of allocating the lock and the pending blocks queue, and setting the_serial variable:
-(id)initSerial:(BOOL)serial{if((self=[superinit])){_lock=[[NSLockalloc]init];_pendingBlocks=[[NSMutableArrayalloc]init];_serial=serial;}returnself;}
Before we get to the remaining public API, there's an underlying method to build which will dispatch a single block on the thread pool, and then potentially call itself to run another block:
-(void)dispatchOneBlock{
Its entire purpose in life is to run stuff on the thread pool, so it dispatches there:
[gThreadPooladdBlock:^{
Then it grabs the first block in the queue. Naturally, this must be done with the lock held to avoid catastrophic explosions:
[_locklock];dispatch_block_tblock=[_pendingBlocksfirstObject];[_pendingBlocksremoveObjectAtIndex:0];[_lockunlock];
With the block in hand and the lock relinquished, the block can safely be executed on the background thread:
block();
If the queue is concurrent, then that's all it needs to do. If it's serial, there's more:
if(_serial){
On a serial queue, additional blocks will build up, but can't be invoked until preceding blocks complete. When a block completes here,dispatchOneBlock will see if any other blocks are pending on the queue. If there are, it calls itself to dispatch the next block. If not, it sets the queue's running state back toNO:
[_locklock];if([_pendingBlockscount]>0){[selfdispatchOneBlock];}else{_serialRunning=NO;}[_lockunlock];}}];}
With this method, implementingdispatchAsync: is relatively easy. Add the block to the queue of pending blocks, then set state and invokedispatchOneBlock as appropriate:
-(void)dispatchAsync:(dispatch_block_t)block{[_locklock];[_pendingBlocksaddObject:block];
If a serial queue isidle then set its state to running and calldispatchOneBlock to get things moving:
if(_serial&&!_serialRunning){_serialRunning=YES;[selfdispatchOneBlock];
If the queue is concurrent, then calldispatchOneBlock unconditionally. This makes sure the new block is executed as soon as possible even if another block is already running, since multiple blocks are allowed to run concurrently:
}elseif(!_serial){[selfdispatchOneBlock];}
If a serial queue is already running then nothing more needs to be done. The existing run ofdispatchOneBlock will eventually get to the block that was just added to the queue. Now release the lock:
[_lockunlock];}
On todispatchSync:. GCD is smart about this and runs the block directly on the calling thread, while stopping other blocks from running on the queue (if it's serial). We won't try to be so smart. Instead, we'll just usedispatchAsync:, and wrap it to wait for execution to complete.
It does this using a localNSCondition, plus adone variable to indicate when the block has completed:
-(void)dispatchSync:(dispatch_block_t)block{NSCondition*condition=[[NSConditionalloc]init];__blockBOOLdone=NO;
Then it dispatches a block asynchronously. This block calls the passed-in one, then setsdone and signals thecondition:
[selfdispatchAsync:^{block();[conditionlock];done=YES;[conditionsignal];[conditionunlock];}];
Out in the original calling thread, it waits on thecondition fordone to be set, then returns.
[conditionlock];while(!done){[conditionwait];}[conditionunlock];}
At this point, the block's execution has completed. Success! That's the last bit of API needed forMADispatchQueue.
Conclusion
A global thread pool can be implemented with a queue of worker blocks and a bit of intelligent spawning of threads. Using a shared global thread pool, a basic dispatch queue API can be built that offers basic serial/concurrent and synchronous/asynchronous dispatch. This rebuild lacks many of the nice features of GCD and is certainly much less efficient, but even so it gives a nice glimpse into what the inner workings of such a thing might be like, and shows that it's not really magic after all. (Except fordispatch_once. That's all magic.)
That's it for today. Come back next time for more fun, games, and fun. Friday Q&A is driven by reader ideas so if you have something you'd like to see discussed here next time or in the future, pleaselet me know!
dispatch_once. For convenience, the URL:if([_blocks count] < idleThreads…. The source on GitHub shows greater-than, which I believe is correct.Add your thoughts, post a comment:
Spam and off-topic posts will be deleted without notice. Culprits may be publicly humiliated at my sole discretion.