Patrones de Concurrencia y Asincronía
Domina la ejecución paralela y las operaciones no bloqueantes
La concurrencia permite que tu programa gestione múltiples operaciones simultáneamente, aprovechando todo el poder de los procesadores modernos. Es como un chef preparando varios platos en paralelo, orquestando cada paso para que todos estén listos al mismo tiempo.
La programación asíncrona, por otro lado, permite que tu aplicación continúe con otras tareas mientras espera que se completen operaciones lentas — como enviar un pedido a la cocina y atender a otros clientes mientras el plato se prepara.
En este artículo exploraremos los patrones fundamentales: Thread Pool, Worker Model, Future y Promise, y Observer en programación reactiva.
Thread Pool Pattern
Un thread (hilo) es la unidad más pequeña de procesamiento que puede ser programada por el sistema operativo. Los threads son como vías de ejecución que corren simultáneamente, permitiendo realizar múltiples actividades a la vez y mejorar el rendimiento.
El patrón Thread Pool gestiona un conjunto fijo de threads reutilizables para ejecutar tareas, evitando el overhead de crear y destruir threads constantemente.
Casos de Uso
Cómo Funciona
Implementación con ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor import time def task(n): print(f"Executing task {n}") time.sleep(1) print(f"Task {n} completed") # Crear pool con 5 worker threads with ThreadPoolExecutor(max_workers=5) as executor: # Enviar 10 tareas al pool for i in range(10): executor.submit(task, i)
Executing task 1
Executing task 2
Executing task 3
Executing task 4
Task 0 completed
Executing task 5
Task 1 completed
Executing task 6
...
ThreadPoolExecutor gestiona todo automáticamente.
Worker Model Pattern
Mientras que el Thread Pool se enfoca en reutilizar un número fijo de threads, el Worker Model se centra en la distribución dinámica de tareas entre entidades worker potencialmente escalables y flexibles.
Este patrón es particularmente útil para escenarios donde las tareas son independientes y pueden procesarse en paralelo, usando multiprocessing para aprovechar múltiples núcleos de CPU.
Casos de Uso
Implementación con Multiprocessing
from multiprocessing import Process, Queue import time def worker(task_queue): while not task_queue.empty(): task = task_queue.get() print(f"Worker {task} is processing") time.sleep(1) print(f"Worker {task} completed") def main(): # Crear cola de tareas task_queue = Queue() for i in range(10): task_queue.put(i) # Crear 5 procesos worker processes = [ Process(target=worker, args=(task_queue,)) for _ in range(5) ] # Iniciar todos los workers for p in processes: p.start() # Esperar a que terminen for p in processes: p.join() print("All tasks completed.") if __name__ == "__main__": main()
Worker 1 is processing
Worker 2 is processing
Worker 3 is processing
Worker 4 is processing
Worker 0 completed
Worker 5 is processing
...
All tasks completed.
ThreadPoolExecutor, multiprocessing crea procesos separados que evitan el GIL de Python, permitiendo verdadero paralelismo en tareas CPU-bound.
Future y Promise Pattern
En programación asíncrona, un Future representa un valor que aún no se conoce pero que estará disponible eventualmente. Cuando una función inicia una operación asíncrona, en lugar de bloquear hasta completarse, retorna inmediatamente un objeto Future como placeholder del resultado futuro.
Los Futures son comúnmente usados para operaciones I/O, requests de red y otras tareas que toman tiempo y corren de forma asíncrona. Permiten que el programa continúe ejecutando otras tareas — esta propiedad se conoce como non-blocking.
Mecanismo en 3 Pasos
Implementación con concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed def square(x): return x * x # Crear executor y obtener Futures with ThreadPoolExecutor() as executor: future1 = executor.submit(square, 2) future2 = executor.submit(square, 3) future3 = executor.submit(square, 4) futures = [future1, future2, future3] # Iterar sobre Futures completados for future in as_completed(futures): print(f"Result: {future.result()}")
Result: 4
Result: 9
Implementación con asyncio
La librería asyncio proporciona soporte para I/O asíncrono, event loops y coroutines. Es particularmente útil para tareas I/O-bound.
import asyncio async def square(x): # Simular operación I/O-bound await asyncio.sleep(1) return x * x async def main(): # Crear Futures fut1 = asyncio.ensure_future(square(2)) fut2 = asyncio.ensure_future(square(3)) fut3 = asyncio.ensure_future(square(4)) # Esperar y recoger resultados results = await asyncio.gather(fut1, fut2, fut3) for result in results: print(f"Result: {result}") if __name__ == "__main__": asyncio.run(main())
Result: 9
Result: 16
coroutines se declaran con async def y pueden pausar su ejecución con await, permitiendo que otras coroutines se ejecuten mientras tanto. Es ideal para web scraping, API calls y operaciones I/O intensivas.
Observer en Reactive Programming
El patrón Observer tradicional notifica a objetos cuando el estado cambia. Sin embargo, cuando debemos manejar muchos eventos interdependientes, el enfoque tradicional puede llevar a código complicado y difícil de mantener.
La programación reactiva nos da una opción interesante: reaccionar a streams de eventos manteniendo el código limpio. En el corazón de ReactiveX está el concepto de Observable.
Conceptos Clave
Implementación con ReactiveX
Primero instalamos: pip install reactivex
import reactivex as rx from reactivex import operators as ops # Crear Observable desde una lista source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") # Aplicar operadores y suscribirse source.pipe( ops.map(lambda s: len(s)), # Transformar a longitud ops.filter(lambda i: i >= 5) # Filtrar >= 5 ).subscribe( on_next=lambda i: print(f"Received: {i}"), on_error=lambda e: print(f"Error: {e}"), on_completed=lambda: print("Done!") )
Received: 5
Received: 7
Done!
Observable desde Archivo de Datos
import reactivex as rx from reactivex import operators as ops from collections import Counter def firstnames_from_db(): """Retorna Observable desde archivo de texto""" with open("people.txt", "r") as f: content = f.read() names = [name.strip().split()[0] for name in content.split(",") if name.strip()] return rx.from_iterable(names) def main(): print("Starting... Press Ctrl+C to quit") # Contar ocurrencias de cada nombre firstnames_from_db().pipe( ops.map(lambda name: (name, 1)) ).subscribe( on_next=lambda x: print(x) ) if __name__ == "__main__": main()
('Peter', 1)
('Gabriel', 1)
('Gary', 1)
('Heather', 1)
...
Otros Patrones de Concurrencia
Existen otros patrones de concurrencia y asincronía que los desarrolladores pueden usar según sus necesidades específicas:
asyncio.Resumen de Patrones
01 // Thread Pool
Gestiona un conjunto fijo de threads reutilizables, mejorando rendimiento y reduciendo overhead de creación/destrucción.
02 // Worker Model
Distribución dinámica de tareas entre workers escalables, ideal para procesamiento paralelo con multiprocessing.
03 // Future y Promise
Facilita operaciones asíncronas non-blocking, permitiendo que la aplicación permanezca responsiva.
04 // Observer Reactivo
Reacciona a streams de eventos manteniendo código limpio mediante Observables y operadores funcionales.