Flink

Flink

流式计算

大数据四大:Hadoop,Hive HBase,Spark,Flink

Jiacheng-LPT018641.CN1.global.ctrip.com:5390
10.32.177.15:5390

初识

流处理:来一条处理一条,实时。

批处理:是一个吞吐量,和计算的高效。

公认最好的流设式计算

无界流:qq消息,发不完。

有界流:一周内的消息,也叫批处理。

适用场景

对于流处理:低延迟,Excactly-once。 Storm

对于批处理:高吞吐、高效处理。 MapReduce典型批量数据处理。

Flink在实现流和批的时候,将二者统一,流批统一。将批处理视为一种特殊的流处理,只是他的输入数据流被定义为有界的。

基于同一个Flink运行环境,分别提供了流处理和批处理API。

适用场景:实时监控系统、推荐系统、日志分析系统等等。

流式计算

两种流式计算,到时候与Flink对比。

部署方式

多种,Local、Standalone、Yarn、Mesos、Docker、Kubernetes、AWS等等。

后面的几个相比于Standalone就是其他的进行资源管理,更高效。但是Standalone的外部依赖最小。

Flink运行架构

JobManager: Master,协调分布式任务执行,调度task进行具体的任务。

TaskManager:Worker,执行实际任务。

JobManager和TaskManager都是单独的JVM进程。

Screenshot2023-07-29at13.36.46

JobManager接收到任务后,整体的流程(yarn模式,其中的resourcemanager可以是yarn):

Screenshot2023-07-29at13.39.07

并发度与Slots

并行度:是任务的(比如java程序)。setParallelsm(2)。然后配置文件也有,然后网页版本的控制也有。

应用程序的优先,然后网页提交任务时候的第二,最后是配置文件中的。

Slots:是TaskManage的。r每一个TaskManager是一个独立的JVM进程,可以独立的在线程上执行一个或者多个任务task,为了控制一个taskmanager能接受多少个task,划分出来多个slot。默认slot是1。

flink会对slots进行灵活的调度,如果parallelsm的slots比可用的slots稍多,也会灵活调度运行成功的。

如果parallelsm比可用slots多,那么需要的并行度比可用的slots多,可能有问题了。会等有空的slots。注意这是在standalone下面,因为slots固定的。这种情况要么调整每一个taskmanage人的slot,要么调整程序的parallelsm。

而yarn模式下可以协调资源,可以生成新的slots可供使用。所以yarn比standalone的要好。

DataStream重的数据在创建了之后就不能在增删改了,流批统一。

1、获取一个执行环境Environment。

2、通过Source,定义数据的来源。(file文件、Socket、kafka、rabbitMQ、ES、JDBC等等很多都支持)

3、对数据定义一系列的操作,Transformations

4、通过Sink,定义程序处理的结果要输出到哪里。

5、最后,提交并启动任务。(execute。)

setRuntimeMode()

STREAMING,流式模式,所有的task在应用执行时就完成部署,后续所有任务会连续不断运行。数据来一个处理一个。

BATCH,task周期性部署。

AUTOMATIC,自动。

从socket读的并行度只能是1。

Source,输入

来自文件、Socket等等。

自定义Source

Sink,输出

输出到文件:writeAs,不过deprecated,可以使用StreamFileSink,不过要有依赖

Socket。

kafka,输出到kafka是需要一个生产者。获取数据是需要一个消费者。

JDBC,也有JDBCSink包。

自定义Sink。

Transformation、数据变换操作

可以查阅相关文档。

map,takes one element and produces one element。

flatmap, 拉平。就是take的element里面可能是嵌套的map。

filter,过滤。

keyBy,(DataStream -> KeyedStream),将key相同的value聚合成相同的value。一般会接一个reduce或者aggregations。

reduce,A “rolling” reduce on a keyed data stream

aggregations, 统计。最大值最小值,sum等等

connect,处理其实分开的两个流。

union,数据和在一起了。

这两个算子用的也挺多的,就是帮助改进数据倾斜问题,把数据均匀一下,防止一个节点很繁忙。

shuffle(),(DataStream -> DataStream),

rebalance(),(DataStream -> DataStream),

Window开窗户

拆成多个Bucket

一类是keyed windows(要先有keyedBy)

另一类是non-keyed windows(不能并行貌似)

CountWindow(固定消息个数)

TumblingWindow(拼的可能不对,窗口大小,还有偏移)

SlideWindow(窗口大小和滑动大小)

SessionWindow(session gap,这个gap以后再开窗)

globalWindow(所有数据在一个窗口,相当于没有窗口么?)trigger函数告诉怎么分,evictor函数剔除器可以告诉那些保留那些不要。

上面的几个窗口是globalWindow的子集。

开窗聚合算子

Window Apply (WindowedStream -> DataStream)

Window Reduce

Aggregations on windows

1、首先引入maven依赖

2、获取原始事件流

3、定义匹配器pattern

4、获取匹配流

5、将匹配流中的数据处理形成结果数据流,process

其中,最关键的是定义匹配器,其次处理process。

就是匹配到的,放到这个流里面处理。

匹配器

next,times,严格连续。

followeby,松散连续,两个之间可以有别的数据。

followedbyany

next.witnin(10),在10s之内

where,or,until,greedy贪心等等

Table API和SQL

![Screenshot2023-07-30at15.33.13](https://gitee.com/JiaChengCC/u-pic-chart-bed/raw/master/uPic/Screenshot 2023-07-30 at 15.33.13.png)

需要引入maven依赖。还有planner依赖。还有flink-table-common。

下面的四个过程就是跟之前的Transformations的那一个(TableAPI简化Transformations的过程?):

1、创建一个TableEnvironment。(两个参数,一个是流式的环境,还有一个是environmentSetting就是针对 table环境的配置信息)

2、将流数据转换成动态表Table

3、在动态表上计算一个连续查询,生成一个新的动态表。

4、生成的动态表再次转换回流数据。

或者,使用SQL查询。

mysql中的是静态表。

临时表和永久表

1
2
3
4
5
6
// 创建临时视图,只在当前任务访问中有效。这个视图只能查询。
tableEnv.createTemporatyView("", orders);

// 创建永久表,另外的任务也可以访问。
Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");

如果同名都存在,则临时表优先,当前任务没法访问到永久表。

内置函数和自定义函数

count、max

自定义函数:

实现eval函数,目前没有override。

1
tableEnv.createTemporarySystemFunction(name, function

临时函数只在当前Catalog和Database生效,临时系统函数在整个生效。

标量函数,将0个或者多个标量,转成一个标量。

表函数(因为函数的结果最终体现为一个表),将0个或者多个标量,但是返回任意数量的行作为输出,而不是单个值。

聚合函数,可以将表中一列的数据,聚合成一个标量值,例如常用的max、min、count等等。

Connector

tableEnv可以去connect,好像是代替Sink的?

就是操作之后,同步存到connector里面去。

Author: Jcwang

Permalink: http://example.com/2023/07/29/Flink/