暂无说说

spark通过JDBC连接数据库

pyspark jiajun 8个月前 (03-03) 190次浏览 0个评论 扫描二维码

安装 mysql

执行以下命令安装 mysql

sudo apt-get install mysql-server

验证

mysql -uroot -proot

mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 4
Server version: 5.7.25-0ubuntu0.16.04.2 (Ubuntu)
Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

创建测试数据

下面我们要新建一个测试 Spark 程序的数据库,数据库名称是“spark”,表的名称是“student”。

mysql -u root -p
#屏幕会提示你输入密码
create database spark;
use spark;
create table student (id int(4), name char(20), gender char(4), age int(4));
alter table student change id id int auto_increment primary key;
insert into student values(1,'Xueqian','F',23);
insert into student values(2,'Weiliang','M',24);
select * from student;

上面已经创建好了我们所需要的 MySQL 数据库和表,下面我们编写 Spark 应用程序连接 MySQL 数据库并且读写数据。

spark 读取 mysql 数据

1、安装 mysql 驱动

mysql 驱动下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

把下载下来的 mysql 驱动移动到/soft/spark/mysql 中

mkdir -p /soft/spark/mysql
mv mysql-connector-java-5.1.47.jar /soft/spark/mysql/

2、启动pyspark

启动一个pyspark,而且启动的时候,要附加一些参数。启动pyspark时,必须指定 mysql 连接驱动 jar 包。

pyspark \
--jars /soft/spark/mysql/mysql-connector-java-5.1.47.jar \
--driver-class-path /soft/spark/mysql/mysql-connector-java-5.1.47.jar

上面的命令行中,在一行的末尾加入斜杠\,是为了告诉 spark-shell,命令还没有结束。

启动进入pyspark以后,可以执行以下命令连接数据库,读取数据,并显示:

jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load()
jdbcDF.show()

下面我们再来看一下如何往 MySQL 中写入数据。

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda line : line.split(" "))
#下面要设置模式信息
schema = StructType([StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(p[1].strip(), p[2].strip(),int(p[3])))
#建立起 Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = 'root'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)

在 pyspark 中执行完上述程序后,我们可以看一下效果,看看 MySQL 数据库中的 spark.student 表发生了什么变化。在 MySQL 命令提示符下面继续输入下面命令:

mysql> select * from student;
+----+-----------+--------+------+
| id | name      | gender | age  |
+----+-----------+--------+------+
|  1 | Xueqian   | F      |   23 |
|  2 | Weiliang  | M      |   24 |
|  3 | Rongcheng | M      |   26 |
|  4 | Guanhua   | M      |   27 |
+----+-----------+--------+------+
4 rows in set (0.00 sec)

 

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址