Synchronization Control in Python Asynchronous Processing

Synchronization Control in Python Asynchronous Processing

This article explains synchronization control in Python asynchronous processing.

You will learn step by step, from the basics of asyncio to practical patterns commonly used for synchronization control.

YouTube Video

Synchronization Control in Python Asynchronous Processing

In asynchronous processing, you can easily run multiple tasks simultaneously. However, in practice, more advanced adjustments are required, such as controlling concurrency, coordinating tasks, exclusive control of shared resources, handling heavy synchronous processes, and cleanup after cancellations.

Here, we will learn step by step from the basics of asyncio to practical patterns commonly used for synchronization.

Introduction: Basics (async / await and create_task)

Let's first look at some minimal asynchronous code. await waits at that point until the called coroutine completes, and asyncio.create_task schedules a task for concurrent execution.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • This code is a typical pattern where tasks are explicitly created, run in parallel, and the results are received at the end with await. create_task enables concurrent execution.

Differences between asyncio.gather, asyncio.wait, and asyncio.as_completed

When running multiple coroutines concurrently, you choose which to use depending on how you want to retrieve results. gather waits for all to finish and returns the results in input order, while as_completed allows processing results as they complete, regardless of order.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • As shown in this code, gather returns results in input order, making it useful when you want to preserve order. as_completed is used when you want to process results as soon as they finish.

Controlling concurrency: Limiting simultaneous executions with asyncio.Semaphore

When there are external API rate limits or DB connection limits, you can control concurrent executions with a Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • By using Semaphore with async with, you can easily limit the number of simultaneous executions. This is effective in situations with external constraints.

Exclusive control of shared resources: asyncio.Lock

Lock is used to prevent simultaneous updates to shared data. asyncio.Lock is an exclusive primitive for asynchronous use.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • If multiple tasks update a shared variable such as a global counter, conflicts can occur. By enclosing operations with a Lock, you can maintain consistency.

Task coordination: asyncio.Event

Event is used when one task signals that it is ready, and other tasks wait for this signal. This is a simple way for tasks to share signals and synchronize with each other.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event has a boolean flag, and calling set() resumes all waiting tasks. It is useful for simple synchronization.

Producer-consumer pattern: asyncio.Queue

By using Queue, producers (who create data) and consumers (who process data) can coordinate smoothly and asynchronously. Also, when the queue is full, producers automatically wait, naturally implementing backpressure to prevent overproduction.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue helps coordinate producers and consumers asynchronously. Additionally, setting maxsize makes the producer wait on put when the queue is full, preventing overproduction.

Handling Synchronous Blocking Operations: run_in_executor

For CPU-intensive processing or when using libraries that do not support async, use run_in_executor to delegate processing to another thread or process. Doing this prevents the main event loop from stopping, allowing other asynchronous tasks to run smoothly.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Calling synchronous functions directly will block the event loop. With run_in_executor, the code runs in a separate thread and asynchronous tasks can continue to progress concurrently.

Example: Rate-limited API calls (combining Semaphore + run_in_executor)

The following is a sample scenario where API calls are rate limited and heavy processing is performed on the results. Combining Semaphore and run_in_executor allows processing to proceed safely and efficiently.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • We use a Semaphore to limit the number of concurrent API calls, and heavy processing on the resulting data is delegated to a thread pool. Separating network and CPU processing improves efficiency.

Task cancellation and cleanup

When a task is cancelled, properly handling finally and asyncio.CancelledError is very important. This ensures files and connections are released and intermediate states are properly handled, maintaining consistency in the application.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Cancellation is delivered as an exception (CancelledError), so perform necessary cleanup in the except block and re-raise the exception if needed.

Key points for practical design

The following are practical points useful for designing asynchronous processing.

  • Explicitly control concurrency When there are resource limits such as APIs or DBs, you can limit the number of concurrent executions with Semaphore.

  • Handle shared resources safely If you need to update state from multiple tasks, use Lock. Reducing shared state and designing around immutable data makes things safer.

  • Choose how to receive results If you want to process tasks as they complete, use asyncio.as_completed; if you want to process results in input order, use gather.

  • Isolate heavy synchronous processing For CPU-intensive or synchronous library calls, use run_in_executor or ProcessPoolExecutor to avoid blocking the event loop.

  • Plan for cancellation and exceptions Write proper exception handling to safely clean up resources even if a task is cancelled mid-way.

  • Make testing easy Abstract side effects like I/O, time, and randomness so they can be replaced, making it easier to test asynchronous code.

Summary

asyncio is powerful, but if you focus only on “running things in parallel,” you may encounter issues such as shared resource contention, resource limit violations, or event loop blocking. By combining Semaphore, Lock, Event, Queue, run_in_executor, and proper cancellation handling, you can design safe and efficient asynchronous applications. By utilizing mechanisms such as the producer-consumer pattern, concurrency limitation, or separating asynchronous and blocking processing, asynchronous workflows can be constructed more safely and efficiently.

You can follow along with the above article using Visual Studio Code on our YouTube channel. Please also check out the YouTube channel.

YouTube Video