如何使用FastAPI和RabbitMQ构建应用程序监视系统| Python
#python #fastapi #monitoring #rabbitmq

在本文中,我们将使用FastAPI和RabbitMQ构建服务监视系统。该系统将监视各种Web服务和微服务的Web流量。这仅是出于教育目的,并不是替代更健壮,可靠和行业的软件,例如Opentelemetry,Prometheus等。

当调用服务时,它将向RabbitMQ队列发布一条消息。消费者将聆听该队列并将服务的跟踪信息存储在数据库中。我们将构建一个FastAPI来收集监视数据。该系统将显示诸如IP地址,请求URL,端口,路径,方法,浏览器,OS,时间等的信息

要求

  • python安装

  • 基本Python知识

  • PIP已安装

  • Postgres已安装

  • 安装了RabbitMQ(从here下载)

构建访客跟踪器

首先,我们为此应用程序创建一个目录。

mkdir visitor_tracker
cd
#Windows users
py -m venv venv
cd venv/Scripts
./activate

#Linux
python3 -m venv venv
source venv/bin/activate

我们安装了我们需要的所有依赖项。

pip install fastapi psycopg2-binary python-dotenv aio_pika

visitor_tracker目录中,我们创建了一个新文件koude1.

init_db.py

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')

def get_db_connection():
    conn = psycopg2.connect(
        dbname = "logs_db",
        user = "postgres",
        password = PASSWORD
    )
    return conn

conn = get_db_connection()
cur = conn.cursor()

cur.execute('DROP TABLE IF EXISTS logs;')
cur.execute('CREATE TABLE logs (id serial PRIMARY KEY,'
                                 'ip_address varchar (150) NOT NULL,'
                                 'request_url varchar (50) NOT NULL,'
                                 'request_port integer NOT NULL,'
                                 'request_path varchar (50) NOT NULL,'
                                 'request_method varchar (50) NOT NULL,'
                                 'browser_type varchar (150) NOT NULL,'
                                 'request_time timestamp (50) NOT NULL,'
                                 'service_name varchar (150) NOT NULL,'

                                 'date_added date DEFAULT CURRENT_TIMESTAMP);'
                                 )

cur.execute('INSERT INTO logs (ip_address,'
                                 'request_url,'
                                 'request_port,'
                                 'request_path,'
                                 'request_method,'
                                 'browser_type,'
                                 'request_time,'
                                 'service_name)'
                                 'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
            ('127.0.0.1',
             'http://localhost:8000',
             8000,
             "/",
             "GET",
             "Chrome",
             "2023-06-25T16:03:24.722256",
             "Test_data_service"
             )
            )

conn.commit()

cur.close()
conn.close()

在这里,我们设置一个数据库连接以存储日志数据。首先,我们使用dotenv加载.env文件以获取数据库用户名和密码变量。然后,我们定义一个get_db_connection()函数,该功能建立了与名为logs_db的PostgreSQL数据库的连接。然后,代码调用该功能以获取数据库连接和光标。

在此文件中,如果它存在并使用给定的模式重新创建日志表,则该代码将删除日志表 - 使用列存储IP地址,请求URL,端口,路径,方法,浏览器,OS,时间,时间等。它将示例日志数据插入具有其值的示例日志数据。它提交了数据库的更改并关闭光标和连接。

helpers.py

import collections

def to_dict(psycopg_tuple:tuple):
    tracker = collections.OrderedDict()
    tracker['id'] = psycopg_tuple[0]

    tracker["ip_address"] = psycopg_tuple[1]
    tracker["request_url"] = psycopg_tuple[2]
    tracker["request_port"] = psycopg_tuple[3]
    tracker["request_path"] = psycopg_tuple[4]
    tracker["request_method"] = psycopg_tuple[5]
    tracker["browser_type"] = psycopg_tuple[6]
    tracker["request_time"] = psycopg_tuple[7].strftime("%d-%m-%Y, %H:%M:%S")
    tracker["service_name"] = psycopg_tuple[8]
    return tracker

def list_dict(rows:list):

    row_list = []
    for row in rows:
        book_dict = to_dict(row)
        row_list.append(book_dict)

    return row_list

此文件具有两个函数:to_dict()list_dict()to_dict()函数将postgresql元组转换为词典。 list_dict()函数将postgresql元组的列表转换为词典列表。

to_dict()函数将postgresql元组作为输入并返回字典。字典包含与元组相同顺序的元组值。 list_dict()函数将PostgreSQL元组列表作为输入,并返回字典列表。词典是使用to_dict()函数创建的。

控制器.py

from init_db import get_db_connection
from helpers import to_dict,list_dict
import json

def all_logs():
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('SELECT * FROM logs;')
    logs = list_dict(cur.fetchall())
    cur.close()
    conn.close()

    return logs

