Flume配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.bind = tail -F /usr/local/hive.log a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = day02 a1.sinks.k1.brokerList = hadoop:9092 a1.sinks.k1.batchSize = 20 a1.sinks.k1.requiredAcks = 1 a1.channels.c1.type = memory a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动Flume
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file /opt/flume/flume/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
启动Kafka
kafka-server-start.sh /opt/kafka/kafka/config/server.properties
创建Topic
kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic day02
查看Topic
kafka-topics.sh --list --zookeeper hadoop:2181
Application
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object SparkReceiver{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkReceiver") val ssc = new StreamingContext(sparkConf,Seconds(5)) //主要参数一 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //主要参数二 val topics = Array("day02") val stream = KafkaUtils.createDirectStream[String,String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(record => (record.key, record.value)).print() ssc.start() ssc.awaitTermination() } }