Asenkron Girdi/Çıktı

Asenkron Girdi/Çıktı

Bu makale asenkron girdi/çıktıyı açıklar.

Bu rehber, Python’da pratik olarak faydalı olan asenkron girdi/çıktı kavramlarını ve kalıplarını adım adım nazikçe açıklar.

YouTube Video

Asenkron Girdi/Çıktı (I/O)

Asenkron I/O Kavramı

Asenkron I/O, dosya işlemleri veya ağ iletişimi gibi zaman alan I/O işlemleri beklenirken diğer işlemlerin paralel olarak çalışmasını sağlayan bir mekanizmadır. Python'da, standart asenkron çerçeve olarak asyncio sunulur ve birçok kütüphane bu mekanizmayı takip edecek şekilde tasarlanmıştır.

Temel Bilgiler: async / await ve Olay Döngüsü

Öncelikle, temel coroutine'lerin nasıl yazıldığını ve birden fazla coroutine'in asyncio.gather kullanılarak aynı anda nasıl çalıştırıldığının bir örneğini göstereceğiz.

Aşağıdaki kod, asenkron fonksiyonları tanımlamanın ve eşzamanlı olarak çalıştırmanın minimal bir örneğidir. sleep fonksiyonu, paralel yürütmeyi göstermek için kullanılır.

 1import asyncio
 2
 3async def worker(name: str, delay: float):
 4    # Simulate I/O-bound work by sleeping
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} finished after {delay}s")
 8    return name, delay
 9
