Flink的简单学习和macOS系统上的本地环境搭建


=Start=

缘由:

上周有时间的时候又重新简单学习了一下Flink的相关知识,并根据网上的文章尝试在macOS系统上搭建了一套本地的Flink环境,方便本地做一些测试和验证性的工作,在此简单记录一下方便后面有需要的时候参考。

正文:

参考解答:

Flink 如此流行的原因很简单,有2个指标非常好:(高)吞吐(低)延迟

Flink 能做什么

  1. 流式数据处理(数据提取、转换、存储)
    1. dataSource-从指定数据源中读取数据
    2. dataOperate-对数据进行各类操作,比如:flatmap/filter/map/reduce/join等
    3. resultSink-将结果输出至特定地方
  2. 窗口聚合(Flink支持滚动窗口、滑动窗口和会话窗口,使用起来也非常方便)
  3. 多数据关联(在多个数据源中根据特定的key进行join关联)

Flink 有什么特殊的优势?

  1. 事件时间(而非数据到达时间,应对一些特殊场景/原因下数据乱序到达的情况)
  2. 状态一致性保障(利用checkpoint机制实现exactly-once,状态只持久化一次到最终的存储介质内)
  3. Flink生态(既支持流式处理-DataStream,也支持批处理-DataSet,上层还有抽象的API方便使用)

Flink 应用场景

  • 实时数据分析
  • 实时数仓
  • 实时推荐/搜索
  • 复杂事件处理CEP

==

流式数据处理

dataSource.flatMap(new SelfFlatMapper()).setParallelism(1)
    .filter(new SelfFilter()).setParallelism(2)
    .map(new SelfMapper()).setParallelism(2)
    .addSink(new SelfSink()).setParallelism(2)

窗口(window)聚合

dataSource.flatMap(new SelfFlatMapper()).setParallelism(1)
    .filter(new SelfFilter()).setParallelism(2)
    .map(new SelfMapper()).setParallelism(2)
    .keyBy(new ServiceKeySelector()) // 按照服务做keyBy
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) // 5min的滚动窗口
    .process(new AlarmSum()) // 聚合计数
    .addSink(new AlarmNotify()); // 根据聚合结果,确定是否需要通知告警

多数据关联

logASource.keyBy(item -> item.getUserName())
    .intervalJoin(logBSource.keyBy(item -> item.getUserName())) // 采用interval join
    .between(Time.minutes(-5), Time.minutes(5)) // 设置join时间
    .process(new ProcessJoinFunction()); // 对关联上的内容进行处理

====

Flink是什么?

Apache Flink – 数据流上的有状态计算

Apache Flink 是一个框架和分布式处理引擎,用于在无边界有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度任意规模进行计算。

数据可以被作为 无界 或者 有界 流来处理。

  • 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  • 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性

Flink能做什么?
  • 事件驱动型应用
    • 反欺诈
    • 异常检测
    • 基于规则的报警
    • 业务流程监控
    • (社交网络)Web 应用
  • 数据分析应用
    • 电信网络质量监控
    • 移动应用中的产品更新及实验评估分析
    • 消费者技术中的实时数据即席分析
    • 大规模图分析
  • 数据管道应用
    • 电商中的实时查询索引构建
    • 电商中的持续 ETL
Flink的优劣势在哪?
性能卓越

低延迟
高吞吐
内存计算

正确性保证

Exactly-once 状态一致性
事件时间处理
成熟的迟到数据处理

分层 API

SQL on Stream & Batch Data
DataStream API & DataSet API
ProcessFunction (Time & State)

大规模计算

水平扩展架构
支持超大状态
增量检查点机制

聚焦运维

灵活部署
高可用
检查点机制

Flink在macOS系统上的环境搭建

在 Mac OS X 上安装 Flink 是非常方便的。推荐通过 homebrew 来安装。

brew install apache-flink

查看版本以及安装位置等信息

flink --version
which flink
ls -lt $(which flink)


$ vim ~/.zshrc
flinkstart() {/usr/local/Cellar/apache-flink/1.18.0/libexec/bin/start-cluster.sh }
flinkstop() {/usr/local/Cellar/apache-flink/1.18.0/libexec/bin/stop-cluster.sh }
$ source ~/.zshrc

flinkstart

ls -lt /usr/local/Cellar/apache-flink/1.18.0/libexec
ls -lt /usr/local/Cellar/apache-flink/1.18.0/libexec/log
Flink 程序剖析

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment); # getExecutionEnvironment()
  2. 加载/创建初始数据; # socketTextStream/readFile/readTextFile/fromCollection/fromElements/addSource/…
  3. 指定数据相关的转换; # flatMap/map/keyBy/window/sum/…
  4. 指定计算结果的存储位置; # print/writeAsText/writeAsCsv/writeToSocket/addSink/…
  5. 触发程序执行。 # env.execute

一段示例代码

package com.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

相关Maven配置

-- 打包fat jar时会用到 Maven shade 插件
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <!-- Replace this with the main class of your job -->
                                <mainClass>com.example.flink.SocketTextStreamWordCount</mainClass>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

-- 一些依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-quickstart-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-quickstart-java</artifactId>
    <version>1.18.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
    <scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.18.0</version>
    <scope>provided</scope>
</dependency>

提交作业&停止集群

$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out

$ flink run -c com.example.flink.SocketTextStreamWordCount ~/IdeaProjects/javatest/target/javatest-1.0-SNAPSHOT.jar 127.0.0.1 9000
$ flinkstop
参考链接:

Apache Flink 是什么?
https://flink.apache.org/zh/what-is-flink/flink-architecture/

Flink 提供的应用
https://flink.apache.org/zh/what-is-flink/flink-applications/

Apache Flink 的常见应用场景
https://flink.apache.org/zh/what-is-flink/use-cases/

Flink 本地模式安装
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/try-flink/local_installation/

Flink 的简单实践
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/overview/

Apache Flink
https://flink.apache.org/zh/

Flink DataStream 基于Interval Join实时Join过去一段时间内的数据
https://blog.csdn.net/wangpei1949/article/details/103108474

数据管道 & ETL
https://nightlies.apache.org/flink/flink-docs-stable/zh/docs/learn-flink/etl/

流式分析
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

如何使用Maven配置 Flink 作业项目 # 打包fat jar时会用到
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/configuration/maven/

Flink DataStream API 编程指南
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/overview/

算子-operator
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/overview/

窗口
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/windows/

关联-join
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/joining/

Flink 用于访问外部数据存储的异步 I/O
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/operators/asyncio/

DataStream Connectors
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/

基于 DataStream API 实现欺诈检测
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/try-flink/datastream/

本地模式安装Flink
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/try-flink/local_installation/

Flink应用场景
https://flink.apache.org/zh/what-is-flink/use-cases/

什么是 Apache Flink?
https://aws.amazon.com/cn/what-is/apache-flink/

一文初识大数据Flink框架
https://blog.csdn.net/asd1358355022/article/details/128376270
https://developer.aliyun.com/article/1152563

Mac flink安装与初体验
https://blog.csdn.net/jiuweiC/article/details/117341869

Flink 从 0 到 1 学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
https://xie.infoq.cn/article/56d2d87d1082977687042034d
http://www.54tianzhisheng.cn/2018/09/18/flink-install/

Flink本地安装教程
https://nightlies.apache.org/flink/flink-docs-release-1.10/zh/getting-started/tutorials/local_setup.html

flink-examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples

=END=


发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注