The below post describes how to configure Flum to grab data from a Kafaka topic as the source then send the data to an HDFS target.
Pipeline flow
Original source > Kafka topic > Flum from (Kafka topic) > HDFS
Create a run file
agent_name=flume1 flume-ng agent -n $agent_name -c conf -f conf/flume.conf # Debug to screen flume-ng agent -n $agent_name -c conf -f conf/flume.conf -Dflume.root.logger=INFO,console
Create configuration files
Create environment file
mkdir conf conf/flume-env.sh # Give Flume more memory and pre-allocate, enable remote monitoring via JMX export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote" # Note that the Flume conf directory is always included in the classpath. export FLUME_CLASSPATH="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/zookeeper-3.4.5-cdh5.4.0.jar"
Create configuration file
conf/flume.conf flume1.sources = kafkaSource flume1.channels = memoryChannel flume1.sinks = hdfsSink flume1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource flume1.sources.kafkaSource.zookeeperConnect = n01.domain.com:2181,n02.domain.com:2181 flume1.sources.kafkaSource.topic = web-sql-log flume1.sources.kafkaSource.batchSize = 5 flume1.sources.kafkaSource.batchDurationMillis = 200 flume1.sources.kafkaSource.channels = memoryChannel #flume1.sources.kafkaSource.groupId = new flume1.sources.kafkaSource.groupId = flume-sql-logs flume1.sources.kafkaSource.kafka.auto.offset.reset = smallest #Use this to modify the source message #flume1.sources.kafkaSource.interceptors = i1 # Regex Interceptor to set timestamp so that HDFS can be written to partitioned #flume1.sources.kafkaSource.interceptors.i1.type = regex_extractor #flume1.sources.kafkaSource.interceptors.i1.serializers = s1 #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.name = timestamp ## Match this format logfile to get timestamp from it: ## 76.164.194.74 - - [06/Apr/2014:03:38:07 +0000] "GET / HTTP/1.1" 200 38281 "-" "Pingdom.com_bot_version_1.4_(http://www.pingdom.com/)" #flume1.sources.kafkaSource.interceptors.i1.regex = (d{2}/[a-zA-Z]{3}/d{4}:d{2}:d{2}:d{2}s+d{4}) #flume1.sources.kafkaSource.interceptors.i1.serializers.s1.pattern = dd/MMM/yyyy:HH:mm:ss Z # # http://flume.apache.org/FlumeUserGuide.html#memory-channel flume1.channels.memoryChannel.type = memory flume1.channels.memoryChannel.capacity = 100 flume1.channels.memoryChannel.transactionCapacity = 100 ## Write to HDFS #http://flume.apache.org/FlumeUserGuide.html#hdfs-sink flume1.sinks.hdfsSink.type = hdfs flume1.sinks.hdfsSink.channel = memoryChannel flume1.sinks.hdfsSink.hdfs.kerberosPrincipal = flume/n01.domain.com@DEVTECH101.COM flume1.sinks.hdfsSink.hdfs.kerberosKeytab = /var/tmp/elik/flume.keytab flume1.sinks.hdfsSink.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d flume1.sinks.hdfsSink.hdfs.fileType = DataStream flume1.sinks.hdfsSink.hdfs.writeFormat = Text flume1.sinks.hdfsSink.hdfs.rollSize = 0 flume1.sinks.hdfsSink.hdfs.batchSize = 100 flume1.sinks.hdfsSink.hdfs.rollCount = 10000 flume1.sinks.hdfsSink.hdfs.rollInterval = 600
Where did you configure the elasticsearch target? I could not locate that in the configuration file.
Hi and welcome to my blog, It’s been a few year’s since I worked with this configuration. Seems like I forgot to post the eleasticsearch configuration. Unfortunately we don’t use that system anymore so I don’t have access to the config. However, I believe we ended up using the flume flow only for hdfs. Meaning we had logstash > Kafka, then we had two Kafka topics, 1) Kafka > flume > hdfs (which thus post shows). 2) Kafka > logstash > eleasticsearch. Try take a look on this post http://devtech101.com/2016/02/19/elk-and-kafka-zookeeper/. I know this not exactly what you are looking for… Read more »