SparkStreaming+Zookeeper+Kafka入门程序
准备工作:
开始工作
1. 启动zookeeper
打开终端,切换到 zookeeper HOME
目录, 进入conf文件夹,拷贝一份 zoo_sample.cfg
副本并重命名为 zoo.cfg
切换到上级的bin目录中,执行 ./zkServer.sh start
启动zookeeper,会有日志打印
Starting zookeeper … STARTED
然后用 ./zkServer.sh status
查看状态,如果有下列信息输出,则说明启动成功
Mode: standalone
如果要停止zookeeper,则运行 ./zkServer stop
即可
2. 启动kafka
打开终端,切换到 kafka HOME
目录,运行 bin/kafka-server-start.sh config/server.properties
会有以下类似日志输出
[2014-11-12 17:38:13,395] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2014-11-12 17:38:13,420] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
3. 启动kafka生产者
重新打开一个终端,暂叫做 生产者终端,方便后面引用说明。切换到 kafka HOME
目录,运行 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建一个叫 test
的主题。
4. 编写scala应用程序
1 |
|
build.sbt
文件中添加依赖
libraryDependencies += “org.apache.spark” % “spark-streaming_2.10” % “1.1.0”
libraryDependencies += “org.apache.spark” % “spark-streaming-kafka_2.10” % “1.1.0”
启动scala程序,然后在 上面第2步的 生产者终端中输入一些字符串,如 sdfsadf a aa a a a a a a a a
,在ide的控制台上可以看到有信息输出
4/11/12 16:38:22 INFO scheduler.DAGScheduler: Stage 195 (take at DStream.scala:608) finished in 0.004 s
-——————————————
Time: 1415781502000 ms
-——————————————
(aa,1)
(a,9)
(sdfsadf,1)
说明程序成功运行。