贝博官方bb艾弗森

企业文化

汇聚最新资讯 / 产品信息

用最专业的眼光看待互联网

立即咨询

首页 > 新闻
2 Flink 快速入门
发布时间:2025-03-28


工程的创建

创建一个普通的 Maven 工程。

引入 Flink 的核心依赖,主要是 flink-streaming-java、flink-clients。

... 1.17.0 ... org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} ...

以批的形式处理有界数据

创建 input 文件夹,在其中创建一个 word.txt 文件,在其中输入:

Hello World Hello Java Hello Flink Flink is Better than Spark

源代码如下:

src/main/java/com/wuji1626/wc/Flink01_Bound_Batch.java · wuji1626/bigdata-sample - Gitee.com

  1. 环境准备

创建 ExecutionEnvironment 执行环境的实例。ExecutionEnvironment 是 org.apache.flink.api.java 下的类,也就是说是 DataSet API。

  1. 利用环境变量读取有界数据文件

执行环境有许多读取文件的方法,本实例是读取有界文本文件,因此选择 readTextFile() 方法,文件路径选择绝对路径。通过 readTextFile() 逐行读取文件内容,转换成一个类似字符串数组的数据源 DataSource 的模版实例。

DataSource ds = env.readTextFile("..\\Java\\bigdata-sample\\input\\word.txt");

  1. 对读取的数据进行扁平化处理

将 DataSource 实例进行打平操作,将单行的文本数据转换成二元组 Tuple 一个单词+出现次数的二元组。

FlatMapOperator<String, Tuple2> flatMapDS = ds.flatMap( new FlatMapFunction<String, Tuple2> () { @Override public void flatMap(String lineString, Collector<Tuple2> out) throws Exception { String[] wordArr = lineString.split(" "); for (String word : wordArr) { // 将封装好的二元组对象发送到下游 out.collect(Tuple2.of(word, 1L)); } } } );

打平操作需要使用到 DataSource 的 flatMap() 方法,该方法需要定义一个 FlatMapFunction() 对象,需要提供一个 FlatMapFunction 模版对象,这个对象包括两个参数,一个是原始数据类型,一个打平后的数据类型。这里原始数据时一行字符串,而转换后的时候一个二元组,前面元素是单词(字符串类型),后面元素是长整型的单词出现次数。
FlatMapFunctionMapFunction 需要实现一个flatMap() 方法。最终输出一个二元组的集合。

  1. 按照单词进行分组

利用打平后的 FlatMapOperator 对象(该对象是 DataSet API 中的对象,由其属于
org.apache.flink.api.java.xxx 包可知,本质上是一个 DataSet 对象)的 groupBy() 对数据进行分组。由于分组对象是一个 DataSet>,因此需要指定分组的对象,这里是以字符串类型进行分组,因此只需指定元素的位置,即 0。

UnsortedGrouping<Tuple2>groupByDS = flatMapDS.groupBy(0);

  1. 聚合计算

UnsortedGrouping 本质上还是 DataSet 对象,有如 sum() 这类聚合函数。此处需要对元组的 Long 类型要素进行分组,因此位置选择 1。

AggregateOperator<Tuple2> sumDS = groupByDS.sum(1);

  1. 打印聚合结果

AggregateOperator 类型的对象本质上也是 DataSet 对象,使用 print() 方法打印时,就是一个遍历循环。由于该操作会抛出一个 RuntimeException 需要进行处理。

try { sumDS.print(); } catch (Exception e) { throw new RuntimeException(e); }

  1. 运行结果


以流的形式处理有界数据

src/main/java/com/wuji1626/wc/Flink02_Bound_Stream.java · wuji1626/bigdata-sample - Gitee.com

复用之前创建的 input 文件夹。

以流的方式处理数据,使用的 Flink Streaming API(
org.apache.flink.streaming.api.datastream.xxx),与批处理使用的 DataSet API 不同。

  1. 环境准备

与批处理不同,使用
StreamExecutionEnvironment 运行环境实例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  1. 从指定的文件读取数据


StreamExecutionEnvironment 实例同样具有 readTextFile() 不过该方法已经标识废止,为与批处理代码一致,暂时继续使用。

DataStreamSource ds = env.readTextFile("...\\Java\\bigdata-sample\\input\\word.txt");

  1. 对读取的数据进行扁平化处理

DataStreamSource 对象也有同样的打平处理的方法 flatMap() 处理方式也与批处理几乎相同。

bb贝博官网

SingleOutputStreamOperator<Tuple2> flatMapDS = ds.flatMap( new FlatMapFunction<String, Tuple2>() { @Override public void flatMap(String lineString, Collector<Tuple2> out) throws Exception { String[] wordArr = lineString.split(" "); for (String word : wordArr) { out.collect(Tuple2.of(word, 1L)); } } } );

  1. 按照单词进行分组


SingleOutputStreamOperator 本质上是 DataStream 对象,也有分组方法不过与 DataSet 的方法不同,应使用 keyBy() ,也是使用元组第 1 个元素为关键字进行分组。

KeyedStream<Tuple2, Tuple> keyedDS = flatMapDS.keyBy(0);

  1. 聚合计算

KeyedStream 类型仍然是 DataStream 对象,该对象仍然有聚合函数,使用 sum() 进行聚合计算。

SingleOutputStreamOperator<Tuple2> sumDS = keyedDS.sum(1);

  1. 打印聚合结果


SingleOutputStreamOperator 仍然是 DataStream 对象,有 print() 方法,相当于遍历打印 DataStream 对象。

sumDS.print();

  1. 提交作业

DataStream API,需要通过 env.execute() 方法显示提交作业,方可运行。

try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); }

  1. 结果

以流的形式处理无界数据

为获取无界数据,可以利用 Socket 向端口发送信息,可以使用类 Linux 的 nc -lk 的方法。Windows 环境下可以使用 ncat。 ncat 是 Nmap 项目的一部分,是 Netcat 的增强版本,并且可以在 Windows 系统上运行。

可以通过如下地址进行下载。

https://nmap.org/download.html

在下载界面选择下载:nmap-7.95-setup.exe、npcap-1.81.exe 两个 exe 文件。

安装软件后,需要重启电脑。重启后,进入 powershell 控制台,输入:

ncat -l -p 8888

即可进入交互界面。

  1. 环境准备

与处理有界数据一样,使用 Stream API 中的环境实例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  1. 从指定的网络端口读取数据

这里使用 socketTextStream() 方法,通过 Socket 读取发送到特定端口的数据,方法需要指定主机名+端口。返回的仍然是数据源。

DataStreamSource socksetDS = env.socketTextStream("localhost", 8888);

  1. 对读取的数据进行扁平化处理

打平处理与【以流形式处理有界数据】中完全想通。

  1. 按照单词进行分组

分组处理与【以流形式处理有界数据】完全相同。

  1. 聚合计算

求和也与【以流形式处理有界数据】完全相同。

  1. 结果打印

同【以流形式处理有界数据】。

  1. 提交作业

同 【以流形式处理有界数据】。

  1. 结果

链式调用改造

2.1~2.3 的代码编写时,存在一个小问题。每一步骤都需要定义一个变量接受上一步的输出,如:socksetDS、flatMapDS、keyedDS、sumDS 等。在编程过程中如果指定不当,就会产生指定错 DS 的情况。因此,代码可以进行链式调用的改造。

以 【流的形式处理无界数据】为例,代码可以修改改为如下形式:

这样修改代码变得比较清爽,不过由于没有了变量接收可能会使 Debug 变得相对复杂,不过 IDE 已经在每一步骤后对输出结果进行了提示。

对内部类代码编写方式进行修改

对于 Flink flatMap() 方法所需的 DataStream 对象,之前采用内部类的方式,进行定义。IDE 对内部类的写法,提示可以修改成 Lambda 表达式的方式,降低代码复杂度。

变成 Lambda 表达式后的效果如下:

修改过后的程序,在代码编写阶段没有报错,而在编译过程中报错。大意是 Collector 泛型中缺少数据类型。

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371) at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:560) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:177) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:611) at com.wuji1626.wc.Flink04_Unbound_Stream_ChainCall.main(Flink04_Unbound_Stream_ChainCall.java:31)

