Synchronisationskontrolle in der asynchronen Verarbeitung mit Python

Synchronisationskontrolle in der asynchronen Verarbeitung mit Python

Dieser Artikel erklärt die Synchronisationskontrolle in der asynchronen Verarbeitung mit Python.

Sie lernen Schritt für Schritt von den Grundlagen von asyncio bis hin zu praktischen Mustern, die üblicherweise zur Synchronisationskontrolle verwendet werden.

YouTube Video

Synchronisationskontrolle in der asynchronen Verarbeitung mit Python

In der asynchronen Verarbeitung können Sie problemlos mehrere Aufgaben gleichzeitig ausführen. In der Praxis sind jedoch weitergehende Anpassungen erforderlich, wie z.B. die Steuerung der Parallelität, die Koordination von Aufgaben, exklusiver Zugriff auf gemeinsame Ressourcen, Umgang mit rechenintensiven synchronen Prozessen und Aufräumarbeiten nach Abbrüchen.

Hier lernen wir Schritt für Schritt von den Grundlagen von asyncio bis hin zu praxisnahen Synchronisationsmustern.

Einführung: Grundlagen (async / await und create_task)

Schauen wir uns zunächst einen minimalen asynchronen Code an. await wartet an dieser Stelle, bis die aufgerufene Coroutine fertig ist, und asyncio.create_task plant eine Aufgabe zur parallelen Ausführung ein.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Dieser Code ist ein typisches Muster, bei dem Aufgaben explizit erstellt, parallel ausgeführt und die Ergebnisse am Ende mit await abgeholt werden. create_task ermöglicht die parallele Ausführung.

Unterschiede zwischen asyncio.gather, asyncio.wait und asyncio.as_completed

Beim gleichzeitigen Ausführen mehrerer Coroutinen wählen Sie, welches Werkzeug Sie verwenden, je nachdem, wie Sie die Ergebnisse erhalten möchten. gather wartet, bis alle fertig sind, und gibt die Ergebnisse in Eingabereihenfolge zurück, während as_completed die Verarbeitung nach Abschluss der einzelnen Ergebnisse, unabhängig von der Reihenfolge, ermöglicht.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Wie in diesem Code gezeigt, gibt gather die Ergebnisse in Eingabereihenfolge zurück – das ist nützlich, wenn Sie die Reihenfolge beibehalten möchten. as_completed wird verwendet, wenn Ergebnisse sofort nach Fertigstellung verarbeitet werden sollen.

Parallelität steuern: Gleichzeitige Ausführungen mit asyncio.Semaphore begrenzen

Wenn externe API-Rate-Limits oder DB-Verbindungsbeschränkungen bestehen, können Sie gleichzeitige Ausführungen mit Semaphore steuern.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • Mit Semaphore zusammen mit async with können Sie die Zahl gleichzeitiger Ausführungen einfach begrenzen. Dies ist effektiv bei externen Beschränkungen.

Exklusiver Zugriff auf gemeinsame Ressourcen: asyncio.Lock

Lock wird verwendet, um gleichzeitige Aktualisierungen gemeinsam genutzter Daten zu verhindern. asyncio.Lock ist ein exklusives Werkzeug für die asynchrone Nutzung.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Wenn mehrere Aufgaben eine gemeinsame Variable wie z.B. einen globalen counter aktualisieren, können Konflikte entstehen. Durch das Einrahmen von Operationen mit einem Lock können Sie die Konsistenz sicherstellen.

Aufgabenkoordination: asyncio.Event

Event wird verwendet, wenn eine Aufgabe signalisiert, dass sie bereit ist, und andere Aufgaben auf dieses Signal warten. Dies ist eine einfache Möglichkeit, dass Aufgaben Signale austauschen und sich synchronisieren.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event besitzt ein boolsches Flag, und das Aufrufen von set() setzt alle wartenden Aufgaben fort. Es ist nützlich für einfache Synchronisation.

Producer-Consumer-Muster: asyncio.Queue

Mit Queue können Produzenten (Daten-Ersteller) und Konsumenten (Daten-Verarbeiter) reibungslos und asynchron zusammenarbeiten. Wenn die Warteschlange voll ist, warten Produzenten automatisch – so wird auf natürliche Weise Backpressure gegen Überproduktion ermöglicht.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue hilft, Produzenten und Konsumenten asynchron zu koordinieren. Mit maxsize wartet der Produzent bei vollem Puffer während put, was Überproduktion verhindert.

