Python Asenkron İşleminde Senkronizasyon Kontrolü

Python Asenkron İşleminde Senkronizasyon Kontrolü

Bu makale, Python asenkron işlemlerinde senkronizasyon kontrolünü açıklar.

asyncio'nun temellerinden başlayarak, genellikle eşzamanlılık kontrolü için kullanılan pratik kalıplara kadar adım adım öğreneceksiniz.

YouTube Video

Python Asenkron İşleminde Senkronizasyon Kontrolü

Asenkron işlemede, birden fazla görevi aynı anda kolayca çalıştırabilirsiniz. Ancak pratikte, eşzamanlılığı kontrol etmek, görevleri koordine etmek, paylaşılan kaynakların münhasır kontrolü, yoğun senkron işlemleri yönetmek ve iptallerden sonra temizlik yapmak gibi daha gelişmiş ayarlamalar gereklidir.

Burada, asyncio'nun temellerinden başlayarak senkronizasyon için sıkça kullanılan pratik kalıplara kadar adım adım öğreneceğiz.

Giriş: Temeller (async / await ve create_task)

Önce biraz basit asenkron koda bakalım. await, çağrılan korutinin tamamlanmasını bekler ve asyncio.create_task bir görevi eşzamanlı çalıştırmak için planlar.

 1import asyncio
 2
 3async def worker(name, delay):
 4    # Simulate async work
 5    await asyncio.sleep(delay)
 6    return f"{name} done after {delay}s"
 7
 8async def main():
 9    # Create two tasks that run concurrently.
10    t1 = asyncio.create_task(worker("A", 1))
11    t2 = asyncio.create_task(worker("B", 2))
12
13    # Await both results (this suspends until both are done).
14    result1 = await t1
15    result2 = await t2
16    print(result1, result2)
17
18if __name__ == "__main__":
19    asyncio.run(main())
  • Bu kod, görevlerin açıkça oluşturulduğu, paralel olarak çalıştırıldığı ve sonuçların await ile alındığı tipik bir örüntüdür. create_task eşzamanlı yürütmeyi mümkün kılar.

asyncio.gather, asyncio.wait ve asyncio.as_completed arasındaki farklar

Birden çok korutini aynı anda çalıştırırken, sonuçları nasıl almak istediğinize bağlı olarak hangisini kullanacağınızı seçersiniz. gather tüm işlemlerin bitmesini bekler ve sonuçları girilen sırada dönerken, as_completed sonuçları tamamlandıkça sıradan bağımsız olarak işler.

 1import asyncio
 2import random
 3
 4async def job(i):
 5    delay = random.random() * 2
 6    await asyncio.sleep(delay)
 7    return (i, delay)
 8
 9async def gather_example():
10    # gather waits for all tasks and returns results in the same order as input
11    results = await asyncio.gather(*(job(i) for i in range(5)))
12    print("gather order:", results)
13
14async def as_completed_example():
15    # as_completed yields results as they finish (useful to process early results)
16    tasks = [asyncio.create_task(job(i)) for i in range(5)]
17    for coro in asyncio.as_completed(tasks):
18        res = await coro
19        print("completed:", res)
20
21async def main():
22    await gather_example()
23    await as_completed_example()
24
25if __name__ == "__main__":
26    asyncio.run(main())
  • Bu kodda gösterildiği gibi, gather sonuçları girilen sırada döndürür; bu da sıralamayı korumak istediğinizde faydalıdır. as_completed tamamlanan sonuçları anında işlemek istediğinizde kullanılır.

Eşzamanlılığı Kontrol Etme: asyncio.Semaphore ile Aynı Anda Çalışan Görevleri Sınırlandırma

Dış API hız veya veritabanı bağlantı limitleri olduğunda, eşzamanlı yürütmeleri bir Semaphore ile kontrol edebilirsiniz.

 1import asyncio
 2import random
 3
 4sem = asyncio.Semaphore(3)  # allow up to 3 concurrent workers
 5
 6async def limited_worker(i):
 7    async with sem:
 8        # Only 3 tasks can be inside this block concurrently
 9        delay = random.random() * 2
10        await asyncio.sleep(delay)
11        print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14    tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15    await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18    asyncio.run(main())
  • async with ile birlikte Semaphore kullanarak eşzamanlı çalışan görevlerin sayısını kolayca sınırlandırabilirsiniz. Bu, dış sınırlamaların olduğu durumlarda etkilidir.

Paylaşılan kaynakların münhasır kontrolü: asyncio.Lock

