暂无说说

spark连接Hive读写数据

pyspark jiajun 8个月前 (03-03) 261次浏览 0个评论 扫描二维码

为了让 Spark 能够访问 Hive,必须为 Spark 添加 Hive 支持。Spark 官方提供的预编译版本,通常是不包含 Hive 支持的,需要采用源码编译,编译得到一个包含 Hive 支持的 Spark 版本。

测试已经安装的 Spark 版本是否支持 Hive

现在让我们测试一下自己电脑上已经安装的 Spark 版本是否支持 Hive。打开一个终端,然后,执行下面命令:

spark-shell

这样就启动进入了 spark-shell,然后在 scala 命令提示符下输入:

import org.apache.spark.sql.hive.HiveContext
<console>:25: error: object hive is not a member of package org.apache.spark.sql
         import org.apache.spark.sql.hive.HiveContext
                                     ^

会返回错误信息,也就是 spark 无法识别 org.apache.spark.sql.hive.HiveContext,这就说明你当前电脑上的 Spark 版本不包含 Hive 支持。这时就要自己重新编译 hive,spark 编译请参考《spark 源码编译》,spark 安装参考《pyspark 之 spark 集群搭建》。

如果你当前电脑上的 Spark 版本包含 Hive 支持,那么应该显示下面的正确信息:

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

修改 spark 配置文件

cd /soft/spark/conf/
nano spark-env.sh
#在 SPARK_DIST_CLASSPATH 后面加上:/soft/hive/lib/* 
export SPARK_DIST_CLASSPATH=$(/soft/hadoop/bin/hadoop classpath):$(/soft/hbase/bin/hbase classpath):/soft/spark/hbase/*:/soft/hive/lib/* 

在 Hive 中创建数据库和表

进入 Hive,新建一个数据库 sparktest,并在这个数据库下面创建一个表 student,并录入两条数据。

hive

create database if not exists sparktest;
show databases;
create table if not exists sparktest.student(
id int,
name string,
gender string,
age int);
use sparktest;
show tables; 
insert into student values(1,'Xueqian','F',23);
insert into student values(2,'Weiliang','M',24); 
select * from student;

通过上面操作,我们就在 Hive 中创建了 sparktest.student 表,这个表有两条数据。

spark 读取 hive 数据

通过pyspark命令打开pyspark,并执行下面代码

from pyspark.sql import SparkSession
hive_context = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()
hive_context.sql('use sparktest')
hive_context.sql('select * from student').show()

 结果:

+---+--------+------+---+
| id|    name|gender|age|
+---+--------+------+---+
|  1| Xueqian|     F| 23|
|  2|Weiliang|     M| 24|
+---+--------+------+---+

spark 向 hive 保存数据

代码:

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
hive_context = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()
hive_context.sql('use sparktest')
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
schema = StructType([StructField("id", StringType(), True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(p[0].strip(),p[1].strip(), p[2].strip(),int(p[3])))
#建立起 Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
studentDF.registerTempTable("tempTable")
hive_context.sql('insert into student select * from tempTable')

以上代码运行后,打开 hive 客户端,输入以下命令查看 Hive 数据库内容的变化:

hive> select * from student;
OK
1   Xueqian F   23
2   Weiliang    M   24
3   Rongcheng   M   26
4   Guanhua M   27
Time taken: 0.049 seconds, Fetched: 4 row(s)

可以看到,插入数据操作执行成功了!

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址