Kafka基本使用
查看操作主题命令参数1[zhangan@node1 kafka]$ bin/kafka-topics.sh 参数 描述 –bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。 –topic <String: topic> 操作的topic名称。 –create 创建主题。 –delete 删除主题。 –alter 修改主题。 –list 查看所有主题。 –describe 查看主题详细描述。 –partitions <Integer: # of partitions> 设置分区数。 –replication-factor<Integer: replication factor> 设置分区副本。 –config <String: name=value> 更新系统默认的配置。 查看当前服务器中的所有topic1[zhangan@node1 kafka]$ bin/kafk...
Flink Hello World
pom1234567891011121314151617<!-- Flink 核心依赖 --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version></dependency><!-- Flink 客户端依赖(本地运行需要) -->...
Flink Stream Connect
一、Flink Connect 操作核心说明Flink 的 connect 是比 union 更灵活的流合并操作: union:仅能合并同数据类型的多个流,合并后仍是同类型流; connect:可以合并不同数据类型的两个流,合并后得到 ConnectedStreams,需通过 CoMapFunction/CoFlatMapFunction 分别处理两种类型的数据。 二、流程图12345678flowchart LR A[数据源1] --> B[stringStream<br/>数据类型:String] C[数据源2] --> D[intStream<br/>数据类型:Integer] B --> E[connect 操作<br/>合并不同类型流] D --> E E --> F[CoMapFunction<br/>map1:处理String<br/>map2:处理Integer] F --> G[resultStream<br/&...
Flink Stream Union
一、流程图1234567flowchart LR A[数据源1] --> B[stream1<br/>数据类型:String] C[数据源2] --> D[stream2<br/>数据类型:String] B --> E[union 操作<br/>合并同类型流] D --> E E --> F[unionStream<br/>合并后的数据流] F --> G[打印/下游处理] 二、Flink Union 操作实现代码首先为你提供可直接运行的 Flink 代码示例,实现两个流(stream1、stream2)的 Union 操作,代码基于 Flink 1.16+ 版本: 12345678910111213141516171819202122232425262728import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.datastream.Da...
Flink Introduction
Flink版本1<flink.version>1.16.1</flink.version>
Flink WordCount
Flink WordCount输入1234yuan@ThinkPad:~$ nc -lk 9999ab bc c c 输出123456(a,1)(b,1)(b,2)(c,1)(c,2)(c,3) wordcount1234567891011121314151617181920// 3. 数据处理:单词计数DataStream<Tuple2<String, Integer>> wordCountStream = socketSource // 拆分每行输入为单个单词,并初始化为 (单词, 1) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception ...
Flink Kafka
pom12345<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version></dependency> Kafka命令行创建Topic12(base) [zhangsan@node1 ~]$ kafka-topics.sh --create --topic flink-kafka --partitions 3 --replication-factor 2 --bootstrap-server node1:9092Created topic flink-kafka. 生产者12345(base) [zhangsan@node1 ~]$ kafka-console-producer.sh --topic flink-kafka --boot...
Flink Table API - CSV
Apache Flink 的 Table API 是一种用于流处理和批处理的统一的关系型 API。它允许用户以声明式的方式编写查询,类似于 SQL,但可以无缝地集成到 Flink 程序中。以下是一个入门示例,演示如何使用 Table API 读取 CSV 文件、进行简单的聚合计算并将结果输出到控制台。 引入依赖在开始之前,确保你的项目中包含了必要的依赖。以 Maven 为例,在 pom.xml 中添加以下依赖: 12345678910111213141516171819202122232425262728<!-- Flink Table API 依赖 --><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version></depend...
Flink Phoenix
pom1234567891011<dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>5.0.0-HBase-2.0</version> <exclusions> <exclusion> <groupId>org.glassfish</groupId> <artifactId>javax.el</artifactId> </exclusion> </exclusions></dependency>
Flink Redis
pom12345<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version></dependency>