flume学习

前言

网络上关于flume的文档很多,因此本文主要作用是入门和介绍一些常用场景,并且也会将一些好的文档推荐给大家~


flume的工作原理

可以参考这篇文档,本文不再赘述。
Flume NG 1.x 是Flume 0.9.x的重构版本。在Flume NG中没有了之前的Master、Zookeeper、Collector以及Web console。只有source、sink、channel。


安装flume

tar zxvf apache-flume-1.7.0-bin.tar.gz  
  • 创建一个用于测试的配置文件:
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cd apache-flume-1.7.0-bin/
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ vim conf/flume-conf.properties
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ cat conf/flume-conf.properties
# Agent tier1
tier1.sources = r1  
tier1.sinks = k1  
tier1.channels = c1

# source 配置
tier1.sources.r1.type = exec  
tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log

# sink 配置
tier1.sinks.k1.type = file_roll  
tier1.sinks.k1.sink.directory = /home/vagrant/flume-test/collector  
tier1.sinks.k1.sink.rollInterval=3600

# channel 配置
tier1.channels.c1.type = file  
tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir  
tier1.channels.c1.dataDirs =  /home/vagrant/flume-test/datadir

# 绑定source、sink到channel上
tier1.sources.r1.channels = c1  
tier1.sinks.k1.channel = c1  
  • 创建配置文件中相关的目录和文件:
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/collector
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/checkpointdir
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ mkdir -p /home/vagrant/flume-test/datadir
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ touch /home/vagrant/flume-test/flume-test.log
  • 启动flume-ng
[vagrant@hadoop-102 apache-flume-1.7.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name tier1 -Dflume.root.logger=INFO,console


其中,--name用来指定agent的名称

  • 再打开一个终端进行测试
[vagrant@hadoop-102 ~]$ cd /home/vagrant/flume-test/
[vagrant@hadoop-102 ~]$ echo testlog >>flume-test.log
[vagrant@hadoop-102 flume-test]$ ls collector/
1505963145080-1  
[vagrant@hadoop-102 flume-test]$ cat collector/1505963145080-1
testlog  


可以看到日志已经被收集了


进阶 - 整合flume和kafka

flume支持很多source、channel、sink,其中就支持把kafka作为sink。因此,只需要简单配置即可,下面看一个示例:

# Agent tier1
tier1.sources = r1  
tier1.sinks = k1  
tier1.channels = c1

# source 配置
tier1.sources.r1.type = exec  
tier1.sources.r1.command = tail -F /home/vagrant/flume-test/flume-test.log

# sink 配置
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
tier1.sinks.k1.kafka.bootstrap.servers = 192.168.100.101:9092,192.168.100.102:9092  
tier1.sinks.k1.kafka.topic = testflume

# channel 配置
tier1.channels.c1.type = file  
tier1.channels.c1.checkpointDir = /home/vagrant/flume-test/checkpointdir  
tier1.channels.c1.dataDirs =  /home/vagrant/flume-test/datadir

# 绑定source、sink到channel上
tier1.sources.r1.channels = c1  
tier1.sinks.k1.channel = c1  

关于kafka、zookeeper的安装,可以参考:


自定义Interceptor

可以先阅读这篇文档

下面看个例子:
SearchInterceptor.java:

package com.gomeplus.bigdata.log;

import lombok.extern.slf4j.Slf4j;  
import org.apache.flume.Context;  
import org.apache.flume.Event;  
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.List;

@Slf4j
public class SearchInterceptor implements Interceptor {  
    private String hostName;
    private String charset;

    public static class Builder implements Interceptor.Builder {
        private String hostName = "localhost";
        private String charset = "utf8";

        {
            try {
                InetAddress address = InetAddress.getLocalHost();
                hostName = address.getHostName();
            } catch (UnknownHostException ex) {
                log.error(ex.getMessage());
            }
        }

        public Interceptor build() {
            return new SearchInterceptor(hostName, charset);
        }

        public void configure(Context context) {
            hostName = context.getString("hostname", hostName);
            charset = context.getString("charset", charset);
        }
    }

    private SearchInterceptor(String hostName, String charset) {
        this.hostName = hostName;
        this.charset = charset;
    }

    public Event intercept(Event event) {
        String body;
        try {
            body = new String(event.getBody(), charset);
            event.setBody((body + "\t" + hostName).getBytes(charset));
        } catch (UnsupportedEncodingException ex) {
            log.error(ex.getMessage());
            return event;
        }
        return event;
    }

    public List<Event> intercept(List<Event> events) {
        for (Event event: events)
            intercept(event);
        return events;
    }

    public void initialize() {
        // NO-OP
    }

    public void close() {
        //NO-OP
    }
}

pom.xml:

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

flume-conf.properties:

# 拦截器
tier1.sources.r1.interceptors = i1  
tier1.sources.r1.interceptors.i1.type = com.gomeplus.bigdata.log.SearchInterceptor$Builder  
tier1.sources.r1.interceptors.i1.hostname = test_host_name  
tier1.sources.r1.interceptors.i1.charset = UTF8  

打好jar包之后,将jar包拷贝到<flume安装目录>/lib/目录下,然后启动flume agent即可。

感谢浏览tim chow的作品!

如果您喜欢,可以分享到: 更多

如果您有任何疑问或想要与tim chow进行交流

可点此给tim chow发信

如有问题,也可在下面留言: