Python3: concurrent.futures achieves high concurrency.

 

From: https://www.cnblogs.com/weihengblog/p/9812110.html

concurrent. Official documents of Futures: https://docs.python.org/3/library/concurrent.futures.html

concurrent. Future: thread pool enables you to handle tasks more efficiently and concurrently: https://www.h3399.cn/201906/703751.html

 

 

Because of its global interpreter lock GIL, python cannot realize real parallel computing through threads. We will not expand this thesis, but there is a concept we want to explain,

IO intensive vs. compute intensive:

  1. IO intensive: read files and frequently read network sockets.
  2. Computation intensive: mathematical and logical operations that consume a lot of CPU, which is what we call parallel computing here.

And concurrent The futures module can use multiprocessing to realize real parallel computing.

The core principle is: concurrent Futures will run multiple python interpreters in parallel in the form of sub processes, so that python programs can use multi-core CPU to improve execution speed. Since child processes are separated from the main interpreter, their global interpreter locks are also independent of each other. Each sub process can fully use a CPU core.

 

 

Python module - concurrent futures

 

From Python 3 Starting from 2, the python # standard library provides concurrent The futures module provides developers with a high-level interface to start asynchronous tasks. It provides two classes, ThreadPoolExecutor and ProcessPoolExecutor, realizes higher-level abstraction of threading and multiprocessing, and provides direct support for writing thread pool / process pool. The corresponding tasks can be directly put into the thread pool / process pool. There is no need to maintain the Queue to worry about deadlock. The thread pool / process pool will automatically help us schedule.

Future summary

1. python3 Bring your own, python2 Installation required
 
2. Executer object
 
It is an abstract class. It provides methods for asynchronous execution. It cannot be used directly, but it can be used through its subclasses
 
ThreadPoolExecuter and ProcessPoolExecuter
 
2.1 Executer.submit(fn, *args, **kwargs)
 
fn: Functions that need to be executed asynchronously
 
*args,**kwargs fn Accepted parameters
 
The function of this method is to submit an executable callback task,It returns a Future object
 
2.2 map(fn, *iterables, timeout=None, chunksize=1)
 
map(task,URLS) # Returns a map() iterator in which the callback execution returns ordered results
 
3. Future Object related
 
future It can be understood as an operation to be completed in the future, which is the basis of asynchronous programming
 
Usually when we meet IO During operation, blocking will occur, cpu Can't do anything else
 
and future The introduction of helps us complete other operations during this waiting time
 
3.1 done():
 
If the current thread has been canceled/Succeeded, return True. 
 
3.2 cance():
 
If the current thread is executing and cannot cancel the call, return Flase. Otherwise, the call will be cancelled and return True
 
3.3 running():
 
Returns if the current thread is executing True
 
3.4 result():
 
Returns the value returned by the call. If the call has not been completed, this method waits
 
If the wait times out, a concurrent.futures.TimeoutError
 
If no timeout is specified, the wait is unlimited
 
If you cancel before you finish Future,Will trigger CancelledError
 
4. as_completed():
 
In multiple Future The iterator on the instance will be returned
 
these Future Instance by fs Generated on completion.
 
from fs Any duplicate returned Future,Will be returned once.
 
What is saved in it is all executed Future object
 
5. wait():
 
Returns a Yuanzu, which contains two elements
 
1. Completed future aggregate
 
2. Unfinished future aggregate

First experience:

  1. # coding=utf-8
     
    from concurrent import futures
     
    from concurrent.futures import Future
     
    import time
     
     
     
    def return_future(msg):
     
    time.sleep(3)
     
    return msg
     
     
     
     
     
    pool = futures.ThreadPoolExecutor(max_workers=2)
     
     
     
    t1 = pool.submit(return_future,'hello')
     
    t2 = pool.submit(return_future,'world')
     
     
     
    time.sleep(3)
     
    print(t1.done()) # If successful, return True
     
    time.sleep(3)
     
    print(t2.done())
     
     
     
    print(t1.result()) # obtain future Return value of
     
    time.sleep(3)
     
    print(t2.result())
     
     
     
    print("Main thread")

     

     
     

map(func,* iterables,timeout = None,chunksize = 1 )

# coding=utf-8
 
 
 
