将实时分析构建到您的下一个项目
#database #分析 #流媒体 #realtime

这篇文章是实时分析系列的最后一篇。它是实时分析的摘录,这是一个确定的指南,可以在完整的here中读取。

-

那么,您如何开始在下一个开发项目中构​​建实时分析?正如本指南所表明的那样,建立实时分析的核心步骤有3个核心步骤:

  1. 在流量表上摄入数据
  2. 查询数据以构建分析指标
  3. 发布以集成到您的应用程序的指标

tinybird是一个实时分析平台,使所有这些都成为可能。在下面,您将找到有关从流平台(和其他来源)摄入数据,使用SQL查询数据的实用步骤,并在应用程序中发布低延迟,高电流API,以便在您的应用程序中消费。

>

>

如果您是Tinybird的新手,则可以通过signing up尝试自由的构建计划,无需信用卡,没有时间限制和宽敞的免费限制。

将实时数据摄入小鸟

tinybird支持多个来源的摄入,包括流媒体平台,文件,数据库和数据仓库。在这里,如何使用Tinybird从各种来源进行编码。

从Kafka摄入实时数据

tinybird使用本机Kafka连接器从Kafka实时摄入。您可以使用Tinybird UI来设置Kafka连接,选择主题,并单击几下定义摄入架构。或者,您可以使用Tinybird Cli从终端开发Kafka摄入管道。

要了解有关在KAFKA数据之上构建实时分析的更多信息,请查看以下资源:

请注意,这适用于任何与Kafka兼容的平台,例如Confluent,Redpanda,Upstash,Aiven或Amazon MSK。

从数据仓库中获取实时数据

tinybird作为存储在数据仓库中的数据的实时出版物层。使用Tinybird,您可以在数据仓库中同步桌子(例如BigQuery,Redshift或Snowflake)在SQL中开发指标,并将这些指标作为低延迟,高恒定APIS。

tinybird的连接器开发套件使得可以将许多数据仓库快速摄取数据。

在下面查看这些资源,以了解如何在数据仓库之上构建实时分析:

从CSV,NDJSON和PARQUET文件中获取实时数据

tinybird可以从CSV,NDJSON和PARQUET文件中启用数据摄入,要么在您的计算机上或在云存储中远程存储(例如GCP或S3存储桶)中摄入。尽管存储在文件中的数据通常不是实时生成的,但作为维数数据,与通过流媒体平台摄入的数据连接在一起可能是有益的。 Tinybird的SQL连接范围广泛以使其成为可能。

您可以使用tinybird UI,使用CLI或使用数据源API从文件中获取实时数据。

这里有一些资源可以学习如何从本地或远程文件中摄入数据:

通过HTTP从应用程序中获取

也许将实时数据捕获到tinybird的最简单方法是使用事件API,这是一个简单的HTTP端点,可以将JSON记录的高频摄入摄入到Tinybird中。

因为它只是HTTP端点,因此您可以从任何应用程序代码中调用API。事件API最多可以以1000个请求和每秒20多个MB的速度处理摄入,这对于大多数流媒体用例都可以超级扩展。

查看下面的代码段,例如用您喜欢的语言使用。

卷曲

curl \
      -X POST 'https://api.tinybird.co/v0/events?name=events_example' \
      -H "Authorization: Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI" \
      -d $'{"timestamp":"2022-10-27T11:43:02.099Z","transaction_id":"8d1e1533-6071-4b10-9cda-b8429c1c7a67","name":"Bobby Drake","email":"bobby.drake@pressure.io","age":42,"passport_number":3847665,"flight_from":"Barcelona","flight_to":"London","extra_bags":1,"flight_class":"economy","priority_boarding":false,"meal_choice":"vegetarian","seat_number":"15D","airline":"Red Balloon"}
{"timestamp":"2022-05-11T20:58:45.112Z","transaction_id":"710d06d7-72c8-49bd-86ef-a510c331c3c4","name":"Kaylie Corkery","email":"kaylie.corkery@elevation.xyz","age":34,"passport_number":3510502,"flight_from":"Madrid","flight_to":"New York","extra_bags":2,"flight_class":"business","priority_boarding":true,"meal_choice":"gluten_free","seat_number":"4B","airline":"Green Bird"}'

python

import requests
import json
from datetime import datetime

data = json.dumps({
    'timestamp': '2022-10-27T11:43:02.099Z',
    'transaction_id': '8d1e1533-6071-4b10-9cda-b8429c1c7a67',
    'name': 'Bobby Drake',
    'email': 'bobby.drake@pressure.io',
    'age': 42,
    'passport_number': 3847665,
    'flight_from': 'Barcelona',
    'flight_to': 'London',
    'extra_bags': 1,
    'flight_class': 'economy',
    'priority_boarding': False,
    'meal_choice': 'vegetarian',
    'seat_number': '15D',
    'airline': 'Red Balloon'
})

r = requests.post('https://api.tinybird.co/v0/events', 
params = {
    'name': 'events_example',
    'token': 'p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI',
}, 
data=data)

