与Python,Kafka和无管道实时处理计算机视觉事件
#python #tooling #machinelearning #computervision

将计算机视觉和实时数据处理结合起来创造了许多有趣的机会。想象一个世界,在这个世界中,相机,传感器和智能算法共同努力地解释和响应视觉世界,随着它即时展开。这是实时的计算机视觉,该领域的能力以闪电速度捕获和处理视觉数据的能力在整个行业之间开辟了许多可能性。

本文解释了如何使用PipelessKafka实时生成和处理计算机视觉事件。无管道是一个开源计算机视觉框架,可在几分钟内构建和部署应用程序。 Kafka是一个流行的OSS分布式事件流平台。

如果您更喜欢使用代码而不是阅读,则可以直接转到即将运行的示例in this doc section。它包含您需要入门的所有内容,包括docker-compose.yaml在本地创建KAFKA群集并逐步指令。

语境

以前的文章解释了如何创建一个无管道项目,加载模型以在视频上识别猫并在视频上绘制边界框。这很酷,但并不是真的有用。今天,您将学习如何与Kafka无管道连接,因此,您将不再通过输入视频绘制边界框,而是能够实时对事件做出反应,在这种情况下,该事件将是视频中出现的猫。

如果您错过了以前的文章,则可以在下面找到它们:

对于此演示,您将处理一个本地视频文件,这对于现实世界应用程序不是很实际。未来的教程将涵盖如何使用无管来处理远程URL和RTMP/RTSP流量的视频流。

建筑学

以下架构代表您将部署的架构:

Pipeless + Kafka

简而言之,您将进行输入视频流,用无管道分析它,以确定视频中显示的内容并将事件导出到Kafka主题。这些事件可以根据应用程序采取任何必需的操作。

作为一个现实世界中的一个例子,请考虑一个马铃薯加工厂,相机连续监视传入的土豆。如果出现有颜色不正确的马铃薯,该系统将触发空气流动机构,从生产线中弹出有问题的马铃薯。或者,您还可以设想一家餐厅跟踪顾客在桌子上花费的时间。在这里,事件将是到达或离开桌子的人。

无管kafka插件

从高级别的角度来看,您可以将Kafka视为分布式系统,该系统被分为topics。您可以将消息发布到主题(农产品),并且可以从主题中消耗消息。

无管道提供了一个插件,可以轻松地将消息发送到Kafka主题。它需要配置KAFKA客户端,因此您只需要提供一些环境变量即可。 Kafka插件包含在pipeless-ai-plugins软件包中,您可以使用pip安装。确实,连接到Kafka并不是一项复杂的任务,实际上,使用Kafka插件不是强制性的,它只是Kafka生产者客户端周围的包装器,可以使事情变得更容易,但是没有什么可以阻止您直接使用Kafka客户端。

您可以找到有关kafka插件in this doc section的整个文档。

通过无管检测事件

让我们重新创建猫的应用程序,而不是绘制边界框,而是检测事件并将其发送到kafka。
您可以找到整个应用程序现成的运行,包括逐步说明和所有必需的资源,例如CATS检测模型和带有KAFKA群集in this doc sectiondocker-compose.yaml

请克隆该仓库,然后移动到examples/kafka目录以轻松遵循下一个部分。

找到无管安装说明和要求here

禁用视频输出

这种特殊情况不需要视频输出。最初的示例是随时修改视频帧,并制作一个新视频作为我们的预期输出。但是,现在我们对视频本身中的事件感兴趣,因此我们将修改项目配置以禁用输出视频。

以下是配置文件(config.yaml)的全部内容:

input:
  address:
    host: localhost
    port: 1234
  video:
    enable: true
    uri: file:///home/path/pipeless/examples/kafka/cats.mp4
log_level: INFO
output:
  video:
    enable: false
worker:
  n_workers: 1

很重要:请记住编辑输入视频的uri,以将绝对路径设置为本地目录

供参考,以下是原始猫的区别:

