快速提示:将Apache Beam与SinglestoredB一起使用
#java #singlestoredb #apachebeam

抽象的

现在可以使用Apache Beam SingleStoreDB I/O connector,在这篇简短的文章中,我们将查看如何连接到SinglestoredB从表中读取并使用Java写入表。

介绍

Previous articles显示了单骨架用例和与各种连接器的集成的示例。连接器列表的新添加是Apache Beam SinglestoredB I/O连接器。这篇简短的文章将使用Java使用基本的读写操作测试连接器。

创建一个SinglestoredB云帐户

previous article显示了创建免费的SinglestoredB云帐户所需的步骤。我们将使用 beam group 作为我们的工作空间组名称, beam-demo 作为我们的工作空间名称。

我们在以下步骤中创建了数据库后,我们将记下我们的 password 主机 name。

创建数据库和表格

在我们的singlestoredB云帐户中,我们将使用SQL编辑器创建一个新数据库,如下:

CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;

我们还将创建两个表,这些表从上一个AdTech example派生,如下:

CREATE TABLE campaigns_read (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

CREATE TABLE campaigns_write (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

我们将填充campaigns_read表,如下:

INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');

完整的SQL代码在附录A。

中列出

创建一个Maven项目

要快速测试,我们将使用maven并从命令行构建和运行我们的代码。

所有项目代码文件均在附录b。

中列出

pom.xml

pom.xml文件非常简单,带有Java版本的详细信息,这是三个主要依赖项,我们想构建具有所有依赖项的单个JAR文件。

S2 Rreadtable类

我们的Java代码将提供连接详细信息,并将campaigns_read表中的数据读取为键值。我们将把参数传递给我们的SQL查询以使其更有趣。

        Pipeline pipeline = Pipeline.create();

        PCollection<KV<Integer,String>> data = pipeline.apply(SingleStoreIO.<KV<Integer,String>> read()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
            .withStatementPreparator(new StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setInt(1, 7);
                }
            })
            .withRowMapper(new RowMapper<KV<Integer,String>> () {
                public KV<Integer,String> mapRow(ResultSet resultSet) throws Exception {
                    return KV.of(resultSet.getInt(1), resultSet.getString(2));
                }
            })
        );

        data
            .apply(MapElements
                .into(TypeDescriptors.strings())
                .via((KV<Integer,String> kv) -> kv.getKey() + "," + kv.getValue()))
            .apply(
             TextIO.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
            );

数据在我们的PCollection中,我们将将键值转换为字符串格式并将数据写入文件中。我们将用要将文件写入的实际路径替换为/path/to/

S2Writetable类

我们还可以将数据写入单骨架表。我们的Java代码将提供连接详细信息,并以键值读取数据并将其写入campaigns_write表。

        Pipeline pipeline = Pipeline.create();

        PCollection<String> lines = pipeline.apply(
            TextIO.read().from("/path/to/s2-00000-of-00001.csv"));

        PCollection<KV<Integer,String>> keyValues = lines.apply(
            MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
            .via((String line) -> {
                String[] fields = line.split(",");
                return KV.of(Integer.parseInt(fields[0]), fields[1]);
            })
        );

        keyValues.apply(SingleStoreIO.<KV<Integer,String>> write()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withTable("campaigns_write")
            .withUserDataMapper(new UserDataMapper<KV<Integer,String>> () {
                public List<String> mapRow(KV<Integer,String> element) {
                    List<String> result = new ArrayList<> ();
                    result.add(element.getKey().toString());
                    result.add(element.getValue());
                    return result;
                }
            })
        );

我们将用要从中读取文件的实际路径替换/path/to/。我们将使用S2ReadTable创建的相同文件。

构建并运行代码

仅出于测试目的,我们将声明两个环境变量:

export S2_HOST="<host>"
export S2_PASSWORD="<password>"

我们将用我们的singlestoredB云帐户中的值替换<host><password>

接下来,我们将构建代码,如下:

mvn clean compile assembly:single

首先,我们将运行S2ReadTable Java代码,如下:

java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2ReadTable

CSV文件应写入我们指定的位置。如果我们查看内容,我们应该看到与以下内容相似的结果:

9,warmth
11,virtual city
8,ultra light
13,dream burger
10,run healthy
12,online lifestyle
14,super bowl tweet

第二,我们将运行S2WriteTable Java代码如下:

java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2WriteTable

如果我们切换到singlestoredB云中的SQL编辑器,我们可以检查campaigns_write表的内容:

SELECT * FROM campaigns_write;

输出应类似于以下内容:

+-------------+------------------+
| campaign_id | campaign_name    |
+-------------+------------------+
|          12 | online lifestyle |
|          10 | run healthy      |
|          13 | dream burger     |
|           9 | warmth           |
|           8 | ultra light      |
|          14 | super bowl tweet |
|          11 | virtual city     |
+-------------+------------------+

概括

这篇简短的文章显示了如何使用Apache Beam从中读写和写入单骨架的示例。更多信息可在documentation中获得。

附录a sql代码

CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;

CREATE TABLE campaigns_read (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

CREATE TABLE campaigns_write (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');

SELECT * FROM campaigns_write;

附录Bâjava项目代码

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.s2</groupId>
  <artifactId>s2-app</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>2.44.0</version>
      <scope>runtime</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
    <dependency>
      <groupId>com.singlestore</groupId>
      <artifactId>singlestore-jdbc-client</artifactId>
      <version>1.1.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-singlestore</artifactId>
      <version>2.44.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

s2readtable.java

package com.s2.beam;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.StatementPreparator;

public class S2ReadTable {
    public static void main(String[] args) {

        String s2_host = System.getenv("S2_HOST");
        String s2_password = System.getenv("S2_PASSWORD");

        Pipeline pipeline = Pipeline.create();

        PCollection<KV<Integer,String>> data = pipeline.apply(SingleStoreIO.<KV<Integer,String>> read()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
            .withStatementPreparator(new StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setInt(1, 7);
                }
            })
            .withRowMapper(new RowMapper<KV<Integer,String>> () {
                public KV<Integer,String> mapRow(ResultSet resultSet) throws Exception {
                    return KV.of(resultSet.getInt(1), resultSet.getString(2));
                }
            })
        );

        data
            .apply(MapElements
                .into(TypeDescriptors.strings())
                .via((KV<Integer,String> kv) -> kv.getKey() + "," + kv.getValue()))
            .apply(
             TextIO.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
            );

        pipeline.run().waitUntilFinish();

    }
}

stwritetable.java

package com.s2.beam;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

import java.util.List;
import java.util.ArrayList;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.UserDataMapper;

public class S2WriteTable {
    public static void main(String[] args) {

        String s2_host = System.getenv("S2_HOST");
        String s2_password = System.getenv("S2_PASSWORD");

        Pipeline pipeline = Pipeline.create();

        PCollection<String> lines = pipeline.apply(
            TextIO.read().from("/path/to/s2-00000-of-00001.csv"));

        PCollection<KV<Integer,String>> keyValues = lines.apply(
            MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
            .via((String line) -> {
                String[] fields = line.split(",");
                return KV.of(Integer.parseInt(fields[0]), fields[1]);
            })
        );

        keyValues.apply(SingleStoreIO.<KV<Integer,String>> write()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withTable("campaigns_write")
            .withUserDataMapper(new UserDataMapper<KV<Integer,String>> () {
                public List<String> mapRow(KV<Integer,String> element) {
                    List<String> result = new ArrayList<> ();
                    result.add(element.getKey().toString());
                    result.add(element.getValue());
                    return result;
                }
            })
        );

        pipeline.run().waitUntilFinish();

    }
}