viernes, 16 de enero de 2026

Principios Fundamentales de Diseño en Python - Parte 7 - Patrones de Concurrencia y Asincronía

Patrones de Concurrencia y Asincronía en Python
Python // Design Patterns

Patrones de Concurrencia y Asincronía

Domina la ejecución paralela y las operaciones no bloqueantes

La concurrencia permite que tu programa gestione múltiples operaciones simultáneamente, aprovechando todo el poder de los procesadores modernos. Es como un chef preparando varios platos en paralelo, orquestando cada paso para que todos estén listos al mismo tiempo.

La programación asíncrona, por otro lado, permite que tu aplicación continúe con otras tareas mientras espera que se completen operaciones lentas — como enviar un pedido a la cocina y atender a otros clientes mientras el plato se prepara.

En este artículo exploraremos los patrones fundamentales: Thread Pool, Worker Model, Future y Promise, y Observer en programación reactiva.

PATTERN_01

Thread Pool Pattern

Un thread (hilo) es la unidad más pequeña de procesamiento que puede ser programada por el sistema operativo. Los threads son como vías de ejecución que corren simultáneamente, permitiendo realizar múltiples actividades a la vez y mejorar el rendimiento.

El patrón Thread Pool gestiona un conjunto fijo de threads reutilizables para ejecutar tareas, evitando el overhead de crear y destruir threads constantemente.

Casos de Uso

BATCH PROCESSING
Distribuir muchas tareas paralelas entre workers
LOAD BALANCING
Distribuir carga equitativamente entre threads
RESOURCE OPTIMIZATION
Minimizar uso de memoria y CPU reutilizando threads
Arquitectura Thread Pool
Task Queue
Thread 1
Thread 2
Thread 3
Thread N
Results

Cómo Funciona

STEP 01
Inicialización: El pool crea un número fijo de worker threads al inicio.
STEP 02
Submission: Las tareas se envían al pool en lugar de crear threads nuevos.
STEP 03
Ejecución: El pool asigna tareas a threads disponibles. Si todos están ocupados, las tareas esperan en cola.
STEP 04
Reutilización: Al completar una tarea, el thread vuelve al pool para nuevas asignaciones.

Implementación con ThreadPoolExecutor

thread_pool.py
from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    print(f"Executing task {n}")
    time.sleep(1)
    print(f"Task {n} completed")

# Crear pool con 5 worker threads
with ThreadPoolExecutor(max_workers=5) as executor:
    # Enviar 10 tareas al pool
    for i in range(10):
        executor.submit(task, i)
python thread_pool.py
Executing task 0
Executing task 1
Executing task 2
Executing task 3
Executing task 4
Task 0 completed
Executing task 5
Task 1 completed
Executing task 6
...
Los 5 workers procesan las 10 tareas concurrentemente. Cuando un worker termina, inmediatamente toma la siguiente tarea de la cola. El ThreadPoolExecutor gestiona todo automáticamente.
PATTERN_02

Worker Model Pattern

Mientras que el Thread Pool se enfoca en reutilizar un número fijo de threads, el Worker Model se centra en la distribución dinámica de tareas entre entidades worker potencialmente escalables y flexibles.

Este patrón es particularmente útil para escenarios donde las tareas son independientes y pueden procesarse en paralelo, usando multiprocessing para aprovechar múltiples núcleos de CPU.

Casos de Uso

PARALLEL COMPUTING
Tareas CPU-intensive que necesitan múltiples cores
DATA PROCESSING
Procesar grandes datasets en chunks paralelos
TASK QUEUES
Sistemas de colas con múltiples consumidores
Worker Model con Multiprocessing
Task Queue
Process 1
Process 2
Process N

Implementación con Multiprocessing

worker_model.py
from multiprocessing import Process, Queue
import time

def worker(task_queue):
    while not task_queue.empty():
        task = task_queue.get()
        print(f"Worker {task} is processing")
        time.sleep(1)
        print(f"Worker {task} completed")

def main():
    # Crear cola de tareas
    task_queue = Queue()
    for i in range(10):
        task_queue.put(i)
    
    # Crear 5 procesos worker
    processes = [
        Process(target=worker, args=(task_queue,))
        for _ in range(5)
    ]
    
    # Iniciar todos los workers
    for p in processes:
        p.start()
    
    # Esperar a que terminen
    for p in processes:
        p.join()
    
    print("All tasks completed.")

if __name__ == "__main__":
    main()
python worker_model.py
Worker 0 is processing
Worker 1 is processing
Worker 2 is processing
Worker 3 is processing
Worker 4 is processing
Worker 0 completed
Worker 5 is processing
...
All tasks completed.
A diferencia de ThreadPoolExecutor, multiprocessing crea procesos separados que evitan el GIL de Python, permitiendo verdadero paralelismo en tareas CPU-bound.
PATTERN_03

Future y Promise Pattern

En programación asíncrona, un Future representa un valor que aún no se conoce pero que estará disponible eventualmente. Cuando una función inicia una operación asíncrona, en lugar de bloquear hasta completarse, retorna inmediatamente un objeto Future como placeholder del resultado futuro.

Los Futures son comúnmente usados para operaciones I/O, requests de red y otras tareas que toman tiempo y corren de forma asíncrona. Permiten que el programa continúe ejecutando otras tareas — esta propiedad se conoce como non-blocking.

Mecanismo en 3 Pasos

INITIATION
Se inicia la operación asíncrona y retorna un Future inmediatamente. Internamente se crea un Promise vinculado al Future.
EXECUTION
La operación procede independientemente del flujo principal. El programa permanece responsivo y puede continuar con otras tareas.
RESOLUTION
Si la operación es exitosa, el Promise se "fulfills". Si falla, se "rejects". El resultado se obtiene mediante callbacks o continuations.
Ciclo Future / Promise
Submit Task
Future (placeholder)
Result / Error

