暂无说说

spark读取kafka数据

pyspark jiajun 8个月前 (03-10) 182次浏览 0个评论 扫描二维码

kafka 安装

关于 Kafka 的概念和安装方法,请参考《kafka 测试节点安装》。在安装的时候,要注意,到 Kafka 官网下载安装文件时,一定要选择和自己电脑上已经安装的 scala 版本号一致才可以,spark2.4.0 使用 scala 版本号是 2.11,所以,一定要选择 Kafka 版本号是 2.11 开头的。另外,从 spark2.3 开始,停止对 kafka0.10.0 以上版本的 python 支持,建议使用 kafka0.8.2.1—kafka0.10.0 之间的版本。本文使用 kafka_2.11-0.10.2.0,前面的 2.11 就是支持的 scala 版本号,后面的 0.10.2.0 是 Kafka 自身的版本号。这里假设已经成功安装了 Kafka。

依赖包下载

Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。根据 Spark 官网的说明,对于 Spark2.4.0 版本,如果要使用 Kafka,则需要下载 spark-streaming-kafka-0-10_2.11 相关 jar 包,可访问Maven Repository搜索下载。

把下载后的 jar 包拷贝到/soft/spark/jars/kafka 目录下

mkdir -p /soft/spark/jars/kafka
cp spark-streaming-kafka-0-8_2.11-2.4.0.jar  /soft/spark/jars/kafka

同时,还要修改 spark 目录下 conf/spark-env.sh 文件,修改该文件下面的 SPARK_DIST_CLASSPATH 变量

nano /soft/spark/conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/soft/hadoop/bin/hadoop classpath):$(/soft/hbase/bin/hbase classpath):/soft/spark/hbase/*:/soft/hive/lib/*:/soft/spark/jars/kafka/*:/soft/kafka/libs/*

编写测试代码

nano KafkaWordCount.py

测试代码

import sys 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
 
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
 
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
 
    ssc.start()
    ssc.awaitTermination()

启动 kafka 并生产数据

启动 kafka

cd /soft/kafka_2.12-2.1.1/
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties

创建 topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sparkkafka

启动脚本

执行如下命令:

#先安装 pyspark
pip install pyspark
python3 ./KafkaWordCount.py localhost:2181 sparkkafka

生产数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sparkkafka

输入测试数据:

hello world

此时,在 python 执行终端下可以显示刚才新输入的结果。

Time: 2019-03-10 19:16:38
-------------------------------------------

-------------------------------------------                                     
Time: 2019-03-10 19:16:39
-------------------------------------------
('hello', 1)
('world', 1)

-------------------------------------------                                     
Time: 2019-03-10 19:16:40
-------------------------------------------

 

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址