Kontrola synchronizacji w asynchronicznym przetwarzaniu w Pythonie

Kontrola synchronizacji w asynchronicznym przetwarzaniu w Pythonie

Ten artykuł wyjaśnia, jak kontrolować synchronizację w asynchronicznym przetwarzaniu w Pythonie.

Poznasz krok po kroku podstawy asyncio oraz praktyczne wzorce powszechnie stosowane do kontroli synchronizacji.

YouTube Video

Kontrola synchronizacji w asynchronicznym przetwarzaniu w Pythonie

W asynchronicznym przetwarzaniu można łatwo uruchamiać wiele zadań jednocześnie. Jednak w praktyce wymagane są bardziej zaawansowane mechanizmy, takie jak kontrola współbieżności, koordynacja zadań, wyłączna kontrola nad współdzielonymi zasobami, obsługa ciężkich procesów synchronicznych oraz sprzątanie po anulowaniu zadań.

Tutaj poznasz krok po kroku od podstaw asyncio do praktycznych wzorców często używanych do synchronizacji.

Wprowadzenie: podstawy (async / await i create_task)

Najpierw przyjrzyjmy się prostemu przykładowi kodu asynchronicznego. await powoduje oczekiwanie w tym miejscu na zakończenie wywołanej korutyny, a asyncio.create_task tworzy zadanie do współbieżnego wykonania.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Ten kod to typowy wzorzec, w którym zadania są jawnie tworzone, uruchamiane równolegle, a wyniki odbierane na końcu za pomocą await. create_task umożliwia współbieżne wykonanie zadań.

Różnice między asyncio.gather, asyncio.wait i asyncio.as_completed

Podczas uruchamiania wielu korutyn jednocześnie wybierasz odpowiednią metodę w zależności od tego, jak chcesz pobierać wyniki. gather czeka, aż wszystkie zadania się zakończą, i zwraca wyniki w kolejności wejściowej, natomiast as_completed umożliwia przetwarzanie wyników w miarę ich zakończenia, niezależnie od kolejności.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Jak pokazano w tym kodzie, gather zwraca wyniki w kolejności wejściowej, co jest przydatne, gdy chcesz zachować kolejność. as_completed używa się, gdy chcesz przetwarzać wyniki natychmiast po ich zakończeniu.

Kontrola współbieżności: ograniczanie liczby równoczesnych wykonań za pomocą asyncio.Semaphore

Gdy występują limity zapytań do API lub ograniczenia połączeń do bazy danych, możesz kontrolować liczbę równoczesnych wykonań za pomocą Semaphore.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Używając Semaphore z async with, możesz łatwo ograniczyć liczbę jednoczesnych wykonań. To jest skuteczne w sytuacjach, gdzie występują zewnętrzne ograniczenia.

Wyłączna kontrola nad współdzielonymi zasobami: asyncio.Lock

Lock służy do zapobiegania jednoczesnemu aktualizowaniu współdzielonych danych. asyncio.Lock to wyłączny prymityw do zastosowań asynchronicznych.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Jeśli wiele zadań aktualizuje współdzieloną zmienną, taką jak globalny counter, mogą wystąpić konflikty. Opakowując operacje w Lock, można zachować spójność.

Koordynacja zadań: asyncio.Event

Event używany jest, gdy jedno zadanie sygnalizuje gotowość, a pozostałe czekają na ten sygnał. Jest to prosty sposób na wymianę sygnałów i synchronizację między zadaniami.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event posiada flagę boolowską, a wywołanie set() wznawia wszystkie oczekujące zadania. Jest to przydatne do prostej synchronizacji.

Wzorzec producent-konsument: asyncio.Queue

Dzięki użyciu Queue, producenci (tworzący dane) i konsumenci (przetwarzający dane) mogą płynnie i asynchronicznie się koordynować. Również, gdy kolejka jest pełna, producenci automatycznie czekają, co naturalnie wprowadza mechanizm przeciążenia i zapobiega nadprodukcji.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue pomaga asynchronicznie koordynować producentów i konsumentów. Ponadto ustawienie maxsize powoduje, że producent czeka podczas put gdy kolejka jest pełna, dzięki czemu zapobiega się nadprodukcji.

