Flume

$FLUME_HOME/conf
1
2
[zhangsan@node0 conf]$ mv flume-env.sh.template flume-env.sh
[zhangsan@node0 conf]$ mv flume-conf.properties.template flume-conf.properties
配置环境变量

Hello World

sink 到logger
配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 为agent a1 的各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# agent a1的一个source在端口44444监听数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 将event数据记录到控制台
a1.sinks.k1.type = logger

# 使用一个channel在内存中缓存events
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 将source和sink绑定到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在一个配置文件中,可以定义多个agent;启动的时候指定agent name

启动代理
1
2
3
$ bin/flume-ng agent -c conf -f conf/agent-1.properties -n a1 -Dflume.root.logger=INFO,console

$ bin/flume-ng agent --conf conf --conf-file conf/agent-1.properties --name a1 -Dflume.root.logger=INFO,console
telnet发送消息
1
2
3
4
5
6
7
8
9
10
# 安装Telnet
[root@node0 zhangsan]# yum -y install telnet

# 启动Telnet
[zhangsan@node0 ~]$ telnet localhost 44444


# 退出Telnet
Ctrl + ]
telnet> quit

Exec Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command = tail -F /opt/bigdata/hadoop/default/logs/hadoop-zhangsan-datanode-node0.log
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

SpoolingDir Source

配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#1、定义 agent的名称、source、channel、sink的名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

#2、配置 channel组件属性
agent1.channels.channel1.type = memory

#3、配置 source组件属性
agent1.sources.source1.channels = channel1
agent1.sources.source1.type = spooldir
# 需要先准备好文件夹flume
agent1.sources.source1.spoolDir = /opt/bigdata/hadoop/default/logs

#4、配置 sink组件属性
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.type = logger
启动agent
1
(python37) [zhangsan@node0 default]$ ./bin/flume-ng agent -c conf -f conf/flume-source-dir.properties -n agent1 -Dflume.root.logger=INFO,console
创建文件

然后在目录中/opt/bigdata/hadoop/default/logs新创建一个文件。

Taildir Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#1、定义 agent的名称、source、channel、sink的名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

#2、配置 channel组件属性
agent1.channels.channel1.type = memory

#3、配置 source组件属性
agent1.sources.source1.channels = channel1
agent1.sources.source1.type = taildir
agent1.sources.source1.filegroups = fg1
agent1.sources.source1.positionFile = /opt/bigdata/hadoop/default/flume-position/taildir_position.json
agent1.sources.source1.filegroups.fg1 = /opt/bigdata/hadoop/default/logs/.*log.*


#4、配置 sink组件属性
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.type = logger

HDFS Sink

配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 为agent a1 的各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
# agent a1的一个source在端口44444监听数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1

# channel
# channel在内存中缓存events
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink
#a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node0:9000/flume/events/%y-%m-% d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 5
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 将source和sink绑定到channel c1
a1.sinks.k1.channel = c1
启动agent
1
2
3
4
5
(python37) [zhangsan@node0 default]$ ./bin/flume-ng agent -c conf -f conf/flume-sink-hdfs.properties -n a1 -Dflume.root.logger=INFO,console


# 启动agent a1 ,如有以下提示,便可使用telnet发送消息。
(lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

Avro Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-legacy-sources</artifactId>
<version>1.9.0</version>
</dependency>
client
1
2
3
4
5
6
7
8
public static void main(String[] args) throws EventDeliveryException {
RpcClient client = RpcClientFactory.getDefaultInstance("192.168.179.100", 6666);
Event event = EventBuilder.withBody("Hello Avro ", StandardCharsets.UTF_8);
//RpcClient->NettyAvroRpcClient)
//AvroFlumeEvent extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord
client.append(event);
client.close();
}