并发可以被认为是同时发生的多件事。
想象您正在举办晚宴,您需要同时烹饪多盘以服务于所有客人。一种做到这一点的方法是彼此烹饪每道菜,但这需要大量时间,您的客人必须等待很长时间才能吃饭。
解决此问题的更好方法是在炉子上使用多个燃烧器并同时烹饪多种菜肴。每个燃烧器代表一个不同的任务,您可以使用它们同时烹饪不同的菜肴。这样,您可以在较短的时间内完成所有烹饪所有菜肴,您的客人可以一起用餐。
燃烧器表示并发任务,菜肴代表同时执行的不同过程或线程。这使您可以同时完成多个任务,提高效率并减少完成所有任务所需的总时间。
在编程中,并发是同时独立于彼此运行的多个线程或过程的概念。这允许有效地利用资源,例如CPU和内存,并可能导致更快,动态的应用程序。
有两种主要类型:
- 并行性:指使用多个处理器或内核同时执行多个任务。并行性使系统能够通过将任务分为可以同时执行的较小的独立子任务来在较短的时间内完成一项大型任务。
- 异步:指系统以非阻滞方式处理多个任务的能力。这意味着系统可以在等待特定任务完成时继续处理其他任务。
可以通过多种方式实现并发,例如使用线程,进程和事件驱动的编程。但是,如果无法正确处理,它还可以引入复杂性和种族条件和其他同步问题的潜力。
自从语言早期以来,Python的并发概念就存在。但是,随着时间的推移,实现并发的特定机制已经发展。
在Python的早期版本中,引入了线程模块,该模块为创建和管理线程提供了基本的线程API。但是,Python的Cpython实施中的全局解释器锁(GIL)是最广泛使用的实现,它通过防止多个线程同时执行Python字节码来限制线程的性能好处。
。。在python 2.4中,引入了同步模块,该模块提供了用于使用线程和过程的高级API。该模块使编写并发代码变得更容易,但是它仍然受到GIL的局限性。
在python 3.2中,添加了模块,并添加了同步。futures.futures.processpoolexecutor类,这允许通过使用多个过程而不是线程。
允许进行真实的并行性。Python 3.4引入了Asyncio库,该库允许您使用ASYNC/等待语法编写并发代码。该库提供了一种编写异步代码的方法,该代码可以同时运行而无需线程或进程。
在最新版本的python(3.8+)中,conturrent.futures.threadpoolexecutor类更新以支持使用本机线程启动的使用和引入线程。Barrier类,可用于同步线程的执行。
总的来说,Python中的并发历史已经逐步发展了可用的并发性机制,重点是提供更高级别的抽象,并使其更容易编写并发代码。
线程
python中的线程是一种通过在单个过程中创建和管理多个线程来实现并发的方法。每个线程都有自己的执行上下文,其中包括其自己的调用堆栈和程序计数器,但在过程中共享与其他线程相同的内存空间。这允许有效利用资源(例如CPU和内存),并可能导致更快,动态的应用程序。
Python中的螺纹模块提供了用于创建和管理线程的基本线程API。该模块中的主要类是:
- 线程:此类代表执行的单个线程。您可以通过实例化此类并将可可的对象(例如函数或方法)作为目标创建新线程。线程启动后,目标函数将在新线程中执行。
- 锁定:此类提供了一种同步线程之间对共享资源的访问的方法。锁在两个状态之一,“锁定”或“解锁”。一次只有一个线程可以容纳一个锁。
- rlock:此类提供了一种同步线程之间对共享资源的访问的方法。 rlock在两个状态之一,“锁定”或“解锁”。可以通过同一线程多次获取rlock,并且只有在同一线程发布了多次时,它才能解锁。
- 信号:此类提供了一种同步线程之间对共享资源的访问的方法。信号量是一个非负值和线程之间共享的值。每次线程获取信号量时,该值会减小一个,并每次线程释放信号量时增加一个。
- 障碍:此类提供了一种同步线程执行的方法。屏障是一个同步对象,允许多个线程互相等待在继续之前达到执行中的某个点。
除了螺纹模块外,Python还提供了并发。FUTURES模块,该模块提供了用于使用线程和过程的高级API。该模块包括以下类:
- threadpoolexecutor:此类用于在线程池中执行可可的对象。线程池是使用固定数量的工作线程创建的,每个工作线程都负责一次执行单个任务。
- ProcessPoolExecutor:此类用于在进程池中执行可可的对象。该过程池是使用固定数量的工作流程创建的,每个工作过程都负责一次执行单个任务。
让我看一些例子
import threading
import time
def worker():
"""thread worker function"""
print('Worker')
time.sleep(5)
print('Worker finished')
# create a new thread
t = threading.Thread(target=worker)
# start the thread
t.start()
# main thread continues execution
print('Main thread')
# wait for the worker thread to complete
t.join()
# main thread continues execution after worker thread completes
print('Main thread finished')
在上面的示例中,我们定义了一个工作函数,该功能通过睡觉5秒来模拟长期运行的任务。然后,我们创建一个新线程,并将工作函数分配为其目标。然后在线程上调用start()方法以启动其执行。主线程继续执行并打印“主线程”。 join()方法在线程上调用,以等待工作线程完成。工作线程完成后,主线程将继续执行并打印“主线程完成”。
请注意,JOIN()方法用于等待工作线程在主线程继续执行之前完成。同样重要的是要注意,sleep(5)用于模拟长期运行的任务并使线程示例更加清晰。
除了线程类外,线程模块还提供了其他类,例如锁定,rlock,信号量和障碍物,可用于同步线程的执行并确保以安全和控制的方式访问共享资源。
例如:
import threading
counter = 0
def increment():
global counter
lock.acquire()
counter += 1
lock.release()
lock = threading.Lock()
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(counter)
此示例使用锁定对象来同步对共享变量计数器的访问。 Acceire()方法用于获取锁定,并使用Release()方法来释放它。这样可以确保只有一个线程一次可以一次访问共享变量,从而防止种族条件和其他同步问题。
重要的是要注意,线程可能很棘手,因此熟悉Python中的线程模块和全局解释器锁(GIL)很重要,这可能会影响多线程程序的性能。
。更多提前的例子
import threading
import queue
import time
class DataFetcherThread(threading.Thread):
"""Thread class for fetching data from a remote API"""
def __init__ (self, queue, api_url):
threading.Thread. __init__ (self)
self.queue = queue
self.api_url = api_url
def run(self):
while True:
# fetch data from the API
data = fetch_data_from_api(self.api_url)
# put the data in the queue
self.queue.put(data)
# sleep for a short period of time
time.sleep(5)
class DataProcessorThread(threading.Thread):
"""Thread class for processing data"""
def __init__ (self, queue):
threading.Thread. __init__ (self)
self.queue = queue
def run(self):
while True:
# get data from the queue
data = self.queue.get()
# process the data
processed_data = process_data(data)
# do something with the processed data
save_to_database(processed_data)
# signal that the data has been processed
self.queue.task_done()
def fetch_data_from_api(api_url):
"""Function to fetch data from a remote API"""
# code to fetch data from the API
pass
def process_data(data):
"""Function to process data"""
# code to process data
pass
def save_to_database(processed_data):
"""Function to save data to a database"""
# code to save data to the database
pass
# create a queue to hold the data
data_queue = queue.Queue()
# create the fetcher and processor threads
fetcher = DataFetcherThread(data_queue, 'https://example.com/api')
processor = DataProcessorThread(data_queue)
# start the threads
fetcher.start()
processor.start()
# wait for the threads to complete
data_queue.join()
在上面的示例中,我们有两个线程类:datafetcherthread和dataprocessorthread。 DataFetcherThread类负责从远程API获取数据并将其放在队列中,而DataProcessorthRead类负责从队列获取数据,处理并将其保存到数据库中。
> 。dataFetcherThread类连续从API获取数据并将其放在队列中,而DataProcessorthRead类则连续从队列中获取数据,对其进行处理并将其保存到数据库中。
>我们使用队列对象保存数据,以确保以线程安全的方式访问数据,以防止种族条件和其他同步问题。
fetch_data_from_api,process_data和save_to_database是模拟现实世界情景的函数,其中从API获取数据,处理并保存到数据库中。
>让我们使用rlock,信号量和障碍来实现Python的并发:
import threading
import time
counter = 0
def increment():
global counter
lock.acquire()
counter += 1
lock.release()
# Create a reentrant lock
lock = threading.RLock()
# Create a semaphore with a maximum capacity of 3
semaphore = threading.Semaphore(3)
# Create a barrier with a capacity of 4
barrier = threading.Barrier(4)
def worker1():
"""Thread worker function 1"""
global counter
semaphore.acquire()
increment()
print('Worker 1:', counter)
time.sleep(1)
semaphore.release()
barrier.wait()
def worker2():
"""Thread worker function 2"""
global counter
semaphore.acquire()
increment()
print('Worker 2:', counter)
time.sleep(1)
semaphore.release()
barrier.wait()
def worker3():
"""Thread worker function 3"""
global counter
semaphore.acquire()
increment()
print('Worker 3:', counter)
time.sleep(1)
semaphore.release()
barrier.wait()
def worker4():
"""Thread worker function 4"""
global counter
semaphore.acquire()
increment()
print('Worker 4:', counter)
time.sleep(1)
semaphore.release()
barrier.wait()
# create 4 threads
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t3 = threading.Thread(target=worker3)
t4 = threading.Thread(target=worker4)
# start the threads
t1.start()
t2.start()
t3.start()
t4.start()
# wait for the threads to complete
t1.join()
t2.join()
t3.join()
t4.join()
print('Counter:', counter)
在上面的示例中,我们有四个线程Worker函数Worker1,Worker2,Worker3和Worker4可以增加全局变量计数器并打印其值。对全局变量的访问受到RLOCK类的一个实例。
的实例保护。此示例说明了Rlock,Semaphore和屏障如何在更复杂的情况下使用并同步线程的执行。 Rlock类用于保护全局变量计数器免受种族条件和其他同步问题的影响,而信号量类则用于控制可以同时访问全局变量的最大线程数。障碍类用于同步线程的执行,并确保在继续执行之前,它们都达到执行中的一定点。
重要的是要注意,在此示例中,使用RLOCK确保线程可以多次获取锁定,并且只有在相同的线程将其释放到与获取的次数多次时,它将被解锁。在线程需要多次获取锁定的情况下,这很有用,例如,如果需要多次访问受保护的资源。
信号量的使用使您可以控制可以同时访问受保护资源的线程数,确保受保护资源不会被太多线程超载。
屏障的使用使您可以同步多个线程的执行,确保线程正在一起工作,而不会彼此干扰。
总体而言,使用RLOCK,Semaphore和Compination中的障碍物,您可以通过在Python程序中同时执行线程以及对共享资源的访问获得更复杂和细粒度的控制。
并发
conturrent.Futures是Python标准库中的一个模块,它为使用线程和过程提供了更高级别的API。它是在Python 2.4中引入的,并在以后的版本中进行了改进。该模块提供了两个主要类,以实现并发:threadpoolexecutor和processPoolExecutor。
threadPoolExecutor用于在线程池中执行可可的对象,而ProcessPoolExecutor用于在进程池中执行可呼叫对象。这两个类的工作方式都相似,但是ProcessPoolExecutor允许通过使用多个进程而不是线程进行真正的并行性,在运行CPU结合任务时,这可能更有效。
这是使用threadpoolexecutor在python中实现并发的一个示例:
import concurrent.futures
def long_running_task(n):
return n*n
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(long_running_task, i) for i in range(10)]
for f in concurrent.futures.as_completed(results):
print(f.result())
在上面的示例中,我们定义一个函数long_running_task,该功能通过执行简单的计算(平方数字)来模拟长期运行的任务。然后,我们使用ThreadPoolExecutor将此功能作为任务提交,以同时执行一系列输入。提交方法返回一个未来的对象,该对象可用于跟踪任务的进度并检索结果。 AS_COMPLETED函数用于完成并打印结果时迭代未来对象。
值得注意的是,ThreadPoolExecutor创建了一个工作线程池,并将任务分配给池中的可用线程。可以通过将最大线程数作为参数传递给构造函数来定义池大小。如果任务数量超过池大小,则将任务排队直到线程可用为止。
同样,这里是使用ProcessPoolExecutor在Python中实现并发的一个例子:
import concurrent.futures
def long_running_task(n):
return n*n
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(long_running_task, i) for i in range(10)]
for f in concurrent.futures.as_completed(results):
print(f.result())
在这里,我们使用ProcessPoolExecutor而不是ThreadPoolExecutor来提交long_running_task函数以同时执行。其余代码保持不变。通过使用ProcessPoolExecutor,我们可以利用多个过程的真实并行性来运行CPU结合的任务,这比使用线程更有效。
值得注意的是,在使用ProcessPoolExecutor时,任务是在单独的Python进程中执行的,这使他们可以并行运行并充分利用多个CPU内核。但是,这也意味着任务不能共享内存,并且必须使用过程间通信(IPC)机制共享数据,这可能比线程之间共享内存更复杂,效率更低。
要注意的另一件事是,使用ProcessPoolExecutor,任务函数和传递给它的数据应该是可挑选的,这意味着该函数和数据应能够序列化和估算化。
总体而言,同意。Futures模块通过使用线程池和过程池提供了一种简单而有力的方法来实现Python的并发。它抽象了许多使用线程和流程的低级细节,并为同时执行可呼叫对象提供了更方便的API。
让我看更多预先的例子
在下面的示例中,我们有一个函数long_running_task,该功能通过睡觉1秒来模拟长期运行的任务。我们还有一个回调函数callback_function,该函数在完成任务时被调用。
我们使用ProcessPoolExecutor提交long_running_task函数要在单独的进程中同时执行,并使用add_done_callback方法在完成任务时登记呼叫callback_function。
> >然后,我们以类似的方式使用threadpoolexecutor,但是这次任务是在线程池中同时执行的,而不是单独的进程。
在这两种情况下,我们都可以看到主过程的过程ID(PID),任务和结果回调函数。这使我们能够查看哪个过程正在运行哪个任务,以及哪个过程正在生成结果。
值得注意的是,我们还可以将进程数或线程传递给执行程序构造函数,例如processPoolExecutor(max_workers = 4)或threadpoolexecutor(max_workers = 4),它将相应地限制池大小。
使用回调函数的另一个优点是,它允许我们对结果执行其他处理,例如将结果保存到数据库或将其发送到另一个服务而不阻止主过程。
import concurrent.futures
import os
import time
def long_running_task(n):
print(f"Task {n} is running on process {os.getpid()}")
time.sleep(1)
return n * n
def callback_function(future):
result = future.result()
print(f"Result {result} was generated by process {os.getpid()}")
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(long_running_task, i) for i in range(10)]
# Add a callback to be called when a future is done
for result in results:
result.add_done_callback(callback_function)
print(f"Main process {os.getpid()}")
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(long_running_task, i) for i in range(10)]
# Add a callback to be called when a future is done
for result in results:
result.add_done_callback(callback_function)
print(f"Main process {os.getpid()}")
异步
asyncio是Python标准库中的一个模块,它提供了使用异步编程范式编写并发代码的框架。它是在Python 3.4中引入的,并在以后的版本中进行了改进。
Asyncio使用Coroutines的概念和事件循环来实现并发。 Coroutines是可以暂停和恢复的特殊功能类型,可以同时执行多个任务,而无需线程或进程。事件循环是安排和运行Coroutines的中心机制。
这是使用Asyncio在Python中实现并发的一个例子:
import asyncio
async def long_running_task(n):
print(f"Task {n} started")
await asyncio.sleep(1)
print(f"Task {n} finished")
return n * n
async def main():
tasks = [long_running_task(i) for i in range(10)]
completed, pending = await asyncio.wait(tasks)
for task in completed:
print(task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上面的示例中,我们有一个异步函数long_running_task,该函数通过使用等待asyncio.sleep(1)来模拟长期运行的任务。此功能以异步关键字装饰,可以暂停并恢复。
主函数也是一个异步函数,它创建了long_running_task coroutines的列表,并使用asyncio.Erewait函数来安排它们进行执行。 Wait功能返回两组Coroutines,一组用于完成任务,另一组用于待处理任务。然后,我们迭代完成的任务,并使用任务对象的结果()方法打印结果。
使用asyncio.get_event_loop()函数创建事件循环,并使用loop.run_until_complete()方法运行主函数。此方法运行事件循环,直到所有计划的任务完成为止。
值得注意的是,Asyncio允许我们通过使用异步和等待关键字来编写类似于同步代码的并发代码。这使其比使用线程或进程的代码更加直观,更易于阅读和理解。
这是另一个示例,这次使用asyncio.gather:
import asyncio
async def long_running_task(n):
print(f"Task {n} started")
await asyncio.sleep(1)
print(f"Task {n} finished")
return n * n
async def main():
results = await asyncio.gather(*(long_running_task(i) for i in range(10)))
for result in results:
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上面的示例中,我们使用asyncio.gather函数来安排和同时运行任务,并将结果返回作为已完成任务的列表。
在这两个示例中,Asyncio提供了一种简单而有力的方法,通过使用coroutines和一个事件循环来实现Python的并发,并且与使用线程或过程相比,它允许使用更优雅,更可读的代码。
更多的预示示例
import asyncio
async def long_running_task(n):
print(f"Task {n} started")
await asyncio.sleep(1)
print(f"Task {n} finished")
return n * n
async def main():
semaphore = asyncio.Semaphore(3)
tasks = [long_running_task_with_semaphore(semaphore, i) for i in range(10)]
completed, pending = await asyncio.wait(tasks)
for task in completed:
print(task.result())
async def long_running_task_with_semaphore(semaphore, n):
async with semaphore:
return await long_running_task(n)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在这里,我们有一个信号量信号量,该信号量可以控制可以同时执行的最大任务数。我们使用asyncio.semaphore类来创建信号量并传递最大容量3。
然后,我们创建一个新的async函数long_running_task_with_semaphore,该函数将信号量和任务输入作为参数。此函数使用带有语句的异步来在运行long_running_task函数之前获取信号量,并且在完成任务后会自动发布信号量。
在主函数中,我们创建一个long_running_task_with_semaphore coroutines的列表,并使用asyncio.Ethewait函数安排它们进行执行。 Wait功能返回两组Coroutines,一组用于完成任务,另一组用于待处理任务。然后,我们迭代完成的任务,并使用任务对象的结果()方法打印结果。
使用asyncio.get_event_loop()函数创建事件循环,并使用loop.run_until_complete()方法运行主函数。此方法运行事件循环,直到所有计划的任务完成为止。
在上面的示例中,我们可以看到如何使用Asyncio的信号量类别来控制并发任务的数量,通过限制信号量令牌的数量,在这种情况下,只允许3个任务同时运行。这是控制并发并避免超载系统的更复杂的方法。
总的来说,所有这些方法都提供了不同的方法来实现Python的并发性,每种方法都有自己的优势和缺点。