Examples

Multiple Processes

Consumers run in separate processes.

import os

from consumers import Pool


def printer(numbers):
    pid = os.getpid()
    for number in numbers:
        print(pid, number)


pool = Pool(printer)

for number in range(5):
    pool.put(number)

pool.join()
3320 0
3320 2
3321 1
3323 3
3324 4

Getting Results

Consumers can return data the end of execution.

from consumers import Pool


def concatenate(letters):
    return '-'.join(letters)


with Pool(concatenate, quantity=2) as pool:
    for letter in 'abcdef':
        pool.put(letter)

print(pool.results)
('b-c', 'a-d-e-f')

Multiple Data

Multiple pieces of data can be queued and consumed with ease.

from consumers import Pool


def print_letter_index(items):
    for index, letter in items:
        print('{} is at index {}'.format(letter, index))


with Pool(print_letter_index) as pool:
    for i, v in enumerate('abcdef'):
        pool.put(i, v)
a is at index 0
c is at index 2
b is at index 1
d is at index 3
e is at index 4
f is at index 5

Consumer Configuration

Consumers can be configured with positional and keyword arguments.

from consumers import Pool


def print_host(numbers, host, port=80):
    connection = '{}:{}'.format(host, port)

    for number in numbers:
        print(connection, number)


with Pool(print_host, 1, args=('remote',)) as pool:
    for i in range(3):
        pool.put(i)

with Pool(print_host, 1, args=('local',), kwargs={'port': 8123}) as pool:
    for i in range(3):
        pool.put(i)
remote:80 0
remote:80 1
remote:80 2
local:8123 0
local:8123 1
local:8123 2

Cross-Pool Communication

Consumers can put data into other pools.

from consumers import Pool


def square_sums(numbers, logger_pool):
    total = 0
    for number in numbers:
        total += number * number
    logger_pool.put(total)


def logger(totals):
    for total in totals:
        print('A consumer has finished with a total of', total)


logger_pool = Pool(logger, 1)
square_sums_pool = Pool(square_sums, args=(logger_pool,))

with logger_pool, square_sums_pool:
    for i in range(500):
        square_sums_pool.put(i)
A consumer has finished with a total of 10292214
A consumer has finished with a total of 10354035
A consumer has finished with a total of 10416304
A consumer has finished with a total of 10479197