Python Asenkron İşleminde Senkronizasyon Kontrolü
Bu makale, Python asenkron işlemlerinde senkronizasyon kontrolünü açıklar.
asyncio'nun temellerinden başlayarak, genellikle eşzamanlılık kontrolü için kullanılan pratik kalıplara kadar adım adım öğreneceksiniz.
YouTube Video
Python Asenkron İşleminde Senkronizasyon Kontrolü
Asenkron işlemede, birden fazla görevi aynı anda kolayca çalıştırabilirsiniz. Ancak pratikte, eşzamanlılığı kontrol etmek, görevleri koordine etmek, paylaşılan kaynakların münhasır kontrolü, yoğun senkron işlemleri yönetmek ve iptallerden sonra temizlik yapmak gibi daha gelişmiş ayarlamalar gereklidir.
Burada, asyncio'nun temellerinden başlayarak senkronizasyon için sıkça kullanılan pratik kalıplara kadar adım adım öğreneceğiz.
Giriş: Temeller (async / await ve create_task)
Önce biraz basit asenkron koda bakalım. await, çağrılan korutinin tamamlanmasını bekler ve asyncio.create_task bir görevi eşzamanlı çalıştırmak için planlar.
1import asyncio
2
3async def worker(name, delay):
4 # Simulate async work
5 await asyncio.sleep(delay)
6 return f"{name} done after {delay}s"
7
8async def main():
9 # Create two tasks that run concurrently.
10 t1 = asyncio.create_task(worker("A", 1))
11 t2 = asyncio.create_task(worker("B", 2))
12
13 # Await both results (this suspends until both are done).
14 result1 = await t1
15 result2 = await t2
16 print(result1, result2)
17
18if __name__ == "__main__":
19 asyncio.run(main())- Bu kod, görevlerin açıkça oluşturulduğu, paralel olarak çalıştırıldığı ve sonuçların
awaitile alındığı tipik bir örüntüdür.create_taskeşzamanlı yürütmeyi mümkün kılar.
asyncio.gather, asyncio.wait ve asyncio.as_completed arasındaki farklar
Birden çok korutini aynı anda çalıştırırken, sonuçları nasıl almak istediğinize bağlı olarak hangisini kullanacağınızı seçersiniz. gather tüm işlemlerin bitmesini bekler ve sonuçları girilen sırada dönerken, as_completed sonuçları tamamlandıkça sıradan bağımsız olarak işler.
1import asyncio
2import random
3
4async def job(i):
5 delay = random.random() * 2
6 await asyncio.sleep(delay)
7 return (i, delay)
8
9async def gather_example():
10 # gather waits for all tasks and returns results in the same order as input
11 results = await asyncio.gather(*(job(i) for i in range(5)))
12 print("gather order:", results)
13
14async def as_completed_example():
15 # as_completed yields results as they finish (useful to process early results)
16 tasks = [asyncio.create_task(job(i)) for i in range(5)]
17 for coro in asyncio.as_completed(tasks):
18 res = await coro
19 print("completed:", res)
20
21async def main():
22 await gather_example()
23 await as_completed_example()
24
25if __name__ == "__main__":
26 asyncio.run(main())- Bu kodda gösterildiği gibi,
gathersonuçları girilen sırada döndürür; bu da sıralamayı korumak istediğinizde faydalıdır.as_completedtamamlanan sonuçları anında işlemek istediğinizde kullanılır.
Eşzamanlılığı Kontrol Etme: asyncio.Semaphore ile Aynı Anda Çalışan Görevleri Sınırlandırma
Dış API hız veya veritabanı bağlantı limitleri olduğunda, eşzamanlı yürütmeleri bir Semaphore ile kontrol edebilirsiniz.
1import asyncio
2import random
3
4sem = asyncio.Semaphore(3) # allow up to 3 concurrent workers
5
6async def limited_worker(i):
7 async with sem:
8 # Only 3 tasks can be inside this block concurrently
9 delay = random.random() * 2
10 await asyncio.sleep(delay)
11 print(f"worker {i} finished after {delay:.2f}s")
12
13async def main():
14 tasks = [asyncio.create_task(limited_worker(i)) for i in range(10)]
15 await asyncio.gather(*tasks)
16
17if __name__ == "__main__":
18 asyncio.run(main())async withile birlikteSemaphorekullanarak eşzamanlı çalışan görevlerin sayısını kolayca sınırlandırabilirsiniz. Bu, dış sınırlamaların olduğu durumlarda etkilidir.
Paylaşılan kaynakların münhasır kontrolü: asyncio.Lock
Lock, paylaşılan verilerin aynı anda güncellenmesini önlemek için kullanılır. asyncio.Lock, asenkron kullanım için geliştirilmiş bir özel araçtır.
1import asyncio
2
3counter = 0
4lock = asyncio.Lock()
5
6async def incrementer(n_times):
7 global counter
8 for _ in range(n_times):
9 # Acquire lock to update shared counter safely
10 async with lock:
11 tmp = counter
12 await asyncio.sleep(0) # yield control to increase race likelihood
13 counter = tmp + 1
14
15async def main():
16 tasks = [asyncio.create_task(incrementer(1000)) for _ in range(5)]
17 await asyncio.gather(*tasks)
18 print("final counter:", counter)
19
20if __name__ == "__main__":
21 asyncio.run(main())- Birden fazla görev, global bir
countergibi paylaşılan bir değişkeni güncelliyorsa, çakışmalar oluşabilir. İşlemleri birLockile sınırlandırarak tutarlılığı koruyabilirsiniz.
Görev Koordinasyonu: asyncio.Event
Event, bir görev hazır olduğunda sinyal verdiğinde ve diğer görevlerin bu sinyali beklediği durumlarda kullanılır. Görevlerin birbirine sinyal gönderip senkronize olmasının basit bir yoludur.
1import asyncio
2
3event = asyncio.Event()
4
5async def waiter(name):
6 print(f"{name} is waiting for event")
7 await event.wait()
8 print(f"{name} resumed after event set")
9
10async def setter():
11 print("setter will sleep and then set the event")
12 await asyncio.sleep(1)
13 event.set()
14 print("event set by setter")
15
16async def main():
17 tasks = [asyncio.create_task(waiter("W1")), asyncio.create_task(waiter("W2"))]
18 await asyncio.create_task(setter())
19 await asyncio.gather(*tasks)
20
21if __name__ == "__main__":
22 asyncio.run(main())Eventbir boole bayrağına sahiptir veset()çağrıldığında tüm bekleyen görevler devam eder. Basit senkronizasyon için kullanışlıdır.
Üretici-tüketici kalıbı: asyncio.Queue
Queue kullanarak veri üretenler (producers) ile işleyenler (consumers) sorunsuz ve asenkron şekilde koordine olabilir. Ayrıca, kuyruk dolduğunda üreticiler otomatik olarak bekler, böylece aşırı üretimi doğal olarak önleyen geri basınç uygulanmış olur.
1import asyncio
2import random
3
4async def producer(q, n_items):
5 for i in range(n_items):
6 await asyncio.sleep(random.random() * 0.5)
7 item = f"item-{i}"
8 await q.put(item)
9 print("produced", item)
10 # signal consumers to stop
11 await q.put(None)
12
13async def consumer(q, name):
14 while True:
15 item = await q.get()
16 if item is None:
17 # put sentinel back for other consumers and break
18 await q.put(None)
19 break
20 await asyncio.sleep(random.random() * 1)
21 print(name, "consumed", item)
22 q.task_done()
23
24async def main():
25 q = asyncio.Queue(maxsize=5) # bounded queue to apply backpressure
26 prod = asyncio.create_task(producer(q, 10))
27 cons = [asyncio.create_task(consumer(q, f"C{i}")) for i in range(2)]
28 await asyncio.gather(prod, *cons)
29
30if __name__ == "__main__":
31 asyncio.run(main())Queue, üretici ve tüketici ilişkisini asenkron olarak koordine etmeye yardımcı olur. Ayrıcamaxsizeayarlanırsa, kuyruk dolu olduğunda üreticiputüzerinde bekler ve aşırı üretim engellenir.
Senkron Engelleyici İşlemlerin Yönetimi: run_in_executor
CPU yoğun işlemler veya asenkron desteklemeyen kütüphaneler için run_in_executor ile işlemi başka bir iş parçacığına ya da prosese devredin. Bunu yaparak ana olay döngüsünün durmasını önler ve diğer asenkron görevlerin sorunsuz çalışmasını sağlar.
1import asyncio
2import time
3
4def blocking_io(x):
5 # simulate blocking I/O or CPU-bound work
6 time.sleep(2)
7 return x * x
8
9async def main():
10 loop = asyncio.get_running_loop()
11 # run blocking_io in default thread pool
12 result = await loop.run_in_executor(None, blocking_io, 3)
13 print("blocking result:", result)
14
15if __name__ == "__main__":
16 asyncio.run(main())- Senkron işlevleri doğrudan çağırmak olay döngüsünü bloke eder.
run_in_executorile kod ayrı bir iş parçacığında çalışır ve diğer asenkron görevler aynı anda ilerlemeye devam eder.
Örnek: Hız Sınırlamalı API Çağrıları (Semaphore + run_in_executor kombinasyonu)
Aşağıda, API çağrılarının hız sınırlı olduğu ve sonuçlar üzerinde yoğun işlemler yapıldığı örnek bir senaryo yer almaktadır. Semaphore ile run_in_executor kombinasyonu işlemlerin güvenli ve verimli ilerlemesini sağlar.
1import asyncio
2import time
3import random
4
5sem = asyncio.Semaphore(5)
6
7def heavy_sync_processing(data):
8 # simulate heavy CPU-bound work
9 time.sleep(1)
10 return f"processed-{data}"
11
12async def api_call(i):
13 await asyncio.sleep(random.random() * 0.5) # simulate network latency
14 return f"data-{i}"
15
16async def worker(i):
17 async with sem:
18 data = await api_call(i)
19 # offload CPU-bound work to threadpool
20 loop = asyncio.get_running_loop()
21 result = await loop.run_in_executor(None, heavy_sync_processing, data)
22 print(result)
23
24async def main():
25 tasks = [asyncio.create_task(worker(i)) for i in range(20)]
26 await asyncio.gather(*tasks)
27
28if __name__ == "__main__":
29 asyncio.run(main())- Eşzamanlı API çağrılarını sınırlandırmak için
Semaphorekullanıyor ve elde edilen veriye uygulanan yoğun iş yükünü bir iş parçacığı havuzuna devrediyoruz. Ağ ve CPU işlemlerinin ayrılması verimliliği artırır.
Görev İptali ve Temizlik
Bir görev iptal edildiğinde, finally ve asyncio.CancelledError'ın düzgün şekilde ele alınması çok önemlidir. Bu, dosya ve bağlantıların serbest bırakılmasını ve ara durumların düzgün şekilde ele alınmasını sağlar; uygulamanın tutarlılığını korur.
1import asyncio
2
3async def long_running():
4 try:
5 print("started long_running")
6 await asyncio.sleep(10)
7 print("finished long_running")
8 except asyncio.CancelledError:
9 print("long_running was cancelled, cleaning up")
10 # perform cleanup here
11 raise
12
13async def main():
14 task = asyncio.create_task(long_running())
15 await asyncio.sleep(1)
16 task.cancel()
17 try:
18 await task
19 except asyncio.CancelledError:
20 print("task cancelled in main")
21
22if __name__ == "__main__":
23 asyncio.run(main())- İptal bir istisna (
CancelledError) olarak iletilir, bu nedenle temizliğiexceptbloğunda yapmalı ve gerekiyorsa istisnayı yeniden yükseltmelisiniz.
Pratik tasarım için ana noktalar
Aşağıdakiler, asenkron işlem tasarımında faydalı pratik noktalardır.
-
Eşzamanlılığı açıkça kontrol edin API veya veritabanı gibi kaynak sınırlamaları olduğunda, eşzamanlı görev sayısını
Semaphoreile sınırlayabilirsiniz. -
Paylaşılan kaynakları güvenli şekilde yönetin Birden fazla görevden durumu güncellemeniz gerekiyorsa,
Lockkullanın. Paylaşılan durumu azaltmak ve değiştirilemez veri ile tasarlamak işleri daha güvenli hale getirir. -
Sonuçları nasıl alacağınızı seçin Görevleri tamamlandıkça işlemek istiyorsanız
asyncio.as_completed, sıraya göre işlemek istiyorsanızgatherkullanın. -
Yoğun senkron işlemleri izole edin CPU yoğun veya senkron kütüphane çağrıları için, olay döngüsünü engellememek adına
run_in_executorveyaProcessPoolExecutorkullanın. -
İptal ve istisnalar için plan yapın Bir görev yarıda iptal edilse bile, kaynakları güvenli şekilde temizlemek için uygun istisna yönetimi yazın.
-
Testi kolaylaştırın I/O, zaman veya rastgelelik gibi yan etkileri soyutlayın, böylece değiştirilebilir ve asenkron kodun testi kolaylaşır.
Özet
asyncio çok güçlüdür ancak yalnızca "işleri paralel çalıştırmaya" odaklanırsanız paylaşılan kaynak çakışması, kaynak sınırı aşımları veya olay döngüsünün tıkanması gibi sorunlarla karşılaşabilirsiniz. Semaphore, Lock, Event, Queue, run_in_executor ve uygun iptal işlemlerini birleştirerek güvenli ve verimli asenkron uygulamalar tasarlayabilirsiniz. Üretici-tüketici kalıbı, eşzamanlılık sınırlaması veya asenkron ve bloklayıcı işlemlerin ayrılması gibi mekanizmalar kullanılarak asenkron iş akışları daha güvenli ve verimli şekilde oluşturulabilir.
Yukarıdaki makaleyi, YouTube kanalımızda Visual Studio Code'u kullanarak takip edebilirsiniz. Lütfen YouTube kanalını da kontrol edin.