介绍
在文章"How to Implement the Producer-Consumer Concurrency Design Pattern with asyncio Coroutines"的延续中,我们将展示如何启用远程计算机上的多个日志文件的监视。
我们将解释《法规》中引入的更改,以实现目标并加深思想的背景并讨论所使用的API。
数据
- host: testhost.almalinux.org
user: someuser
key_filename: /home/pawel/.ssh/id_ed25519
log_file: /var/log/nginx/access.log
delay: 5
buffer_lines: 10
- host: refurby
user: pablo
key_filename: /home/pawel/.ssh/id_rsa
log_file: /var/log/syslog
delay: 60
buffer_lines: 5
代码
#!/usr/bin/env python3
import asyncio
import datetime
import time
from fabric import Connection
from asyncio.queues import Queue
import yaml
async def producer(queue: Queue, host: dict):
conn = Connection(host['host'], user=host['user'], connect_kwargs={'key_filename': host['key_filename']})
with conn.cd('/tmp'):
tail_cmd = f"sudo tail -n {host['buffer_lines']} {host['log_file']}"
old_data = ()
while True:
channel = conn.run(command=tail_cmd, hide='both')
data = channel.stdout.splitlines()
if old_data != data:
old_d = set(old_data)
delta_data = [x for x in data if x not in old_d]
for d in delta_data:
await queue.put((host['host'], host['log_file'], str(d)))
old_data = data.copy()
data.clear()
await asyncio.sleep(host['delay'])
conn.close()
async def consumer(queue: Queue):
while True:
line = await queue.get()
host, log_file, data = line
dt = datetime.datetime.fromtimestamp(time.time())
print(f"@{dt} {host}:{log_file}:\n{data}\n-----", end='\n', flush=True)
queue.task_done()
async def main():
with open('hosts.yml') as f:
hosts = yaml.safe_load(f)
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue, host)) for host in hosts]
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.gather(*producers)
await queue.join()
consumer_task.cancel()
if __name__ == '__main__':
asyncio.run(main())
说明:支持多个主机(生产者)
代码中引入的更改的主要思想是,使用相同的生产者 - 消费者并发设计模式在远程计算机上启用多个日志文件。为了实现这一目标,我们介绍了一个YAML配置文件hosts.yml
,该文件指定了要监视的主机及其属性,例如日志文件路径,检查之间的延迟以及要读取的行数。
在main()
函数中,我们使用yaml
模块从yaml文件加载配置,并创建一个asyncio.Queue()
对象,以用于互通信。然后,我们使用list comprehension为每个主机创建一个单独的生产商Coroutine ,然后将它们添加到生产者列表中。 asyncio.gather()
方法用于运行所有生产商和消费者coroutine。
说明:食用多个生产商的产出
consumer()
coroutine与上一个版本中保持不变,除了修改以处理来自多个主机的数据。现在,每个日志行都是包含主机名,日志文件路径和日志线文本的元组。
在producer()
Coroutine中,我们现在将主机字典传递给Coroutine,而不是使用全局变量。字典包含要监视的主机的属性,例如主机名,ssh键文件名,日志文件路径,检查之间的延迟以及要读取的行数。与原始版本一样,我们使用这些属性来建立与远程计算机的SSH连接,并在日志文件上执行tail
命令。所得数据被放入队列中,作为包含主机名,日志文件路径和日志线文本的元组。
说明:睡眠如何允许生产者减少负载
await asyncio.sleep(host['delay'])
行用于在producer()
Coroutine中的日志文件检查之间引入延迟。 delay
属性是在主机配置字典中指定的每个主机的YAML文件中的词典,而asyncio.sleep()
函数用于在指定的时间内暂停Coroutine的执行。这使生产者可以等待新的日志条目出现,然后再检查,减少不必要的网络和系统加载。
概括
总的来说,对previous version of the code进行的更改使脚本可以以可扩展且可维护的方式监视远程计算机上的多个日志文件。通过使用YAML配置文件,添加新主机或修改现有主机的属性很容易,并且不需要更改代码。
关于API
至于代码中使用的API,我们使用yaml
模块从YAML文件和[asyncio.queues.Queue()](https://docs.python.org/3/library/asyncio-queue.html)
类中加载配置,以创建用于互通信的队列。 create_task()
方法用于创建生产者任务,而gathing()方法用于同时运行所有任务。最后,使用队列和消费者任务的join()
和cancel()
方法优雅地退出程序。
其他
我们希望这种延续能够帮助您了解消费者生产者如何易于扩展,以扩展以可扩展的方式为更多的生产商提供支持。与往常一样,请随时尝试代码并提出任何问题。
P.S。请继续关注“生产者 - 消费者并发设计模式”系列中的更多文章。