Flink1.11有状态算⼦的使⽤
1.前⾔
时间:2020年11⽉29⽇
在Flink中所有算⼦(如map,flatmap,reduce等等)都可以是有状态的。⽤Scala写起来会有⼀些骚操作,⽐如使⽤lazy定义descriptor 等。但是这⾥暂时不会讲到,本⽂以Java API为主。
有状态操作⼤致可以分为Key State(键控状态)和Operator State(算⼦状态),由于键控状态⽐较常⽤,本⽂会以键控状态为主进⾏总结。
状态算⼦跟状态后端有⼀定的关系(算⼦的状态是存在状态后端当中的,因此实际开发中可能需要先设置使⽤何种状态后端),之前写过⼀篇⽂章有需要可以参考。
2.键控状态
既然是键控状态,⾸先⼀定要进⾏keyBy操作,将DataStream变成KeyedDataStream。然后才能使⽤键控状态。下⾯先使⽤ValueState 的例⼦⼊门,然后详细说明。
2.1ValueState demo
package it.kenn.state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.vironment.StreamExecutionEnvironment;
public class KeyedStateDemo {
public static void main(String[] args) throws Exception {
String backendPath = "hdfs://localhost:8020/flink/statebackend/fsState";
StreamExecutionEnvironment env = ExecutionEnvironment();
env.setParallelism(6);
//设置状态后端为hdfs
StateBackend stateBackend = new FsStateBackend(backendPath);
env.setStateBackend(stateBackend);
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
//进⾏keyBy操作
.keyBy(value -> value.f0)
//有状态的flatMap
.flatMap(new CountWindowAverage())
//简单的输出
.print();
}
}
编写具体的flatmap函数
package it.kenn.state;
import org.apache.flink.apimon.functions.RichFlatMapFunction;
import org.apache.flink.apimon.state.ValueState;
import org.apache.flink.apimon.state.ValueStateDescriptor;
import org.apache.peinfo.TypeHint;
import org.apache.peinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.figuration.Configuration;
import org.apache.flink.util.Collector;
/
**
* 下⾯是⼀个ValueState的例⼦
*/
//需要注意的是要使⽤RichFlatMapFunction,因为Rich抽象类⾥⾯有⽣命周期函数,⽐如下⾯要⽤到的open
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> collector) throws Exception {
//这⾥编写的是真正的业务代码,我们要做的是对传⼊的值进⾏累加,并记录有多少个值进⾏了累加
Tuple2<Long, Long> currValue = sum.value();
currValue.f0 += 1;
currValue.f1 += input.f1;
//更新状态
sum.update(currValue);
//计算平均值
if (currValue.f0 >= 2) {
sum.clear();
}
}
/
**
* 在open函数⾥⾯可以使⽤getRuntimeContext函数来getState,但在此之前需要定义ValueStateDescriptor
* ValueStateDescriptor有三个参数,第⼀个为该state起名字,第⼆个定义该state的类型,第三个初始化值
* @param parameters
*/
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
,new Tuple2<>(0L,0L)
);
//第⼀次调⽤的时候完成对ValueState初始化并得到初始值
sum = getRuntimeContext().getState(descriptor);
}
}
2.2ValueState使⽤总结
有些内容在上⾯例⼦的注释中已经讲过了,这⾥再总结⼀下。
1. 要使⽤键控状态,⾸先要执⾏keyBy操作,指定DataStream的键。但是这个键是⼀个虚拟的键,即指定字段中哪个字段当做键就可以了,不需要使⽤真正的键值数据结构(这个在spark中好像使⽤专门的数据结构的,可以翻⼀下验证)
2. 状态必须要通过RuntimeContext来获得,⽽RuntimeContext只能在RichFunction中才能得到(在Sc
ala中可以定义为lazy类型不需要在RichFunction中得到,这⾥不讲),因此有状态的算⼦⽐如上⾯的flatmap必须继承Function。
3. 要得到⼀个状态句柄需要创建⼀个StateDescriptor,ValueState的StateDescriptor叫做ValueStateDescriptor。创建需要三个参数,上⾯例⼦中也解释过了。
4. 获取状态需要使⽤ValueState<T> getState(ValueStateDescriptor<T>),需要注意的是,这⾥得到的是ValueState这个结构,并不是存储的状态值
5.得到结构以后就可以通过value()⽅法得到值了。更新状态值使⽤update()。清空状态值使⽤clear()⽅法。当然这是针对ValueState说的,其他类型的State(下⼀节说)获取、更新值还有⼀些区别
2.3键控状态分类
上⾯已经使⽤ValueState做了简单的⼊门,下⾯键控状态的分类。
2.3.1ValueState
获取ValueState:ValueState<T> getState(ValueStateDescriptor<T>)
得到、更新、清空值:value()⽅法、update(T)⽅法、clear()⽅法
2.3.2ReducingState
它保存⼀个值,这个值表⽰所有添加到该状态的汇总值。⽐如对某个值进⾏累加,那么存储的就是当前数据过来以后截⽌⽬前数据的累加值。需要注意的是,它的⼊参类型和返回值类型要完全⼀致。
获取ReducingState:ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
得到、更新、清空值:
2.3.3ListState
它保存了⼀组值。
获取ListState:ListState<T> getListState(ListStateDescriptor<T>)
添加值:add(T)或者addAll()⽅法;得到Iterable:Iterable<T> get();更新值:update(List<T>)。
2.3.4AggregatingState
与ReducingState最⼤的不同是它的⼊参和出参可以是不同的类型(从下⾯泛型中也可以看出来)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
2.3.5MapState
它可以保存⼀个映射列表。可以向MapState中放⼊⼀个键值对数据。
获取MapState:MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
添加元素:put(UK, UV) 或者 putAll(Map<UK, UV>);获取元素:get(UK)或者后⾯这三个⽅法entries(),、keys()、 values()跟Java基本API差不多不赘述;清空值clear()
如果MapState的键值本⾝是很复杂的类型,在创建的时候需要⽤TypeInformation。具体如下:
private static MapState<String, Tuple2<Long,String>> behaviorMapState;
@Override
public void open(Configuration parameters) throws Exception {
behaviorMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>
("behaviorMapState", TypeInformation.of(new TypeHint<String>(){}),TypeInformation.of(new TypeHint<Tuple2<Long,String>>(){})));
}
3.State TTL
随着时间的延续,算⼦要维护的状态可能⾮常多,对内存或者磁盘都有⼀定的压⼒,Flink设置了⼀种算⼦状态过期机制,可以让不再需要的状态在⼀段时间以后过期。如果对键控状态设置了过期时间,相应的状态会在⼀段时间以后被清除(不⼀定是过期以后⽴即被清除!)。
3.2Demo
package it.kenn.state;
import org.apache.flink.apimon.state.StateTtlConfig;
import org.apache.flink.apimon.state.ValueStateDescriptor;
import org.apache.flink.apimon.time.Time;
/**
* 状态过期时间配置
*/
public class StateTTLDemo {
public static void main(String[] args) {
StateTtlConfig ttlConfig = StateTtlConfig
//过期时间,必须的值
.newBuilder(Time.seconds(10))
抽象类的使用
//默认值,还可以是OnReadAndWrite
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//配置如果状态已经过期了是否返回,下⾯的配置是说如果状态过期了就不返回了
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> text_state = new ValueStateDescriptor<>("text state", String.class);
//为该状态指定过期配置
ableTimeToLive(ttlConfig);
}
}
上⾯是⼀个简单的TTL⼩demo。更详细的内容看吧。