Lock, paylaşılan verilerin aynı anda güncellenmesini önlemek için kullanılır. asyncio.Lock, asenkron kullanım için geliştirilmiş bir özel araçtır.

 1import asyncio
 2
 3counter = 0
 4lock = asyncio.Lock()
 5
 6async def incrementer(n_times):
 7    global counter
 8    for _ in range(n_times):
 9        # Acquire lock to update shared counter safely
10        async with lock:
11            tmp = counter
12            await asyncio.sleep(0)  # yield control to increase race likelihood
13            counter = tmp + 1
14
15async def main():
16    tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17    await asyncio.gather(*tasks)
18    print("final counter:", counter)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Birden fazla görev, global bir counter gibi paylaşılan bir değişkeni güncelliyorsa, çakışmalar oluşabilir. İşlemleri bir Lock ile sınırlandırarak tutarlılığı koruyabilirsiniz.

Görev Koordinasyonu: asyncio.Event

Event, bir görev hazır olduğunda sinyal verdiğinde ve diğer görevlerin bu sinyali beklediği durumlarda kullanılır. Görevlerin birbirine sinyal gönderip senkronize olmasının basit bir yoludur.

 1import asyncio
 2
 3event = asyncio.Event()
 4
 5async def waiter(name):
 6    print(f"{name} is waiting for event")
 7    await event.wait()
 8    print(f"{name} resumed after event set")
 9
10async def setter():
11    print("setter will sleep and then set the event")
12    await asyncio.sleep(1)
13    event.set()
14    print("event set by setter")
15
16async def main():
17    tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18    await asyncio.create_task(setter())
19    await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22    asyncio.run(main())
  • Event bir boole bayrağına sahiptir ve set() çağrıldığında tüm bekleyen görevler devam eder. Basit senkronizasyon için kullanışlıdır.

Üretici-tüketici kalıbı: asyncio.Queue

Queue kullanarak veri üretenler (producers) ile işleyenler (consumers) sorunsuz ve asenkron şekilde koordine olabilir. Ayrıca, kuyruk dolduğunda üreticiler otomatik olarak bekler, böylece aşırı üretimi doğal olarak önleyen geri basınç uygulanmış olur.

 1import asyncio
 2import random
 3
 4async def producer(q, n_items):
 5    for i in range(n_items):
 6        await asyncio.sleep(random.random() * 0.5)
 7        item = f"item-{i}"
 8        await q.put(item)
 9        print("produced", item)
10    # signal consumers to stop
11    await q.put(None)
12
13async def consumer(q, name):
14    while True:
15        item = await q.get()
16        if item is None:
17            # put sentinel back for other consumers and break
18            await q.put(None)
19            break
20        await asyncio.sleep(random.random() * 1)
21        print(name, "consumed", item)
22        q.task_done()
23
24async def main():
25    q = asyncio.Queue(maxsize=5)  # bounded queue to apply backpressure
26    prod = asyncio.create_task(producer(q, 10))
27    cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28    await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31    asyncio.run(main())
  • Queue, üretici ve tüketici ilişkisini asenkron olarak koordine etmeye yardımcı olur. Ayrıca maxsize ayarlanırsa, kuyruk dolu olduğunda üretici put üzerinde bekler ve aşırı üretim engellenir.

Senkron Engelleyici İşlemlerin Yönetimi: run_in_executor

CPU yoğun işlemler veya asenkron desteklemeyen kütüphaneler için run_in_executor ile işlemi başka bir iş parçacığına ya da prosese devredin. Bunu yaparak ana olay döngüsünün durmasını önler ve diğer asenkron görevlerin sorunsuz çalışmasını sağlar.

 1import asyncio
 2import time
 3
 4def blocking_io(x):
 5    # simulate blocking I/O or CPU-bound work
 6    time.sleep(2)
 7    return x * x
 8
 9async def main():
10    loop = asyncio.get_running_loop()
11    # run blocking_io in default thread pool
12    result = await loop.run_in_executor(None, blocking_io, 3)
13    print("blocking result:", result)
14
15if __name__ == "__main__":
16    asyncio.run(main())
  • Senkron işlevleri doğrudan çağırmak olay döngüsünü bloke eder. run_in_executor ile kod ayrı bir iş parçacığında çalışır ve diğer asenkron görevler aynı anda ilerlemeye devam eder.

Örnek: Hız Sınırlamalı API Çağrıları (Semaphore + run_in_executor kombinasyonu)

