Spark指定分区数、⽂件并⾏读写、SparkIO读写常⽤处理⽅
⼀、⼩⽂件管理之指定分区数
1、配置 spark.sql.shuffle.partitions,适⽤场景spark.sql()合并分区
这样配置后,通过spark.sql()执⾏后写出的数据分区数就是你要求的个数,如这⾥5。
2、配置 coalesce(n),适⽤场景spark写出数据到指定路径下合并分区,不会引起shuffle
df = spark.sql(sql_string).coalesce(1) #合并分区数
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.
option("header", True)
.save(hdfs_path)
3、配置repartition(n), 重新分区,会引发shuffle
df = spark.sql(sql_string).repartition(1) #重新分区,会引发全局shuffle
df.write.format("csv")
.mode("overwrite")
.option("sep", ",")
.option("header", True)
.save(hdfs_path)
⼆、⽂件的并⾏读取和写出
1、并⾏写出之 partitionBy() 指定分区列  , 会根据分区列创建⼦⽂件夹,并⾏写出数据
de("overwrite")
.partitionBy("day")
.save("/tmp/partitioned-files.parquet")
2、并⾏写出之 repartition() ,⼀般spark中有⼏个分区就会有⼏个并⾏的IO写出
.write.format("csv")
.save("/tmp/multiple.csv")
3、分桶写出,好处是后续读⼊的时候数据就不会做shuffle了,因为相同分桶的数据会被划分到同⼀个物理分区中
csvFile.write.format("parquet")
.mode("overwrite")
.bucketBy(5, "gmv") #第⼀个参数:分成⼏个桶,第⼆个参数:按哪列进⾏分桶
.saveAsTable("bucketedFiles")
三、Spark IO
参考:
1、DataFrame调⽤.read/write.format() API⽂件写⼊-覆盖和追加
(1)ad.format() 对⽂件进⾏读取的通⽤代码格式
.option("mode", "FAILFAST") #读取遇到格式错误时怎么处理
.option("inferSchema", "true") #option中("Key","value") 形式配置参数
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
#读取遇到格式错误时处理选项参数:
'''
permissive :将损坏的数据记录成null,并将损坏的列命名为 _corrupt_record 标记出来dropMalformed :删掉损坏的数据
failFast:报错失败
'''
(2)df.write.format() 将DataFrame中数据写⼊到⽂件的通⽤代码格式
#抽象的DF写出代码格式
DataFrameWrite.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy(...)
.save()
#⼀个具体的DF写出的代码格式
df.write.format("csv")
.option("mode", "OVERWRITE") #覆盖还是追加数据
.option("sep",",") #分隔符
.option("header",true) #第⼀⾏是否是列名
.option("inferSchema",true) #是否⾃动推断列类型
.option("dateFormat", "yyyy-MM-dd") #⽇期数据的格式
.option("timestampFormat","yyyy-MMdd’T’HH:mm:ss.SSSZZ") #时间戳数据的格式
.option("nullValue","null") #数据中的null值⽤什么表⽰,默认是“”,也可以设置成NA或NULL
.
option("nanValue","unknown") #数据中的NaN或缺失值⽤什么表⽰,默认是"NaN",也可以设置成其他.option("positiveInf","Inf") #正⽆穷怎么表⽰
.option("negativeInf","-Inf") #负⽆穷怎么表⽰
.option("compression","gzip") #压缩⽅式,uncompressed、snappy、lz4等
.option("path", "path/to/file(s)") #保存地址
.save()
'''
写出 mode可选参数有:
append 在写出路径下追加数据⽂件
overwrite 覆盖写出路径下的所有⽂件
errorIfExists 如果在写出路径下已经有数据了,则报错误并失败掉任务
python怎么读文件夹下的文件夹ignore 如果在写出路径下有数据或⽂件,不做任何处理,即忽略这个数据的写出,直接跳过
'''
(3)ad.format()、df.write.format() 的应⽤举例
#DataFrame调⽤write.format API
#spark读取
df = ad.format('json').load('python/test_support/sql/people.json')
#df覆盖写⼊,mode("overwrite")
df.write.format("csv").mode("overwrite").option("sep", ",").option("header", True).save(hdfs_path)
#df追加写⼊,mode("append")
df.write.format("csv").mode("append").option("sep", ",").option("header", True).save(hdfs_path)
2、DataFrame调⽤ .write.csv(/parquet/orc) API读取写出⽂件
(1)ad.{⽂件类型} 读取⽂件的通⽤代码格式
#1、ad.csv()
#2、ad.parquet()
df.write.parquet('bar.parquet')
#3、()
(2)df.write.{⽂件类型}  将DataFrame中数据写⼊到⽂件的通⽤代码格式
#1、df读写csv
df.write.csv('foo.csv', header=True)
#2、df读写parquet
df.write.parquet('bar.parquet')
#3、df读写ORC
('')
#4、覆盖写⼊
df.write.csv(path, mode='overwrite',sep=',',header=True)
#5、追加写⼊
df.write.csv(path, mode='append',sep=',',header=True)
注意: 不管是⽤format()的⽅式,把⽂件类型写在format的⾥⾯作为参数,还是⽤调⽤⽂件类型的读写,它们在option配置选项上是通⽤的。
3、textFile读写
(1)File() 和 ad.wholeTextFile()
#File() 会忽略读⼊⽂件的分区
.selectExpr("split(value, ',') as rows").show()
#() 会保留读⼊⽂件的分区
'''
如果有⼀个⽬录,下⾯按⽇期分了多个⼦⽬录,每个⽬录下存放着⽇期当天的⼀些数据,
是否⽤()读取⽬录就能获取⽇期分区呢?
'''
(2)()  写出只能有⼀列
当写出的不是⼀列的时候,否则会报错。
df.select("A_COLUMN_NAME").("/")
但如果其他列作为分区的话,是可以选择多列的。但作为分区的列是作为分区的⼦路径⽂件名存在的,并不是输出的多列,输出还是只有⼀列。
#指定⼀个分区列,⽂件中输出的还是只有⼀列
df.select("A_COLUMN_NAME", "count")\
.write.partitionBy("count").text("/tmp/five-csv-files2py.csv")
如果先⽤concat_ws(',',col1,col2)  as col 把要输出的列进⾏了连接,然后再按照要分区的列(⽐如⽇期)来⽬录写出,岂不是很实⽤?
#链接要输出的列,并按照⽇期列分区写出
df.select("concat_ws(',',col1,col2) as col", "day")
.write.partitionBy("day")
.text("/tmp/five-csv-files2py.csv")