@@ -118,7 +118,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
118118try :
119119result = (True ,func (* args ,** kwds ))
120120except Exception as e :
121- if wrap_exception :
121+ if wrap_exception and func is not _helper_reraises_exception :
122122e = ExceptionWithTraceback (e ,e .__traceback__ )
123123result = (False ,e )
124124try :
@@ -133,6 +133,10 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
133133completed += 1
134134util .debug ('worker exiting after %d tasks' % completed )
135135
136+ def _helper_reraises_exception (ex ):
137+ 'Pickle-able helper function for use by _guarded_task_generation.'
138+ raise ex
139+
136140#
137141# Class representing a process pool
138142#
@@ -277,6 +281,17 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
277281return self ._map_async (func ,iterable ,starmapstar ,chunksize ,
278282callback ,error_callback )
279283
284+ def _guarded_task_generation (self ,result_job ,func ,iterable ):
285+ '''Provides a generator of tasks for imap and imap_unordered with
286+ appropriate handling for iterables which throw exceptions during
287+ iteration.'''
288+ try :
289+ i = - 1
290+ for i ,x in enumerate (iterable ):
291+ yield (result_job ,i ,func , (x ,), {})
292+ except Exception as e :
293+ yield (result_job ,i + 1 ,_helper_reraises_exception , (e ,), {})
294+
280295def imap (self ,func ,iterable ,chunksize = 1 ):
281296'''
282297 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
@@ -285,15 +300,23 @@ def imap(self, func, iterable, chunksize=1):
285300raise ValueError ("Pool not running" )
286301if chunksize == 1 :
287302result = IMapIterator (self ._cache )
288- self ._taskqueue .put ((((result ._job ,i ,func , (x ,), {})
289- for i ,x in enumerate (iterable )),result ._set_length ))
303+ self ._taskqueue .put (
304+ (
305+ self ._guarded_task_generation (result ._job ,func ,iterable ),
306+ result ._set_length
307+ ))
290308return result
291309else :
292310assert chunksize > 1
293311task_batches = Pool ._get_tasks (func ,iterable ,chunksize )
294312result = IMapIterator (self ._cache )
295- self ._taskqueue .put ((((result ._job ,i ,mapstar , (x ,), {})
296- for i ,x in enumerate (task_batches )),result ._set_length ))
313+ self ._taskqueue .put (
314+ (
315+ self ._guarded_task_generation (result ._job ,
316+ mapstar ,
317+ task_batches ),
318+ result ._set_length
319+ ))
297320return (item for chunk in result for item in chunk )
298321
299322def imap_unordered (self ,func ,iterable ,chunksize = 1 ):
@@ -304,15 +327,23 @@ def imap_unordered(self, func, iterable, chunksize=1):
304327raise ValueError ("Pool not running" )
305328if chunksize == 1 :
306329result = IMapUnorderedIterator (self ._cache )
307- self ._taskqueue .put ((((result ._job ,i ,func , (x ,), {})
308- for i ,x in enumerate (iterable )),result ._set_length ))
330+ self ._taskqueue .put (
331+ (
332+ self ._guarded_task_generation (result ._job ,func ,iterable ),
333+ result ._set_length
334+ ))
309335return result
310336else :
311337assert chunksize > 1
312338task_batches = Pool ._get_tasks (func ,iterable ,chunksize )
313339result = IMapUnorderedIterator (self ._cache )
314- self ._taskqueue .put ((((result ._job ,i ,mapstar , (x ,), {})
315- for i ,x in enumerate (task_batches )),result ._set_length ))
340+ self ._taskqueue .put (
341+ (
342+ self ._guarded_task_generation (result ._job ,
343+ mapstar ,
344+ task_batches ),
345+ result ._set_length
346+ ))
316347return (item for chunk in result for item in chunk )
317348
318349def apply_async (self ,func ,args = (),kwds = {},callback = None ,
@@ -323,7 +354,7 @@ def apply_async(self, func, args=(), kwds={}, callback=None,
323354if self ._state != RUN :
324355raise ValueError ("Pool not running" )
325356result = ApplyResult (self ._cache ,callback ,error_callback )
326- self ._taskqueue .put (([(result ._job ,None ,func ,args ,kwds )],None ))
357+ self ._taskqueue .put (([(result ._job ,0 ,func ,args ,kwds )],None ))
327358return result
328359
329360def map_async (self ,func ,iterable ,chunksize = None ,callback = None ,
@@ -354,8 +385,14 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
354385task_batches = Pool ._get_tasks (func ,iterable ,chunksize )
355386result = MapResult (self ._cache ,chunksize ,len (iterable ),callback ,
356387error_callback = error_callback )
357- self ._taskqueue .put ((((result ._job ,i ,mapper , (x ,), {})
358- for i ,x in enumerate (task_batches )),None ))
388+ self ._taskqueue .put (
389+ (
390+ self ._guarded_task_generation (result ._job ,
391+ mapper ,
392+ task_batches ),
393+ None
394+ )
395+ )
359396return result
360397
361398@staticmethod
@@ -377,33 +414,27 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
377414
378415for taskseq ,set_length in iter (taskqueue .get ,None ):
379416task = None
380- i = - 1
381417try :
382- for i ,task in enumerate (taskseq ):
418+ # iterating taskseq cannot fail
419+ for task in taskseq :
383420if thread ._state :
384421util .debug ('task handler found thread._state != RUN' )
385422break
386423try :
387424put (task )
388425except Exception as e :
389- job ,ind = task [:2 ]
426+ job ,idx = task [:2 ]
390427try :
391- cache [job ]._set (ind , (False ,e ))
428+ cache [job ]._set (idx , (False ,e ))
392429except KeyError :
393430pass
394431else :
395432if set_length :
396433util .debug ('doing set_length()' )
397- set_length (i + 1 )
434+ idx = task [1 ]if task else - 1
435+ set_length (idx + 1 )
398436continue
399437break
400- except Exception as ex :
401- job ,ind = task [:2 ]if task else (0 ,0 )
402- if job in cache :
403- cache [job ]._set (ind + 1 , (False ,ex ))
404- if set_length :
405- util .debug ('doing set_length()' )
406- set_length (i + 1 )
407438finally :
408439task = taskseq = job = None
409440else :