flume介绍

flume是一个分布式,高可用,高可靠的海量日志采集、聚合和传输系统,flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力。

Flume由三个组件组成

Source:从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro,Thrift,Http等。

Channel:channel是一种短暂的存储容器,它将从sources处接收的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数据量的source和sink链接。支持的类型有:JDBC channel,File System channel,Memory channel等。

Sink:sink将数据存储到集中存储器比如Hbase和HDFS,它从channels消费数据(events)并将其传递给目标地。目标地可能是另一个sink,也可能是HDFS,HBase等。

数据流模型

Event是Flume定义的一个数据流传输的最小单元。Agent就是一个Flume的实例,本质是一个JVM进程,该JVM进程控制Event数据流从外部日志生产者那里传输到目的地(或者是下一个Agent)。

Agent中的source和sink与channel存取Event是异步的。

Flume的Source负责消费外部传递给它的数据(比如web服务器的日志)。外部的数据生产方以Flume Source识别的格式向Flume发送Event。

可靠性

Event会在每个Agent的Channel上进行缓存,随后Event将会传递到流中的下一个Agent或目的地(比如HDFS)。只有成功地发送到下一个Agent或目的地后Event才会从Channel中删除。这一步保证了Event数据流在Flume Agent中传输时端到端的可靠性。

可恢复性

Event数据会缓存在Channel中用来在失败的时候恢复出来。Flume支持保存在本地文件系统中的(文件channel),也支持保存在内存中的(内存Channel),(内存Channel)显然速度会更快,缺点是万一Agent挂掉(内存Channel)中缓存的数据也就丢失了。

组合流

Flume可以设置多级Agent连接的方式传输Event数据。

多路复用流

Flume支持多路复用数据流到一个或多个目的地。这是通过使用一个流的[多路复用器](multiplexer)来实现的,它可以 复制 或者 选择 数据流到一个或多个channel上。

案例

flume版本:apache-flume-1.9.0-bin.tar.gz

下面功能是从kafka中读取topic名为nginx_log的消息,然后写入到hdfs中。

conf目录下新建kafka-hdfs.conf,然后将hadoop目录下的core-site.xml和hdfs-site.xml放入conf目录下。

 # ------------------- define data source ----------------------
  # source alias
  agent.sources = kafka_source  
  # channels alias
  agent.channels = c1  
  # sink alias
  agent.sinks = hdfs_sink  
  
  ### 定义 kafka source
  agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
  agent.sources.kafka_source.batchSize = 5000  
  # set kafka broker address kafka的broker地址,多个用逗号分割
  agent.sources.kafka_source.kafka.bootstrap.servers = localhost:9092
  # set kafka topic 设置kafka的topic
  agent.sources.kafka_source.kafka.topics = nginx_log
  # set kafka groupid 设置消费组id
  agent.sources.kafka_source.kafka.consumer.group.id = group_log
  
  ### 定义 hdfs sink
  agent.sinks.hdfs_sink.type = hdfs 
  # set store hdfs path 设置存储hdfs路径
  agent.sinks.hdfs_sink.hdfs.path = /nginx_log/%Y%m%d  
  # set file size to trigger roll
  # HDFS上的文件达到128M时生成一个文件 134217728
  agent.sinks.hdfs_sink.hdfs.rollSize = 1024
  # 不按照条数生成文件
  agent.sinks.hdfs_sink.hdfs.rollCount = 0  
  # HDFS上的文件达到3600秒生成一个文件
  agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
  agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
  # log文件前缀
  agent.sinks.hdfs_sink.hdfs.filePrefix = log
  agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
  agent.sinks.hdfs_sink.hdfs.writeFormat=Text    
  
  ### 定义 channel from kafka source to hdfs sink 
  agent.channels.c1.type = memory  
  # channel store size
  agent.channels.c1.capacity = 100000
  # transaction size
  agent.channels.c1.transactionCapacity = 10000
  
   ### 关联source和sink到channel上
  agent.sources.kafka_source.channels = c1
  # specify the channel the sink should use  
  agent.sinks.hdfs_sink.channel = c1 

启动:

 ./bin/flume-ng agent -n agent -f conf/kafka-hdfs.conf

Flume常用组件列表

Source组件:

 1.Spooling Directory Source:收集目录中的新文件 
 2.Kafka Source:相当于kafka消费者,读取topic消息
 3.NetCat TCP Source:监听端口文本数据
 4.HTTP Source:提供http api接口,接收数据
 4.Custom Source:实现抽象类AbstractSource方法

Channel组件:

 1.Memory Channel:event存储在内存
 2.Kafka Channel:event存储在kafka集群,Kafka提供了高可用性和复制机制
 3.File Channel:event存储在本地文件
 4.Custom Channel:实现Channel接口来自定义一个channel

Sink组件:

 1.HDFS Sink:将Event写入Hadoop分布式文件系统(也就是HDFS)
 2.Hive Sink:将包含分隔文本或JSON数据的 Event 直接流式传输到 Hive表或分区上
 3.Logger Sink:使用INFO级别把Event内容输出到日志中
 4.Avro Sink:可以作为 Flume 分层收集特性的下半部分,转发数据到其他agent。
 5.Thrift Sink:可以作为 Flume 分层收集特性的下半部分,转发数据到其他agent。
 6.File Roll Sink:把 Event 存储到本地文件系统
 7.Null Sink:丢弃所有从 channel 读取到的 Event
 8.HBaseSink:此Sink将数据写入 HBase
 9.AsyncHBaseSink:这个Sink使用异步模型将数据写入 HBase
 10.ElasticSearchSink:这个Sink把数据写入到 elasticsearch 集群
 11.Kafka Sink:这个 Sink 可以把数据发送到 kafka topic上
 12.HTTP Sink:HTTP Sink 从 channel 中获取 Event,然后再向远程 HTTP 接口 POST 发送请求,Event 内容作为 POST 的正文发送。
 13.Custom Sink:实现抽象类AbstractSink方法,自定义sink

发表评论

电子邮件地址不会被公开。 必填项已用*标注