#!/usr/bin/env python3
from functools import reduce
from itertools import islice
from pprint import pprint
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor as Pool
def sequencer(data):
"""
This sequencer will cut data into smaller chunks of 10 items.
"""
_data = iter(data)
count = 10
chunk = True
while chunk:
chunk = list(islice(_data, count))
if not chunk:
break
yield chunk
def mapper(items):
"""
This mapper will sum even and odd numbers separately, and add them to
a dictionary with the appropriate category as key.
"""
output = defaultdict(int)
for item in items:
category = 'odd' if item & 1 else 'even'
output[category] += item
return output
def reducer(target, source):
"""
This reducer will go through the dictionary 'source',
and add its values to 'target', key by key.
"""
for key, value in source.items():
target[key] += value
return target
def main():
data = range(100)
number_of_processes = 4
with Pool(number_of_processes) as pool:
# Note:
# This can start yielding results to the reducer earlier if you instead
# import concurrent.futures.as_completed and stop using the map function.
# Note some more:
# You could use ThreadPoolExecutor if your mapper is IO-bound.
# One way of achieving this would be to pass the task at hand
# off to other machines and wait for them to get back to you.
result = reduce(reducer, pool.map(mapper, sequencer(data)), defaultdict(int))
for key, value in result.items():
print('sum of %s numbers: %d' % (key, value))
if __name__ == '__main__':
main()