在本文中,我们将使用FastAPI和RabbitMQ构建服务监视系统。该系统将监视各种Web服务和微服务的Web流量。这仅是出于教育目的,并不是替代更健壮,可靠和行业的软件,例如Opentelemetry,Prometheus等。
当调用服务时,它将向RabbitMQ队列发布一条消息。消费者将聆听该队列并将服务的跟踪信息存储在数据库中。我们将构建一个FastAPI来收集监视数据。该系统将显示诸如IP地址,请求URL,端口,路径,方法,浏览器,OS,时间等的信息
要求
-
python安装
-
基本Python知识
-
PIP已安装
-
Postgres已安装
-
安装了RabbitMQ(从here下载)
构建访客跟踪器
首先,我们为此应用程序创建一个目录。
mkdir visitor_tracker
cd
#Windows users
py -m venv venv
cd venv/Scripts
./activate
#Linux
python3 -m venv venv
source venv/bin/activate
我们安装了我们需要的所有依赖项。
pip install fastapi psycopg2-binary python-dotenv aio_pika
在visitor_tracker
目录中,我们创建了一个新文件koude1.
init_db.py
import os
import psycopg2
from dotenv import load_dotenv
load_dotenv()
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')
def get_db_connection():
conn = psycopg2.connect(
dbname = "logs_db",
user = "postgres",
password = PASSWORD
)
return conn
conn = get_db_connection()
cur = conn.cursor()
cur.execute('DROP TABLE IF EXISTS logs;')
cur.execute('CREATE TABLE logs (id serial PRIMARY KEY,'
'ip_address varchar (150) NOT NULL,'
'request_url varchar (50) NOT NULL,'
'request_port integer NOT NULL,'
'request_path varchar (50) NOT NULL,'
'request_method varchar (50) NOT NULL,'
'browser_type varchar (150) NOT NULL,'
'request_time timestamp (50) NOT NULL,'
'service_name varchar (150) NOT NULL,'
'date_added date DEFAULT CURRENT_TIMESTAMP);'
)
cur.execute('INSERT INTO logs (ip_address,'
'request_url,'
'request_port,'
'request_path,'
'request_method,'
'browser_type,'
'request_time,'
'service_name)'
'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
('127.0.0.1',
'http://localhost:8000',
8000,
"/",
"GET",
"Chrome",
"2023-06-25T16:03:24.722256",
"Test_data_service"
)
)
conn.commit()
cur.close()
conn.close()
在这里,我们设置一个数据库连接以存储日志数据。首先,我们使用dotenv
加载.env
文件以获取数据库用户名和密码变量。然后,我们定义一个get_db_connection()
函数,该功能建立了与名为logs_db的PostgreSQL数据库的连接。然后,代码调用该功能以获取数据库连接和光标。
在此文件中,如果它存在并使用给定的模式重新创建日志表,则该代码将删除日志表 - 使用列存储IP地址,请求URL,端口,路径,方法,浏览器,OS,时间,时间等。它将示例日志数据插入具有其值的示例日志数据。它提交了数据库的更改并关闭光标和连接。
helpers.py
import collections
def to_dict(psycopg_tuple:tuple):
tracker = collections.OrderedDict()
tracker['id'] = psycopg_tuple[0]
tracker["ip_address"] = psycopg_tuple[1]
tracker["request_url"] = psycopg_tuple[2]
tracker["request_port"] = psycopg_tuple[3]
tracker["request_path"] = psycopg_tuple[4]
tracker["request_method"] = psycopg_tuple[5]
tracker["browser_type"] = psycopg_tuple[6]
tracker["request_time"] = psycopg_tuple[7].strftime("%d-%m-%Y, %H:%M:%S")
tracker["service_name"] = psycopg_tuple[8]
return tracker
def list_dict(rows:list):
row_list = []
for row in rows:
book_dict = to_dict(row)
row_list.append(book_dict)
return row_list
此文件具有两个函数:to_dict()
和list_dict()
。 to_dict()
函数将postgresql元组转换为词典。 list_dict()
函数将postgresql元组的列表转换为词典列表。
to_dict()
函数将postgresql元组作为输入并返回字典。字典包含与元组相同顺序的元组值。 list_dict()
函数将PostgreSQL元组列表作为输入,并返回字典列表。词典是使用to_dict()
函数创建的。
控制器.py
from init_db import get_db_connection
from helpers import to_dict,list_dict
import json
def all_logs():
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM logs;')
logs = list_dict(cur.fetchall())
cur.close()
conn.close()
return logs
def new_log(ip_address: str,
request_url: str,
request_port: int,
request_path: str,
request_method: str,
browser_type: str,
request_time: str,
service_name:str,):
conn = get_db_connection()
cur = conn.cursor()
cur.execute('INSERT INTO logs (ip_address, request_url, request_port, request_path, request_method, browser_type, request_time, service_name)'
'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING *;',(ip_address,
request_url,
request_port,
request_path,
request_method,
browser_type,
request_time,
service_name))
log = cur.fetchone()[:]
log_dict = to_dict(log)
conn.commit()
cur.close()
conn.close()
return json.dumps(log_dict)
all_logs()
函数从数据库中获取所有日志,并返回字典列表。每个字典都包含有关单个日志的信息。
new_log()
函数将新的日志插入数据库。
consumer.py
from controllers import new_log
import json
import aio_pika
import ast
async def on_message(message: aio_pika.IncomingMessage):
tracker = ast.literal_eval(message.body.decode("utf-8"))
new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
tracker["request_path"], tracker["request_method"],
tracker["browser_type"],tracker["request_time"], tracker["service_name"])
在consumer.py
文件中,我们编写了负责从RabbitMQ队列接收消息的代码。收到消息时,将调用on_message()
函数。它从消息主体中解析JSON数据并调用new_log()
函数以将数据添加到数据库中。
app.py
from fastapi import FastAPI
from controllers import all_logs, new_log
import json
import asyncio
import aio_pika
app = FastAPI()
@app.on_event('startup')
async def startup():
loop = asyncio.get_event_loop()
connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
channel = await connection.channel()
queue = await channel.declare_queue("logs")
await queue.consume(on_message)
@app.get("/logs")
async def receiver():
logs = all_logs()
return logs
@app.get("/")
async def hello():
return "hello"
在启动时,我们创建了与AMQP代理的连接,并声明了队列的名称,在这种情况下为“ logs”。并使用on_message()
函数消耗该队列的消息。
然后,我们使用路径“/logs”创建一个端点,以显示数据库中存储的所有日志。
要测试此服务,我们创建了一个新文件,sender.py。
import pika
import json
tracker = {
"ip_address": '127.0.0.1',
"request_url": 'http://localhost:8000',
"request_port": 8000,
"request_path": "/",
"request_method": "GET",
"request_time": "2023-06-25T16:03:24.722256",
"browser_type": "Firefox",
"operating_system": "Windows 11",
"service_name": "Fastapi_service",
}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='logs')
channel.basic_publish(exchange='', routing_key='logs', body=json.dumps(tracker))
print(" [x] Sent 'Logs'")
connection.close()
我们运行此文件,python3 sender.py
。它将在我们的命令行中显示[x] Sent 'Logs'
。在显示访问者跟踪器日志的命令行中,它将出现从sender.py
接收到数据。
出版商
发布者将是我们要跟踪其活动的服务。
sender.py
import pika
def sender(body: dict):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='logs')
channel.basic_publish(exchange='', routing_key='logs', body=body)
print(" [x] Sent 'Logs'")
connection.close()
在sender.py
文件中,我们定义了sender()函数,它将收到一个字典,这将是跟踪信息。它创建了与RabbitMQ的连接,声明队列,并将其发布到队列中(带有跟踪信息的字典)。
middleware.py
from fastapi import Request
from datetime import datetime
class Tracker:
def __init__ (self, service_name: str):
self.service_name = service_name
def visitor_tracker(self, request: Request):
ip_address = request.client.host
request_url = request.url._url
request_port = request.url.port
request_path = request.url.path
request_method = request.method
browser_type = request.headers["User-Agent"]
request_time = str(datetime.now())
service_name = self.service_name
return {
"ip_address": ip_address,
"request_url": request_url,
"request_port": request_port,
"request_path": request_path,
"request_method": request_method,
"request_time": request_time,
"browser_type": browser_type,
"service_name": service_name,
}
在middleware.py
文件中,我们创建了将service_name
作为构造函数中的参数的跟踪类别。它具有visitor_tracker()
方法,该方法将FastAPI请求对象作为参数。它从请求中提取各种信息,例如IP地址,URL,端口,请求路径,方法(GET,POST等),从用户代理标头中的浏览器类型,请求时间以及在构造函数中传递的服务名称。此方法返回一个字典,其中包含所有这些信息,可以记录以跟踪访客请求。
main.py
from fastapi import FastAPI, Request
from datetime import datetime
from middleware import Tracker
from sender import sender
import json
app = FastAPI()
@app.middleware("tracker")
async def tracker(request: Request, call_next):
service_tracker = Tracker("service_one")
tracker = str(service_tracker.visitor_tracker(request))
sender(tracker)
response = await call_next(request)
return response
@app.get("/")
def index():
return "Hello, world"
@app.get("/json")
def some_func():
return {
"some_json": "Some Json"
}
if __name__ == " __main__":
app.run(debug=True)
在此文件中,我们使用@app.middleware
Decorator定义了名为“ Tracker”的中间件功能。此中间件将针对每个请求运行。中间件函数创建一个名为service_tracker
的跟踪类别的实例,将“ service_one”作为服务名称,调用跟踪器实例的visitor_tracker()
方法,并通过fastapapi请求对象。这为我们提供了跟踪信息,并调用sender()
功能传递跟踪信息并将其发送到RabbitMQ队列。这用于记录或发送跟踪数据的某个地方,并使用call_next
调用下一个route
函数并返回其响应。
我们定义了两条路线。如果我们向这些路线提出请求,将通过中间件收集其信息并发送到RabbitMQ队列。
现在,我们与访客跟踪器同时运行此服务。
我们使用浏览器或使用HTTP客户端请求浏览端点之一。
我们将看到访问者跟踪器的响应:
构建UI
我们将创建一个React应用程序,以显示所有跟踪信息的表格。
安装Vite并进行反应
在我们的命令行中,我们使用反应型模板安装Vite。此命令行将为项目创建一个文件夹。
#npm
npm create vite@latest table -- --template react-ts
#yarn
yarn create vite@latest table --template react-ts
#pnpm
pnpm create vite@latest table --template react-ts
安装了所有软件包后,我们将使用命令运行vite:
npm run dev
我们去koude32,应该看到Vite并反应主页。
app.ts
import React, { useState, useEffect} from "react";
const url = "http://localhost:8000/logs";
interface Table {
id: number,
ip_address: string,
request_url: string,
request_port: string,
request_path: string,
request_method: string,
request_time: string,
browser_type: string,
service_name: string,
}
const Table: React.FC = () => {
const [data, setData] = useState<Table[]>([]);
useEffect(() => {
fetch(url)
.then(res => res.json())
.then(data => setData(data));
console.log(data);
}, []);
return (
<div>
<h1>Logs</h1>
<table>
<thead>
<tr>
<th>Id</th>
<th>IP Address</th>
<th>Request URL</th>
<th>Request Port</th>
<th>Request Path</th>
<th>Request Method</th>
<th>Request Time</th>
<th>Browser Type</th>
<th>Service Name</th>
</tr>
</thead>
<tbody>
{data.map((item, index) => (
<tr key={index}>
<td>{item.id}</td>
<td>{item.ip_address}</td>
<td>{item.request_url}</td>
<td>{item.request_port}</td>
<td>{item.request_path}</td>
<td>{item.request_method}</td>
<td>{item.request_time}</td>
<td>{item.browser_type}</td>
<td>{item.service_name}</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
export default Table;
让我们添加一个.css
文件以添加样式并轻松查看数据。
index.css
:root {
font-family: Inter, system-ui, Avenir, Helvetica, Arial, sans-serif;
line-height: 1.5;
font-weight: 400;
color-scheme: light dark;
color: rgba(255, 255, 255, 0.87);
background-color: #242424;
font-synthesis: none;
text-rendering: optimizeLegibility;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
-webkit-text-size-adjust: 100%;
}
table {
border-collapse: collapse;
margin-bottom: 1rem;
}
th,
td {
padding: 0.5rem;
border: 1px solid #ccc;
}
th {
text-align: left;
}
td {
text-align: left;
}
.column-gap-10 {
column-gap: 10px;
}
结论
在本文中,我们使用FastAPI和RabbitMQ构建了一个简单的服务监视系统。我们看到了如何创建FastApi项目,创建交流和队列,在FastApi中从RabbitMQ队列中发布和消费消息,以及在Postgres数据库中存储监视数据。
注意:我没有尝试此应用程序来跟踪与FastApi不同框架的其他服务的活动。但是,如果需要,您可以尝试使用相同的步骤:创建一个收集跟踪信息的中间件,然后将它们推入您声明的管理此数据的RabbitMQ队列。
感谢您抽出宝贵的时间阅读本文。
如果您对其他软件包,架构,如何改善我的代码,英语或其他任何建议有任何建议;请发表评论或通过Twitter或LinkedIn与我联系。
访客跟踪器的代码是here。
发布者的源代码是here。