生产者 - 消费者并发设计模式是并发编程中广泛使用的方法,生产者生成数据并消费者消耗数据。这些组件并行运行,并且使用缓冲区或队列之间传递数据。
在本文中,我们演示了如何使用asyncio coroutines实现生产者 - 消费者并发设计模式,并提供了一个实际示例,可以在SSH连接上监视远程计算机上的日志文件。我们首先使用yield
语句实现生产者 - 消费者模式,然后修改代码以使用队列进行互通信。
注意:例如,我们要求无密码的sudo
在监视nginx
日志文件时在远程主机(服务器)上工作,这是不可公开访问的。
使用yield
语句
from fabric import Connection
import asyncio
import datetime
import time
host = '1337-demo-host.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5
async def producer():
conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
with conn.cd('/tmp'):
tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
old_data = ()
while True:
channel = conn.run(command=tail_cmd, hide='both')
data = channel.stdout.splitlines()
if old_data != data:
old_d=set(old_data)
delta_data = [x for x in data if x not in old_d]
for d in delta_data:
yield str(d)
old_data = data.copy()
data.clear()
time.sleep(delay)
conn.close()
async def consumer():
p=producer()
async for line in p:
dt = datetime.datetime.fromtimestamp(time.time())
print (f"TS: {dt}\n{line}", end = '\n', flush = True)
async def main():
await asyncio.gather(consumer())
if __name__ == '__main__':
asyncio.run(main())
为了使脚本异步,我们使用asyncio库。 Asyncio模块提供了基于Coroutines的高级并发框架,这使我们能够编写看起来同步的异步代码。使用async
和await
关键字实现coroutines。
在我们的脚本中,生产者Coroutine通过通过SSH连接在远程计算机上执行tail
命令来生成数据。消费者Coroutine通过打印新日志线和时间戳来消耗数据。 yield
语句在生产者Coroutine中用于生成新的日志线,这些日志线被消费者Coroutine消费。
asyncio.gather()
方法用于同时运行两种Coroutines,使消费者Coroutine可以以非阻滞方式消耗生产者Coroutine生成的数据。 await
关键字用于等待Coroutines完成。
关于织物
我们使用织物建立了与远程计算机的SSH连接,执行远程命令并检索其输出。面料是一个Python库,简化了多个机器上的SSH连接和远程执行,为常见任务提供了更高级别的API。
有关织物的更多信息可以在project's website中找到。
使用asyncio.queue()
在此版本中,我们用asyncio.Queue()
对象替换了用于互通信的asyncio.Queue()
对象。
和以前一样,生产者Coroutine通过通过SSH连接在远程计算机上执行尾巴命令来生成数据,但是这次,它将新的日志线条放入队列中。消费者Coroutine从队列中读取数据,并与时间戳一起打印新的日志线。
await queue.put()
方法用于将新的日志线放入队列中,而await queue.get()
方法用于读取队列中的新日志线。 queue.task_done()
方法用于指示消费者已经完成了日志线。
通过使用队列进行通用通信,该脚本可以处理大量日志线,而无需复制数据。使用asyncio.Queue()
对象进行通信是异步编程中的最佳实践,可以使Coroutines并行运行。
from fabric import Connection
import asyncio
import datetime
import time
from asyncio.queues import Queue
host = '1337-demo-host2.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5
async def producer(queue: Queue):
conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
with conn.cd('/tmp'):
tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
old_data = ()
while True:
channel = conn.run(command=tail_cmd, hide='both')
data = channel.stdout.splitlines()
if old_data != data:
old_d=set(old_data)
delta_data = [x for x in data if x not in old_d]
for d in delta_data:
await queue.put(d)
old_data = data.copy()
data.clear()
await asyncio.sleep(delay)
conn.close()
async def consumer(queue: Queue):
while True:
line = await queue.get()
dt = datetime.datetime.fromtimestamp(time.time())
print (f"TS: {dt}\n{line}", end = '\n', flush = True)
queue.task_done()
async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
if __name__ == '__main__':
asyncio.run(main())
总的来说,第二个版本演示了如何使用队列在生产者 - 消费者的并发设计模式中使用asyncio
Coroutines在远程计算机上监视日志文件。这里的用例是微不足道的,但是该技术可以应用于广泛的并发编程问题,使其成为开发可扩展和高性能系统的有用工具。
示例:
# python3 producer-consumer-tail-logfile-over-ssh-with-queue.py
TS: 2023-02-25 01:12:23.705987
XXX.XX.XX.XX - - [24/Feb/2023:23:50:35 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706113
XXX.XX.XX.XX - - [24/Feb/2023:23:50:36 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706146
XXX.XX.XX.XX - - [24/Feb/2023:23:50:58 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706171
XXX.XX.XX.XX - - [25/Feb/2023:00:03:59 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706193
XXX.XX.XX.XX - - [25/Feb/2023:00:04:02 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:33.965717
XXX.XX.XX.XX - - [25/Feb/2023:00:12:29 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:18:53.170400
XXX.XX.XX.XX - - [25/Feb/2023:00:18:52 +0000] "RAW / HTTP/1.1" 404 146 "-" "Mozilla/5.0 zgrab/0.x" "-"
TS: 2023-02-25 01:30:35.124575
XXX.XX.XX.XX - - [25/Feb/2023:00:30:31 +0000] "GET /.env HTTP/1.1" 404 146 "-" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50" "-"
TS: 2023-02-25 01:36:13.271236
XXX.XX.XX.XX - - [25/Feb/2023:00:36:12 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:106.0) Gecko/20100101 Firefox/106.0" "-"
TS: 2023-02-25 01:37:40.304185
XXX.XX.XX.XX - - [25/Feb/2023:00:37:38 +0000] "GET /0bef HTTP/1.0" 400 248 "-" "-" "-"