import time
 
from concurrent.futures import Future,as_completed
 
from concurrent.futures import ThreadPoolExecutor as Pool
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
 
 
pool = Pool()
 
result = pool.map(task,URLS)
 
 
 
start_time = time.time()
 
 
 
# according to URLS Return in order
 
for res in result:
 
print("{} {}".format(res.url,len(res.content)))
 
 
 
# Disordered
 
with Pool(max_workers=3) as executer:
 
future_task = [executer.submit(task,url) for url in URLS]
 
 
 
for f in as_completed(future_task):
 
if f.done():
 
f_ret = f.result() # f.result()obtain task Return value of, requests object
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
 
 
print("time consuming", time.time() - start_time)
 
print("Main thread")

 

 

Future object

Future can be understood as an operation to be completed in the future
When we perform io operations, blocking occurs before waiting for the returned results
The cpu can't do other things, and the introduction of Future helps us complete other operations during the waiting period

from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
# start_time = time.time()
 
# for url in URLS:
 
# ret = task(url)
 
# print("{} {}".format(ret.url,len(ret.content)))
 
# print("time consuming",time.time() - start_time)
 
with Pool(max_workers=3) as executor:
 
# establish future task
 
future_task = [executor.submit(task,url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s is running"%str(f))
 
 
 
for f in as_completed(future_task):
 
try:
 
ret = f.done()
 
if ret:
 
f_ret = f.result()
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
except Exception as e:
 
f.cance()
 
print(e)
 
 
 
"""
 
url If it is not returned in order, it means that when accessing a url If no result is returned, blocking will not occur
 
<Future at 0x1c63990e6d8 state=running> is running
 
<Future at 0x1c639922780 state=running> is running
 
<Future at 0x1c639922d30 state=running> is running
 
<Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
 
<Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101
 
<Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103
 
"""

 

 

Module method

 

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait() will return a tuple, which will contain two sets: the completed set and the unfinished set. Using wait() will gain more freedom. It accepts three parameters: FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETE. The default is ALL_COMPLETE.

If the default all is used_ Completed, the program will block until all tasks in the thread pool are completed, and then execute the main thread:

 

from concurrent.futures import Future
 
from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed, wait
 
import requests
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
 
 
def task(url, timeout=10):
 
r = requests.get(url=url, timeout=timeout)
 
print(r.status_code)
 
 
 
 
 
with Pool(max_workers=3) as execute:
 
future_task = [execute.submit(task, url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s" % (str(f)))
 
 
 
"""
 
also wait also timeout and return_when Two parameters
 
return_when There are three constants (Default is ALL_COMPLETED)
 
FIRST_COMPLETED Any one future_task When execution is complete/When canceling, change the function to return
 
FIRST_EXCEPTION Any one future_task When an exception occurs, the function returns. If no exception occurs, it is equivalent to ALL_COMPLETED
 
ALL_COMPLETED When all future_task Return after execution.
 
"""
 
results = wait(future_task, return_when="FIRST_COMPLETED") #
 
done = results[0]
 
for d in done:
 
print(d)

 

 

 

 

concurrent.futures.as_completed(fs, timeout=None)

Iterators on multiple Future instances will be returned, which are generated by fs completion. Any duplicate Future returned by fs will be returned once. All the Future objects saved in it are the Future objects that have been executed.

from concurrent.futures import ThreadPoolExecutor as Pool
 
from concurrent.futures import as_completed
 
import requests
 
import time
 
 
 
URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
 
 
 
def task(url,timeout=10):
 
return requests.get(url=url,timeout=timeout)
 
 
 
with Pool(max_workers=3) as executor:
 
# establish future task
 
future_task = [executor.submit(task,url) for url in URLS]
 
 
 
for f in future_task:
 
if f.running():
 
print("%s is running"%str(f))
 
 
 
for f in as_completed(future_task):
 
try:
 
ret = f.done()
 
if ret:
 
f_ret = f.result()
 
print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
 
except Exception as e:
 
f.cance()
 
print(e)

 

Next, we will learn about concurrent Classes in the futures} module. concurrent. The basic modules of futures are executor and future.

Use sample code:

# -*- coding:utf-8 -*-
 
 
 
import redis
 
from redis import WatchError
 
from concurrent.futures import ProcessPoolExecutor
 
 
 
r = redis.Redis(host='127.0.0.1', port=6379)
 
 
 
 
 
# Inventory reduction function, Cycle until inventory reduction is completed
 
# keep a large stock of goods, Inventory reduction succeeded, return True
 
# Insufficient inventory, Inventory reduction failed, return False
 
 
 
def reduce_stock():
 
 
 
# python in redis Transaction is through pipeline Implementation of encapsulation
 
with r.pipeline() as pipe:
 
while True:
 
try:
 
# watch Stock key, multi Later if key Changed by other clients, Transaction operation will throw WatchError abnormal
 
pipe.watch('stock:count')
 
count = int(pipe.get('stock:count'))
 
if count > 0: # Stock
 
# Transaction start
 
pipe.multi()
 
pipe.decr('stock:count')
 
# Push the order over
 
# execute Returns the list of command execution results, There's only one here decr Returns the current value
 
print(pipe.execute()[0])
 
return True
 
else:
 
return False
 
except WatchError as ex:
 
# Print WatchError abnormal, Observation quilt watch Locked condition
 
print(ex)
 
pipe.unwatch()
 
 
 
 
 
def worker():
 
while True:
 
# Exit without inventory
 
if not reduce_stock():
 
break
 
 
 
 
 
if __name__ == "__main__":
 
# Set inventory to 100
 
r.set("stock:count", 100)
 
 
 
# Multiple processes simulate multiple client submissions
 
with ProcessPoolExecutor() as pool:
 
for _ in range(10):
 
pool.submit(worker)

 

 

 

concurrent. Detailed explanation of futures module

 

1. Executor object

class concurrent.futures.Executor

Executor is an abstract class that provides methods to execute calls asynchronously. It cannot be used directly, but it can be called through its two subclasses ThreadPoolExecutor or ProcessPoolExecutor.

 

1.1 Executor.submit(fn, *args, **kwargs)

fn: functions that need to be executed asynchronously
*args, **kwargs: parameters of fn

Example code:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
 
future = executor.submit(test, 1)
 
print(future.result())

 

1.2 Executor.map(func, *iterables, timeout=None)

Equivalent to map(func, *iterables), but func is executed asynchronously. The value of timeout can be int or float. If the operation times out, raisesTimeoutError will be returned; If the timeout parameter is not specified, the timeout is not set.

func: functions that need to be executed asynchronously
*Iteratables: iteratable objects, such as lists. Every time func executes, parameters will be taken from iterables.
Timeout: sets the timeout of each asynchronous operation

Example code:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
data = [1, 2, 3]
 
with futures.ThreadPoolExecutor(max_workers=1) as executor:
 
for future in executor.map(test, data):
 
print(future)

 

1.3 Executor.shutdown(wait=True)

Release system resources in the executor Submit() or executor Called after an asynchronous operation such as map (). Use the with statement to avoid explicitly calling this method.

 

2. ThreadPoolExecutor object

ThreadPoolExecutor class is a subclass of Executor, which uses thread pool to execute asynchronous calls

class concurrent.futures.ThreadPoolExecutor(max_workers), using max_ The thread pool with the number of workers performs asynchronous calls

 

3. ProcessPoolExecutor object

ThreadPoolExecutor class is a subclass of Executor, which uses process pool to execute asynchronous calls

class concurrent.futures.ProcessPoolExecutor(max_workers=None), using max_ The process pool with the number of workers performs asynchronous calls if max_ If workers is None, the number of processors of the machine is used (for example, if max_worker of a 4-core machine is configured as None, four processes are used for asynchronous concurrency).

Example code:

# -*- coding:utf-8 -*-
 
from concurrent import futures
 
 
 
 
 
def test(num):
 
import time
 
return time.ctime(), num
 
 
 
 
 
def muti_exec(m, n):
 
# m Concurrent times
 
# n Number of runs
 
 
 
with futures.ProcessPoolExecutor(max_workers=m) as executor: # Multi process
 
# with futures.ThreadPoolExecutor(max_workers=m) as executor: #Multithreading
 
executor_dict = dict((executor.submit(test, times), times) for times in range(m * n))
 
 
 
for future in futures.as_completed(executor_dict):
 
times = executor_dict[future]
 
if future.exception() is not None:
 
print('%r generated an exception: %s' % (times, future.exception()))
 
else:
 
print('RunTimes:%d,Res:%s' % (times, future.result()))
 
 
 
 
 
if __name__ == '__main__':
 
muti_exec(5, 1)

 

 

Scheduling a single task

 

The Executor class Executor schedules a single task, uses the submit() function, and then waits for the task result with the returned Future instance.

The Executor is a python concurrent Abstract class of futures module. It cannot be used directly. We need to use one of the following specific subclasses-

  • ThreadPoolExecutor: thread pool
  • ProcessPoolExecutor: process pool

 

Example code:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
future = executor.submit(task, 5)
 
print('future: {}'.format(future))
 
result = future.result()
 
print('result: {}'.format(result))

 

 

Thread pool and process pool

 

  1. ThreadPoolExecutor is one of the concrete subclasses of the} Executor class. Subclasses use multithreading, and we get a thread pool for submitting tasks. The pool assigns tasks to available threads and schedules them to run.
  2. ProcessPoolExecutor is one of the concrete subclasses of the Executor class. It uses multiprocessing, and we get a pool of processes that submit tasks. This pool assigns tasks to available processes and schedules them to run.

 

How to create a ThreadPoolExecutor or ProcessPoolExecutor?

In concurrent With the help of the Future module and its specific subclass Executor, you can easily create a thread pool or process pool. We need to use the number of threads in the pool we want to construct a ThreadPoolExecutor or {ProcessPoolExecutor. By default, the number is 5. You can then submit a task to the thread pool or process pool. When the submit() task, the Future object will be returned. The Future object has a method called done(), which tells Future whether it has been resolved. With this, a value is set for this specific Future object. When the task is completed, the thread pool Executor sets this value to the object of Future.

Thread pool example code:

from concurrent.futures import ThreadPoolExecutor
 
from time import sleep
 
 
 
 
 
def task(message):
 
sleep(2)
 
return message
 
 
 
 
 
def main():
 
executor = ThreadPoolExecutor(5)
 
future = executor.submit(task, "Completed")
 
print(future.done())
 
sleep(2)
 
print(future.done())
 
print(future.result())
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

Result screenshot:

In the above example, a ThreadPoolExecutor has been constructed from five threads. Then, the task waiting for 2 seconds before providing the message is submitted to the thread pool executor. As you can see from the output, the task does not complete until 2 seconds, so the first call to done() will return False. After 2 seconds, the task is completed. We call the result() method to get the future result.

Process pool example code:

from concurrent.futures import ProcessPoolExecutor
 
from time import sleep
 
 
 
 
 
def task(message):
 
sleep(2)
 
return message
 
 
 
 
 
def main():
 
executor = ProcessPoolExecutor(5)
 
future = executor.submit(task, ("Completed"))
 
print(future.done())
 
sleep(2)
 
print(future.done())
 
print(future.result())
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

Instantiate ThreadPoolExecutor or the context manager of ProcessPoolExecutor
Another way to complete the instantiation is with the help of poolexecutor. It works similarly to the method used in the previous example. The main advantage of using context manager is that it looks good in syntax. Instantiation can be done with the help of the following code

with ThreadPoolExecutor(max_workers = 5) as executor
 
perhaps
 
with ProcessPoolExecutor(max_workers = 5) as executor

 

Examples

The following example is borrowed from a Python document. In this example, you must first import {concurrent Futures module. Then create a file called load_url(), which will load the requested url. The function then creates a ThreadPoolExecutor with five threads in the pool. Threadpoolexecutor() has been used as a context manager. We can call the result() method to get the result of the future.

import concurrent.futures
 
import urllib.request
 
 
 
URLS = [
 
'http://www.foxnews.com/',
 
'https://www.yiibai.com/',
 
'http://europe.wsj.com/',
 
'http://www.bbc.co.uk/',
 
'http://some-made-up-domain.com/'
 
]
 
 
 
 
 
def load_url(url, timeout):
 
with urllib.request.urlopen(url, timeout=timeout) as conn:
 
return conn.read()
 
 
 
 
 
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
 
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
 
for future in concurrent.futures.as_completed(future_to_url):
 
url = future_to_url[future]
 
try:
 
data = future.result()
 
except Exception as exc:
 
print('%r generated an exception: %s' % (url, exc))
 
else:
 
print('%r page is %d bytes' % (url, len(data)))

 

The following will be the output of the Python script above-

  1.  
    'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
  2.  
    'http://www.foxnews.com/' page is 229313 bytes
  3.  
    'http://www.yiibai.com/' page is 168933 bytes
  4.  
    'http://www.bbc.co.uk/' page is 283893 bytes
  5.  
    'http://europe.wsj.com/' page is 938109 bytes

Process pool:

import concurrent.futures
 
from concurrent.futures import ProcessPoolExecutor
 
import urllib.request
 
 
 
URLS = ['http://www.foxnews.com/',
 
'http://www.cnn.com/',
 
'http://europe.wsj.com/',
 
'http://www.bbc.co.uk/',
 
'http://some-made-up-domain.com/']
 
 
 
 
 
def load_url(url, timeout):
 
with urllib.request.urlopen(url, timeout=timeout) as conn:
 
return conn.read()
 
 
 
 
 
def main():
 
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
 
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
 
for future in concurrent.futures.as_completed(future_to_url):
 
url = future_to_url[future]
 
try:
 
data = future.result()
 
except Exception as exc:
 
print('%r generated an exception: %s' % (url, exc))
 
else:
 
print('%r page is %d bytes' % (url, len(data)))
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

 

Use map() to schedule multiple tasks and return in order

 

Using map(), multiple worker s take data from the input iterator concurrently, process it, and then return the results in order.

Example code:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
results = executor.map(task, range(1, 10))
 
print('unprocessed results: {}'.format(results))
 
real_results = list(results)
 
print('real results: {}'.format(real_results))

 

Using the executor Map() function

The Python map() function is widely used for many tasks. One such task is to apply a function to each element within the iteration. Similarly, you can map all elements of the iterator to a function and submit them outside the ThreadPoolExecutor as a stand-alone job. Consider the following Python script example to understand how functions work.

Examples
In the following example, the map function is used to apply the square() function to each value in the values array.

from concurrent.futures import ThreadPoolExecutor
 
from concurrent.futures import as_completed
 
 
 
values = [2, 3, 4, 5]
 
 
 
 
 
def square(n):
 
return n * n
 
 
 
 
 
def main():
 
with ThreadPoolExecutor(max_workers=3) as executor:
 
results = executor.map(square, values)
 
for result in results:
 
print(result)
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

The following will be the output of the Python script above:

Process pool:

from concurrent.futures import ProcessPoolExecutor
 
from concurrent.futures import as_completed
 
 
 
values = [2, 3, 4, 5]
 
 
 
 
 
def square(n):
 
return n * n
 
 
 
 
 
def main():
 
with ProcessPoolExecutor(max_workers=3) as executor:
 
results = executor.map(square, values)
 
for result in results:
 
print(result)
 
 
 
 
 
if __name__ == '__main__':
 
main()

 

 

Multitask scheduling, unordered return

 

Continuously submit the task to the executor, return the future list, and use as_completed unordered produces the results of each task.

Example code:

from concurrent import futures
 
import time
 
import random
 
 
 
 
 
def task(n):
 
time.sleep(random.randint(1, 10))
 
return n
 
 
 
 
 
executor = futures.ThreadPoolExecutor(max_workers=3)
 
future_list = [executor.submit(task, i) for i in range(1, 10)]
 
for f in futures.as_completed(future_list):
 
print(f.result())

 

When to use ProcessPoolExecutor and ThreadPoolExecutor?

Now that we have learned two Executor classes - ThreadPoolExecutor and ProcessPoolExecutor, we need to know when to use which Executor. You need to select ProcessPoolExecutor for CPU limited workloads and ThreadPoolExecutor for I/O limited workloads.

If you use ProcessPoolExecutor, you don't need to worry about GIL because it uses multiprocessing. Moreover, the execution time is less than ThreadPoolExecution.

 

Reprint: https://blog.csdn.net/freeking101/article/details/97395745

 

 

Tags: Python

Posted by show8bbs on Sat, 07 May 2022 10:33:26 +0300