前言

目前,数据处理的链路很长,由于业务的不断迭代,导致了数据的分层。例如ODS->DWD->DWS->ADS。每个层都有不同的数据处理逻辑。 而层与层之间目前是通过Kafka队列实现解耦的。在某些场景下,上下层需要通过查询数据库来实现某种数据的更新交互。由于网络延迟和一些不可预知的原因,导致下游查询不到最新的数据,从而无法得到正确的更新。

解决方案

既然是因为时间差导致的不一致问题,那直接可以使用延迟队列。虽然kafka也支持延迟队列,但是使用上限制较多,且需要配置,维护比较麻烦。在环境允许的情况下,可以优先选用RabbitMQ进行解决。网上示例很多,可以进行参考。

由于笔者是Flink流处理任务,DataSteam的底层API天然就支持对时间的精确控制。时间语义和窗口是让无界流可以聚合精准处理的两大核心要素。

大致思路如下:

  1. 首先把上游推送的标识ID进行逻辑分区

  2. 开窗口进行ID去重,保证一段时间内数据的唯一性,避免下游重复查询

  3. 定义一个状态,缓存延迟的ID

  4. 注册定时器,触发ID的推送逻辑

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.88.130:9092")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStreamSource<String> kafka_source = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

env.setParallelism(2);

source.keyBy(String::hashCode)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30L)))
.process(new ProcessWindowFunction<String, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, ProcessWindowFunction<String, String, Integer, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
Set<String> resList = new HashSet<>();
for (String it : iterable) {
resList.add(it);
}
for (String it : resList) {
collector.collect(it);
}
}
})
.keyBy(String::hashCode)
.process(new StateTimerFunction(3))
.print();


env.execute("my job");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.hc.process;

import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class StateTimerFunction extends KeyedProcessFunction<Integer, String, String> {

private MapState<String,String> mapCacheStorage;
private ValueState<String> valCacheStorage;


private int TTL_SEC;

public StateTimerFunction(int TTL_MINUTE) {
this.TTL_SEC = TTL_MINUTE;
}

public void open(Configuration parameters) throws Exception {

ValueStateDescriptor<String> valStateDescriptor = new ValueStateDescriptor<>("valState", String.class);

StateTtlConfig ttl = StateTtlConfig
.newBuilder(Time.seconds(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

valStateDescriptor.enableTimeToLive(ttl);


valCacheStorage = getRuntimeContext().getState(valStateDescriptor);
}

@Override
public void processElement(String bytes, Context context, Collector<String> collector) throws Exception {
try {

context.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + TTL_SEC *1000);
if (valCacheStorage.value()==null){
valCacheStorage.update(bytes);
collector.collect(bytes);
}
} catch (Exception e) {
e.printStackTrace();
}


}


//到达时间点触发事件操作
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 模拟查询数据库
if (valCacheStorage.value()!=null){
out.collect(valCacheStorage.value().concat("selectDB"));
}
}
}

运行的结果如图所示

结论

Flink作为以流为核心的高可用、高性能的分布式计算引擎。具备流批一体,高吞吐、低延迟,容错能力,大规模复杂计算等特点。结合其底层的核心DataSteamAPI可以非常灵活的处理各种需求。