上周,我们介绍了how to build a PyFlink experiment environment,今天我们将使用该实验环境来探索Pyflink的可能性。
pyflink是一个通用流框架,摘要将流处理分为四个级别。
- sql
- Table API
- datastream
- 状态流处理
越接近底部的灵活性越多,也需要编写更多代码。我希望能够用Pyflink完成几乎所有的事情,所以让我们从数据流的角度开始使用Pyflink开发的基本概念。
本文将通过简单的描述和示例介绍PYFLINK开发的一些关键点,但不会提及Flink的实现细节。
datastream概念
DataStream的开发将遵循以下过程。
基本上,我们从源中获取流数据,对其进行处理并将其输出到某个地方。
这在pyflink中表示如下。
ds = env.add_source(kafka_consumer)
ds = ds.map(transform, output_type=output_type_info)
ds.add_sink(kafka_producer)
源和水槽很容易理解,但是关键是可以使用哪些处理?
在官方文件中,有所有可用操作的列表。
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/
在上面的示例中,我们使用了map
。
也有一个flink伪像的操作的示例,该代码位于./examples/python/datastream/basic_operations.py
ã
map
和flat_map
有什么区别?
操作列表中有两个类似的操作,map
和flat_map
,这两个操作之间有什么区别?
区别在于生成的输出数量。
在map
的情况下,输入事件会生成一个且仅次于一个输出事件;另一方面,flat_map
可以生成零对许多输出事件。
让我们以实际代码为例。
def map_transform(i: int):
return i * i
def flat_map_transform(i: int):
for idx in range(i):
yield idx
ds.map(map_transform, output_type=Types.INT())
ds.flat_map(flat_map_transform, output_type=Types.INT())
在此示例中,map
正式将所有输入整数并传递给它们,一个输入对应于一个输出。但是,flat_map
输出一系列事件,输出事件的数量由输入事件确定。
如果输入为0
,则不会触发flat_map
的yield
,也不会生成任何内容。
状态
状态是Flink的最佳功能。
尽管我们有各种可用的操作,但其中许多实际上是根据先前事件产生的结果。我们如何保留以前的事件?这是State
进来的地方。
状态可以视为内部存储以持续数据,状态的大小是每个节点内存的摘要。
尽管如此,状态可以持续在像RocksDB
这样的耐用存储中以获得更多的可扩展性。
from pyflink.datastream import StreamExecutionEnvironment, EmbeddedRocksDBStateBackend
env = StreamExecutionEnvironment.get_execution_environment()
env.set_state_backend(EmbeddedRocksDBStateBackend())
要在Flink框架中使用状态,有两个值得注意的关键点。
- 状态只能在“键入数据流”中使用。
- 状态基于操作,无法与他人分享。
所有可用状态和参考都在下面列出。
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/state/
实际上,./examples/python/datastream/state_access.py
上的一个例子也提供了一个很好的演示。
连接(共享状态)
如前一节所述,状态是基于操作的,不能共享,但是有时我们确实需要结合两个不同的流态,所以我们该怎么办?
幸运的是,Flink提供connect
,使我们能够在同一工作中共享不同流的状态。
通过使用connect
,我们可以组合不同的流并使用相同的操作,以便我们可以共享相同的操作状态。
要变得更具体,让我提供一个实用的例子。有两个流。
- 流1提供项目标识符和项目名称之间的映射。当项目名称更改时,事件
(item_id, item_name)
被发送到流中,因此我们只需要保存最新的状态。 - 流2是交易历史记录,包括出售了哪些商品和订购的物品数量。
我们要做的是,当输入任何购买时,我们必须总结并附加最新的项目名称。
这是经典的流浓缩模式,我在my previous article中详细解释了富集设计模式。
这是完整的程序示例。
import logging, sys
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, MapFunction, CoMapFunction, CoFlatMapFunction
from pyflink.datastream.state import MapStateDescriptor, ValueStateDescriptor
from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
class SellTotal(CoFlatMapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = MapStateDescriptor('map', Types.LONG(), Types.STRING())
self.state = runtime_context.get_map_state(state_desc)
cnt_desc = ValueStateDescriptor('cnt', Types.LONG())
self.cnt_state = runtime_context.get_state(cnt_desc)
# (id, name)
def flat_map1(self, value):
self.state.put(value[0], value[1])
#return Row(value[0], f"update {value[1]}", 0)
# (id, cnt)
def flat_map2(self, value):
cnt = self.cnt_state.value() or 0
total = cnt + value[1]
self.cnt_state.update(total)
if not self.state.contains(value[0]):
name = "NONAME"
else:
name = self.state.get(value[0])
#return Row(value[0], name, total)
yield Row(value[0], name, total)
def sell_total_demo(env):
type_info1 = Types.ROW([Types.LONG(), Types.STRING()])
ds1 = env.from_collection(
[(1, 'apple'), (2, 'banana'), (3, 'cherry'), (4, 'durian'), (6, 'fig'), (7, 'grape')],
type_info=type_info1)
type_info2 = Types.ROW([Types.LONG(), Types.LONG()])
ds2 = env.from_collection(
[(1, 1), (2, 3), (3, 5), (1, 5), (5, 100), (6, 66), (1, 10)],
type_info=type_info2)
output_type_info = Types.ROW([Types.LONG(), Types.STRING(), Types.LONG()])
serialization_schema = JsonRowSerializationSchema.Builder() \
.with_type_info(output_type_info) \
.build()
kafka_producer = FlinkKafkaProducer(
topic='TempResults',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'kafka:9092', 'group.id': 'test_group'}
)
connected_ds = ds1.connect(ds2)
connected_ds.key_by(lambda a: a[0], lambda a: a[0]).flat_map(SellTotal(), output_type_info).add_sink(kafka_producer)
env.execute()
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:////home/ec2-user/flink-1.15.2/opt/flink-sql-connector-kafka-1.15.2.jar")
print("connected demo gogo")
sell_total_demo(env)
在flat_map1
中,流1处理了,也就是说,要维护项目编号和项目名称的映射,因此此流不需要生成输出事件。
整个应用程序的核心位于flat_map2
中。我们从self.cnt_state
获取累积数量,不仅添加新数量,而且还将其更新回该州。然后,在输出过程中,我们从self.state
中获取相应的名称,最后输出丰富的事件。
结论
在最后一个示例中,我们演示了流的操作,状态和合并。
从这个示例中,我们可以轻松理解Flink只要正确编写程序就可以做任何我们想要的事情。
我们将继续在流处理上进行一些实验,如果有任何内容,将继续发布任何进一步的更新。