程序员社区

Flume Source应用与配置

前言

Source是负责接收数据到Flume Agent的组件。Source可以从其他系统接收数据。Source也可以用于接受其他Flume Agent的Sink通过RPC发送来的数据。毫不夸张的说,Source可以接受任何来源的数据。

Source的基本配置

Source像所有的Fluem组件一样,需要在配置文件中指定它的类型,可以是FQCN或者内置Source的别名,所有的Source都至少有一个用于写入的Channel。因此Channel的列表也是对应Source合理配置的必要参数。

配置的例子如下:

//source的别名
usingFlume.sources = usingFlumeSource
usingFlume.channels = memory

usingFlume.sources.usingFlumeSource.type = avro
usingFlume.sources.usingFlumeSource.channels = memory
usingFlume.sources.usingFlumeSource.port = 7777
usingFlume.sources.usingFlumeSource.bind = 0.0.0.0

当然Source也可以有一些可选的配置参数,可以用来配置拦截器和选择器。

参数 描述
Interceptors 代表一连串拦截器的名单
Interceptors.<Interceptors.name>.* 传递给指定名称拦截器的参数
Selector Channel选择器使用的别名
Selector.* 传递给Channel选择器的配置参数

配置拦截器实例

usingFlume.source.avro.interceptor = i1 i2
usingFlume.source.avro.interceptor.i1.type = host
usingFlume.source.avro.interceptor.i1.preserveExsiting= true
usingFlume.source.avro.interceptor.i2.type = static
usingFlume.source.avro.interceptor.i2.key= header
usingFlume.source.avro.interceptor.i2.value= staticValue

配置选择器实例

usingFlume.source.avro.selector.type = multiplexing
usingFlume.source.avro.selector.header = priority
usingFlume.source.avro.selector.mapping.1 = channel1
usingFlume.source.avro.selector.mapping.2 = channel2
usingFlume.source.avro.selector.default = channel2

Sink-Source通信

Flume灵活性的重点就是Fluem的RPC Sink-to-Source的结合,RPC sink用来被设计给RPC source发送事件。Source能够接受大量的Sink或者RPC客户单发送的数据,尽管每个RPC Sink只能发送数据给一个RPC Source,但是每个Agent可以配置使用Sink组和Sink处理器,来发送数据给多个其他的Agent。

Avro Source

Flume主要的RPC-Source是Avro-Source。Avro-Source被设计为高扩展的RPC服务器端,能从其他的Flume Agent的Avro Sink或者使用Flume的SDK发送数据的客户端应用,接受数据到一个Flume Agent中。

Flume的Avro Source使用Netty-Avro inter-process的通信协议。

Avro-Source可以配置用来接受解压缩的Avro-Sink发送的压缩过的事件。也可以接受SSL加密后的数据。

Avro-Source配置

参数 描述
type avro,也可以用完整类别名称
bind IP地址,或者主机名,0.0.0.0绑定机器的所有接口
threads infinity,接受从客户端 或者Avro-Sink传入数据的最大工作线程数量
ssl true/false是否启动SSL,如果启用需要keystore和keystore-password
keystore 使用ssl的keystore 路径
keystore-password 打开keystore 的密码
keystore-type 默认JKS0,使用keystore类型
compression-type 解压缩数据的压缩格式,如zlib。如果要接受zlib压缩的数据,设置为deflate

Avro-Source使用Netty服务器来处理请求,Netty服务器使用Java的非阻塞I/O,这保证了Netty服务器使用相对较少的线程处理高的请求数。Avro-Source允许配置线程的最大数量。

如果数据在广域网数据中心传播,配置compression-type是非常有用的,能够减少使用的带宽。如果这个参数没有设置,或者设置为none,而接受的数据都是压缩的,这可能会导致事件积压在前一阶段,因为Source将不能解析压缩数据,会给前一阶段返回错误,将导致前一阶段一直重试。

下面配置了一个SSL和压缩的Avro Source的例子

usingFlume.sources = avroSrc
usingFlume.channels = memChannel

