Coverage for src/sparkouille/fctmr/pyparallel_fctmr.py: 100%

5 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 14:24 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief :epkg:`joblib` uses a module not documented 

5in the official :epkg:`Python` documentation: 

6`Python's undocumented ThreadPool <http://lucasb.eyer.be/snips/python-thread-pool.html>`_. 

7""" 

8from multiprocessing.pool import ThreadPool 

9 

10 

11def pyparallel_mapper(fct, gen, threads=None): 

12 """ 

13 Applies function *fct* to a generator. 

14 Relies on *ThreadPool*. 

15 

16 @param fct function 

17 @param gen generator 

18 @param threads number of threads 

19 @return generator 

20 

21 If the number of threads is None, 

22 it is replaced by ``os.cpu_count() or 1`` 

23 (see *multiprocessing.pool*). 

24 

25 .. exref:: 

26 :title: mapper 

27 :tag: progfonc 

28 

29 .. runpython:: 

30 :showcode: 

31 

32 from sparkouille.fctmr.pyparallel_fctmr import pyparallel_mapper 

33 res = pyparallel_mapper(lambda x: x + 1, [4, 5]) 

34 print(list(res)) 

35 

36 Unfortunately, the parallelization is not following 

37 the map/reduce concept in a sense that the function 

38 generates an intermediate list and creates an iterator 

39 on it. 

40 """ 

41 pool = ThreadPool(processes=threads) 

42 return iter(pool.map(fct, gen))