Implementación con concurrent.futures

future_concurrent.py
from concurrent.futures import ThreadPoolExecutor, as_completed

def square(x):
    return x * x

# Crear executor y obtener Futures
with ThreadPoolExecutor() as executor:
    future1 = executor.submit(square, 2)
    future2 = executor.submit(square, 3)
    future3 = executor.submit(square, 4)
    
    futures = [future1, future2, future3]
    
    # Iterar sobre Futures completados
    for future in as_completed(futures):
        print(f"Result: {future.result()}")
python future_concurrent.py
Result: 16
Result: 4
Result: 9

Implementación con asyncio

La librería asyncio proporciona soporte para I/O asíncrono, event loops y coroutines. Es particularmente útil para tareas I/O-bound.

future_asyncio.py
import asyncio

async def square(x):
    # Simular operación I/O-bound
    await asyncio.sleep(1)
    return x * x

async def main():
    # Crear Futures
    fut1 = asyncio.ensure_future(square(2))
    fut2 = asyncio.ensure_future(square(3))
    fut3 = asyncio.ensure_future(square(4))
    
    # Esperar y recoger resultados
    results = await asyncio.gather(fut1, fut2, fut3)
    
    for result in results:
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())
python future_asyncio.py
Result: 4
Result: 9
Result: 16
Las coroutines se declaran con async def y pueden pausar su ejecución con await, permitiendo que otras coroutines se ejecuten mientras tanto. Es ideal para web scraping, API calls y operaciones I/O intensivas.
PATTERN_04

Observer en Reactive Programming

El patrón Observer tradicional notifica a objetos cuando el estado cambia. Sin embargo, cuando debemos manejar muchos eventos interdependientes, el enfoque tradicional puede llevar a código complicado y difícil de mantener.

La programación reactiva nos da una opción interesante: reaccionar a streams de eventos manteniendo el código limpio. En el corazón de ReactiveX está el concepto de Observable.

Conceptos Clave

OBSERVABLE
Fuente que emite streams de datos o eventos
OBSERVER
Consumidor que reacciona a los datos emitidos
OPERATORS
Transformaciones sobre el stream (map, filter, etc.)
Flujo Reactivo
Data Source
Observable
→ emit →
Observer

Implementación con ReactiveX

Primero instalamos: pip install reactivex

rx_observable.py
import reactivex as rx
from reactivex import operators as ops

# Crear Observable desde una lista
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

# Aplicar operadores y suscribirse
source.pipe(
    ops.map(lambda s: len(s)),      # Transformar a longitud
    ops.filter(lambda i: i >= 5)   # Filtrar >= 5
).subscribe(
    on_next=lambda i: print(f"Received: {i}"),
    on_error=lambda e: print(f"Error: {e}"),
    on_completed=lambda: print("Done!")
)
python rx_observable.py
Received: 5
Received: 5
Received: 7
Done!

Observable desde Archivo de Datos

rx_peoplelist.py
import reactivex as rx
from reactivex import operators as ops
from collections import Counter

def firstnames_from_db():
    """Retorna Observable desde archivo de texto"""
    with open("people.txt", "r") as f:
        content = f.read()
    
    names = [name.strip().split()[0] 
             for name in content.split(",") 
             if name.strip()]
    
    return rx.from_iterable(names)

def main():
    print("Starting... Press Ctrl+C to quit")
    
    # Contar ocurrencias de cada nombre
    firstnames_from_db().pipe(
        ops.map(lambda name: (name, 1))
    ).subscribe(
        on_next=lambda x: print(x)
    )

if __name__ == "__main__":
    main()
python rx_peoplelist.py
Starting... Press Ctrl+C to quit
('Peter', 1)
('Gabriel', 1)
('Gary', 1)
('Heather', 1)
...
ReactiveX permite componer operaciones asíncronas de forma declarativa. Los streams pueden venir de archivos, APIs, eventos de UI, o cualquier fuente de datos. La idea es reaccionar a los datos como fluyen, igual que las corrientes de agua en la naturaleza.
EXTRAS

Otros Patrones de Concurrencia

Existen otros patrones de concurrencia y asincronía que los desarrolladores pueden usar según sus necesidades específicas:

ACTOR MODEL
Modelo conceptual donde actores toman decisiones locales, crean más actores, envían mensajes y determinan cómo responder al siguiente mensaje.
COROUTINES
Estructuras de control donde el flujo se pasa cooperativamente entre rutinas. Python las soporta nativamente vía asyncio.
MESSAGE PASSING
Entidades de software se comunican y coordinan pasándose mensajes entre sí, usado en computación paralela e IPC.
BACKPRESSURE
Mecanismo para gestionar el flujo de datos, señalando al productor que reduzca velocidad cuando el consumidor está saturado.
Cada patrón tiene sus casos de uso y trade-offs. La elección depende del tipo de problema: CPU-bound vs I/O-bound, necesidad de paralelismo real vs concurrencia, y la arquitectura general del sistema.

Resumen de Patrones

01 // Thread Pool

Gestiona un conjunto fijo de threads reutilizables, mejorando rendimiento y reduciendo overhead de creación/destrucción.

02 // Worker Model

Distribución dinámica de tareas entre workers escalables, ideal para procesamiento paralelo con multiprocessing.

03 // Future y Promise

Facilita operaciones asíncronas non-blocking, permitiendo que la aplicación permanezca responsiva.

04 // Observer Reactivo

Reacciona a streams de eventos manteniendo código limpio mediante Observables y operadores funcionales.

Basado en "Mastering Python Design Patterns" // Kamon Ayeva & Sakis Kasampalis

Chapter 7: Concurrency and Asynchronous Patterns