Kafka QuickStart

#Kafka常用操作

##准备工作 下载Kafka包,并解压
tar -zxvf kafka.tar.gz
cd kafka/bin

##查看集群Topic List

./kafka-topics.sh --list --zookeeper zk_host:port/chroot 

##创建Topic

./kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name  --partitions 20 --replication-factor 3 --config retention.ms=86400000

config具体的配置:http://kafka.apache.org/documentation.html#topic-config

##查看某个Topic详情

./kafka-topics.sh --describe --zookeeper zk_host:port/chroot --topic my_topic_name

##修改Topic

###增加partition

./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name  --partitions 40

###增加(或修改)Config

./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

###删除Config

./kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x

##删除Topic

./kafka-topics.sh --delete --zookeeper zk_host:port/chroot --topic my_topic_name

#Spark Streaming + Kafka 如果不想管理Kafka的offset,而是只注重消息内容本身,建议直接使用没有任何Receivers的直接读取方法。
这种方法的好处在于可以简化并行操作;消息的接收变得直接而有效,因为没有了Receivers;还有就是Exactly-once semantics,零数据丢失,因为这种方法没有使用ZK来管理offset,所以即使失败也不会处理两遍相同的消息。不好的地方就是无法使用基于ZK的Kafka管理监控工具。

##产生消息的实例代码

确定Kafka集群,以及要发送的topic名字

val Array(brokers, topic) = args

定义连接Kafka集群的配置

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

定义消息生产者

val producer = new KafkaProducer[String, String](props)

定义消息,并且发送消息

val str = "messages"
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)

##消息消费的实例代码 确定Kafka集群信息,要处理的topic名字

val Array(brokers, topics) = args

创建一个streamingContext,可以定义要处理消息队列的时间窗口

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

读取Kafka消息队列中的消息

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

读取到的消息的(key,value)类型是你定义的类型:InputDStream[(String, String)]
前面定义了消息的读取,以及消息的处理动作,下面就是启动消费消息的流程

ssc.start()
ssc.awaitTermination()

另外,任务总会有失败的可能,所以你将任务失败的时候的状态存起来,这样重新启动的任务可以根据状态,重新从失败的offset开始读取并消费消息。
存储状态

ssc.checkpoint(checkpointPath)
messages.checkpoint(Minutes(intervalOfCheckpoint.toInt))

根据失败时的状态文件,重新创建消息消费任务

val ssc = StreamingContext.getOrCreate(checkpointPath, () => {createStreamJob(sparkConf, args)})

另外,需要注意的是,你读取并存储消息的partition和rdd中的partition并不相同,所以,如果有足够的资源,你可以将读取到的消息repartition到不同的计算节点上进行计算。

messages.foreachRDD(rdd =>
    rdd.repartition(numExecutor).mapPartitions(iter => {
        // iter action
    })
)