Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

软件:

kafka_2.11-2.1.0
hadoop-2.7.7.tar.gz
apache-flume-1.9.0-bin.tar.gz

1.下载flume

 wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
 tar zxvf apache-flume-1.9.0-bin.tar.gz 

2.创建flume的配置文件kafka-hdfs.conf

vi conf/kafka-hdfs.conf

 # ------------------- define data source ----------------------
 # source alias
 agent.sources = kafka_source  
 # channels alias
 agent.channels = c1  
 # sink alias
 agent.sinks = hdfs_sink  
 # define kafka source
 agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource  
 agent.sources.kafka_source.channels = c1  
 agent.sources.kafka_source.batchSize = 5000  
 # set kafka broker address kafka的broker地址,多个用逗号分割
 agent.sources.kafka_source.kafka.bootstrap.servers = localhost:9092
 # set kafka topic 设置kafka的topic
 agent.sources.kafka_source.kafka.topics = nginx_log
 # set kafka groupid 设置消费组id
 agent.sources.kafka_source.kafka.consumer.group.id = group_log
 # defind hdfs sink
 agent.sinks.hdfs_sink.type = hdfs 
 # specify the channel the sink should use  
 agent.sinks.hdfs_sink.channel = c1
 # set store hdfs path 设置存储hdfs路径
 agent.sinks.hdfs_sink.hdfs.path = hdfs://host:9000/nginx_log/%Y%m%d  
 # set file size to trigger roll
 # HDFS上的文件达到128M时生成一个文件
 agent.sinks.hdfs_sink.hdfs.rollSize = 134217728  
 # 不按照条数生成文件
 agent.sinks.hdfs_sink.hdfs.rollCount = 0  
 # HDFS上的文件达到3600秒生成一个文件
 agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
 agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
 # log文件前缀
 agent.sinks.hdfs_sink.hdfs.filePrefix = log
 agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
 agent.sinks.hdfs_sink.hdfs.writeFormat=Text    
 # define channel from kafka source to hdfs sink 
 agent.channels.c1.type = memory  
 # channel store size
 agent.channels.c1.capacity = 100000
 # transaction size
 agent.channels.c1.transactionCapacity = 10000 

3.添加flume依赖hdfs的jar包

 commons-configuration-1.6.jar
 commons-io-2.4.jar
 hadoop-auth-2.7.3.jar
 hadoop-common-2.7.3.jar
 hadoop-hdfs-2.7.3.jar
 htrace-core-3.1.0-incubating.jar 

4.将hdfs的配置文件放入到flume的conf目录下

将hadoop配置文件core-site.xml和hdfs-site.xml拷贝到flume的conf目录下

5.启动flume

 ./bin/flume-ng agent -n agent -f conf/kafka-hdfs.conf
#后台运行
 nohup ./bin/flume-ng agent -n agent -f conf/kafka-hdfs.conf >> log_url_kafka-hdfs.log &

发表评论

电子邮件地址不会被公开。 必填项已用*标注