Python decorator to parallelize any function

Python decorator to parallelize any function

Wouldn’t it be cool if you can speed up your program by just adding a decorator to the function? Wouldn’t it be cool if you don’t have to worry about running the data in a list as a parallelly? Today we are going to write a python decorator which exactly does these automatically for you, so that you can concentrate more on the logics of your code than worrying about multi-threading issues.

Some basics on python multi-threading before we start.

Okay, we will start off with the code of the decorator. Have tried to make sure the code is self explanatory if you are not able to follow along or if you don’t know and don’t want to understand the logic to write a decorator, You can directly copy paste the decorator code.

Below is the decorator code that we are going to use.

import concurrent.futures
import os
from functools import wraps

def make_parallel(func):
    """
        Decorator used to decorate any function which needs to be parallized.
        After the input of the function should be a list in which each element is a instance of input fot the normal function.
        You can also pass in keyword arguements seperatley.
        :param func: function
            The instance of the function that needs to be parallelized.
        :return: function
    """

    @wraps(func)
    def wrapper(lst):
        """

        :param lst:
            The inputs of the function in a list.
        :return:
        """
        # the number of threads that can be max-spawned.
        # If the number of threads are too high, then the overhead of creating the threads will be significant.
        # Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
        # In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
        number_of_threads_multiple = 2 # You can change this multiple according to you requirement
        number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
        if len(lst) < number_of_workers:
            # If the length of the list is low, we would only require those many number of threads.
            # Here we are avoiding creating unnecessary threads
            number_of_workers = len(lst)

        if number_of_workers:
            if number_of_workers == 1:
                # If the length of the list that needs to be parallelized is 1, there is no point in
                # parallelizing the function.
                # So we run it serially.
                result = [func(lst[0])]
            else:
                # Core Code, where we are creating max number of threads and running the decorated function in parallel.
                result = []
                with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executer:
                    bag = {executer.submit(func, i): i for i in lst}
                    for future in concurrent.futures.as_completed(bag):
                        result.append(future.result())
        else:
            result = []
        return result
    return wrapper

We built a sample dummy function which would make HTTPS calls to JSON Placeholder API. Below is that sample code, please note that this is just to demonstrate how a IO heavy call would look like, you can replace this function with whatever function that you want to parallelize.

import requests
def sample_function(post_id):
    """
        Just a sample function which would make dummy API calls
    """

    url = f"https://jsonplaceholder.typicode.com/comments?postId={post_id}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    return {}

When we try to make serial calls to this function our code would look something like this:

list_of_post_ids = list(range(1, 20))

# Serial way of calling the function
results = []
for post_id in list_of_post_ids:
    res = sample_function(post_id)
    results.append(res)

But, when we use our decorator the code simplifies to being this:

# Paralleized way of calling the function
results = make_parallel(sample_function)(list_of_post_ids)

You can observe the time difference between these two methods of calling the function to see for yourself how multithreading helps us quicken up IO heavy calls.

Also, note that this decorator only works for functions which have 1 input argument. Will be improving this decorator and also be adding auto selecting the number of threads based on function runtime in my next article.

SUBSCRIBE FOR NEW ARTICLES

@
comments powered by Disqus