pyspark中dataframe读写数据库
本⽂只讨论spark借助jdbc读写mysql数据库
⼀,jdbc
想要spark能够从mysql中获取数据,我们⾸先需要⼀个连接mysql的jar包,mysql-connector-java-5.1.40-bin.jarevaluation造句
将jar包放⼊虚拟机中合适的位置,⽐如我放置在/home/sxw/Documents路径下,并在spark的 spark-env.sh ⽂件中加⼊: export SPARK_CLASSPATH=/home/sxw/Documents/mysql-connector-java-5.1.40-bin.jar
⼆,读取⽰例代码
进程间通信方式有哪些df = ad.format('jdbc').options(
url='jdbc:mysql://127.0.0.1',
dbtable='mysql.db',
forward的意思user='root',
password='123456'
).load()
df.show()
# 也可以传⼊SQL语句
sql="(select * from mysql.db where db='wp230') t"
df = ad.format('jdbc').options(
url='jdbc:mysql://127.0.0.1',
dbtable=sql,
user='root',
password='123456'
)
.load()
df.show()
---------------------
作者:振裕
来源:CSDN
三,写⼊⽰例代码
# 打开动态分区
spark.sql("de = nonstrict")
spark.sql("dynamic.partition=true")
# 使⽤普通的hive-sql写⼊分区表
spark.sql("""
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
""")
# 或者使⽤每次重建分区表的⽅式
de("overwrite").partitionBy("saledate").insertInto("ai.da_aipurchase_dailysale_hive")
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_hive", None, "append", partitionBy='saledate')
# 不写分区表,只是简单的导⼊到hive表
jdbcDF.write.saveAsTable("ai.da_aipurchase_dailysale_for_ema_predict", None, "overwrite", None)
---------------------
作者:振裕
来源:CSDN
原⽂:blog.csdn/suzyu12345/article/details/79673473
四,其他htmlinput边框不出现
import os
from pyspark.sql import SparkSession, SQLContext, DataFrame
visual c++
from adwriter import DataFrameReader, DataFrameWriter
appname = "demo"
mysql怎么读英语sparkmaster = "local"
spark = SparkSession.builder.appName(appname).master(sparkmaster).getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
spark中实际是DataFrameReader, DataFrameWriter来实现读写dataframe数据操作。df = ad.format("jdbc").options(url, driver, dbtable).load()
df_reader = DataFrameReadre(sqlContext)
df = df_reader.format("jdbc").options().load()
df = df_reader.jdbc(url, table, porperties)