Flink DataStream API编程

流处理基本概念

对于什么是流处理,从不同的角度有不同的定义。其实流处理与批处理这两个概念是对立统一的,它们的关系有点类似于对于 Java 中的 ArrayList 中的元素,是直接看作一个有限数据集并用下标去访问,还是用迭代器去访问。

流处理系统本身有很多自己的特点。一般来说,由于需要支持无限数据集的处理,流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子,然后等到数据到达后对数据进行处理。为了表达复杂的计算逻辑,包括 Flink 在内的分布式流处理引擎一般采用 DAG 图来表示整个计算逻辑,其中 DAG 图中的每一个点就代表一个基本的逻辑单元,也就是前面说的算子。由于计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理,最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中

图2. 一个 DAG 计算逻辑图与实际的物理时模型。
逻辑图中的每个算子在物理图中可能有多个并发。

对于实际的分布式流处理引擎,它们的实际运行时物理模型要更复杂一些,这是由于每个算子都可能有多个实例。如图 2 所示,作为 Source 的 A 算子有两个实例,中间算子 C 也有两个实例。在逻辑模型中,A 和 B 是 C 的上游节点,而在对应的物理逻辑中,C 的所有实例和 A、B 的所有实例之间可能都存在数据交换。在物理模型中,我们会根据计算逻辑,采用系统自动优化或人为指定的方式将计算工作分布到不同的实例中。只有当算子实例分布到不同进程上时,才需要通过网络进行数据传输,而同一进程中的多个实例之间的数据传输通常是不需要通过网络的。

DataStream编程概述

Flink中的DataStream程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* @Author:落墨
* @CreateTime:2020/6/16
* @Description:JavaDataStreamSourceApp
*/
public class JavaDataStreamSourceApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//socketFunction(env);
DataStreamSource<String>data = env.socketTextStream("local",9999);
DataStream<word> result= data.flatMap(new FlatMapFunction<String, word>() {
public void flatMap(String s, Collector<word> collector) throws Exception {
for(String value:s.split(",")) {
collector.collect(new word(value,1L));
}
}
}).keyBy("name").timeWindow(Time.seconds(5),Time.seconds(1)).reduce(new ReduceFunction<word>() {
public word reduce(word word, word t1) throws Exception {
return new word(word.name,word.count+t1.count);
}
});
result.print();
env.execute("JavaDataStreamSourceApp");
}
public static class word{
private String name;
private Long count;
public String getName() {
return name;
}
public word(){} //需要不包含参数的空参构造器
public word(String name, Long count) {
this.name = name;
this.count = count;
}
public void setName(String name) {
this.name = name;
}
public Long getCount() {
return count;
}
@Override
public String toString() {
return "word{" +
"name='" + name + '\'' +
", count=" + count +
'}';
}
public void setCount(Long count) {
this.count = count;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @Author:落墨
* @CreateTime:2020/6/16
* @Description:DataStreamSourceApp
*/
object Wordcount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.socketTextStream("localhost",9999)
import org.apache.flink.api.scala._
data.flatMap(_.split(",")).map(x =>(x,1)).keyBy(0)
.timeWindow(Time.seconds(5),Time.seconds(1))
.sum(1)
.print()
env.execute("WordCount")
}
}

为了实现流式 Word Count,我们首先要先获得一个 StreamExecutionEnvironment 对象。它是我们构建图过程中的上下文对象。基于这个对象,我们可以添加一些算子。对于流处理程度,我们一般需要首先创建一个数据源去接入数据。在这个例子中,我们使用了 Environment 对象中内置的读取文件的数据源。这一步之后,我们拿到的是一个 DataStream 对象,它可以看作一个无限的数据集,可以在该集合上进行一序列的操作。例如,在 Word Count 例子中,我们首先将每一条记录(即文件中的一行)分隔为单词,这是通过 FlatMap 操作来实现的。调用 FlatMap 将会在底层的 DAG 图中添加一个 FlatMap 算子。然后,我们得到了一个记录是单词的流。我们将流中的单词进行分组(keyBy),然后累积计算每一个单词的数据(sum(1))。计算出的单词的数据组成了一个新的流,我们将它写入到输出文件中。

最后,我们需要调用 env.execute 方法来开始程序的执行。需要强调的是,前面我们调用的所有方法,都不是在实际处理数据,而是在构通表达计算逻辑的 DAG 图。只有当我们将整个图构建完成并显式的调用 Execute 方法后,框架才会把计算图提供到集群中,接入数据并执行实际的逻辑。

基于流式 Word Count 的例子可以看出,基于 Flink 的 DataStream API 来编写流处理程序一般需要三步:通过 Source 接入数据、进行一系统列的处理以及将数据写出。最后,不要忘记显式调用 Execute 方式,否则前面编写的逻辑并不会真正执行。

