Al leer este artículo, aprenderá a usar la [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)biblioteca para ejecutar tareas de forma asincrónica en Python. Es una mejor alternativa a las clases threadingmultiprocessingen Python debido al hecho de que implementó ambas ThreadProcesscon la misma interfaz, que está definida por la Executorclase abstracta La documentación oficial revela un problema importante con Thread:
Además, la threadingclase no le permite devolver un valor de las funciones invocables, excepto nullEl concepto principal del concurrent.futuresmódulo reside en la Executorclase. Es una clase abstracta que proporciona métodos para ejecutar llamadas de forma asincrónica. En lugar de usarlo directamente, usaremos las subclases que heredan de él:
  • ThreadPoolExecutor
  • ProcessPoolExecutor
Pasemos a la siguiente sección y comencemos a escribir código Python.

1. ThreadPoolExecutor

Importar

Agregue la siguiente declaración de importación en la parte superior de su archivo Python:
from concurrent.futures import ThreadPoolExecutor
import time

Función invocable (objetivo)

Definamos una nueva función que sirva como función invocable para la llamada asincrónica. Definiré una función simple que duerme durante dos segundos y luego devuelve la multiplicación de ambos parámetros de entrada:
def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

Tarea individual

El siguiente paso es crear un ThreadPoolExecutorobjeto. Se recomienda encarecidamente incluirlo dentro del withadministrador de contexto, ya que llamará a la shutdownfunción por sí sola y liberará los recursos una vez que finalice la ejecución. Acepta los siguientes parámetros de entrada.
  • max_workers- El número de trabajadores para esta instancia. Para la versión 3.5 en adelante, el número predeterminado de procesadores en la máquina, multiplicado por cinco. A partir de la versión 3.8, el valor predeterminado se cambia a min(32, os.cpu_count() + 4).
  • thread_name_prefix- Permite a los usuarios controlar los threading.Threadnombres de los subprocesos de trabajo creados por el grupo para una depuración más fácil.
  • initializer - Un llamado opcional que se llama al inicio de cada proceso de trabajo.
  • initargs- Una tupla de argumentos pasados ​​al initializer.
En este tutorial, usaré solo el max_workersparámetro. Creemos una ThreadPoolExecutory llamemos a la submitfunción con wait_functioncomo una función invocable de entrada. Recuerde que wait_functionacepta dos parámetros de entrada. Los voy a pasar como parámetros separados en lugar de una tupla:
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_function, 3, 4)
La submitfunción devolverá un Futureobjeto que encapsula la ejecución asincrónica de un invocable. Las funciones más utilizadas para el Futureobjeto son:
  • cancel- Intentos de cancelar la ejecución. Devuelve un valor booleano que indica si la llamada se canceló correctamente.
  • running- Comprueba si la llamada se está ejecutando. Devuelve un booleano.
  • done- Comprueba si la llamada fue cancelada o completada. Devuelve un booleano.
  • result- Devuelve el valor devuelto por la llamada. Si la llamada aún no se ha completado, este método esperará hasta n segundos dados por el timeoutparámetro de entrada Se recomienda verificar usando la donefunción antes de llamar al resultado, ya timeoutque bloqueará la ejecución actual.
  • add_done_callback- Adjunta la función invocable al Futureobjeto. Se llamará a esta función con Futuresu único argumento cuando Futurese cancele o termine de ejecutarse.
Agregue el siguiente código justo debajo de la submitfunción. Es solo un bucle simple que imprime una cadena mientras se ejecuta el hilo. Cuando se complete, imprimirá el resultado:
while True:
    if(future.running()):
        print("Task 1 running")
    elif(future.done()):
        print(future.result())
        break
Echa un vistazo al código completo en GitHub :
from concurrent.futures import ThreadPoolExecutor
import time

def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_function, 3, 4)

    while True:
        if(future.running()):
            print("Task 1 running")
        elif(future.done()):
            print(future.result())
            break
Debería ver el siguiente resultado cuando ejecuta el archivo Python:
Este es el título de la imagen

Trabajos multiples

A continuación, agregaremos otra tarea para que ambos se ejecuten en paralelo. Cambie el código en su archivo Python a lo siguiente:
from concurrent.futures import ThreadPoolExecutor
import time

def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results
    future = executor.submit(wait_function, 3, 4)
    future2 = executor.submit(wait_function, 8, 8)
    while True:
        if(future.running()):
            print("Task 1 running")
        if(future2.running()):
            print("Task 2 running")

        if(future.done() and future2.done()):
            print(future.result(), future2.result())
            break
