#Kafka常用操作
##准备工作
下载Kafka包,并解压
tar -zxvf kafka.tar.gz
cd kafka/bin
##查看集群Topic List
./kafka-topics.sh --list --zookeeper zk_host:port/chroot
##创建Topic
./kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config retention.ms=86400000
config具体的配置:http://kafka.apache.org/documentation.html#topic-config
##查看某个Topic详情
./kafka-topics.sh --describe --zookeeper zk_host:port/chroot --topic my_topic_name
##修改Topic
###增加partition
./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
###增加(或修改)Config
./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
###删除Config
./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
##删除Topic
./kafka-topics.sh --delete --zookeeper zk_host:port/chroot --topic my_topic_name
#Spark Streaming + Kafka
如果不想管理Kafka的offset,而是只注重消息内容本身,建议直接使用没有任何Receivers的直接读取方法。
这种方法的好处在于可以简化并行操作;消息的接收变得直接而有效,因为没有了Receivers;还有就是Exactly-once semantics,零数据丢失,因为这种方法没有使用ZK来管理offset,所以即使失败也不会处理两遍相同的消息。不好的地方就是无法使用基于ZK的Kafka管理监控工具。
##产生消息的实例代码
确定Kafka集群,以及要发送的topic名字
val Array(brokers, topic) = args
定义连接Kafka集群的配置
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
定义消息生产者
val producer = new KafkaProducer[String, String](props)
定义消息,并且发送消息
val str = "messages"
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
##消息消费的实例代码 确定Kafka集群信息,要处理的topic名字
val Array(brokers, topics) = args
创建一个streamingContext,可以定义要处理消息队列的时间窗口
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
读取Kafka消息队列中的消息
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
读取到的消息的(key,value)类型是你定义的类型:InputDStream[(String, String)]
前面定义了消息的读取,以及消息的处理动作,下面就是启动消费消息的流程
ssc.start()
ssc.awaitTermination()
另外,任务总会有失败的可能,所以你将任务失败的时候的状态存起来,这样重新启动的任务可以根据状态,重新从失败的offset开始读取并消费消息。
存储状态
ssc.checkpoint(checkpointPath)
messages.checkpoint(Minutes(intervalOfCheckpoint.toInt))
根据失败时的状态文件,重新创建消息消费任务
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {createStreamJob(sparkConf, args)})
另外,需要注意的是,你读取并存储消息的partition和rdd中的partition并不相同,所以,如果有足够的资源,你可以将读取到的消息repartition到不同的计算节点上进行计算。
messages.foreachRDD(rdd =>
rdd.repartition(numExecutor).mapPartitions(iter => {
// iter action
})
)