def new_log(ip_address: str,
         request_url: str,
         request_port: int,
         request_path: str,
         request_method: str,
         browser_type: str,
         request_time: str,
         service_name:str,):

    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('INSERT INTO logs (ip_address, request_url, request_port, request_path, request_method, browser_type, request_time, service_name)'
                    'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING *;',(ip_address,
                                                    request_url,
                                                    request_port,
                                                    request_path,
                                                    request_method,
                                                    browser_type,
                                                    request_time,
                                                    service_name))

    log = cur.fetchone()[:]
    log_dict = to_dict(log)
    conn.commit()
    cur.close()
    conn.close()

    return json.dumps(log_dict)

all_logs()函数从数据库中获取所有日志,并返回字典列表。每个字典都包含有关单个日志的信息。

new_log()函数将新的日志插入数据库。

consumer.py


from controllers import new_log
import json    

import aio_pika  
import ast

async def on_message(message: aio_pika.IncomingMessage):
    tracker = ast.literal_eval(message.body.decode("utf-8"))

    new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                        tracker["request_path"], tracker["request_method"],
                        tracker["browser_type"],tracker["request_time"], tracker["service_name"])

consumer.py文件中,我们编写了负责从RabbitMQ队列接收消息的代码。收到消息时,将调用on_message()函数。它从消息主体中解析JSON数据并调用new_log()函数以将数据添加到数据库中。

app.py

from fastapi import FastAPI
from controllers import all_logs, new_log
import json    
import asyncio
import aio_pika  

app = FastAPI()

@app.on_event('startup')
async def startup():
    loop = asyncio.get_event_loop()
    connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
    channel = await connection.channel()
    queue = await channel.declare_queue("logs")
    await queue.consume(on_message)

@app.get("/logs")
async def receiver():
    logs = all_logs()
    return logs

@app.get("/")
async def hello():

    return "hello"

在启动时,我们创建了与AMQP代理的连接,并声明了队列的名称,在这种情况下为“ logs”。并使用on_message()函数消耗该队列的消息。

然后,我们使用路径“/logs”创建一个端点,以显示数据库中存储的所有日志。

要测试此服务,我们创建了一个新文件,sender.py。

import pika
import json

tracker = {
        "ip_address": '127.0.0.1',
        "request_url": 'http://localhost:8000',
        "request_port": 8000,
        "request_path": "/",
        "request_method": "GET",
        "request_time": "2023-06-25T16:03:24.722256",
        "browser_type": "Firefox",
        "operating_system": "Windows 11",
        "service_name": "Fastapi_service",
    }

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='logs')

channel.basic_publish(exchange='', routing_key='logs', body=json.dumps(tracker))
print(" [x] Sent 'Logs'")
connection.close()

我们运行此文件,python3 sender.py。它将在我们的命令行中显示[x] Sent 'Logs'。在显示访问者跟踪器日志的命令行中,它将出现从sender.py接收到数据。

出版商

发布者将是我们要跟踪其活动的服务。

sender.py

import pika

def sender(body: dict):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='logs')

    channel.basic_publish(exchange='', routing_key='logs', body=body)
    print(" [x] Sent 'Logs'")
    connection.close()

sender.py文件中,我们定义了sender()函数,它将收到一个字典,这将是跟踪信息。它创建了与RabbitMQ的连接,声明队列,并将其发布到队列中(带有跟踪信息的字典)。

middleware.py

from fastapi import Request
from datetime import datetime

class Tracker:
    def __init__ (self, service_name: str):
        self.service_name = service_name

    def visitor_tracker(self, request: Request):
        ip_address = request.client.host
        request_url = request.url._url
        request_port = request.url.port
        request_path = request.url.path
        request_method = request.method
        browser_type = request.headers["User-Agent"]
        request_time = str(datetime.now())
        service_name = self.service_name

        return {
            "ip_address": ip_address,
            "request_url": request_url,
            "request_port": request_port,
            "request_path": request_path,
            "request_method": request_method,
            "request_time": request_time,
            "browser_type": browser_type,
            "service_name": service_name,
        }

middleware.py文件中,我们创建了将service_name作为构造函数中的参数的跟踪类别。它具有visitor_tracker()方法,该方法将FastAPI请求对象作为参数。它从请求中提取各种信息,例如IP地址,URL,端口,请求路径,方法(GET,POST等),从用户代理标头中的浏览器类型,请求时间以及在构造函数中传递的服务名称。此方法返回一个字典,其中包含所有这些信息,可以记录以跟踪访客请求。

main.py

from fastapi import FastAPI, Request
from datetime import datetime
from middleware import Tracker
from sender import sender
import json
app = FastAPI()