Por ahora, configure el max_workersprimero. Ejecútelo y debería notar que las tareas no se ejecutan en paralelo. Ejecutará la primera tarea y luego la segunda tarea. Esto se debe principalmente a que solo tiene un trabajador en el grupo. Aumentemos el valor max_workersa dos y debería poder ver que ambas tareas se ejecutan en paralelo.

Función de devolución de llamada

Puede adjuntar una función de devolución de llamada al Futureobjeto. Llamará a la función adjunta una vez que se cancele o se complete la ejecución. Esto es extremadamente útil si tiene la intención de continuar con una actualización de la interfaz de usuario después de una conexión exitosa a la base de datos o de completar las solicitudes de URL. Vamos a crear una función de devolución de llamada simple por ahora:
def callback_function(future):
    print('Callback with the following result', future.result())
Agregue el siguiente código justo debajo de la submitfunción:
future.add_done_callback(callback_function)
Echa un vistazo al código completo en GitHub :
El siguiente resultado se mostrará en la consola cuando ejecute el archivo Python:
from concurrent.futures import ThreadPoolExecutor
import time

def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

def callback_function(future):
    print('Callback with the following result', future.result())

with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results
    future = executor.submit(wait_function, 3, 4)
    future.add_done_callback(callback_function)
    future2 = executor.submit(wait_function, 8, 8)
    while True:
        if(future.running()):
            print("Task 1 running")
        if(future2.running()):
            print("Task 2 running")

        if(future.done() and future2.done()):
            print(future.result(), future2.result())
            break
Este es el título de la imagen

2. ProcessPoolExecutor

La ProcessPoolExecutorclase funciona exactamente igual ThreadPoolExecutor, pero con algunas diferencias menores. Utiliza el multiprocessingmódulo, que le permite esquivar el Global Interpreter LockSin embargo, esto también significa que solo los objetos seleccionables se pueden ejecutar y devolver.
Además, no funciona en un intérprete interactivo y debe tener una __main__función que los subprocesos de los trabajadores puedan importar. max_workersserá el número de procesos en la máquina. En el sistema operativo Windows, max_workersdebe ser igual o inferior a 61.
Tienes que importar el ProcessPoolExecutorpara usarlo:
from concurrent.futures import ProcessPoolExecutor
Puede reutilizar el código anterior y modificarlo en ProcessPoolExecutorlugar de ThreadPoolExecutorEnvuelva el código dentro de una función y llámelo directamente desde __main__Consulte el código completo a continuación en GitHub :
from concurrent.futures import ProcessPoolExecutor
import time

def wait_function(x, y):
    print('Task(', x,'multiply', y, ') started')
    time.sleep(2)
    print('Task(', x,'multiply', y, ') completed')
    return x * y

def callback_function(future):
    print('Callback with the following result', future.result())

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(wait_function, 3, 4)
        future.add_done_callback(callback_function)
        future2 = executor.submit(wait_function, 8, 8)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()

3. Interrupción del teclado

Si tiene la intención de detener la ejecución Ctrl+Cmientras el proceso se ejecuta en un hilo, el compilador probablemente se bloqueará y se atascará en la KeyboardInteruptexcepción. Esto se debe principalmente a que el Ctrl+Ccomando genera SIGINT, lo que no detendrá ni interrumpirá la ejecución. Necesita generar SIGBREAKpara finalizar la ejecución y volver al terminal. Use el siguiente comando para generar SIGBREAKsegún el sistema operativo y el modelo de computadora:
Este es el título de la imagen

4. Conclusión

Recapitulemos lo que hemos aprendido hoy.
Comenzamos con una explicación simple del concurrent.futuresmódulo.
Después de eso, exploramos en profundidad la ThreadPoolExecutorclase básica y la Futureclase. Intentamos ejecutar múltiples tareas en paralelo con un número diferente de max_workersTambién probamos la configuración de una función de devolución de llamada que se ejecutará al completar la tarea.
Pasamos a la ProcessPoolExecutor, que es similar a la ThreadPoolExecutorcon algunas diferencias menores.
Gracias por leer esta pieza. ¡Espero verte de nuevo en el próximo artículo!
Pythondata sciencedevopsasync
¿Cómo puedo iniciar tareas paralelas en Python?