usingFlume.sources.avroSrc.type = avro
usingFlume.sources.avroSrc.channels = memChannel

# 绑定到所有的接口
usingFlume.sources.avroSrc.bind = 0.0.0.0
usingFlume.sources.avroSrc.port = 4353

# SSL
usingFlume.sources.avroSrc.ssl = true
usingFlume.sources.avroSrc.keystore = /tem/keystore.jks
usingFlume.sources.avroSrc.keystore-password = usingFlume
usingFlume.sources.avroSrc.keystore-type = jks

# 解压缩zlib
usingFlume.sources.avroSrc.compression-type = deflate

# channel配置
usingFlume.channels.memChannel.type = memory

Avro Sink的配置

usingFlume.sinks = avroSink
usingFlume.channels = memChannel

# channel配置
usingFlume.channels.memChannel.type = memory

# sink配置
usingFlume.sinks.avroSink.type = avro
usingFlume.sinks.avroSink.channels = memChannel
usingFlume.sinks.avroSink.hostname = node001.com
usingFlume.sinks.avroSink.port= 4353

# SSL
usingFlume.sinks.avroSink.ssl = true
usingFlume.sinks.avroSink.trust-all-certs = /tem/keystore.jks
usingFlume.sinks.avroSink.truststore = /path/keystore
usingFlume.sinks.avroSink.truststore-password = usingFlume
usingFlume.sinks.avroSink.truststore-type= jks

# 解压缩 zlib
usingFlume.sinks.avroSink.compression-type = deflate

RPC Source的失败处理

如果Source配置用来写入的其中一个Channel由于写满,会抛出ChannelException异常,或者由于这次事务量太大,Source会给Sink返回一个失败的状态,来回调它并期望他重试,因为RPC Source通过线程池的线程接受数据,线程失败j只能导致线程终止。

这些异常或者失败,记录在异常的日志文件中,有时候这些异常可能指明了一个大问题,例如资源耗尽异常,OutOfMemoryError,如果频繁抛出ChannelException异常,意味着Channel分配的速率远小于写入的速率,或者Sink清理Channel中数据的速度不够快,如果Sink数量不足可以增加Sink数量,但是如果最终的目的地不能处理负载,则需要重新考虑这些问题。但不管在什么情况下,错误只会导致程序重复执行,但不会造成真正的数据丢失,因为只有当数据真正的写入到下一阶段,事件才会从Channel中移除。

Spooling Directory Source

在很多场景中,应用程序写入数据到文件,通常这些文件不是简单的文件,或者每一行转换为一个事件没有意义,例如堆栈跟踪,这种情况下,Flume的Spooling Directory Source可以被用来从这些文件中读取数据。

Spooling Directory Source监视读取事件的目录,文件一旦被移动到该目录,就不应该再被写入数据,同时目录中的文件名也不能重复,如果这两种情况发生,Source会抛出异常终止,此时只能重启Agent的Source。

Spooling Directory Source是使用tail -f 的Exec Source的一种好的替换方案。但是这种方式不能保证实时跟踪,只能在文件关闭移动到坚实目录才能读取文件,文件一旦被Source完全使用,且所有的事件都被成功写入Channel中,Source就可以基于配置重命名文件或者删除文件。重命名文件指的是给文件名添加一个后缀,如.COMPLETE,这个后缀是可以配置的。

Spooling Directory Source使用追踪器持久化到磁盘,定位每个文件在哪个位置成功将事件写入到Channel,如果Agent挂了,重启时,Source就知道从这个位置开始读取数据。由于Source重启时,从文件的上次处理位置开始,这也就是Source不允许文件重用的原因。

Spooling Directory Source 参数