10async def main():
11    # Run multiple coroutines concurrently
12    tasks = [
13        worker("A", 1.5),
14        worker("B", 1.0),
15        worker("C", 0.5),
16    ]
17    results = await asyncio.gather(*tasks)
18    print("Results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • Bu kod, asyncio.run() kullanarak olay döngüsünü başlatır ve üç coroutine'i eşzamanlı olarak çalıştırır.

async with ve Asenkron Bağlam Yöneticileri

Asenkron işlemlerde, bağlantıların açılıp kapatılması veya dosyaların kapatılması gibi kaynak yönetimi kolayca karmaşık hale gelebilir. İşte bu noktada, async with kullanan asenkron bağlam yöneticileri oldukça faydalı olur. Bu sözdizimi, eşzamanlı with ifadesiyle aynı şekilde kullanılır; ancak iç işlemler asenkrondur, bu nedenle async/await akışına doğal bir şekilde uyum sağlar.

async with kullanmak için iki ana neden vardır:.

  • Bağlantılar, dosya tanıtıcıları veya oturumlar gibi kaynakları güvenle temizlemek. Olağan dışı bir sonlanma olsa bile kaynakların düzgün bir şekilde serbest bırakıldığından emin olabilirsiniz.
  • Bağlantıların kurulması veya kapatılması ve tamponun boşaltılması gibi başlatma ve temizleme işlemlerini asenkron şekilde otomatikleştirmek. Bu, elle kod yazma zahmetini ortadan kaldırır ve kodunuzu daha anlaşılır hale getirir.

Aşağıda sıfırdan basit bir asenkron bağlam yöneticisinin oluşturulma örneği verilmiştir.

 1import asyncio
 2
 3class AsyncResource:
 4    async def __aenter__(self):
 5        print("Opening resource...")
 6        await asyncio.sleep(0.5)
 7        print("Resource opened")
 8        return self
 9
10    async def __aexit__(self, exc_type, exc, tb):
11        print("Closing resource...")
12        await asyncio.sleep(0.5)
13        print("Resource closed")
14
15async def main():
16    async with AsyncResource() as r:
17        print("Using resource...")
18
19if __name__ == "__main__":
20    asyncio.run(main())
  • __aenter__ ve __aexit__ tanımlayarak async with kullanabilirsiniz.
  • async with bloğuna girerken ve çıkarken yapılan işlemler asenkron ve güvenli şekilde yürütülür.

Asenkron Dosya I/O'su (aiofiles)

Dosya işlemleri, engelleyici (blocking) işlemlerin klasik bir örneğidir. aiofiles kullanarak dosya işlemlerini güvenli bir şekilde asenkron olarak gerçekleştirebilirsiniz. İçeride bir iş parçacığı havuzu kullanır ve async with ile dosyaların düzgün şekilde kapatılmasını sağlar.

Aşağıdaki örnek, birden fazla dosyanın asenkron olarak paralel şekilde okunmasını gösterir. Bu kodu çalıştırmadan önce pip install aiofiles ile aiofiles kütüphanesini kurmanız gerekir.

 1# pip install aiofiles
 2import asyncio
 3import aiofiles
 4from pathlib import Path
 5
 6async def read_file(path: Path):
 7    # Read file content asynchronously
 8    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
 9        contents = await f.read()
10    return path.name, len(contents)
11
12async def main():
13    files = [
14        Path("example1.txt"),
15        Path("example2.txt"),
16        Path("example3.txt")
17    ]
18    tasks = [read_file(p) for p in files]
19    results = await asyncio.gather(*tasks)
20    for name, size in results:
21        print(f"{name}: {size} bytes")
22
23if __name__ == "__main__":
24    asyncio.run(main())
  • Bu kod, her dosyanın paralel olarak okunmasını sağlar. aiofiles genellikle dahili olarak bir iş parçacığı havuzu kullanır ve bu sayede bloklanan dosya I/O işlemlerini asenkron arayüzle yönetmenize olanak sağlar.

Asenkron HTTP İstemcisi (aiohttp)

Ağ I/O'sunun klasik bir örneği olarak, burada HTTP isteklerinin asenkron nasıl yapıldığı gösterilmektedir. Özellikle çok sayıda HTTP isteğini paralel olarak gerçekleştirmeniz gerektiğinde son derece güçlüdür.

Aşağıda, aiohttp kullanarak birden fazla URL'nin paralel olarak getirilmesinin bir örneği verilmiştir. pip install aiohttp ile aiohttp kütüphanesini kurmanız gerekecektir.

 1# pip install aiohttp
 2import asyncio
 3import aiohttp
 4
 5async def fetch(session: aiohttp.ClientSession, url: str):
 6    # Fetch a URL asynchronously and return status and size
 7    async with session.get(url) as resp:
 8        text = await resp.text()
 9        return url, resp.status, len(text)
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [fetch(session, u) for u in urls]
14        for coro in asyncio.as_completed(tasks):
15            url, status, size = await coro
16            print(f"{url} -> {status}, {size} bytes")
17
18if __name__ == "__main__":
19    urls = [
20        "https://codesparklab.com/json/example1.json",
21        "https://codesparklab.com/json/example2.json",
22        "https://codesparklab.com/json/example3.json",
23    ]
24    asyncio.run(main(urls))
  • asyncio.as_completed kullanarak, görevler tamamlandıkça sonuçları sırayla işleyebilirsiniz. Bu, çok sayıda isteği verimli şekilde ele alabilmek için oldukça faydalıdır.

Bloke Edici G/Ç ile Birlikte Var Olmak: run_in_executor

Asenkron kodda CPU yoğun görevlerle veya mevcut bloklayan API’lerle çalışırken, loop.run_in_executor üzerinden ThreadPoolExecutor veya ProcessPoolExecutor kullanın.

Aşağıdaki kod, bloklayan I/O varsayan görevleri bir iş parçacığı havuzunu kullanarak eşzamanlı çalıştırmanın bir örneğidir.

 1import asyncio
 2from concurrent.futures import ThreadPoolExecutor
 3import time
 4
 5def blocking_io(n):
 6    # Simulate a blocking I/O or CPU-bound function
 7    time.sleep(n)
 8    return f"slept {n}s"
 9
10async def main():
11    loop = asyncio.get_running_loop()
12    with ThreadPoolExecutor() as pool:
13        tasks = [
14            loop.run_in_executor(pool, blocking_io, 1),
15            loop.run_in_executor(pool, blocking_io, 2),
16        ]
17        results = await asyncio.gather(*tasks)
18        print("Blocking results:", results)
19
20if __name__ == "__main__":
21    asyncio.run(main())
  • run_in_executor kullanarak mevcut senkron kodunuzu, önemli bir yeniden yazma gerektirmeden asenkron akışların içine dahil edebilirsiniz. Ancak iş parçacığı sayısı ve CPU yüküne dikkat etmelisiniz.
  • ProcessPoolExecutor, CPU’ya bağlı görevler için uygundur.

Asenkron Sunucu: asyncio tabanlı TCP Echo Sunucusu

Soketleri doğrudan yönetmek isterseniz, asyncio.start_server kullanarak kolayca asenkron bir sunucu oluşturabilirsiniz.

Aşağıdaki örnek, istemciden gelen verileri olduğu gibi geri gönderen basit bir echo sunucusudur.

 1import asyncio
 2
 3async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
 4    # Handle a single client: read data and echo it back
 5    addr = writer.get_extra_info('peername')
 6    print(f"Connection from {addr}")
 7    while True:
 8        data = await reader.read(1024)
 9        if not data:
10            break
11        writer.write(data)  # echo back
12        await writer.drain()
13    writer.close()
14    await writer.wait_closed()
15    print(f"Connection closed {addr}")
16
17async def main():
18    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
19    addr = server.sockets[0].getsockname()
20    print(f"Serving on {addr}")
21    async with server:
22        await server.serve_forever()
23
24if __name__ == "__main__":
25    asyncio.run(main())
  • asyncio ile yapılan TCP iletişiminde, StreamReader ve StreamWriter eşzamansız giriş ve çıkışta merkezi bir rol oynar. StreamReader, istemciden gönderilen verileri eşzamansız olarak okurken, StreamWriter, sunucudan istemciye yanıt göndermek için kullanılır.

  • Soketlerin ayrıntılı işlemlerini kendiniz yönetmeseniz bile, asyncio.start_server kullanarak basit ve verimli bir şekilde bir eşzamansız sunucu başlatabilirsiniz.

  • asyncio.start_server'a bir işleyici fonksiyonu verdiğinizde, o fonksiyon reader ve writer'ı argüman olarak alır. Bunları kullanarak, düşük seviyeli soket API'lerini doğrudan yönetmekten daha güvenli ve anlaşılır bir şekilde iletişim süreçlerini uygulayabilirsiniz. Örneğin, reader.read() ile veri alıp, writer.write() ile writer.drain()'ı birleştirerek, iletimin tamamlanmasını sağlayan eşzamansız veri gönderimini uygulayabilirsiniz.

  • Bu yapı, çok sayıda eşzamanlı bağlantıyı yönetmek için uygundur ve basit protokoller ya da küçük çaplı TCP servisleri için idealdir.

Büyük Akış Verisinin Yönetilmesi

Büyük dosya veya yanıtları sıralı işlerken, veriyi parçalara bölerek okuyup yazarak bellek kullanımını düşük tutun. Aşağıda, aiohttp kullanılarak akış şeklinde okumanın bir örneği verilmiştir.

Aşağıdaki kod, HTTP yanıtlarını parça parça işler ve veri geldikçe diske yazar.

 1import aiohttp
 2import asyncio
 3import aiofiles
 4
 5async def stream_download(url: str, dest: str):
 6    # Stream download and write to file in chunks
 7    async with aiohttp.ClientSession() as session:
 8        async with session.get(url) as resp:
 9            async with aiofiles.open(dest, 'wb') as f:
10                async for chunk in resp.content.iter_chunked(1024 * 64):
11                    await f.write(chunk)
12
13if __name__ == "__main__":
14    asyncio.run(stream_download("https://codesparklab.com/100MB.bin", "download.bin"))
  • Bu kod, büyük bir dosyayı bir kerede yüklemez; bunun yerine veriyi küçük parçalar (bölümler) halinde alır ve bunu dosyaya eşzamansız olarak yazar. Sonuç olarak, bellek kullanımını düşük tutarken hızlı ve verimli bir şekilde indirme işlemleri gerçekleştirebilir. aiohttp veriyi eşzamansız olarak alır ve aiofiles dosyaya engellemeden yazar, böylece diğer süreçlerle birlikte kolayca çalıştırılabilir.

  • Bu yapı, büyük dosyaları verimli şekilde indirip kaydederken bellek kullanımını en aza indirger.

Alt Süreçlerin Asenkron Yürütülmesi

Harici komutları asenkron olarak çalıştırmak ve çıktılarını gerçek zamanlı okumak isterseniz asyncio.create_subprocess_exec faydalıdır.

Aşağıda, harici bir komutun başlatılıp standart çıktısının gerçek zamanlı okunmasının bir örneği verilmiştir.

 1import asyncio
 2
 3async def run_cmd(cmd):
 4    # Run external command asynchronously and capture output line by line
 5    proc = await asyncio.create_subprocess_exec(
 6        *cmd,
 7        stdout=asyncio.subprocess.PIPE,
 8        stderr=asyncio.subprocess.PIPE
 9    )
10
11    async def read_stream(stream, name):
12        while True:
13            line = await stream.readline()
14            if not line:
15                break
16            print(f"[{name}] {line.decode().rstrip()}")
17
18    await asyncio.gather(
19        read_stream(proc.stdout, "stdout"),
20        read_stream(proc.stderr, "stderr"),
21    )
22    await proc.wait()
23    return proc.returncode
24
25if __name__ == "__main__":
26    asyncio.run(run_cmd(["python", "--version"]))
  • Alt süreçleri asenkron olarak kontrol ederek, harici araçlardan gelen logları anlık olarak ele alabilir veya birden fazla süreci paralel çalıştırabilirsiniz.

İptal ve Zaman Aşımının Yönetilmesi

Asenkron görevler iptal edilebilir. Bir zaman aşımı uygularken, asyncio.wait_for kullanmak basittir.

Aşağıda, zaman aşımı ile birlikte bir görevin nasıl çalıştırılacağının bir örneği vardır.

 1import asyncio
 2
 3async def slow_task():
 4    await asyncio.sleep(5)
 5    return "done"
 6
 7async def main():
 8    try:
 9        result = await asyncio.wait_for(slow_task(), timeout=2.0)
10        print("Result:", result)
11    except asyncio.TimeoutError:
12        print("Task timed out")
13
14if __name__ == "__main__":
15    asyncio.run(main())
  • wait_for, zaman aşımı gerçekleşirse bir TimeoutError fırlatır ve gerekirse görevi iptal eder. Görev iptalinin yayılması ve temizlenmesi konusunda dikkatli olun.

Eşzamanlılığı Kontrol Etmek (Semaphore)

Çok fazla eşzamanlı bağlantı veya istek kaynakları tüketebileceğinden, eşzamanlılığı asyncio.Semaphore ile sınırlandırın.

Aşağıda, bir semaphore kullanarak eşzamanlı indirme işleminin sınırlandırılmasına dair bir örnek verilmiştir.

 1import asyncio
 2import aiohttp
 3
 4semaphore = asyncio.Semaphore(3)  # allow up to 3 concurrent tasks
 5
 6async def limited_fetch(session, url):
 7    async with semaphore:
 8        async with session.get(url) as resp:
 9            return url, resp.status
10
11async def main(urls):
12    async with aiohttp.ClientSession() as session:
13        tasks = [limited_fetch(session, u) for u in urls]
14        results = await asyncio.gather(*tasks)
15        print(results)
16
17if __name__ == "__main__":
18    urls = ["https://codesparklab.com/json/example.json"] * 10
19    asyncio.run(main(urls))
  • Bu yöntemle, harici servislere kontrollü şekilde erişebilir ve kendi sürecinizi aşırı yüklemeden kaçınabilirsiniz.

Hata Yönetimi ve Yeniden Deneme Stratejileri

Asenkron işlemlerde bile hatalar mutlaka meydana gelir. İstisnaları uygun şekilde yakalayın ve üssel gecikme (exponential backoff) gibi yeniden deneme stratejeleri uygulayın.

Aşağıda, N kez tekrar denenebilen basit bir yeniden deneme mantığının uygulama örneği vardır.

 1import asyncio
 2import aiohttp
 3import random
 4
 5async def fetch_with_retry(session, url, retries=3):
 6    for attempt in range(1, retries + 1):
 7        try:
 8            async with session.get(url) as resp:
 9                resp.raise_for_status()
10                text = await resp.text()
11                return text
12        except Exception as e:
13            if attempt == retries:
14                raise
15            await asyncio.sleep(0.5 * attempt + random.random())
16
17async def main():
18    async with aiohttp.ClientSession() as session:
19        text = await fetch_with_retry(session, "https://codesparklab.com/")
20        print("Fetched length:", len(text))
21
22if __name__ == "__main__":
23    asyncio.run(main())
  • Doğru bir yeniden deneme mantığı, tutarlılık ve trafik kontrolü arasında denge kurmak için önemlidir.

Hata Ayıklama ve Kayıt Tutma İpuçları

Asenkron işlerde, görevler aynı anda ilerlediği için sorunların nedenini bulmak zorlaşabilir. Sorunları verimli şekilde izlemek için aşağıdaki noktalara dikkat etmek hata ayıklamayı daha kolay hale getirecektir.

  • asyncio.run() ve Task’den gelen istisnalar kolayca gözden kaçabilir; bu nedenle ele alınmayan hataları kayıt altına aldığınızdan emin olun.
  • logging kullanırken, günlüklerinize coroutine adını veya Python 3.8 ve üzeri sürümlerde task.get_name()i dahil etmek, takibi kolaylaştırır.
  • asyncio.Task.all_tasks() kullanarak görevlerin mevcut durumunu kontrol edebilirsiniz. Ancak, bu API hata ayıklama amacıyla tasarlanmıştır ve performans sorunları veya beklenmeyen müdahaleleri önlemek için üretim ortamlarında dikkatle kullanılmalıdır.

Performans Dikkat Noktaları

Asenkron programlama, I/O beklemelerini yönetmede çok iyi olsa da yanlış kullanımı performansı düşürebilir. Aşağıdaki konulara dikkat ederek optimizasyon yapın:.

  • Asenkron işleme I/O odaklı görevlerde çok iyi performans gösterir, ancak CPU odaklı görevler için uygun değildir; bu tür durumlarda işlem havuzu kullanın.
  • İş parçacığı veya işlem havuzları kullanırken, havuz boyutunu ve görevlerin doğasını göz önünde bulundurun.
  • Birden fazla küçük görevi aynı anda başlatırsanız, olay döngüsü yükü artar—bu nedenle toplu işlem (batching) veya semaphore kullanarak ayarlayın.

Özet

Python'un asenkron I/O'su, I/O bekleme sürelerini etkin kullanmasını sağlayan ve ağ ve dosya işlemlerini aynı anda verimli şekilde yürüten güçlü bir mekanizmadır. asyncio, aiohttp, aiofiles ve run_in_executor gibi teknikleri birleştirerek esnek ve pratik asenkron uygulamalar geliştirebilirsiniz. async with kullanarak kaynak edinimi ve salımını otomatikleştirmek; dosyalar, HTTP oturumları ve kilitler gibi asenkron kaynakları güvenli ve güvenilir bir şekilde yönetmenizi sağlar. Doğru hata yönetimi ve eşzamanlılık kontrolü ile yüksek güvenilirliğe sahip asenkron programları güvenle çalıştırabilirsiniz.

Yukarıdaki makaleyi, YouTube kanalımızda Visual Studio Code'u kullanarak takip edebilirsiniz. Lütfen YouTube kanalını da kontrol edin.

YouTube Video