@app.middleware("tracker")
async def tracker(request: Request, call_next):
    service_tracker = Tracker("service_one")
    tracker = str(service_tracker.visitor_tracker(request))

    sender(tracker)
    response = await call_next(request)

    return response

@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }

if __name__ == " __main__":
    app.run(debug=True)

在此文件中,我们使用@app.middleware Decorator定义了名为“ Tracker”的中间件功能。此中间件将针对每个请求运行。中间件函数创建一个名为service_tracker的跟踪类别的实例,将“ service_one”作为服务名称,调用跟踪器实例的visitor_tracker()方法,并通过fastapapi请求对象。这为我们提供了跟踪信息,并调用sender()功能传递跟踪信息并将其发送到RabbitMQ队列。这用于记录或发送跟踪数据的某个地方,并使用call_next调用下一个route函数并返回其响应。

我们定义了两条路线。如果我们向这些路线提出请求,将通过中间件收集其信息并发送到RabbitMQ队列。

现在,我们与访客跟踪器同时运行此服务。

我们使用浏览器或使用HTTP客户端请求浏览端点之一。

我们将看到访问者跟踪器的响应:

构建UI

我们将创建一个React应用程序,以显示所有跟踪信息的表格。

安装Vite并进行反应

在我们的命令行中,我们使用反应型模板安装Vite。此命令行将为项目创建一个文件夹。

#npm
npm create vite@latest table -- --template react-ts

#yarn
yarn create vite@latest table --template react-ts
#pnpm
pnpm create vite@latest table --template react-ts

安装了所有软件包后,我们将使用命令运行vite:

npm run dev

我们去koude32,应该看到Vite并反应主页。

app.ts

import React, { useState, useEffect} from "react";

const url = "http://localhost:8000/logs";

interface Table {
  id: number,
  ip_address: string,
  request_url: string,
  request_port: string,
  request_path: string,
  request_method: string,
  request_time: string,
  browser_type: string,
  service_name: string,
}

const Table: React.FC = () => {
    const [data, setData] = useState<Table[]>([]);

    useEffect(() => {
      fetch(url)  
        .then(res => res.json())
        .then(data => setData(data));
        console.log(data);
    }, []);

    return (
        <div>
          <h1>Logs</h1>
          <table>
            <thead>
                <tr>
                  <th>Id</th>
                  <th>IP Address</th>
                  <th>Request URL</th>
                  <th>Request Port</th>
                  <th>Request Path</th>
                  <th>Request Method</th>
                  <th>Request Time</th>
                  <th>Browser Type</th>
                  <th>Service Name</th>
                </tr>
              </thead>  

            <tbody>
            {data.map((item, index) => (
              <tr key={index}> 
                <td>{item.id}</td>  
                <td>{item.ip_address}</td>     
                <td>{item.request_url}</td>        
                <td>{item.request_port}</td>  
                <td>{item.request_path}</td>
                <td>{item.request_method}</td>
                <td>{item.request_time}</td>
                <td>{item.browser_type}</td>  
                <td>{item.service_name}</td>   
              </tr>
            ))}
          </tbody>  

          </table>

        </div>

      );

};

export default Table;

让我们添加一个.css文件以添加样式并轻松查看数据。

index.css

:root {
  font-family: Inter, system-ui, Avenir, Helvetica, Arial, sans-serif;
  line-height: 1.5;
  font-weight: 400;

  color-scheme: light dark;
  color: rgba(255, 255, 255, 0.87);
  background-color: #242424;

  font-synthesis: none;
  text-rendering: optimizeLegibility;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  -webkit-text-size-adjust: 100%;
}

table {
  border-collapse: collapse;
  margin-bottom: 1rem;
}

th,
td {
  padding: 0.5rem;
  border: 1px solid #ccc;
}

th {
  text-align: left;
}

td {
  text-align: left;
}

.column-gap-10 {
  column-gap: 10px;
}

结论

在本文中,我们使用FastAPI和RabbitMQ构建了一个简单的服务监视系统。我们看到了如何创建FastApi项目,创建交流和队列,在FastApi中从RabbitMQ队列中发布和消费消息,以及在Postgres数据库中存储监视数据。

注意:我没有尝试此应用程序来跟踪与FastApi不同框架的其他服务的活动。但是,如果需要,您可以尝试使用相同的步骤:创建一个收集跟踪信息的中间件,然后将它们推入您声明的管理此数据的RabbitMQ队列。

感谢您抽出宝贵的时间阅读本文。

如果您对其他软件包,架构,如何改善我的代码,英语或其他任何建议有任何建议;请发表评论或通过TwitterLinkedIn与我联系。

访客跟踪器的代码是here

发布者的源代码是here

资源