参数 描述
type spooldir,也可以用完整类别名称
spoolDir 监控的和读取文件的目录
batchSize 默认值100,每次写入Channel的最大事件数量
ignorePatter 默认值^& 正则表达式,文件名称匹配的正则表达式会被忽略
deletePolicy 默认never
fileSuffix 后缀,默认.COMPLETE
fileHeader false 如果为true,完整路径/文件名会被添加到header
fileHeaderKey file文件路径 如果文件名被添加到header,此参数为header中使用的密钥。
trackerDir .flumespool Spooling Directory Source存储元数据的目录,用来source中断时重启source
deserializer 默认line,行序列化器,Bulider类的别名或者完整类别名称,用来创建或读取自定义格式数据的反序列化器
inputCharste 默认utf8,当反序列化器调用readChar方法使用的编码

Spooling Directory Source的类型是spooldir,Source读取指定目录的文件并逐个处理它们,处理目录的完整名称通过spoolDir 指定。由于性能的原因,Source批处理的最大数量由batchSize 指定,Source尽可能多的从文件中读取事件,直至达到批大小,如果文件中事件的数量不足批大小,一旦文件读取完成,就尽快提交事务。

Spooling Directory Source能从他中断的位置恢复,所以能避免重复消耗文件的数据,文件读取处理信息持久化到 到追踪目录,追踪目录一直在Source的监控目录中,目录名默认为.flumespool 。需要注意的是,一旦追踪目录的名称设置好,如果这个参数的值发生改变,Source将不能再定位文件处理的位置,可能导致重复消费数据。

Spooling Directory Source配置实例

agent.sources = spool
agent.channels = memChannel

agent.source.spool.type = spooldir
agent.source.spool.channels = memChannel

agent.source.spool.spoolDir = /data/flume/spool
agent.source.spool.batchSize = 250

agent.source.spool.deletePolicy = immediate
agent.source.spool.fileHeader = true
agent.source.spool.fileHeaderKey = usingFlumeFiles
agent.source.spool.deserializer = usingflume.ch03.ProtobufDeserializer$ProtobufDeserializerBuilder

agent.channel.memChannel.type = memory
agent.channel.memChannel.capacity = 10000
agent.channel.memChannel.transactionCapacity = 500

Spooling Directory Source的性能

Spooling Directory Source是IO密集型的,可以通过使用多个线程读取数据,更多的使用可用的CPU来提高性能。提高读取文件性能的一种方法是轮流写文件到不同的目录,并有Spooling Directory Source处理每一个目录,如果数据都传入到相同的目的地,则写入到相同的Channel,

Exec Source

Exec Source执行用户配置的命令,且基于命令的标准输出来生成事件,他还可以从命令中读取错误流,转换成Flume事件。Source希望命令不断的产生数据,并且吸收其输出,只要命令开始运行,Source就要不停的去运行处理。

接着输出流的每一行将被编码为字节数组,编码为UTF-8,然后每个字节数组用作Flume事件的Body,把他们写入到Channel,如果Channel已满,Source可以配置成停止读取流输出和错误流。

Exec Source配置

参数 描述
type exec,也可以用完整类别名称org.apache.flume.source.ExecSource
command source运行的命令
restart 默认值false,设置为true时,如果流程死亡,Source将重启流程
restartThrottle 默认值10000,重启命令需要等待的毫秒值,如果restart为false,则无影响
logStdErr 默认false,如果设置为true,错误流也会被读取并转换为Flume事件
batchSize 默认20,批处理,写入到Channel中的最大事件数量
batchTimeout 批处理超时时间,如果长时间未写入到channel,则该配置会生效,将缓存的数据写入到channel中
charset UTF-8,编码输入流或错误流为Flume事件的字符集
shell 用于运行该命令的shell或者命令处理器

配置示例

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

shell运行命令

a1.sources = tailsource-1
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

Exec Source 丢失数据的可能性?
Exec Source是异步Source的一个例子,即如果有失败可能通知不到数据生产者,因此,重新启动Agent或者机器会导致数据丢失。
Exec Source在Flume中最常用来追踪文件,利用tail -f 命令使用Exec Source来追踪文件,近乎实时的将数据放到Flume中,但存在丢失数据的风险。如果Flume Agent挂了,重新启动Agent或者机器,Exec Source将在启动时重新运行command,在这种情况下,因为tail 命令只会拉取新数据,因此任何在Agent死亡和Source启动期间的写入文件的数据都会丢失。

