Spark:DataFrame写⼊⽂本⽂件
将DataFrame写成⽂件⽅法有很多
最简单的将DataFrame转换成RDD,通过saveASTextFile进⾏保存但是这个⽅法存在⼀些局限性:
1.将DataFrame转换成RDD或导致数据结构的改变
2.RDD的saveASTextFile如果⽂件存在则⽆法写⼊,也就意味着数据只能覆盖⽆法追加,对于有数据追加需求的⼈很不友好
3.如果数据需要⼆次处理,RDD指定分隔符⽐较繁琐
基于以上原因,在研读了Spark的官⽅⽂档后,决定采取DataFrame的⾃带⽅法 write 来实现。
此处采⽤mysql的数据作为数据源,读取mysql的⽅法在有详细介绍。
mysql的信息我保存在了外部的配置⽂件,这样⽅便后续的配置添加。
1//配置⽂件⽰例:
2 [hdfs@iptve2e0
3 tmp_lillcol]$ cat job.properties
3 #mysql数据库配置
4 mysql.sql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user
7 mysql.password=123456
2.需要的jar依赖
sbt版本,maven的对应修改即可
1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
10 libraryDependencies += "ics" % "metrics-core" % "2.2.0"
3.完整实现代码
1import java.io.FileInputStream
2import java.util.Properties
3
4import org.apache.spark.sql.hive.HiveContext
5import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
6import org.apache.spark.{SparkConf, SparkContext}
7
8/**
9  * @author Administrator
10  *        2018/10/16-14:35
11  *
12*/
13 object TestSaveFile {
14  var hdfsPath: String = ""
15  var proPath: String = ""
16  var DATE: String = ""
17
18  val sparkConf: SparkConf = new SparkConf().SimpleName)
19  val sc: SparkContext = new SparkContext(sparkConf)
20  val sqlContext: SQLContext = new HiveContext(sc)
21
22  def main(args: Array[String]): Unit = {
23    hdfsPath = args(0)
24    proPath = args(1)
25//不过滤读取
26    val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
27    saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
28  }
29
30/**
31    * 获取 Mysql 表的数据
32    *
33    * @param sqlContext
34    * @param tableName 读取Mysql表的名字
35    * @param proPath  配置⽂件的路径
36    * @return返回 Mysql 表的 DataFrame
37*/
38  def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String): DataFrame = {
39    val properties: Properties = getProPerties(proPath)
40    sqlContext
41      .read
42      .format("jdbc")
43      .option("url", Property("mysql.url"))
44      .option("driver", Property("mysql.driver"))
45      .option("user", Property("mysql.username"))
46      .option("password", Property("mysql.password"))
47      .option("dbtable", tableName)
48      .load()
49  }
50
51/**
52    * 将 DataFrame 保存为 hdfs ⽂件同时指定保存绝对路径与分隔符
53    *
54    * @param dataFrame  需要保存的 DataFrame
55    * @param absSaveDir 保存保存的路径(据对路径)
56    * @param splitRex  指定分割分隔符
57    * @param saveMode  保存的模式:Append、Overwrite、ErrorIfExists、Ignore
58*/
59  def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
hbase官方文档60    dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.outputpress", "false")
61//为了⽅便观看结果去掉压缩格式
62    val allClumnName: String = lumns.mkString(",")
63    val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
64    de(saveMode).text(absSaveDir)
65  }
66
67/**
68    * 获取配置⽂件
69    *
70    * @param proPath
71    * @return
72*/
73  def getProPerties(proPath: String): Properties = {
74    val properties: Properties = new Properties()
75    properties.load(new FileInputStream(proPath))
76    properties
77  }
78 }
4.测试
1 def main(args: Array[String]): Unit = {
2    hdfsPath = args(0)
3    proPath = args(1)
4//不过滤读取
5    val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
6    saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
7  }
5.执⾏命令
1 nohup spark-submit --master yarn \
2 --driver-memory 4G \
3 --num-executors 2 \
4 --executor-cores 4 \
5 --executor-memory 8G \
6 --class com.iptv.job.basedata.TestSaveFile \
7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar \
8 test.jar \
9 hdfs://ns1/user/hive/../ \
10 /var/.../job.properties > ./TestSaveFile.log 2>&1 &
6.运⾏结果
1 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TestSaveFile
2 0      0      hdfs://ns1/user/hive/warehouse/TestSaveFile/_SUCCESS
3 4.1 K  4.1 K  hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
4
5 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -cat hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
6 1234|12349|张三|韩服_G|11234|张三艾欧尼亚|韩服-G|1234D5A3434|3|张三天庭
7 12343|1234|1234|韩服_M|31234|李四艾欧尼亚|韩服-M|5F4EE4345|8|1234天庭
8 1234|12340|⽯中剑⼭|韩服_s8|11234|张三艾欧尼亚|韩服-s8|59B403434|5|⽯中剑⼭天庭
9 12344|12344|灵⼭|韩服_J|31234|李四艾欧尼亚|韩服-J|CF19F434B|40|灵⼭天庭
10 1234|1234|他家|韩服_H|11234|张三艾欧尼亚|韩服-Z|51234EB1434|9|他家天庭
11 12345|12340|云浮|韩服_F|31234|李四艾欧尼亚|韩服-Y|9C9C04344|41|浮天庭
12 1234|12348|潮边疆|韩服_Z|41234|佛⼭艾欧尼亚|韩服-Z|5B034340F|15|边疆天庭
13 12340|12344|河姆渡⼈源|韩服_HY|41234|深圳艾欧尼亚|韩服-HY434123490808|18|河姆渡⼈源天庭
14 1234|1234|佛⼭|韩服_S|41234|佛⼭艾欧尼亚|韩服-FS|EEA981434|4|佛祖天庭
15 12340|12343|揭阳|韩服_J|41234|深圳艾欧尼亚|韩服-JY|9FF084349|10|天庭
16 1234|1234|⽯中剑边疆|韩服_|41234|佛⼭艾欧尼亚|韩服-HZ|440A434FC|0|⽯中剑边疆天庭
17 12348|1234|梅边疆|韩服_Z|41234|深圳艾欧尼亚|韩服-MZ|E9B434F09|14|梅边疆天庭
18 1234|12348|⽯中剑名|韩服_M|41234|佛⼭艾欧尼亚|韩服-MM|5D0A94434|14|⽯中剑名天庭
19 12349|1234|⽇本|韩服_|41234|深圳艾欧尼亚|韩服-SG|BD0F34349|19|⽇本天庭
20 1234|1234|⽯中剑⽯中剑|韩服_ST|41234|佛⼭艾欧尼亚|韩服-ST|18D0D0434|0|⽯中剑⽯中剑天庭
21 12340|1234|深圳|韩服_Z|41234|深圳艾欧尼亚|韩服-Z|31E4C4344|4|深天庭
22 12340|12340|⽯中剑尾|韩服_SW|41234|佛⼭艾欧尼亚|韩服-SW|1BA1234434B|10|⽯中剑尾天庭
23 12341|1234|美国|韩服_Z|41234|深圳艾欧尼亚|韩服-Q|3C09D434B|13|美国天庭
24 12341|1234|湛江|韩服_Z|41234|佛⼭艾欧尼亚|韩服-Z|3A49A4340|11|我家天庭
25 1234|12343|清诗和远⽅|韩服_Y|11234|张三艾欧尼亚|韩服-Y|4344E0F31|10|清诗和远⽅天庭
26 1234|41234|李四|韩服_AZ|31234|李四艾欧尼亚|韩服-Z|13F1D4344|1|李四天庭
7.总结
在整个过程中有⼏个需要注意的点
只能存⼀个列
1/**
2  * Saves the content of the [[DataFrame]] in a text file at the specified path.
3  * The DataFrame must have only one column that is of string type.
4  * Each row becomes a new line in the output file. For example:
5  * {{{
6  *  // Scala:
7  *  ("/path/to/output")
8  *
9  *  // Java:
10  *  df.write().text("/path/to/output")
11  * }}}
12  *
13  * @since 1.6.0
14*/
15  def text(path: String): Unit = format("text").save(path)
这段代码已经说明了⼀切,是的,只能保存只有⼀列的DataFrame.
但是⽐起RDD,DataFrame能够⽐较轻易的处理这种情况
1 def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
2    dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.outputpress", "false")
3//为了⽅便观看结果去掉压缩格式
4    val allClumnName: String = lumns.mkString(",")
5    val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
6    de(saveMode).text(absSaveDir)
7  }
上述代码中我们通过columns.mkString(",")获取 dataFrame 的所有列名并⽤","分隔,然后通过
selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")将所有数据拼接当成⼀列,完美解决
只能保存⼀列的问题DataFrame 某个字段为空
如果 DataFrame 中某个字段为null,那么在你最中⽣成的⽂件中不会有该字段,所以,如果对结果字段的个数有要求的,最好在数据处理的时候将有可能为null的数据赋值空串"",特别是还有将数据load进Hive需求的,否则数据会出现错位
⾄此DataFrame 写⽂件功能实现
此⽂为本⼈⼯作总结,转载请标明出处!!!!!!!