如何使用Asyncio Coroutines实施生产者 - 消费者并发设计模式
#python #安全 #并发性 #designpatterns

生产者 - 消费者并发设计模式是并发编程中广泛使用的方法,生产者生成数据并消费者消耗数据。这些组件并行运行,并且使用缓冲区或队列之间传递数据。

在本文中,我们演示了如何使用asyncio coroutines实现生产者 - 消费者并发设计模式,并提供了一个实际示例,可以在SSH连接上监视远程计算机上的日志文件。我们首先使用yield语句实现生产者 - 消费者模式,然后修改代码以使用队列进行互通信。

注意:例如,我们要求无密码的sudo在监视nginx日志文件时在远程主机(服务器)上工作,这是不可公开访问的。

使用yield语句

from fabric import Connection
import asyncio
import datetime
import time

host = '1337-demo-host.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5

async def producer():
    conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
    with conn.cd('/tmp'):
        tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
        old_data = ()
        while True:
            channel = conn.run(command=tail_cmd, hide='both')
            data = channel.stdout.splitlines()
            if old_data != data:
                old_d=set(old_data)
                delta_data = [x for x in data if x not in old_d]
                for d in delta_data:
                    yield str(d)
            old_data = data.copy()
            data.clear()
            time.sleep(delay)
    conn.close()

async def consumer():
    p=producer()
    async for line in p:
        dt = datetime.datetime.fromtimestamp(time.time())
        print (f"TS: {dt}\n{line}", end = '\n', flush = True)

async def main():
    await asyncio.gather(consumer())

if __name__ == '__main__':
    asyncio.run(main())

为了使脚本异步,我们使用asyncio库。 Asyncio模块提供了基于Coroutines的高级并发框架,这使我们能够编写看起来同步的异步代码。使用asyncawait关键字实现coroutines。

在我们的脚本中,生产者Coroutine通过通过SSH连接在远程计算机上执行tail命令来生成数据。消费者Coroutine通过打印新日志线和时间戳来消耗数据。 yield语句在生产者Coroutine中用于生成新的日志线,这些日志线被消费者Coroutine消费。

asyncio.gather()方法用于同时运行两种Coroutines,使消费者Coroutine可以以非阻滞方式消耗生产者Coroutine生成的数据。 await关键字用于等待Coroutines完成。

关于织物

我们使用织物建立了与远程计算机的SSH连接,执行远程命令并检索其输出。面料是一个Python库,简化了多个机器上的SSH连接和远程执行,为常见任务提供了更高级别的API。
有关织物的更多信息可以在project's website中找到。

使用asyncio.queue()

在此版本中,我们用asyncio.Queue()对象替换了用于互通信的asyncio.Queue()对象。

和以前一样,生产者Coroutine通过通过SSH连接在远程计算机上执行尾巴命令来生成数据,但是这次,它将新的日志线条放入队列中。消费者Coroutine从队列中读取数据,并与时间戳一起打印新的日志线。

await queue.put()方法用于将新的日志线放入队列中,而await queue.get()方法用于读取队列中的新日志线。 queue.task_done()方法用于指示消费者已经完成了日志线。

通过使用队列进行通用通信,该脚本可以处理大量日志线,而无需复制数据。使用asyncio.Queue()对象进行通信是异步编程中的最佳实践,可以使Coroutines并行运行。

from fabric import Connection
import asyncio
import datetime
import time
from asyncio.queues import Queue

host = '1337-demo-host2.org'
user = 'almalinux'
key_filename = '/Users/user/.ssh/id_rsa'
log_file = '/var/log/nginx/access.log'
delay = 5
buffer_lines = 5

async def producer(queue: Queue):
    conn = Connection(host, user=user, connect_kwargs={'key_filename': key_filename})
    with conn.cd('/tmp'):
        tail_cmd = f'sudo tail -n {buffer_lines} {log_file}'
        old_data = ()
        while True:
            channel = conn.run(command=tail_cmd, hide='both')
            data = channel.stdout.splitlines()
            if old_data != data:
                old_d=set(old_data)
                delta_data = [x for x in data if x not in old_d]
                for d in delta_data:
                    await queue.put(d)
            old_data = data.copy()
            data.clear()
            await asyncio.sleep(delay)
    conn.close()

async def consumer(queue: Queue):
    while True:
        line = await queue.get()
        dt = datetime.datetime.fromtimestamp(time.time())
        print (f"TS: {dt}\n{line}", end = '\n', flush = True)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    await asyncio.gather(producer(queue), consumer(queue))

if __name__ == '__main__':
    asyncio.run(main())

总的来说,第二个版本演示了如何使用队列在生产者 - 消费者的并发设计模式中使用asyncio Coroutines在远程计算机上监视日志文件。这里的用例是微不足道的,但是该技术可以应用于广泛的并发编程问题,使其成为开发可扩展和高性能系统的有用工具。

示例:

# python3 producer-consumer-tail-logfile-over-ssh-with-queue.py
TS: 2023-02-25 01:12:23.705987
XXX.XX.XX.XX - - [24/Feb/2023:23:50:35 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706113
XXX.XX.XX.XX - - [24/Feb/2023:23:50:36 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706146
XXX.XX.XX.XX - - [24/Feb/2023:23:50:58 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706171
XXX.XX.XX.XX - - [25/Feb/2023:00:03:59 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:23.706193
XXX.XX.XX.XX - - [25/Feb/2023:00:04:02 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:12:33.965717
XXX.XX.XX.XX - - [25/Feb/2023:00:12:29 +0000] "GET //asdf HTTP/1.1" 404 3797 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15" "-"
TS: 2023-02-25 01:18:53.170400
XXX.XX.XX.XX - - [25/Feb/2023:00:18:52 +0000] "RAW / HTTP/1.1" 404 146 "-" "Mozilla/5.0 zgrab/0.x" "-"
TS: 2023-02-25 01:30:35.124575
XXX.XX.XX.XX - - [25/Feb/2023:00:30:31 +0000] "GET /.env HTTP/1.1" 404 146 "-" "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50" "-"
TS: 2023-02-25 01:36:13.271236
XXX.XX.XX.XX - - [25/Feb/2023:00:36:12 +0000] "GET / HTTP/1.1" 200 4727 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:106.0) Gecko/20100101 Firefox/106.0" "-"
TS: 2023-02-25 01:37:40.304185
XXX.XX.XX.XX - - [25/Feb/2023:00:37:38 +0000] "GET /0bef HTTP/1.0" 400 248 "-" "-" "-"