上述问题主要是由于,Flink 具有一个类型提取系统,可以分析函数输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器,但由于 JDK8 有泛型擦除特性。在实际开发中,泛型擦除又经常会出现在 Lambda 表达式中。导致 Lambda 表达式输出的Tuple 对象的类似被擦除,而 Flink 将无法自动获取到类型信息而报错。

为了避免上述问题,需要在 flatMap() 方法后显示的指定 Tuple 对象的类型。具体方法如下:

.returns(Types.TUPLE(Types.STRING, Types.LONG))

需要增加
SingleOutputStreamOperator 的 returns() 方法即可。Flink 对各种 Java 的类型都进行了定义。

最后总结一下,关于 flatMap() 方法的 Lambda 表达式,主要关注四个部分:

  1. 参数 FlatMapFunction flatMapper ,输入:

  1. 输出结果 (lineStr, out),实际就是 Tuple2 类型,也就是被泛型擦除的部分。

2 Flink 快速入门

  1. 方法体,-> 后的部分。就是之前的逻辑。

  1. 显示的指定 flatMap() 处理结果的输出类型。

补充说明:

Lamba 表达式是 JDK8 引入的概念,Lambda 表达式需要与函数接口一起使用。当接口中只有一个方法,且有 @FunctionalInterface 注解就可以使用 Lambda 表达式进行定义。当接口中有多个方法, Lambda 表达式将无法区分要实现哪个方法因此不能使用。

售前咨询热线
在线咨询
公司地址
  • 地址:漳平市涝黎河217号

CopyRight © bellbet贝博.(艾佛森)官方网站 2007-2024 https://www.jmskms.com All Rights Reserved BB贝博艾弗森官方网站