Flink 的 Broadcast(广播流)是一种特殊的流合并方式,用于将配置/规则/维度数据(小体量、低频更新)广播到所有并行子任务中,让每个任务都能访问到这些全局数据,常与普通数据流结合实现动态规则匹配、维度补全等场景。

核心特点:

  • 广播流(Broadcast Stream):被广播的流,数据会复制到所有并行实例;
  • 普通流(Data Stream):业务数据流,每个并行实例处理自己分片的数据;
  • 结合方式:通过 connect + BroadcastProcessFunction 实现双流结合,广播流数据会存入 BroadcastState(可共享的状态)。

二、Broadcast 完整代码示例

以下示例实现「规则广播流 + 业务数据流」的结合:

  • 广播流:动态更新的过滤规则(比如过滤掉包含特定关键词的消息);
  • 业务流:普通字符串消息流;
  • 核心逻辑:用广播的规则过滤业务流数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.studybigdata.bigdata.flink.datastream;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class StreamBroadcast {
// 定义广播状态的描述符(用于标识广播状态,存储规则:key=ruleId,value=过滤关键词)
private static final MapStateDescriptor<String, String> RULE_STATE_DESCRIPTOR =
new MapStateDescriptor<>(
"filter_rules",
Types.STRING,
Types.STRING
);
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 设置并行度,验证广播流会分发到所有并行实例
// 2. 构建广播流:动态过滤规则(比如更新规则:过滤掉包含"test"的消息)
DataStream<String> ruleStream = env.fromElements("filter_key:test", "filter_key:debug");
// 将规则流转换为广播流
BroadcastStream<String> broadcastRuleStream = ruleStream.broadcast(RULE_STATE_DESCRIPTOR);
// 3. 构建业务数据流:待过滤的消息
DataStream<String> businessStream = env.fromElements(
"user:1001,msg:正常业务消息",
"user:1002,msg:test测试消息(需过滤)",
"user:1003,msg:debug调试消息(需过滤)",
"user:1004,msg:订单支付成功"
);
// 4. 连接广播流和业务流
BroadcastConnectedStream<String, String> connectedStream = businessStream.connect(broadcastRuleStream);

// 5. 处理连接后的流:用广播规则过滤业务数据
DataStream<String> resultStream = connectedStream.process(
new BroadcastProcessFunction<String, String, String>() {
// 处理广播流数据(更新规则)
@Override
public void processBroadcastElement(String rule, Context ctx, Collector<String> out) throws Exception {
// 解析广播规则(格式:filter_key:关键词)
String[] ruleParts = rule.split(":");
String key = ruleParts[0];
String value = ruleParts[1];

// 更新广播状态(所有并行实例都会同步这个状态)
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
broadcastState.put(key, value);
// out.collect("规则更新:" + key + "=" + value);
}

// 处理业务流数据(非广播流)
@Override
public void processElement(String businessMsg, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 获取只读的广播状态(规则)
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
String filterKey = broadcastState.get("filter_key");

// 根据广播规则过滤数据:如果消息不包含过滤关键词,则输出
if (filterKey == null || !businessMsg.contains(filterKey)) {
out.collect("保留消息:" + businessMsg);
}
// else {
// out.collect("过滤消息:" + businessMsg + "(触发规则:" + filterKey + ")");
// }
}
}
);
// 6. 打印结果
resultStream.print("Broadcast Result: ");
// 7. 执行任务
env.execute("Flink Broadcast Stream Example");
}
}

代码关键说明:

  1. MapStateDescriptor:定义广播状态的结构(Key-Value 类型),是广播流和业务流共享状态的标识;
  2. BroadcastStream:通过 broadcast(descriptor) 将普通流转为广播流;
  3. BroadcastProcessFunction
    • processBroadcastElement:处理广播流数据,更新 BroadcastState(所有并行实例可见);
    • processElement:处理业务流数据,读取只读BroadcastState,实现规则匹配;
  4. 并行度验证:设置并行度为 2,可看到广播规则会被所有并行子任务接收。

三、流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
flowchart TD
A["创建Flink执行环境"] --> B["构建规则流<br/>(filter_key:test/debug)"]
A --> C["构建业务数据流<br/>(用户消息)"]

B --> D["转换为广播流<br/>(基于RULE_STATE_DESCRIPTOR)"]
C --> E["连接广播流+业务流<br/>BroadcastConnectedStream"]
D --> E

E --> F["BroadcastProcessFunction处理"]
F --> F1["processBroadcastElement<br/>更新广播状态(规则)"]
F --> F2["processElement<br/>读取广播规则过滤业务数据"]

F1 --> G["BroadcastState(分布式共享)"]
F2 --> G
F2 --> H["输出过滤后的有效消息"]
H --> I["打印结果"]
I --> J["执行Flink任务"]

四、代码运行结果

1
2
3
4
5
6
Broadcast Result: 规则更新:filter_key=test
Broadcast Result: 规则更新:filter_key=debug
Broadcast Result: 保留消息:user:1001,msg:正常业务消息
Broadcast Result: 过滤消息:user:1002,msg:test测试消息(需过滤)(触发规则:debug)
Broadcast Result: 过滤消息:user:1003,msg:debug调试消息(需过滤)(触发规则:debug)
Broadcast Result: 保留消息:user:1004,msg:订单支付成功

五、Broadcast 核心注意事项

  1. 数据体量:广播流适合小体量数据(如配置、规则),因为会复制到所有并行实例,大量数据会导致内存压力;
  2. 状态一致性BroadcastState 是分布式的,更新后所有并行实例会同步,但仅支持 put/remove 等简单操作,不支持聚合;
  3. 使用场景:动态规则过滤、维度表补全(如广播商品维度数据,补全订单流的商品名称)、实时配置更新。

总结

  1. Broadcast 核心作用:将小体量、低频更新的全局数据(规则/配置)广播到所有并行子任务,供业务流实时使用;
  2. 核心组件MapStateDescriptor(状态描述)、BroadcastStream(广播流)、BroadcastProcessFunction(双流处理);
  3. 与普通 Connect 区别:普通 Connect 是两个流的一对一合并,Broadcast 是“一对多”的全局分发,且广播数据存入共享状态。