暂无说说

spark读写HBase数据

pyspark jiajun 8个月前 (03-02) 129次浏览 0个评论 扫描二维码

写在开头

hbase 版本为 1.2.11,其它版本未测试。

拷贝依赖包

1、创建文件夹,用于保存 hbase 所需要的 jar 包

mkdir -p /soft/spark/hbase

2、拷贝 hbase 包

cd /soft/spark/hbase
cp /soft/hbase/lib/hbase*.jar ./
cp /soft/hbase/lib/guava-12.0.1.jar ./
cp /soft/hbase/lib/protobuf-java-2.5.0.jar ./

需要注意:在 Spark 2.0 版本上缺少把 hbase 的数据转换 python 可读取的 jar 包,需要我们另行下载。
打开spark-example-1.6.0.jar下载 jar 包,然后把下载下来的 jar 包移动到/soft/spark/hbase 目录下

mv spark-examples_2.11-1.6.0-typesafe-001.jar /soft/spark/hbase

修改配置文件

cd /soft/spark/conf/
nano spark-env.sh
#修改内容如下
export SPARK_DIST_CLASSPATH=$(/soft/hadoop/bin/hadoop classpath):$(/soft/hbase/bin/hbase classpath):/soft/spark/hbase/*

创建 hbase 表

start-hbase.sh
hbase shell
create 'student','info'

可以发现,在创建 student 表的 create 命令中,命令后面首先跟上表名称'student',然后,再跟上列族名称’info’,这个列族’info’中包含三个列’name’,’gender’,’age’。好像没有’id’字段,这是因为 HBase 的表中会有一个系统默认的属性作为行键,无需自行创建,默认把 put 命令操作中跟在表名后的第一个字段作为行健。
创建完“student”表后,可通过 describe 命令查看“student”表的基本信息:

describe 'student'

插入数据

//首先录入 student 表的第一个学生记录
put 'student','1','info:name','Xueqian'
put 'student','1','info:gender','F'
put 'student','1','info:age','23'
//然后录入 student 表的第二个学生记录
put 'student','2','info:name','Weiliang'
put 'student','2','info:gender','M'
put 'student','2','info:age','24'

数据录入结束后,可以用下面命令查看刚才已经录入的数据:

get 'student','1'
COLUMN                                 CELL                                                                                                           
 info:age                              timestamp=1551525486912, value=23                                                                              
 info:gender                           timestamp=1551525484748, value=F                                                                               
 info:name                             timestamp=1551525484699, value=Xueqian                                                                         
1 row(s)
Took 0.0561 seconds 

scan 'student'
ROW                                    COLUMN+CELL                                                                                                    
 1                                     column=info:age, timestamp=1551525486912, value=23                                                             
 1                                     column=info:gender, timestamp=1551525484748, value=F                                                           
 1                                     column=info:name, timestamp=1551525484699, value=Xueqian                                                       
 2                                     column=info:age, timestamp=1551525501413, value=24                                                             
 2                                     column=info:gender, timestamp=1551525500506, value=M                                                           
 2                                     column=info:name, timestamp=1551525500433, value=Weiliang                                                      
2 row(s)
Took 0.8019 seconds

编写程序读取 HBase 数据

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
        print (k, v)

结果:

1 {"qualifier" : "age", "timestamp" : "1551537954853", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}
{"qualifier" : "gender", "timestamp" : "1551537953818", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}
{"qualifier" : "name", "timestamp" : "1551537947631", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}
2 {"qualifier" : "age", "timestamp" : "1551537961525", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}
{"qualifier" : "gender", "timestamp" : "1551537960804", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}
{"qualifier" : "name", "timestamp" : "1551537960740", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}

编写程序向 HBase 写入数据

启动pyspark,然后在pyspark shell 中输入如下代码:

host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
 
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
# ( rowkey , [ row key , column family , column name , value ] )
sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

执行后,在 HBase shell 中输入如下命令查看结果:

scan 'student'

结果:

ROW                   COLUMN+CELL                                               
 1                    column=info:age, timestamp=1551541013746, value=23        
 1                    column=info:gender, timestamp=1551541012491, value=F      
 1                    column=info:name, timestamp=1551541012422, value=Xueqian  
 2                    column=info:age, timestamp=1551541020715, value=24        
 2                    column=info:gender, timestamp=1551541020065, value=M      
 2                    column=info:name, timestamp=1551541020004, value=Weiliang 
 3                    column=info:name, timestamp=1551541137631, value=Rongcheng
 4                    column=info:name, timestamp=1551541137631, value=Guanhua  
4 row(s) in 0.4540 seconds

 

喜欢 (2)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址