Aşağıda, API çağrılarının hız sınırlı olduğu ve sonuçlar üzerinde yoğun işlemler yapıldığı örnek bir senaryo yer almaktadır. Semaphore ile run_in_executor kombinasyonu işlemlerin güvenli ve verimli ilerlemesini sağlar.

 1import asyncio
 2import time
 3import random
 4
 5sem = asyncio.Semaphore(5)
 6
 7def heavy_sync_processing(data):
 8    # simulate heavy CPU-bound work
 9    time.sleep(1)
10    return f"processed-{data}"
11
12async def api_call(i):
13    await asyncio.sleep(random.random() * 0.5)  # simulate network latency
14    return f"data-{i}"
15
16async def worker(i):
17    async with sem:
18        data = await api_call(i)
19        # offload CPU-bound work to threadpool
20        loop = asyncio.get_running_loop()
21        result = await loop.run_in_executor(None, heavy_sync_processing, data)
22        print(result)
23
24async def main():
25    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26    await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29    asyncio.run(main())
  • Eşzamanlı API çağrılarını sınırlandırmak için Semaphore kullanıyor ve elde edilen veriye uygulanan yoğun iş yükünü bir iş parçacığı havuzuna devrediyoruz. Ağ ve CPU işlemlerinin ayrılması verimliliği artırır.

Görev İptali ve Temizlik

Bir görev iptal edildiğinde, finally ve asyncio.CancelledError'ın düzgün şekilde ele alınması çok önemlidir. Bu, dosya ve bağlantıların serbest bırakılmasını ve ara durumların düzgün şekilde ele alınmasını sağlar; uygulamanın tutarlılığını korur.

 1import asyncio
 2
 3async def long_running():
 4    try:
 5        print("started long_running")
 6        await asyncio.sleep(10)
 7        print("finished long_running")
 8    except asyncio.CancelledError:
 9        print("long_running was cancelled, cleaning up")
10        # perform cleanup here
11        raise
12
13async def main():
14    task = asyncio.create_task(long_running())
15    await asyncio.sleep(1)
16    task.cancel()
17    try:
18        await task
19    except asyncio.CancelledError:
20        print("task cancelled in main")
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • İptal bir istisna (CancelledError) olarak iletilir, bu nedenle temizliği except bloğunda yapmalı ve gerekiyorsa istisnayı yeniden yükseltmelisiniz.

Pratik tasarım için ana noktalar

Aşağıdakiler, asenkron işlem tasarımında faydalı pratik noktalardır.

  • Eşzamanlılığı açıkça kontrol edin API veya veritabanı gibi kaynak sınırlamaları olduğunda, eşzamanlı görev sayısını Semaphore ile sınırlayabilirsiniz.

  • Paylaşılan kaynakları güvenli şekilde yönetin Birden fazla görevden durumu güncellemeniz gerekiyorsa, Lock kullanın. Paylaşılan durumu azaltmak ve değiştirilemez veri ile tasarlamak işleri daha güvenli hale getirir.

  • Sonuçları nasıl alacağınızı seçin Görevleri tamamlandıkça işlemek istiyorsanız asyncio.as_completed, sıraya göre işlemek istiyorsanız gather kullanın.

  • Yoğun senkron işlemleri izole edin CPU yoğun veya senkron kütüphane çağrıları için, olay döngüsünü engellememek adına run_in_executor veya ProcessPoolExecutor kullanın.

  • İptal ve istisnalar için plan yapın Bir görev yarıda iptal edilse bile, kaynakları güvenli şekilde temizlemek için uygun istisna yönetimi yazın.

  • Testi kolaylaştırın I/O, zaman veya rastgelelik gibi yan etkileri soyutlayın, böylece değiştirilebilir ve asenkron kodun testi kolaylaşır.

Özet

asyncio çok güçlüdür ancak yalnızca "işleri paralel çalıştırmaya" odaklanırsanız paylaşılan kaynak çakışması, kaynak sınırı aşımları veya olay döngüsünün tıkanması gibi sorunlarla karşılaşabilirsiniz. Semaphore, Lock, Event, Queue, run_in_executor ve uygun iptal işlemlerini birleştirerek güvenli ve verimli asenkron uygulamalar tasarlayabilirsiniz. Üretici-tüketici kalıbı, eşzamanlılık sınırlaması veya asenkron ve bloklayıcı işlemlerin ayrılması gibi mekanizmalar kullanılarak asenkron iş akışları daha güvenli ve verimli şekilde oluşturulabilir.

Yukarıdaki makaleyi, YouTube kanalımızda Visual Studio Code'u kullanarak takip edebilirsiniz. Lütfen YouTube kanalını da kontrol edin.

YouTube Video