output:
-  address:
-    host: localhost
-    port: 1237
-  video:
-    enable: true
-    uri: file:///home/example/path/pipeless/examples/cats/cats-output.mp4
+    enable: false

您可以看到,我们还删除了“输出address”部分,因为我们将没有输出组件,而是将事件直接从我们的process挂钩发送到kafka。

加载kafka插件和生产活动

要加载kafka插件,我们只需要从插件包中导入它,然后在无管的before钩中包含以下行:

from pipeless_ai_plugins.kafka import KafkaProducer

...
def before(self):
    self.producer = KafkaProducer()
...

加载了插件后,我们将在process挂钩上向我们的Kafka主题发送消息:

producer.produce('pipeless', 'There is a cat!')

以下是新应用程序的整个代码(app.py):

from pipeless_ai.lib.app.app import PipelessApp
from pipeless_ai_plugins.kafka import KafkaProducer
import cv2

class App(PipelessApp):
    def before(self):
        self.producer = KafkaProducer()
        self.xml_data = cv2.CascadeClassifier('cats.xml')

    def process(self, frame):
        model = self.xml_data

        # Create reduced frame for faster detection
        original_height, original_width, _ = frame.shape
        aspect_ratio = original_width / original_height
        reduced_width = 600
        reduced_height = int(reduced_width / aspect_ratio)
        reduced_frame = cv2.resize(frame, (reduced_width, reduced_height))
        bounding_boxes = model.detectMultiScale(reduced_frame, minSize = (30, 30))

        # Notify that there is a cat
        if len(bounding_boxes) > 0:
            self.producer.produce('pipeless', 'There is a cat!')

请注意,原始猫的唯一区别是:

def process(self, frame):
...
-# Draw the bounding boxes over the original frame
-for box in bounding_boxes:
-    a, b, width, height = box
-    # Recalculate bounding box for the original image
-    a = int(a * (original_width / reduced_width))
-    b = int(b * (original_height / reduced_height))
-    width = int(width * (original_width / reduced_width))
-    height = int(height * (original_height / reduced_height))
-    cv2.rectangle(frame, (a, b), (a + width, b + height), (255, 0, 255), 2)
+# Notify that there is a cat
+if len(bounding_boxes) > 0:
+    this.producer.produce('pipeless', 'There is a cat!')

最后,让我们使用我们的群集地址配置Kafka插件。它就像导出环境变量一样简单:

export KAFKA_BOOTSTRAP_SERVERS="localhost:9094"

这就是您所需要的!

运行应用程序

现在,我们使用examples/kafka目录上提供的docker-compose.yaml文件启动本地KAFKA。我们不会在此处详细介绍,因为它不超出本文的范围,只需从示例目录中运行以下命令:

docker compose up

,让我们无管道运行以开始处理我们的视频并发送事件:

pipeless run

验证卡夫卡上的事件

本节上的命令必须在Kafka容器中执行。通过运行:
执行到容器中

docker compose exec kafka bash

docker-compose.yaml包括配置Kafka以自动创建主题。您可以通过运行第一次编写pipeless主题来验证pipeless主题:

kafka-topics.sh --list --bootstrap-server localhost:9094

该示例的代码仅将信息发送给Kafka,它不会从主题中消耗,因此该主题仍然包含我们发送给它的所有信息。您的任务是在Kafka主题上聆听消息并根据这些消息采取操作。由于每个应用程序对如何处理这些事件都有自己的要求,因此这是无管范围的。让我们运行一个消费者以验证信息到达该主题:

kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic pipeless --from-beginning

使用Ctrl + C停止消费者。

现在,是时候通过消费Kafka主题中的消息,处理信息并采取任何必需的操作来完成应用程序的时候了。一个简单的示例可能是在视频上有猫及其出现的时间时向您发送通知。

参与其中

If you like Pipeless you can help us by sharing and starring our GitHub repository.

我们也感谢反馈。您可以通过GitHub问题分享您的想法,也可以在我们的GitHub discussions forum上打开线程。您也可以在Twitter (now X)上关注无管道

最后,如果您有信心,请随时通过github拖拉请求为代码库做出贡献!