Verarbeitung synchroner blockierender Operationen: run_in_executor

Für CPU-intensive Prozesse oder bei der Nutzung von Bibliotheken ohne Async-Support nutzen Sie run_in_executor, um die Verarbeitung an einen anderen Thread oder Prozess auszulagern. Dadurch verhindert man ein Anhalten der Event-Loop und andere asynchrone Aufgaben können weiterhin reibungslos laufen.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Das direkte Aufrufen synchroner Funktionen blockiert die Event-Loop. Mit run_in_executor läuft der Code in einem separaten Thread und asynchrone Aufgaben gehen parallel weiter.

Beispiel: Ratengesteuerte API-Aufrufe (Kombination von Semaphore + run_in_executor)

Im Folgenden sehen Sie ein Beispiel, bei dem API-Aufrufe limitiert und die Ergebnisse aufwendig verarbeitet werden. Die Kombination von Semaphore und run_in_executor ermöglicht eine sichere und effiziente Verarbeitung.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Ein Semaphore begrenzt die Anzahl gleichzeitiger API-Aufrufe, und die aufwändige Verarbeitung der Ergebnisse wird an einen Thread-Pool übergeben. Das Trennen von Netzwerk- und CPU-Verarbeitung steigert die Effizienz.

Aufgabenabbruch und Bereinigung

Beim Abbrechen einer Aufgabe ist der korrekte Umgang mit finally und asyncio.CancelledError sehr wichtig. So wird gewährleistet, dass Dateien und Verbindungen freigegeben werden und Zwischenzustände korrekt behandelt werden – und damit die Konsistenz der Anwendung gewahrt bleibt.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Der Abbruch erfolgt über eine Exception (CancelledError); daher sollten Sie notwendige Aufräumarbeiten im except-Block durchführen und gegebenenfalls die Exception erneut auslösen.

Wichtige Punkte für die praxisnahe Gestaltung

Im Folgenden finden Sie praktische Punkte, die bei der Gestaltung asynchroner Verarbeitung nützlich sind.

  • Parallelität explizit steuern Bei Ressourcenbeschränkungen wie APIs oder Datenbanken lässt sich die Anzahl gleichzeitiger Ausführungen mit Semaphore begrenzen.

  • Gemeinsame Ressourcen sicher verwalten Bei Statusaktualisierungen aus mehreren Aufgaben verwenden Sie Lock. Das Reduzieren von gemeinsamem Status und die Nutzung unveränderlicher Daten erhöhen die Sicherheit.

  • Wählen Sie, wie Sie Ergebnisse empfangen Wenn Sie Aufgaben bei Fertigstellung verarbeiten wollen, nutzen Sie asyncio.as_completed; für eine Behandlung in Eingabereihenfolge nutzen Sie gather.

  • Rechenintensive synchrone Prozesse isolieren Für CPU-intensive oder synchrone Bibliotheksaufrufe verwenden Sie run_in_executor oder ProcessPoolExecutor, um Blockierungen der Event-Loop zu vermeiden.

  • Berücksichtigen Sie Abbrüche und Ausnahmen Schreiben Sie eine geeignete Fehlerbehandlung, um Ressourcen auch bei vorzeitigem Abbruch sicher aufzuräumen.

  • Testbarkeit erleichtern Kapseln Sie Seiteneffekte wie I/O, Zeit und Zufall, damit sie ersetzt werden und asynchroner Code leichter getestet werden kann.

Zusammenfassung

asyncio ist mächtig, aber wenn Sie sich nur auf das „parallele Ausführen“ konzentrieren, können Probleme wie Ressourcenkonflikte, Überschreitungen von Limits oder Blockieren der Event-Loop auftreten. Durch die Kombination von Semaphore, Lock, Event, Queue, run_in_executor und ordnungsgemäßem Abbruch-Handling können Sie sichere und effiziente asynchrone Anwendungen gestalten. Durch den Einsatz von Techniken wie Producer-Consumer-Muster, Begrenzung der Parallelität oder Entkopplung asynchroner und blockierender Prozesse können asynchrone Workflows sicherer und effizienter gestaltet werden.

Sie können den obigen Artikel mit Visual Studio Code auf unserem YouTube-Kanal verfolgen. Bitte schauen Sie sich auch den YouTube-Kanal an.

YouTube Video