Synchronisationskontrolle in der asynchronen Verarbeitung mit Python
Dieser Artikel erklärt die Synchronisationskontrolle in der asynchronen Verarbeitung mit Python.
Sie lernen Schritt für Schritt von den Grundlagen von asyncio bis hin zu praktischen Mustern, die üblicherweise zur Synchronisationskontrolle verwendet werden.
YouTube Video
Synchronisationskontrolle in der asynchronen Verarbeitung mit Python
In der asynchronen Verarbeitung können Sie problemlos mehrere Aufgaben gleichzeitig ausführen. In der Praxis sind jedoch weitergehende Anpassungen erforderlich, wie z.B. die Steuerung der Parallelität, die Koordination von Aufgaben, exklusiver Zugriff auf gemeinsame Ressourcen, Umgang mit rechenintensiven synchronen Prozessen und Aufräumarbeiten nach Abbrüchen.
Hier lernen wir Schritt für Schritt von den Grundlagen von asyncio bis hin zu praxisnahen Synchronisationsmustern.
Einführung: Grundlagen (async / await und create_task)
Schauen wir uns zunächst einen minimalen asynchronen Code an. await wartet an dieser Stelle, bis die aufgerufene Coroutine fertig ist, und asyncio.create_task plant eine Aufgabe zur parallelen Ausführung ein.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Dieser Code ist ein typisches Muster, bei dem Aufgaben explizit erstellt, parallel ausgeführt und die Ergebnisse am Ende mit
awaitabgeholt werden.create_taskermöglicht die parallele Ausführung.
Unterschiede zwischen asyncio.gather, asyncio.wait und asyncio.as_completed
Beim gleichzeitigen Ausführen mehrerer Coroutinen wählen Sie, welches Werkzeug Sie verwenden, je nachdem, wie Sie die Ergebnisse erhalten möchten. gather wartet, bis alle fertig sind, und gibt die Ergebnisse in Eingabereihenfolge zurück, während as_completed die Verarbeitung nach Abschluss der einzelnen Ergebnisse, unabhängig von der Reihenfolge, ermöglicht.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Wie in diesem Code gezeigt, gibt
gatherdie Ergebnisse in Eingabereihenfolge zurück – das ist nützlich, wenn Sie die Reihenfolge beibehalten möchten.as_completedwird verwendet, wenn Ergebnisse sofort nach Fertigstellung verarbeitet werden sollen.
Parallelität steuern: Gleichzeitige Ausführungen mit asyncio.Semaphore begrenzen
Wenn externe API-Rate-Limits oder DB-Verbindungsbeschränkungen bestehen, können Sie gleichzeitige Ausführungen mit Semaphore steuern.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())- Mit
Semaphorezusammen mitasync withkönnen Sie die Zahl gleichzeitiger Ausführungen einfach begrenzen. Dies ist effektiv bei externen Beschränkungen.
Exklusiver Zugriff auf gemeinsame Ressourcen: asyncio.Lock
Lock wird verwendet, um gleichzeitige Aktualisierungen gemeinsam genutzter Daten zu verhindern. asyncio.Lock ist ein exklusives Werkzeug für die asynchrone Nutzung.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Wenn mehrere Aufgaben eine gemeinsame Variable wie z.B. einen globalen
counteraktualisieren, können Konflikte entstehen. Durch das Einrahmen von Operationen mit einemLockkönnen Sie die Konsistenz sicherstellen.
Aufgabenkoordination: asyncio.Event
Event wird verwendet, wenn eine Aufgabe signalisiert, dass sie bereit ist, und andere Aufgaben auf dieses Signal warten. Dies ist eine einfache Möglichkeit, dass Aufgaben Signale austauschen und sich synchronisieren.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventbesitzt ein boolsches Flag, und das Aufrufen vonset()setzt alle wartenden Aufgaben fort. Es ist nützlich für einfache Synchronisation.
Producer-Consumer-Muster: asyncio.Queue
Mit Queue können Produzenten (Daten-Ersteller) und Konsumenten (Daten-Verarbeiter) reibungslos und asynchron zusammenarbeiten. Wenn die Warteschlange voll ist, warten Produzenten automatisch – so wird auf natürliche Weise Backpressure gegen Überproduktion ermöglicht.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queuehilft, Produzenten und Konsumenten asynchron zu koordinieren. Mitmaxsizewartet der Produzent bei vollem Puffer währendput, was Überproduktion verhindert.
Verarbeitung synchroner blockierender Operationen: run_in_executor
Für CPU-intensive Prozesse oder bei der Nutzung von Bibliotheken ohne Async-Support nutzen Sie run_in_executor, um die Verarbeitung an einen anderen Thread oder Prozess auszulagern. Dadurch verhindert man ein Anhalten der Event-Loop und andere asynchrone Aufgaben können weiterhin reibungslos laufen.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Das direkte Aufrufen synchroner Funktionen blockiert die Event-Loop. Mit
run_in_executorläuft der Code in einem separaten Thread und asynchrone Aufgaben gehen parallel weiter.
Beispiel: Ratengesteuerte API-Aufrufe (Kombination von Semaphore + run_in_executor)
Im Folgenden sehen Sie ein Beispiel, bei dem API-Aufrufe limitiert und die Ergebnisse aufwendig verarbeitet werden. Die Kombination von Semaphore und run_in_executor ermöglicht eine sichere und effiziente Verarbeitung.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Ein
Semaphorebegrenzt die Anzahl gleichzeitiger API-Aufrufe, und die aufwändige Verarbeitung der Ergebnisse wird an einen Thread-Pool übergeben. Das Trennen von Netzwerk- und CPU-Verarbeitung steigert die Effizienz.
Aufgabenabbruch und Bereinigung
Beim Abbrechen einer Aufgabe ist der korrekte Umgang mit finally und asyncio.CancelledError sehr wichtig. So wird gewährleistet, dass Dateien und Verbindungen freigegeben werden und Zwischenzustände korrekt behandelt werden – und damit die Konsistenz der Anwendung gewahrt bleibt.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- Der Abbruch erfolgt über eine Exception (
CancelledError); daher sollten Sie notwendige Aufräumarbeiten imexcept-Block durchführen und gegebenenfalls die Exception erneut auslösen.
Wichtige Punkte für die praxisnahe Gestaltung
Im Folgenden finden Sie praktische Punkte, die bei der Gestaltung asynchroner Verarbeitung nützlich sind.
-
Parallelität explizit steuern Bei Ressourcenbeschränkungen wie APIs oder Datenbanken lässt sich die Anzahl gleichzeitiger Ausführungen mit
Semaphorebegrenzen. -
Gemeinsame Ressourcen sicher verwalten Bei Statusaktualisierungen aus mehreren Aufgaben verwenden Sie
Lock. Das Reduzieren von gemeinsamem Status und die Nutzung unveränderlicher Daten erhöhen die Sicherheit. -
Wählen Sie, wie Sie Ergebnisse empfangen Wenn Sie Aufgaben bei Fertigstellung verarbeiten wollen, nutzen Sie
asyncio.as_completed; für eine Behandlung in Eingabereihenfolge nutzen Siegather. -
Rechenintensive synchrone Prozesse isolieren Für CPU-intensive oder synchrone Bibliotheksaufrufe verwenden Sie
run_in_executoroderProcessPoolExecutor, um Blockierungen der Event-Loop zu vermeiden. -
Berücksichtigen Sie Abbrüche und Ausnahmen Schreiben Sie eine geeignete Fehlerbehandlung, um Ressourcen auch bei vorzeitigem Abbruch sicher aufzuräumen.
-
Testbarkeit erleichtern Kapseln Sie Seiteneffekte wie I/O, Zeit und Zufall, damit sie ersetzt werden und asynchroner Code leichter getestet werden kann.
Zusammenfassung
asyncio ist mächtig, aber wenn Sie sich nur auf das „parallele Ausführen“ konzentrieren, können Probleme wie Ressourcenkonflikte, Überschreitungen von Limits oder Blockieren der Event-Loop auftreten. Durch die Kombination von Semaphore, Lock, Event, Queue, run_in_executor und ordnungsgemäßem Abbruch-Handling können Sie sichere und effiziente asynchrone Anwendungen gestalten. Durch den Einsatz von Techniken wie Producer-Consumer-Muster, Begrenzung der Parallelität oder Entkopplung asynchroner und blockierender Prozesse können asynchrone Workflows sicherer und effizienter gestaltet werden.
Sie können den obigen Artikel mit Visual Studio Code auf unserem YouTube-Kanal verfolgen. Bitte schauen Sie sich auch den YouTube-Kanal an.