Asynchronous Task Execution In Python
Schedulers became major architechtural component of any software that we use, these asynchrous task executors are behind the emails and notifications, the pop up that greets us on logging in, the repo
Schedulers are a beautiful piece of code. In computer systems, they are as old as operating systems. In real world they are as old as our alarm clocks. In the world of computers, programmers used to schedule an appointment with an operator(a person) to run their code. Later when programmers wanted to take the operators out of their job they wrote scheduling algorithms.
When Operating Systems came into the picture, instead of computer operators it was schedulers that fed programs into the CPU for their execution. Over the years, the number of processing cores increased and so did the complexity of scheduling algorithms. The layers of hardware caches, RAM and hard disks brought up the need for different kinds of scheduling long, medium and short term.
Now, in the era of cloud and distributed systems, schedulers became an indisposable architectural component of any software system. These asynchronous task executors are behind the emails, notifications, the pop-ups that greet us on logging in, and reports that are sent to your email and so on.
Celery, RabbitMQ, Redis, Google Task Queue API, and Amazon's SQS are major players of task scheduling in distributed environments.
The rest of this blog sheds light on conventional task queue systems, and where asyncio stands, and finally we cover the pros on cons of the major players.
Conventional tasks queues have two programs (a producer and a consumer) with a database acting as a queue. For every task created by the producer an entry is made in the database with a flag
NotStarted
, Running
, Completed
, Failed
, and so on. At any point task workers (say a never-ending python program) will query this DB and look for incomplete tasks and start running it. It is a simple implementation but it has its own disadvantages.Disadvantages
- Maintaining tasks on a DB table means that the table grows based on the number of tasks. It becomes complicated when the DB grows so much that we have to deal the problem of scaling.
- For every consumer that's free it queries the DB with task flag
Scheduled
, to fetch a scheduled task that it can run. The querying becomes costly as the size of the DB grows.
Cron is the simplest software utility that enables you to run a task asynchronously at a given time. The utility maintains a single file (a table) called crontab. The utility itself is a scheduled job that runs every minute, takes a log of every command that is scheduled to run in the current minute, and runs each command. How cool is that?
- 1.Backups
- 2.Cleaning up temp files
- 3.Reminders
I wrote a simple python script to trigger a Mac notification that asks me to take a break every 20 minutes.
import os
def notify(title, text):
os.system("""
osascript -e 'display notification "{}" with title "{}"'
""".format(text, title))
notify("Take a break", "You are sitting for too long")
# > crontab -e
*/20 * * * * python /<path_to_script>/notfication.py
1. What if you have users across timezones? Whenever we deal with time, we are also dealing with timezone problems. Cron jobs by default runs in the configuration of timezone in the machine it is running, we can override it using
TZ
environment variable. But cron isn't suitable if we want to run different tasks at different timezones.2. What happens if a cron fails? When a script fails on executing a cron job, the error is just logged and the cron waits for the next schedule. This is not the most reliable way of handling errors. We would often want schedulers to retry until a certain threshold before moving on.
3. Scaling - What if the number of tasks to be executed at a given time is very large? - What if a single task occupies a lot of memory?
I will be using a treasure hunter program as an example to explain the concepts in the blog. The problem statement is simple.
- 1.We have a treasure hunt program with 10000 files with one file containing a word
treasure
. - 2.The goal of the program is to go through the files and find the treasure.
# Create treasure
def creating_treasure():
"""
Creates N files with treasure randomly set in one of the files
"""
treasure_in = randint(1, N)
for i in range(0, N):
logging.debug(i)
with open(f"treasure_data/file_{i}.txt", "w") as f:
if i != treasure_in:
f.writelines(["Not a treasure\n"] * N)
else:
f.writelines(["Treasure\n"] * N)
print (f"treasure is in {treasure_in}")
creating_treasure()
Asynchronous Python is gaining popularity after the release of asyncio. Though it has got nothing to do with task schedulers, it is important to understand where it stands.
Python threading is an age-old story. Though it gives an idea of running multiple threads simultaneously, in reality it doesn't. Why? Because cpython has GIL(Global interpreter locks). Unless your program has a lot of waiting over external (I/O) events, using threading is not of much use. Even when your laptop has multiple cores, you would often find them idle on CPU intensive tasks due to GIL.
The
treasure_hunter
program implemented using threading would have threads looking through different ranges of files.N = 10000
treasure_found = False
num_of_threads = 10
count = int(N/num_of_threads)
We have a common flag
treasure_found
for all threads to set.def find_treasure(start, end):
logging.debug(f"{start}, {end}")
global treasure_found
for i in range(start, end):
if treasure_found:
return
with open(f"treasure_data/file_{i}.txt", "r") as f:
if f.readlines()[0] == "Treasure\n":
treasure_found = i
return
start_time = time.time()
threads = [threading.Thread(target=find_treasure, args=[i, i+count])
for i in range(0, N, count)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
print("--- %s seconds ---" % (time.time() - start_time))
print (f"Found treasure {treasure_found}")
The multiprocessing module enables us to overcome the disadvantage of threading. The simplest way to understand this is that the GIL applies only to threads and not to processes, thereby providing us a way to achieve parallelism.
Multiprocessing also works well on CPU intensive tasks as we can use all the cores available independently. When designing a multiprocessing problem, the processes often share a queue from where each process can load tasks for its next execution.
num_of_process = 100