Управление синхронизацией в асинхронной обработке на Python
В этой статье объясняется управление синхронизацией в асинхронной обработке на Python.
Вы поэтапно изучите основы asyncio и практические шаблоны, обычно используемые для управления синхронизацией.
YouTube Video
Управление синхронизацией в асинхронной обработке на Python
В асинхронной обработке вы легко можете запускать несколько задач одновременно. Однако на практике требуются более продвинутые настройки, такие как управление параллелизмом, координация задач, эксклюзивный доступ к общим ресурсам, обработка тяжёлых синхронных процессов и очистка после отмены.
Здесь мы шаг за шагом изучим основы asyncio и практические шаблоны синхронизации, применяемые на практике.
Введение: Основы (async / await и create_task)
Сначала давайте рассмотрим минимальный пример асинхронного кода. await ожидает завершения вызываемой корутины в данной точке, а asyncio.create_task планирует выполнение задачи параллельно.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Этот код представляет собой типичный шаблон, где задачи явно создаются, выполняются параллельно, а результаты собираются с помощью
await.create_taskпозволяет выполнять задачи параллельно.
Различия между asyncio.gather, asyncio.wait и asyncio.as_completed
При одновременном запуске нескольких корутин вы выбираете подходящий инструмент в зависимости от того, как хотите получать результаты. gather ожидает завершения всех задач и возвращает результаты в порядке передачи, а as_completed позволяет обрабатывать результаты по мере их выполнения, независимо от порядка.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Как показано в данном коде,
gatherвозвращает результаты в исходном порядке, что удобно, если порядок важен.as_completedиспользуется, когда необходимо обрабатывать результаты сразу по мере их завершения.
Управление параллелизмом: ограничение одновременных запусков с помощью asyncio.Semaphore
Когда для внешнего API установлены ограничения или есть лимиты соединений с БД, вы можете контролировать количество одновременно выполняемых задач с помощью Semaphore.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Используя
Semaphoreсasync with, вы легко ограничите количество одновременных запусков. Это эффективно при наличии внешних ограничений.
Эксклюзивный контроль над общими ресурсами: asyncio.Lock
Lock используется для предотвращения одновременного изменения общих данных. asyncio.Lock — это примитив эксклюзивного доступа для асинхронного использования.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Если несколько задач изменяют одну общую переменную, например глобальный
counter, могут возникнуть конфликты. Ограничивая операции с помощьюLock, вы обеспечиваете согласованность данных.
Координация задач: asyncio.Event
Event используется, когда одна задача подаёт сигнал о готовности, а другие ожидают этот сигнал. Это простой способ для задач обмениваться сигналами и синхронизироваться.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventсодержит булевый флаг; вызовset()возобновляет выполнение всех ожидающих задач. Это полезно для простой синхронизации.
Шаблон производитель-потребитель: asyncio.Queue
Используя Queue, производители (создающие данные) и потребители (обрабатывающие данные) могут эффективно координироваться в асинхронном режиме. Также, если очередь переполнена, производители автоматически ожидают, что позволяет естественно реализовать отдачу давления (backpressure) и избежать перепроизводства.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queueпомогает асинхронно координировать работу производителей и потребителей. Кроме того, установка параметраmaxsizeзаставляет производителя ожидать при вызовеput, если очередь заполнена, предотвращая перепроизводство.
Обработка синхронных блокирующих операций: run_in_executor
Для ресурсоёмких операций или при использовании библиотек без поддержки async используйте run_in_executor, чтобы делегировать выполнение в отдельный поток или процесс. Это предотвращает блокировку основного цикла событий и позволяет другим асинхронным задачам выполняться без задержек.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Вызов синхронных функций напрямую заблокирует event loop. С помощью
run_in_executorкод выполняется в отдельном потоке, а асинхронные задачи продолжают выполняться параллельно.
Пример: Ограничение частоты вызовов API (комбинирование Semaphore + run_in_executor)
Ниже приведен пример, в котором обращения к API ограничены по частоте, а над результатами проводится тяжёлая обработка. Комбинирование Semaphore и run_in_executor обеспечивает безопасную и эффективную обработку.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Мы используем
Semaphoreдля ограничения одновременных обращений к API, а тяжёлая обработка результатов делегируется пулу потоков. Разделение сетевой и процессорной обработки повышает производительность.
Отмена задач и очистка
При отмене задачи крайне важно корректно обрабатывать finally и asyncio.CancelledError. Это гарантирует, что файлы и соединения будут освобождены, а промежуточные состояния будут обработаны корректно, обеспечивая согласованность приложения.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Отмена происходит посредством исключения (
CancelledError), поэтому выполняйте необходимую очистку в блокеexceptи при необходимости выбрасывайте исключение повторно.
Ключевые моменты для практического проектирования
Ниже приведены практические советы, полезные для проектирования асинхронной обработки.
-
Явное управление параллелизмом Если есть ограничения по ресурсам, например API или базы данных, вы можете ограничить количество одновременных запусков с помощью
Semaphore. -
Безопасное обращение с общими ресурсами Если требуется обновлять состояние из нескольких задач, используйте
Lock. Снижение объёма общего состояния и проектирование на базе неизменяемых данных делают систему безопаснее. -
Выбор способа получения результатов Если нужно обрабатывать задачи по мере завершения — используйте
asyncio.as_completed; если важен исходный порядок — используйтеgather. -
Изолируйте тяжёлую синхронную обработку Для ресурсоёмких вычислений или синхронных библиотечных вызовов используйте
run_in_executorилиProcessPoolExecutor, чтобы избежать блокировки event loop. -
Прорабатывайте обработку отмен и исключений Пишите корректную обработку исключений, чтобы освободить ресурсы даже при отмене задачи в середине выполнения.
-
Облегчайте тестирование Абстрагируйте побочные эффекты, такие как ввод-вывод, время и случайность — их можно будет подменять, упрощая тестирование асинхронного кода.
Резюме
asyncio — мощный инструмент, но если ограничиться только параллельным запуском, могут возникнуть проблемы с борьбой за ресурсы, превышением лимитов или блокировкой event loop. Комбинируя Semaphore, Lock, Event, Queue, run_in_executor и корректную обработку отмен, вы можете строить безопасные и эффективные асинхронные приложения. Используя такие механизмы, как шаблон производитель-потребитель, ограничение параллелизма или разделение асинхронных и блокирующих процессов, можно строить асинхронные процессы безопаснее и эффективнее.
Вы можете следовать этой статье, используя Visual Studio Code на нашем YouTube-канале. Пожалуйста, также посмотрите наш YouTube-канал.