pom

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>

Kafka命令行

创建Topic

1
2
(base) [zhangsan@node1 ~]$ kafka-topics.sh  --create  --topic flink-kafka --partitions 3 --replication-factor 2 --bootstrap-server node1:9092
Created topic flink-kafka.

生产者

1
2
3
4
5
(base) [zhangsan@node1 ~]$ kafka-console-producer.sh  --topic flink-kafka --bootstrap-server node1:9092
>a
>b b
>c c c
>

Flink消费者

1
2
3
4
5
6
7
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092") // Kafka 服务器地址
.setTopics("flink-kafka") // 订阅的 topic
.setGroupId("consumer-group") // 消费者组
.setStartingOffsets(OffsetsInitializer.latest()) // 从最新偏移量开始
.setValueOnlyDeserializer(new SimpleStringSchema()) // 消息反序列化器
.build();

Flink生产者

1
2
3
4
5
6
7
8
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("node1:9092") // Kafka 服务器地址
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink-kafka") // 目标 topic
.setValueSerializationSchema(new SimpleStringSchema()) // 消息序列化器
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 投递保证
.build();

完整示例

DataStream API 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkKafkaExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Kafka 消费者配置
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092")
.setTopics("flink-kafka")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

// Kafka 生产者配置
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("node1:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink-kafka-output")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

// 数据处理流程
DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);

// 简单的数据处理:将消息转换为大写
DataStream<String> processedStream = stream
.map(String::toUpperCase)
.name("ToUpperCase");

// 输出到 Kafka
processedStream.sinkTo(sink);

env.execute("Flink Kafka Processing Job");
}
}

Table API 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class FlinkKafkaTableExample {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 创建 Kafka 源表
tableEnv.executeSql(
"CREATE TABLE kafka_source (" +
" `message` STRING," +
" `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
" WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-kafka'," +
" 'properties.bootstrap.servers' = 'node1:9092'," +
" 'properties.group.id' = 'flink-table-group'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'latest-offset'" +
")"
);

// 创建 Kafka 目标表
tableEnv.executeSql(
"CREATE TABLE kafka_sink (" +
" `message` STRING," +
" `processed_time` TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flink-kafka-output'," +
" 'properties.bootstrap.servers' = 'node1:9092'," +
" 'format' = 'json'" +
")"
);

// 执行数据处理并写入目标表
TableResult result = tableEnv.executeSql(
"INSERT INTO kafka_sink " +
"SELECT UPPER(message), CURRENT_TIMESTAMP " +
"FROM kafka_source"
);

result.print();
}
}

配置选项

消费者配置

1
2
3
4
5
6
7
8
9
10
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092")
.setTopics("flink-kafka")
.setGroupId("consumer-group")
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早开始
// .setStartingOffsets(OffsetsInitializer.latest()) // 从最新开始
// .setStartingOffsets(OffsetsInitializer.offsets(Map.of(...))) // 指定偏移量
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperties(properties) // 自定义属性
.build();

生产者配置

1
2
3
4
5
6
7
8
9
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("node1:9092")
.setRecordSerializer(serializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 至少一次
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精确一次
// .setDeliveryGuarantee(DeliveryGuarantee.NONE) // 无保证
.setTransactionalIdPrefix("flink-producer-") // 事务ID前缀
.setProperty("compression.type", "gzip") // 压缩类型
.build();

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
// 启用检查点以实现精确一次语义
env.enableCheckpointing(5000); // 5秒间隔

// 配置 Kafka 消费者的容错
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092")
.setTopics("flink-kafka")
.setGroupId("consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("auto.offset.reset", "latest") // 偏移量重置策略
.setProperty("enable.auto.commit", "false") // 禁用自动提交
.build();

最佳实践

  1. 偏移量管理: 使用 Flink 的检查点机制来管理 Kafka 偏移量
  2. 并行度设置: 根据 Kafka topic 的分区数设置合适的并行度
  3. 序列化优化: 使用高效的序列化格式如 Avro、Protobuf
  4. 监控告警: 监控消费延迟和积压情况
  5. 资源隔离: 为不同的业务场景使用不同的消费者组