参考资料 – 第二章 Apache Flink快速入门 枫叶云笔记 (fynote.com)

Flink 批数据处理案例

Java 版本WordCount

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1.读取文件
DataSource<String> linesDS = env.readTextFile("./data/words.txt");

//2.切分单词
FlatMapOperator<String, String> wordsDS =
        linesDS.flatMap((String lines, Collector<String> collector) -> {
    String[] arr = lines.split(" ");
    for (String word : arr) {
        collector.collect(word);
    }
}).returns(Types.STRING);

//3.将单词转换成Tuple2 KV 类型
MapOperator<String, Tuple2<String, Long>> kvWordsDS =
        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));

//4.按照key 进行分组处理得到最后结果并打印
kvWordsDS.groupBy(0).sum(1).print();

Scala 版本WordCount

//1.准备环境,注意是Scala中对应的Flink环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.api.scala._

//3.读取数据文件
val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")

//4.进行 WordCount 统计并打印
linesDS.flatMap(line => {
  line.split(" ")
})
  .map((_, 1))
  .groupBy(0)
  .sum(1)
  .print()

Flink流式数据处理案例

Java 版本WordCount

//1.创建流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.读取文件数据
DataStreamSource<String> lines = env.readTextFile("./data/words.txt");

//3.切分单词,设置KV格式数据
SingleOutputStreamOperator<Tuple2<String, Long>> kvWordsDS =
        lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
    String[] words = line.split(" ");
    for (String word : words) {
        collector.collect(Tuple2.of(word, 1L));
    }
}).returns(Types.TUPLE(Types.STRING, Types.LONG));

//4.分组统计获取 WordCount 结果
kvWordsDS.keyBy(tp->tp.f0).sum(1).print();

//5.流式计算中需要最后执行execute方法
env.execute();

Scala 版本WordCount

//1.创建环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

//2.导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.streaming.api.scala._

//3.读取文件
val ds: DataStream[String] = env.readTextFile("./data/words.txt")

//4.进行wordCount统计
ds.flatMap(line=>{line.split(" ")})
  .map((_,1))
  .keyBy(_._1)
  .sum(1)
  .print()

//5.最后使用execute 方法触发执行
env.execute()

Flink批和流案例总结

1.Flink程序编写流程总结

a. 获取flink的执行环境,批和流不同,Execution Environment。

b. 加载数据数据-- soure。

c. 对加载的数据进行转换-- transformation。

d. 对结果进行保存或者打印-- sink。

e. 触发flink程序的执行 --env.execute()

2.关于Flink的批处理和流处理上下文环境

创建Flink批和流上下文环境有以下三种方式,批处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalEnvironment localEnv = ExecutionEnvironment.createLocalEnvironment(10);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
ExecutionEnvironment remoteEnv = ExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

流处理上下文创建环境如下:

//设置Flink运行环境,如果在本地启动则创建本地环境,如果是在集群中启动,则创建集群环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//指定并行度创建本地环境
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(5);

//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost", 6021, 5, "application.jar");

同样在Scala api 中批和流创建Flink 上下文环境也有以上三种方式,在实际开发中建议批处理使用”ExecutionEnvironment.getExecutionEnvironment()”方式创建。流处理使用”StreamExecutionEnvironment.getExecution-Environment()”方式创建

3.Flink批和流 Java 和 Scala导入包不同

//Flink Java api 引入的包
import org.apache.flink.api.java.ExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.api.scala.ExecutionEnvironment

流处理不同API引入StreamExecutionEnvironment如下:

//Flink Java api 引入的包
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//Flink Scala api 引入的包
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

Flink Scala api需要导入隐式转换

在Flink Scala api中批处理和流处理代码编写过程中需要导入对应的隐式转换来推断函数操作后的类型,在批和流中导入隐式转换不同,具体如下:

//Scala 批处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.api.scala._
//Scala 流处理导入隐式转换,使用Scala API 时需要隐式转换来推断函数操作后的类型
import org.apache.flink.streaming.api.scala._

DataStream BATCH模式

package com.mashibing.flinkscala.code.chapter2

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamBatchModeTest {
  def main(args: Array[String]): Unit = {
    //1.准备实时处理环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    //2.导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //3.读取文件数据
    val linesDS: DataStream[String] = env.readTextFile("./data/words.txt")

    //4.切分单词
    val wordsDS: DataStream[String] = linesDS.flatMap(line => {
      line.split(" ")
    })

    //5.计数
    wordsDS.map(word=>{(word,1)})
      .keyBy(0)
      .sum(1)
      .print()

    //6.触发执行
    env.execute()
  }

}

Stream API 中除了可以设置Batch批处理模式之外,还可以设置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式会根据数据是有界流/无界流自动决定采用BATCH/STREAMING模式来读取数据,设置方式如下:

//BATCH 设置批处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//AUTOMATIC 会根据有界流/无界流自动决定采用BATCH/STREAMING模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//STREAMING 设置流处理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代码中设置处理模式外,还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式,也可以在集群中提交Flink任务时指定execution.runtime-mode来指定,Flink官方建议在提交Flink任务时指定执行模式,这样减少了代码配置给Flink Application提供了更大的灵活性,提交任务指定参数如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注