Al leer este artículo, aprenderá a usar la
[concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)
biblioteca para ejecutar tareas de forma asincrónica en Python. Es una mejor alternativa a las clases threading
y multiprocessing
en Python debido al hecho de que implementó ambas Thread
y Process
con la misma interfaz, que está definida por la Executor
clase abstracta . La documentación oficial revela un problema importante con Thread
:
Además, la
threading
clase no le permite devolver un valor de las funciones invocables, excepto null
. El concepto principal del concurrent.futures
módulo reside en la Executor
clase. Es una clase abstracta que proporciona métodos para ejecutar llamadas de forma asincrónica. En lugar de usarlo directamente, usaremos las subclases que heredan de él:ThreadPoolExecutor
ProcessPoolExecutor
Pasemos a la siguiente sección y comencemos a escribir código Python.
1. ThreadPoolExecutor
Importar
Agregue la siguiente declaración de importación en la parte superior de su archivo Python:
from concurrent.futures import ThreadPoolExecutor
import time
Función invocable (objetivo)
Definamos una nueva función que sirva como función invocable para la llamada asincrónica. Definiré una función simple que duerme durante dos segundos y luego devuelve la multiplicación de ambos parámetros de entrada:
def wait_function(x, y):
print('Task(', x,'multiply', y, ') started')
time.sleep(2)
print('Task(', x,'multiply', y, ') completed')
return x * y
Tarea individual
El siguiente paso es crear un
ThreadPoolExecutor
objeto. Se recomienda encarecidamente incluirlo dentro del with
administrador de contexto, ya que llamará a la shutdown
función por sí sola y liberará los recursos una vez que finalice la ejecución. Acepta los siguientes parámetros de entrada.max_workers
- El número de trabajadores para esta instancia. Para la versión 3.5 en adelante, el número predeterminado de procesadores en la máquina, multiplicado por cinco. A partir de la versión 3.8, el valor predeterminado se cambia amin(32, os.cpu_count() + 4)
.thread_name_prefix
- Permite a los usuarios controlar losthreading.Thread
nombres de los subprocesos de trabajo creados por el grupo para una depuración más fácil.initializer
- Un llamado opcional que se llama al inicio de cada proceso de trabajo.initargs
- Una tupla de argumentos pasados alinitializer
.
En este tutorial, usaré solo el
max_workers
parámetro. Creemos una ThreadPoolExecutor
y llamemos a la submit
función con wait_function
como una función invocable de entrada. Recuerde que wait_function
acepta dos parámetros de entrada. Los voy a pasar como parámetros separados en lugar de una tupla:with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wait_function, 3, 4)
La
submit
función devolverá un Future
objeto que encapsula la ejecución asincrónica de un invocable. Las funciones más utilizadas para el Future
objeto son:cancel
- Intentos de cancelar la ejecución. Devuelve un valor booleano que indica si la llamada se canceló correctamente.running
- Comprueba si la llamada se está ejecutando. Devuelve un booleano.done
- Comprueba si la llamada fue cancelada o completada. Devuelve un booleano.result
- Devuelve el valor devuelto por la llamada. Si la llamada aún no se ha completado, este método esperará hasta n segundos dados por eltimeout
parámetro de entrada . Se recomienda verificar usando ladone
función antes de llamar al resultado, yatimeout
que bloqueará la ejecución actual.add_done_callback
- Adjunta la función invocable alFuture
objeto. Se llamará a esta función conFuture
su único argumento cuandoFuture
se cancele o termine de ejecutarse.
Agregue el siguiente código justo debajo de la
submit
función. Es solo un bucle simple que imprime una cadena mientras se ejecuta el hilo. Cuando se complete, imprimirá el resultado:while True:
if(future.running()):
print("Task 1 running")
elif(future.done()):
print(future.result())
break
Echa un vistazo al código completo en GitHub :
from concurrent.futures import ThreadPoolExecutor
import time
def wait_function(x, y):
print('Task(', x,'multiply', y, ') started')
time.sleep(2)
print('Task(', x,'multiply', y, ') completed')
return x * y
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(wait_function, 3, 4)
while True:
if(future.running()):
print("Task 1 running")
elif(future.done()):
print(future.result())
break
Debería ver el siguiente resultado cuando ejecuta el archivo Python:
Trabajos multiples
A continuación, agregaremos otra tarea para que ambos se ejecuten en paralelo. Cambie el código en su archivo Python a lo siguiente:
from concurrent.futures import ThreadPoolExecutor
import time
def wait_function(x, y):
print('Task(', x,'multiply', y, ') started')
time.sleep(2)
print('Task(', x,'multiply', y, ') completed')
return x * y
with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results
future = executor.submit(wait_function, 3, 4)
future2 = executor.submit(wait_function, 8, 8)
while True:
if(future.running()):
print("Task 1 running")
if(future2.running()):
print("Task 2 running")
if(future.done() and future2.done()):
print(future.result(), future2.result())
break
Por ahora, configure el
max_workers
primero. Ejecútelo y debería notar que las tareas no se ejecutan en paralelo. Ejecutará la primera tarea y luego la segunda tarea. Esto se debe principalmente a que solo tiene un trabajador en el grupo. Aumentemos el valor max_workers
a dos y debería poder ver que ambas tareas se ejecutan en paralelo.Función de devolución de llamada
Puede adjuntar una función de devolución de llamada al
Future
objeto. Llamará a la función adjunta una vez que se cancele o se complete la ejecución. Esto es extremadamente útil si tiene la intención de continuar con una actualización de la interfaz de usuario después de una conexión exitosa a la base de datos o de completar las solicitudes de URL. Vamos a crear una función de devolución de llamada simple por ahora:def callback_function(future):
print('Callback with the following result', future.result())
Agregue el siguiente código justo debajo de la
submit
función:future.add_done_callback(callback_function)
Echa un vistazo al código completo en GitHub :
El siguiente resultado se mostrará en la consola cuando ejecute el archivo Python:
from concurrent.futures import ThreadPoolExecutor
import time
def wait_function(x, y):
print('Task(', x,'multiply', y, ') started')
time.sleep(2)
print('Task(', x,'multiply', y, ') completed')
return x * y
def callback_function(future):
print('Callback with the following result', future.result())
with ThreadPoolExecutor(max_workers=1) as executor: #change max_workers to 2 and see the results
future = executor.submit(wait_function, 3, 4)
future.add_done_callback(callback_function)
future2 = executor.submit(wait_function, 8, 8)
while True:
if(future.running()):
print("Task 1 running")
if(future2.running()):
print("Task 2 running")
if(future.done() and future2.done()):
print(future.result(), future2.result())
break
2. ProcessPoolExecutor
La
ProcessPoolExecutor
clase funciona exactamente igual ThreadPoolExecutor
, pero con algunas diferencias menores. Utiliza el multiprocessing
módulo, que le permite esquivar el Global Interpreter Lock
. Sin embargo, esto también significa que solo los objetos seleccionables se pueden ejecutar y devolver.
Además, no funciona en un intérprete interactivo y debe tener una
__main__
función que los subprocesos de los trabajadores puedan importar. max_workers
será el número de procesos en la máquina. En el sistema operativo Windows, max_workers
debe ser igual o inferior a 61.
Tienes que importar el
ProcessPoolExecutor
para usarlo:from concurrent.futures import ProcessPoolExecutor
Puede reutilizar el código anterior y modificarlo en
ProcessPoolExecutor
lugar de ThreadPoolExecutor
. Envuelva el código dentro de una función y llámelo directamente desde __main__
. Consulte el código completo a continuación en GitHub :from concurrent.futures import ProcessPoolExecutor
import time
def wait_function(x, y):
print('Task(', x,'multiply', y, ') started')
time.sleep(2)
print('Task(', x,'multiply', y, ') completed')
return x * y
def callback_function(future):
print('Callback with the following result', future.result())
def main():
with ProcessPoolExecutor(max_workers=2) as executor:
future = executor.submit(wait_function, 3, 4)
future.add_done_callback(callback_function)
future2 = executor.submit(wait_function, 8, 8)
while True:
if(future.running()):
print("Task 1 running")
if(future2.running()):
print("Task 2 running")
if(future.done() and future2.done()):
print(future.result(), future2.result())
break
if __name__ == '__main__':
main()
3. Interrupción del teclado
Si tiene la intención de detener la ejecución
Ctrl+C
mientras el proceso se ejecuta en un hilo, el compilador probablemente se bloqueará y se atascará en la KeyboardInterupt
excepción. Esto se debe principalmente a que el Ctrl+C
comando genera SIGINT
, lo que no detendrá ni interrumpirá la ejecución. Necesita generar SIGBREAK
para finalizar la ejecución y volver al terminal. Use el siguiente comando para generar SIGBREAK
según el sistema operativo y el modelo de computadora:4. Conclusión
Recapitulemos lo que hemos aprendido hoy.
Comenzamos con una explicación simple del
concurrent.futures
módulo.
Después de eso, exploramos en profundidad la
ThreadPoolExecutor
clase básica y la Future
clase. Intentamos ejecutar múltiples tareas en paralelo con un número diferente de max_workers
. También probamos la configuración de una función de devolución de llamada que se ejecutará al completar la tarea.
Pasamos a la
ProcessPoolExecutor
, que es similar a la ThreadPoolExecutor
con algunas diferencias menores.
Gracias por leer esta pieza. ¡Espero verte de nuevo en el próximo artículo!
0 Comentarios