抽象的
现在可以使用Apache Beam SingleStoreDB I/O connector,在这篇简短的文章中,我们将查看如何连接到SinglestoredB从表中读取并使用Java写入表。
介绍
Previous articles显示了单骨架用例和与各种连接器的集成的示例。连接器列表的新添加是Apache Beam SinglestoredB I/O连接器。这篇简短的文章将使用Java使用基本的读写操作测试连接器。
创建一个SinglestoredB云帐户
previous article显示了创建免费的SinglestoredB云帐户所需的步骤。我们将使用 beam group 作为我们的工作空间组名称, beam-demo 作为我们的工作空间名称。
我们在以下步骤中创建了数据库后,我们将记下我们的 password 和主机 name。
。创建数据库和表格
在我们的singlestoredB云帐户中,我们将使用SQL编辑器创建一个新数据库,如下:
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
我们还将创建两个表,这些表从上一个AdTech example派生,如下:
CREATE TABLE campaigns_read (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
CREATE TABLE campaigns_write (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
我们将填充campaigns_read
表,如下:
INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
完整的SQL代码在附录A。
中列出创建一个Maven项目
要快速测试,我们将使用maven
并从命令行构建和运行我们的代码。
所有项目代码文件均在附录b。
中列出pom.xml
pom.xml
文件非常简单,带有Java版本的详细信息,这是三个主要依赖项,我们想构建具有所有依赖项的单个JAR文件。
S2 Rreadtable类
我们的Java代码将提供连接详细信息,并将campaigns_read
表中的数据读取为键值。我们将把参数传递给我们的SQL查询以使其更有趣。
Pipeline pipeline = Pipeline.create();
PCollection<KV<Integer,String>> data = pipeline.apply(SingleStoreIO.<KV<Integer,String>> read()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 7);
}
})
.withRowMapper(new RowMapper<KV<Integer,String>> () {
public KV<Integer,String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
data
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<Integer,String> kv) -> kv.getKey() + "," + kv.getValue()))
.apply(
TextIO.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
);
数据在我们的PCollection
中,我们将将键值转换为字符串格式并将数据写入文件中。我们将用要将文件写入的实际路径替换为/path/to/
。
S2Writetable类
我们还可以将数据写入单骨架表。我们的Java代码将提供连接详细信息,并以键值读取数据并将其写入campaigns_write
表。
Pipeline pipeline = Pipeline.create();
PCollection<String> lines = pipeline.apply(
TextIO.read().from("/path/to/s2-00000-of-00001.csv"));
PCollection<KV<Integer,String>> keyValues = lines.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via((String line) -> {
String[] fields = line.split(",");
return KV.of(Integer.parseInt(fields[0]), fields[1]);
})
);
keyValues.apply(SingleStoreIO.<KV<Integer,String>> write()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withTable("campaigns_write")
.withUserDataMapper(new UserDataMapper<KV<Integer,String>> () {
public List<String> mapRow(KV<Integer,String> element) {
List<String> result = new ArrayList<> ();
result.add(element.getKey().toString());
result.add(element.getValue());
return result;
}
})
);
我们将用要从中读取文件的实际路径替换/path/to/
。我们将使用S2ReadTable
创建的相同文件。
构建并运行代码
仅出于测试目的,我们将声明两个环境变量:
export S2_HOST="<host>"
export S2_PASSWORD="<password>"
我们将用我们的singlestoredB云帐户中的值替换<host>
和<password>
。
接下来,我们将构建代码,如下:
mvn clean compile assembly:single
首先,我们将运行S2ReadTable
Java代码,如下:
java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2ReadTable
CSV文件应写入我们指定的位置。如果我们查看内容,我们应该看到与以下内容相似的结果:
9,warmth
11,virtual city
8,ultra light
13,dream burger
10,run healthy
12,online lifestyle
14,super bowl tweet
第二,我们将运行S2WriteTable
Java代码如下:
java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2WriteTable
如果我们切换到singlestoredB云中的SQL编辑器,我们可以检查campaigns_write
表的内容:
SELECT * FROM campaigns_write;
输出应类似于以下内容:
+-------------+------------------+
| campaign_id | campaign_name |
+-------------+------------------+
| 12 | online lifestyle |
| 10 | run healthy |
| 13 | dream burger |
| 9 | warmth |
| 8 | ultra light |
| 14 | super bowl tweet |
| 11 | virtual city |
+-------------+------------------+
概括
这篇简短的文章显示了如何使用Apache Beam从中读写和写入单骨架的示例。更多信息可在documentation中获得。
附录a sql代码
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
CREATE TABLE campaigns_read (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
CREATE TABLE campaigns_write (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
SELECT * FROM campaigns_write;
附录Bâjava项目代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.s2</groupId>
<artifactId>s2-app</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.44.0</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
<dependency>
<groupId>com.singlestore</groupId>
<artifactId>singlestore-jdbc-client</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-singlestore</artifactId>
<version>2.44.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>fully.qualified.MainClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
s2readtable.java
package com.s2.beam;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.StatementPreparator;
public class S2ReadTable {
public static void main(String[] args) {
String s2_host = System.getenv("S2_HOST");
String s2_password = System.getenv("S2_PASSWORD");
Pipeline pipeline = Pipeline.create();
PCollection<KV<Integer,String>> data = pipeline.apply(SingleStoreIO.<KV<Integer,String>> read()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 7);
}
})
.withRowMapper(new RowMapper<KV<Integer,String>> () {
public KV<Integer,String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
data
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<Integer,String> kv) -> kv.getKey() + "," + kv.getValue()))
.apply(
TextIO.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
);
pipeline.run().waitUntilFinish();
}
}
stwritetable.java
package com.s2.beam;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;
import java.util.ArrayList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.UserDataMapper;
public class S2WriteTable {
public static void main(String[] args) {
String s2_host = System.getenv("S2_HOST");
String s2_password = System.getenv("S2_PASSWORD");
Pipeline pipeline = Pipeline.create();
PCollection<String> lines = pipeline.apply(
TextIO.read().from("/path/to/s2-00000-of-00001.csv"));
PCollection<KV<Integer,String>> keyValues = lines.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via((String line) -> {
String[] fields = line.split(",");
return KV.of(Integer.parseInt(fields[0]), fields[1]);
})
);
keyValues.apply(SingleStoreIO.<KV<Integer,String>> write()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withTable("campaigns_write")
.withUserDataMapper(new UserDataMapper<KV<Integer,String>> () {
public List<String> mapRow(KV<Integer,String> element) {
List<String> result = new ArrayList<> ();
result.add(element.getKey().toString());
result.add(element.getValue());
return result;
}
})
);
pipeline.run().waitUntilFinish();
}
}