Flume
$FLUME_HOME/conf
1 2
| [zhangsan@node0 conf]$ mv flume-env.sh.template flume-env.sh [zhangsan@node0 conf]$ mv flume-conf.properties.template flume-conf.properties
|
配置环境变量
Hello World
sink 到logger
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
|
在一个配置文件中,可以定义多个agent;启动的时候指定agent name
启动代理
1 2 3
| $ bin/flume-ng agent -c conf -f conf/agent-1.properties -n a1 -Dflume.root.logger=INFO,console 或 $ bin/flume-ng agent --conf conf --conf-file conf/agent-1.properties --name a1 -Dflume.root.logger=INFO,console
|
telnet发送消息
1 2 3 4 5 6 7 8 9 10
| # 安装Telnet [root@node0 zhangsan]# yum -y install telnet
# 启动Telnet [zhangsan@node0 ~]$ telnet localhost 44444
# 退出Telnet Ctrl + ] telnet> quit
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.command = tail -F /opt/bigdata/hadoop/default/logs/hadoop-zhangsan-datanode-node0.log a1.sources.r1.channels = c1
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
|
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1
agent1.channels.channel1.type = memory
agent1.sources.source1.channels = channel1 agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /opt/bigdata/hadoop/default/logs
agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = logger
|
启动agent
1
| (python37) [zhangsan@node0 default]$ ./bin/flume-ng agent -c conf -f conf/flume-source-dir.properties -n agent1 -Dflume.root.logger=INFO,console
|
创建文件
然后在目录中/opt/bigdata/hadoop/default/logs新创建一个文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1
agent1.channels.channel1.type = memory
agent1.sources.source1.channels = channel1 agent1.sources.source1.type = taildir agent1.sources.source1.filegroups = fg1 agent1.sources.source1.positionFile = /opt/bigdata/hadoop/default/flume-position/taildir_position.json agent1.sources.source1.filegroups.fg1 = /opt/bigdata/hadoop/default/logs/.*log.*
agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = logger
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://node0:9000/flume/events/%y-%m-% d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 5 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.channel = c1
|
启动agent
1 2 3 4 5
| (python37) [zhangsan@node0 default]$ ./bin/flume-ng agent -c conf -f conf/flume-sink-hdfs.properties -n a1 -Dflume.root.logger=INFO,console
# 启动agent a1 ,如有以下提示,便可使用telnet发送消息。 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| a1.sources = r1 a1.channels = c1 a1.sinks = k1
a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
|
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.4</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-legacy-sources</artifactId> <version>1.9.0</version> </dependency>
|
client
1 2 3 4 5 6 7 8
| public static void main(String[] args) throws EventDeliveryException { RpcClient client = RpcClientFactory.getDefaultInstance("192.168.179.100", 6666); Event event = EventBuilder.withBody("Hello Avro ", StandardCharsets.UTF_8); client.append(event); client.close(); }
|