print(r.status_code)
print(r.text)

javascript

fetch(
'https://api.tinybird.co/v0/events?name=events_example',
{
  method: 'POST',
  body: JSON.stringify({
  "timestamp": "2022-10-27T11:43:02.099Z",
  "transaction_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
  "name": "Bobby Drake",
  "email": "bobby.drake@pressure.io",
  "age": 42,
  "passport_number": 3847665,
  "flight_from": "Barcelona",
  "flight_to": "London",
  "extra_bags": 1,
  "flight_class": "economy",
  "priority_boarding": false,
  "meal_choice": "vegetarian",
  "seat_number": "15D",
  "airline": "Red Balloon"
}),
  headers: { Authorization: 'Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI' }
}
)
.then(res => res.json())
.then(data => console.log(data))


package main

import (
    "bytes"
    "fmt"
    "io/ioutil"
    "net/http"
    "time"
)

func main() {
    url := "https://api.tinybird.co/v0/events?name=events_example"
    fmt.Println("URL:>", url)

    var jsonStr = []byte(`{"timestamp":"2022-10-27T11:43:02.099Z","transaction_id":"8d1e1533-6071-4b10-9cda-b8429c1c7a67","name":"Bobby Drake","email":"bobby.drake@pressure.io","age":42,"passport_number":3847665,"flight_from":"Barcelona","flight_to":"London","extra_bags":1,"flight_class":"economy","priority_boarding":false,"meal_choice":"vegetarian","seat_number":"15D","airline":"Red Balloon"}`)
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
    req.Header.Set("Authorization", "Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI")
    req.Header.Set("Content-Type", "application/json")

    client := &http.Client{Timeout: time.Second * 10}
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()

    body, _ := ioutil.ReadAll(resp.Body)
    fmt.Println(string(body))
}

Rust

use reqwest::blocking::Client;
use serde_json::json;

fn main() {
    let client = Client::new();
    let res = client.post("https://api.tinybird.co/v0/events?name=events_example")
        .header("Authorization", "Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI")
        .json(&json!({
  "timestamp": "2022-10-27T11:43:02.099Z",
  "transaction_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
  "name": "Bobby Drake",
  "email": "bobby.drake@pressure.io",
  "age": 42,
  "passport_number": 3847665,
  "flight_from": "Barcelona",
  "flight_to": "London",
  "extra_bags": 1,
  "flight_class": "economy",
  "priority_boarding": false,
  "meal_choice": "vegetarian",
  "seat_number": "15D",
  "airline": "Red Balloon"
}))
        .send()
        .unwrap();

    let body = res.text().unwrap();
    println!("{}", body);
}

php

<?php
$curl = curl_init();

curl_setopt_array($curl, array(
  CURLOPT_URL => "https://api.tinybird.co/v0/events?name=events_example",
  CURLOPT_RETURNTRANSFER => true,
  CURLOPT_ENCODING => "",
  CURLOPT_MAXREDIRS => 10,
  CURLOPT_TIMEOUT => 30,
  CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
  CURLOPT_CUSTOMREQUEST => "POST",
  CURLOPT_POSTFIELDS => "{\"timestamp\":\"2022-10-27T11:43:02.099Z\",\"transaction_id\":\"8d1e1533-6071-4b10-9cda-b8429c1c7a67\",\"name\":\"Bobby Drake\",\"email\":\"bobby.drake@pressure.io\",\"age\":42,\"passport_number\":3847665,\"flight_from\":\"Barcelona\",\"flight_to\":\"London\",\"extra_bags\":1,\"flight_class\":\"economy\",\"priority_boarding\":false,\"meal_choice\":\"vegetarian\",\"seat_number\":\"15D\",\"airline\":\"Red Balloon\"}",
  CURLOPT_HTTPHEADER => array(
    "Authorization: Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI",
    "Content-Type: application/json"
  ),
));

$response = curl_exec($curl);
$err = curl_error($curl);

curl_close($curl);

if ($err) {
  echo "cURL Error #:" . $err;
} else {
  echo $response;
}
?>

红宝石

require 'net/http'
require 'json'

uri = URI('https://api.tinybird.co/v0/events?name=events_example')
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true

request = Net::HTTP::Post.new(uri.request_uri)
request['Authorization'] = 'Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI'
request.body = JSON.dump({
  "timestamp": "2022-10-27T11:43:02.099Z",
  "transaction_id": "8d1e1533-6071-4b10-9cda-b8429c1c7a67",
  "name": "Bobby Drake",
  "email": "bobby.drake@pressure.io",
  "age": 42,
  "passport_number": 3847665,
  "flight_from": "Barcelona",
  "flight_to": "London",
  "extra_bags": 1,
  "flight_class": "economy",
  "priority_boarding": false,
  "meal_choice": "vegetarian",
  "seat_number": "15D",
  "airline": "Red Balloon"
})

response = http.request(request)
puts response.body

java

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