Obsługa synchronicznych operacji blokujących: run_in_executor

Dla zadań obciążających CPU lub używających bibliotek nieobsługujących asynchroniczności, użyj run_in_executor, by przekazać przetwarzanie do innego wątku lub procesu. Dzięki temu główna pętla zdarzeń nie zostaje zablokowana i inne zadania asynchroniczne mogą działać płynnie.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Bezpośrednie wywołanie funkcji synchronicznych zablokuje pętlę zdarzeń. Dzięki run_in_executor kod działa w osobnym wątku, a zadania asynchroniczne mogą nadal działać współbieżnie.

Przykład: Wywołania API z limitem (połączenie Semaphore + run_in_executor)

Poniżej przykład scenariusza, w którym wywołania API są ograniczone limitem, a na wynikach wykonywane jest ciężkie przetwarzanie. Połączenie Semaphore i run_in_executor umożliwia bezpieczne i efektywne przetwarzanie.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Używamy Semaphore do ograniczenia liczby równoległych wywołań API, a ciężkie przetwarzanie wynikowych danych delegujemy do puli wątków. Rozdzielenie przetwarzania sieciowego i CPU zwiększa efektywność.

Anulowanie zadania i sprzątanie

Podczas anulowania zadania bardzo ważna jest prawidłowa obsługa finally oraz asyncio.CancelledError. Zapewnia to zwalnianie plików i połączeń oraz obsługę stanów pośrednich, zachowując spójność aplikacji.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Anulowanie przekazywane jest jako wyjątek (CancelledError), dlatego należy przeprowadzić potrzebne sprzątanie w bloku except, a w razie potrzeby ponownie podnieść wyjątek.

Kluczowe aspekty praktycznego projektowania

Poniżej praktyczne wskazówki przydatne przy projektowaniu asynchronicznego przetwarzania.

  • Jawnie kontroluj współbieżność Gdy istnieją limity np. dla API lub baz danych, liczbę jednoczesnych wykonań można ograniczyć przy użyciu Semaphore.

  • Bezpiecznie zarządzaj zasobami współdzielonymi Gdy stan jest aktualizowany przez wiele zadań jednocześnie, użyj Lock. Ograniczanie współdzielonego stanu i projektowanie w oparciu o dane niemutowalne zwiększa bezpieczeństwo.

  • Wybierz sposób otrzymywania wyników Jeśli chcesz przetwarzać zadania po ich zakończeniu, użyj asyncio.as_completed; jeśli chcesz otrzymać wyniki w kolejności wejściowej, użyj gather.

  • Izoluj ciężkie przetwarzanie synchroniczne Dla działań obciążających CPU lub wywołań synchronicznych, używaj run_in_executor lub ProcessPoolExecutor, żeby nie blokować pętli zdarzeń.

  • Załóż możliwość anulowania i wyjątków Zadbaj o odpowiednią obsługę wyjątków, by nawet przy anulowaniu zadania posprzątać zasoby.

  • Ułatw testowanie Abstrahuj skutki uboczne, takie jak I/O, czas i losowość, by móc je łatwo podmienić i ułatwić testowanie kodu asynchronicznego.

Podsumowanie

asyncio jest potężne, ale skupiając się tylko na „uruchamianiu wielu rzeczy równolegle”, możesz napotkać problemy z współdzielonymi zasobami, naruszeniem limitów lub blokowaniem pętli zdarzeń. Łącząc Semaphore, Lock, Event, Queue, run_in_executor oraz odpowiednio obsługując anulowanie, możesz zaprojektować bezpieczne i wydajne aplikacje asynchroniczne. Wykorzystując mechanizmy takie jak wzorzec producent-konsument, ograniczanie współbieżności czy rozdzielenie przetwarzania asynchronicznego i blokującego, można budować bezpieczniejsze i wydajniejsze przepływy asynchroniczne.

Możesz śledzić ten artykuł, korzystając z Visual Studio Code na naszym kanale YouTube. Proszę również sprawdzić nasz kanał YouTube.

YouTube Video