Flink中使用数据源

source是程序从中读取其输入的位置。我们可以使用StreamExecutionEnvironment.addSource(sourceFunction)将一个source附加到程序中。Flink提供了许多预先实现的sourceFunction,但是我们可以通过实现SourceFunction 非并行源,实现ParallelSourceFunction接口或扩展RichParallelSourceFunction 并行源来编写自己的自定义源。

基于文件:

  • readTextFile(path)- TextInputFormat逐行读取文本文件,即符合规范的文件,并将其作为字符串返回。

  • readFile(fileInputFormat, path) -根据指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)-这是前两个内部调用的方法。它path根据给定的读取文件fileInputFormat。根据提供的内容watchType,此源可以定期(每intervalms)监视路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步从文件中排除文件。

基于套接字:

  • socketTextStream-从套接字读取。元素可以由定界符分隔。

基于集合:

  • fromCollection(Collection)-从Java Java.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。

  • fromCollection(Iterator, Class)-从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。

  • fromElements(T …)-从给定的对象序列创建数据流。所有对象必须具有相同的类型。

  • fromParallelCollection(SplittableIterator, Class)-从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。

  • generateSequence(from, to) -并行生成给定间隔中的数字序列。

基于连接器:

  • addSource-附加新的源功能。例如,要阅读Apache Kafka,可以使用 addSource(new FlinkKafkaConsumer08<>(…))。
1
2
3
implementing the SourceFunction for non-parallel sources
implementing the ParallelSourceFunction interface
extending the RichParallelSourceFunction for parallel sources.

自定义 SourceFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package DataStream

import org.apache.flink.streaming.api.functions.source.SourceFunction

/**
* @Author:落墨
* @CreateTime:2020/6/24
* @Description:SourceFunction
*/
class CustomNonParallelSourceFunction extends SourceFunction[Long]{
var count = 1L
var isRunning = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning){
sourceContext.collect(count)
count += 1
Thread.sleep(1000)

}
}
override def cancel(): Unit = {
isRunning = false
}
}

自定义ParallelSourceFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package DataStream

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

/**
* @Author:落墨
* @CreateTime:2020/6/24
* @Description:ParallelSourceFunction
*/
class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{
var isRunning = true
var count = 1L
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning){
sourceContext.collect(count)
count += 1

Thread.sleep(10000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}

自定义 RichParallelSourceFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package DataStream

import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

/**
* @Author:落墨
* @CreateTime:2020/6/24
* @Description:RichParallelSourceFunction
*/
class CustomRichParallelSourceFunction extends
RichParallelSourceFunction[Long] {
var isRunning = true
var count = 1L
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while(isRunning){
sourceContext.collect(count)
count += 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}

使用自定义数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package DataStream

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
/**
* @Author:落墨
* @CreateTime:2020/6/24
* @Description:使用自定义数据源
*/
object DataStreamSourceApp {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//socketFunction(env)
//nonParallelSourceFunction(env)
//ParallelSourceFunction(env)
RichParallelSourceFunction(env)
env.execute("DataStreamSourceApp")
}
def socketFunction(environment: StreamExecutionEnvironment): Unit ={
val data = environment.socketTextStream("localhost",9999)
data.print()
}
def nonParallelSourceFunction(environment: StreamExecutionEnvironment): Unit ={
val data = environment.addSource(new CustomNonParallelSourceFunction).setParallelism(1)//因为实现的不是并行的SourceFunction,所以参数不能设置为2
data.print()
}
def ParallelSourceFunction(environment: StreamExecutionEnvironment): Unit ={
val data = environment.addSource(new CustomParallelSourceFunction).setParallelism(1)//因为实现的是并行的sourceFunction所以可以设置大于1的并行参数
data.print()
}
def RichParallelSourceFunction(environment: StreamExecutionEnvironment): Unit ={
val data = environment.addSource(new CustomRichParallelSourceFunction).setParallelism(1)//因为实现的是并行的sourceFunction所以可以设置大于1的并行参数
data.print()
}
}
文章目录
  1. 1. 流处理基本概念
  2. 2. DataStream编程概述
    1. 2.1. 示例
    2. 2.2. Flink中使用数据源
      1. 2.2.1. 基于文件:
      2. 2.2.2. 基于套接字:
      3. 2.2.3. 基于集合:
      4. 2.2.4. 基于连接器:
      5. 2.2.5. 自定义 SourceFunction
      6. 2.2.6. 自定义ParallelSourceFunction
      7. 2.2.7. 自定义 RichParallelSourceFunction
      8. 2.2.8. 使用自定义数据源
,