public class Main {
  public static void main(String[] args) throws IOException {
    String url = "https://api.tinybird.co/v0/events?name=events_example";
    String auth = "Bearer p.eyJ1IjogImE5Yzk1YTA4LTkwZmQtNDRiMi05NDFkLWJlNWQwZTViODVkOCIsICJpZCI6ICJlNjZkZjA0Yi1hZmI1LTRlMDctOWE0ZC01NzNkMDc2NzZmZmQifQ.IZlFNiZSgJgNTGs7wJSIgly2PHNDvn6Hohg1oZUDOiI";
    String data = "{\"timestamp\":\"2022-10-27T11:43:02.099Z\",\"transaction_id\":\"8d1e1533-6071-4b10-9cda-b8429c1c7a67\",\"name\":\"Bobby Drake\",\"email\":\"bobby.drake@pressure.io\",\"age\":42,\"passport_number\":3847665,\"flight_from\":\"Barcelona\",\"flight_to\":\"London\",\"extra_bags\":1,\"flight_class\":\"economy\",\"priority_boarding\":false,\"meal_choice\":\"vegetarian\",\"seat_number\":\"15D\",\"airline\":\"Red Balloon\"}";

    HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection();
    con.setRequestMethod("POST");
    con.setRequestProperty("Authorization", auth);
    con.setRequestProperty("Content-Type", "application/json");
    con.setDoOutput(true);
    con.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));

    System.out.println(new String(con.getInputStream().readAllBytes(), StandardCharsets.UTF_8));

    con.disconnect();
  }
}

有关使用事件API在应用程序数据上构建实时分析的更多信息,请查看以下资源:

使用SQL查询和形成实时数据

tinybird提供了一个令人愉悦的界面,用于使用您认识和爱的SQL构建实时分析指标。

使用Tinybird Pipes,您可以将更复杂的查询切成链式的SQL链接节点。这简化了开发流程,并可以轻松识别阻碍性能或增加潜伏期的查询。

tinybird管道还包括一种可靠的模板语言,可以将查询逻辑扩展到SQL之外,并从查询中发布动态,参数化的端点。

以下是一些SQL查询的示例代码段

Web Analytics

SELECT
  session_id,
  location,
  device,
  browser,
  min(timestamp) AS session_start,
  max(timestamp) AS session_end,
  domain(argMin(referrer, timestamp)) AS session_referrer,
  argMin(href, timestamp) AS entry_page,
  argMax(href, timestamp) AS exit_page,
  count(path(pathname)) AS total_pageviews,
  uniq(path(pathname)) AS unique_pageviews,
  groupArray(path(pathname)) AS urls,
  groupArray(action) AS actions,
  count(action) > 1 ? 0 : 1 AS bounce
FROM analytics_events
WHERE session_start BETWEEN {{Date(start_date)}} AND {{Date(end_date)}}
GROUP BY session_id, location, device, browser
ORDER BY session_start desc

软件日志分析

SELECT
  function_name, 
  count(total) AS total
FROM errors_per_function_per_hour
WHERE 1
{% if defined(date_from) and defined(date_to) %}
  AND hour BETWEEN {{Date(start_date)}} AND {{Date(end_date)}}
{% else %}
  {{ error("Must supply date_from & date_to params") }}
{% end %}
GROUP BY function_name

基于用法的定价

SELECT
   sum(usage) AS total_usage,
   sum(usage) * usage_price AS total_bill
FROM usage_events
JOIN pricing USING(event_type_id)
WHERE user = {{String(uuid)}}
AND toStartOfMonth(timestamp) = {{Date(billing_month)}}

个性化电子商务网站

WITH (
  SELECT
    product_id,
    count(views) AS views
  FROM sessions
  WHERE visitor_id = {{String(visitor_uuid)}}
) AS product_views
SELECT
  offer_expiration,  
  offer_code
FROM offers
INNER JOIN product_views USING(product_id)
ORDER by offer_expiration_date DESC
LIMIT 1

有关使用Tinybird Pipes构建实时分析指标的更多信息,请查看以下资源:

发布实时分析API

tinybird在其出版层中闪耀。尽管其他实时分析平台或技术仍然要求您建立自定义后端以支持用户面向用户的应用程序,但Tinybird通过SQL查询的即时REST API出版物大大简化了应用程序开发。

从Tinybird管道中发布的每个API都包括自动生成的,与OpenAPI兼容的文档,通过Auth Doken Management进行的安全性以及内置可观察性仪表板和API,以监视端点性能和用法。

此外,可以使用简单的模板语言对Tinybird Apis进行参数化。通过使用SQL查询中的模板语言,您可以为动态API端点构建强大的逻辑。

要了解有关如何使用Tinybird构建实时分析API的更多信息,请查看以下资源:

准备体验行业领先的实时分析平台吗? Try Tinybird today,免费。开始构建计划 - 对于大多数简单项目而言,这已经足够了,并且没有时间限制 - 并且随着扩展而升级。