网络上关于flume的文档很多,因此本文主要作用是入门和介绍一些常用场景,并且也会将一些好的文档推荐给大家~
可以参考这篇文档,本文不再赘述。
Flume NG 1.x 是Flume 0.9.x的重构版本。在Flume NG中没有了之前的Master、Zookeeper、Collector以及Web console。只有source、sink、channel。
安装JDK(省略)
去官网提供的下载地址下载flume的二进制包
解压:
tar zxvf apache-flume-1.7.0-bin.tar.gz
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cd apache-flume-1.7.0-bin/ [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ vim conf/flume-conf.properties [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cat conf/flume-conf.properties # Agent tier1 tier1.sources = r1 tier1.sinks = k1 tier1.channels = c1 # source 配置 tier1.sources.r1.type = exec tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log # sink 配置 tier1.sinks.k1.type = file_roll tier1.sinks.k1.sink.directory = /home/vagrant/flume-test/collector tier1.sinks.k1.sink.rollInterval=3600 # channel 配置 tier1.channels.c1.type = file tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir tier1.channels.c1.dataDirs = /home/vagrant/flume-test/datadir # 绑定source、sink到channel上 tier1.sources.r1.channels = c1 tier1.sinks.k1.channel = c1
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/ [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/collector [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/checkpointdir [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/datadir [vagrant@hadoop-102 apache-flume-1.7.0-bin]$ touch /home/vagrant/flume-test/flume-test.log
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name tier1 -Dflume.root.logger=INFO,console
其中,--name用来指定agent的名称
[vagrant@hadoop-102 ~]$ cd /home/vagrant/flume-test/ [vagrant@hadoop-102 ~]$ echo testlog >>flume-test.log [vagrant@hadoop-102 flume-test]$ ls collector/ 1505963145080-1 [vagrant@hadoop-102 flume-test]$ cat collector/1505963145080-1 testlog
可以看到日志已经被收集了
flume支持很多source、channel、sink,其中就支持把kafka作为sink。因此,只需要简单配置即可,下面看一个示例:
# Agent tier1 tier1.sources = r1 tier1.sinks = k1 tier1.channels = c1 # source 配置 tier1.sources.r1.type = exec tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log # sink 配置 tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.k1.kafka.bootstrap.servers = 192.168.100.101:9092,192.168.100.102:9092 tier1.sinks.k1.kafka.topic = testflume # channel 配置 tier1.channels.c1.type = file tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir tier1.channels.c1.dataDirs = /home/vagrant/flume-test/datadir # 绑定source、sink到channel上 tier1.sources.r1.channels = c1 tier1.sinks.k1.channel = c1
关于kafka、zookeeper的安装,可以参考:
可以先阅读这篇文档。
下面看个例子:
SearchInterceptor.java:
package com.gomeplus.bigdata.log;
import lombok.extern.slf4j.Slf4j;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
@Slf4j
public class SearchInterceptor implements Interceptor {
private String hostName;
private String charset;
public static class Builder implements Interceptor.Builder {
private String hostName = "localhost";
private String charset = "utf8";
{
try {
InetAddress address = InetAddress.getLocalHost();
hostName = address.getHostName();
} catch (UnknownHostException ex) {
log.error(ex.getMessage());
}
}
public Interceptor build() {
return new SearchInterceptor(hostName, charset);
}
public void configure(Context context) {
hostName = context.getString("hostname", hostName);
charset = context.getString("charset", charset);
}
}
private SearchInterceptor(String hostName, String charset) {
this.hostName = hostName;
this.charset = charset;
}
public Event intercept(Event event) {
String body;
try {
body = new String(event.getBody(), charset);
event.setBody((body + "\t" + hostName).getBytes(charset));
} catch (UnsupportedEncodingException ex) {
log.error(ex.getMessage());
return event;
}
return event;
}
public List intercept(List events) {
for (Event event: events)
intercept(event);
return events;
}
public void initialize() {
// NO-OP
}
public void close() {
//NO-OP
}
}
pom.xml:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
</dependencies>
flume-conf.properties:
# 拦截器 tier1.sources.r1.interceptors = i1 tier1.sources.r1.interceptors.i1.type = com.gomeplus.bigdata.log.SearchInterceptor$Builder tier1.sources.r1.interceptors.i1.hostname = test_host_name tier1.sources.r1.interceptors.i1.charset = UTF8
打好jar包之后,将jar包拷贝到<flume安装目录>/lib/目录下,然后启动flume agent即可。
翻译自:flume官网。
flume支持从一个 source 扇出到多个 channel。flume自带的扇出策略有两种:replicating(复制,默认值) 和 multiplexing(多路复用)。
在使用replicating策略时,event会被发送到所有配置给source的channel;在使用multiplexing策略时,event会被按需发送到一部分channel中。
为了达到扇出的目的,需要给source配置多个channel,同时还需要使用selector.type指定扇出策略。下面是一个例子:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
当使用multiplexing策略时,可以根据event的某一个header,来选择channel。 当该header匹配上某一个值的时候,就会将该event发送到配置的channels中。如果该header没匹配到任何值,那么会将event发送到default channels中去。比如:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>
在下面的例子中,有一个叫agent_foo的agent,它只有一个avro类型的source,但是有两个channel,每个channel都连接到一个sink。selector会检查event的State header,如果它的值是CA,那么event会被发送到mem-channel-1;如果它的值是AZ,那么event会被发送到file-channel-2;如果它的值是NY,那么event会被发送到mem-channel-1 和 mem-channel-2;否则,event会被发送到mem-channel-1:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
selector会尝试把event写到所有required channels中,哪怕其中一个channel消费event失败,都会导致这个事务失败。然后,selector会在所有的channel上重试。在所有required channels都消费了event之后,selector会尝试把event写到所有optional channels中,当任何optional channel消费event失败时,都会被忽略,而不会重试。下面看一个带有optional channel的例子:
# channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
如果required channels和optional channels有重叠,那么认为这些channel是required。
当没有required channels时,selector会把event写进default channels,并尝试把event写进optional channels;
当既没有required channels,又没有default channels时,selector会尝试把event写进optional channels。