由于这个原因,如果要求比较严格,可以采用Spooling Dircetory Source处理写入文件的数据。

即使使用其他一些命令,Exec Source 在将事件写入到Channel之前,也会缓存一些事件,直至到达batch大小,如果Agent重启,这些事件也可能会丢失。

Kafka Source

Kafka Source配置

参数 描述
type 完整类别名称org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers kafka集群
kafka.topics kafka消费者将从逗号分隔的主题列表中读取消息
kafka.topics.regex 正则订阅topic,该属性比kafka.topics优先级更高,如果存在,将覆盖kafka.topics。
kafka.consumer.group.id 默认flume,消费者组Id
batchSize 默认1000,批处理,写入到Channel中的最大事件数量
batchDurationMillis 默认1000,批处理超时时间,如果长时间未写入到channel,则该配置会生效,将缓存的数据写入到channel中
backoffSleepIncrement 1000 初始和增量等待时间当kafka主题显示为空时触发,该参数会减少kafka一直去ping一个空主题的topic
maxBackoffSleep 5000 kafka主题显示为空时触发的最长等待时间
useFlumeEventFormat false 设置为true以Flume Avro二进制格式读取事件
migrateZookeeperOffsets true
kafka.consumer.security.protocol SASL_PLAINTEXT, SASL_SSL 或者 SSL

配置示例

逗号分隔的主题列表

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则订阅Topic

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

自定义Source

Flume有两种类型的Source,EventDriven Source和Pollable Source.

PollableSource
1.当一个agent 启动之后,就会不断循环调用 process 以获取数据
2.当 process 返回 READY,表示数据产生正常,如果是 BACKOFF 则表示异常,当产生异常时候,agent 会等待一段时间再来调用 process,异常次数越多,间隔时间越长,最长不超过 5s。
3.自带一个线程,工作都是在自己的独立线程之内的

EventDrivenSource
1.当一个agent启动时候,会开始执行 application 的 main() 方法
2.进程启动之后,会通过 AbstractConfigurationProvider$getConfiguration解析配置文件中的各个组件和属性
3.针对 source 会生成 sourceRunner 通过 supervisor 来运行和管理其生命周期。
4.source 的生命周期 start 方法正式开始执行,这样也就到了我们将要自定义代码的实现执行了。

创建一个类,继承自 AbstractSource 并实现 Configurable 和( EventDrivenSource 或者PollableSource )
实现相关方法,以下是简单的一个生成序列的source

package com.inveno.flume;

import java.util.ArrayList;
import java.util.List;

import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {

    private static final Logger logger = LoggerFactory
            .getLogger(SequenceSource.class);

    private long seq;
    private int batchSize = 10;
    private List<Event> batchArrayList = new ArrayList<>();


    @Override
    public void configure(Context context) {
        //自定义配置属性
        batchSize = context.getInteger("batchSize", 1);
        //打印自定义属性
        ImmutableMap<String, String> map = context.getParameters();
        for (String s : map.keySet()) {
            logger.warn(s + "==============configure=============================" + map.get(s));
        }
    }

    private void process(){
        try {
            batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
            if(batchArrayList.size()>=batchSize){
                getChannelProcessor().processEventBatch(batchArrayList);
                batchArrayList.clear();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    @Override
    public void start() {
        super.start();
        //开启一个线程来生产数据,当然你也可以整个线程池
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (!Thread.currentThread().isInterrupted()){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        //这里有个java知识点 ,InterruptedException捕获后,
                        // 这个标记点会被重置 ,需要再次 interrupt才能正确退出
                        Thread.currentThread().interrupt();
                    }
                    process();
                }
            }
        }).start();
        logger.debug("==========================start");
    }

    @Override
    public void stop() {
        super.stop();
        logger.info("==========================stop", getName());
    }

}
赞(0) 打赏
未经允许不得转载:IDEA激活码 » Flume Source应用与配置

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区