Spark数据读取与保存(输⼊、输出)4.数据读取与保存
  Spark 的数据读取及数据保存可以从两个维度来作区分:⽂件格式以及⽂件系统。
⽂件格式分为:Text ⽂件、Json ⽂件、Csv ⽂件、Sequence ⽂件以及 Object ⽂件;
⽂件系统分为:本地⽂件系统、HDFS、HBASE以及数据库。
1)数据读取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://hadoop102:")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102: MapPartitionsRDD[21] at textFile at <console>:24
2)数据保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")
4.1.2 Json ⽂件
  如果 JSON ⽂件中每⼀⾏就是⼀个 JSON 记录,那么可以通过将 JSON ⽂件当做⽂本
⽂件来读取,然后利⽤相关的 JSON 库对每⼀条数据进⾏ JSON 解析。
  注意:使⽤ RDD 读取 JSON ⽂件处理很复杂,同时 SparkSQL 集成了很好的处理
JSON ⽂件的⽅式,所以应⽤中多是采⽤ SparkSQL 处理 JSON ⽂件。
(1)导⼊解析 json 所需的包
scala> import scala.util.parsing.json.JSON
(2)上传 json ⽂件到 HDFS
[lxl@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)读取⽂件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析 json 数据
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
(5)打印
scala> llect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
4.1.3 Sequence ⽂件
  SequenceFile ⽂件是 Hadoop ⽤来存储⼆进制形式的 key-value 对⽽设计的⼀种平⾯
⽂件(Flat File)。Spark 有专门⽤来读取 SequenceFile 的接⼝。在 SparkContext 中,可以
调⽤ sequenceFile[ keyClass, valueClass](path)。
注意:SequenceFile ⽂件只针对 PairRDD
(1)创建⼀个 RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)将 RDD 保存为 Sequence ⽂件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看该⽂件
[lxl@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile
[lxl@hadoop102 seqFile]$ ll
总⽤量8
-rw-r--r-- 1 atguigu atguigu 10810⽉910:29 part-00000
-rw-r--r-- 1 atguigu atguigu 12410⽉910:29 part-00001
-rw-r--r-- 1 atguigu atguigu 010⽉910:29 _SUCCESS
[lxl@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)读取 Sequence ⽂件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印读取后的 Sequence ⽂件
scala> llect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
4.1.4 对象⽂件(objectFile)
  对象⽂件是将对象序列化后保存的⽂件,采⽤ Java 的序列化机制。可以通过
objectFile[k,v](path) 函数接收⼀个路径,读取对象⽂件,返回对应的 RDD,也可以通过调
⽤ saveAsObjectFile() 实现对对象⽂件的输出。因为是序列化所以要指定类型。
(1)创建⼀个 RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)将 RDD 保存为 Object ⽂件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看该⽂件
[lxl@hadoop102 object]$ pwd
/opt/module/spark/object
[lxl@hadoop102 object]$ ll
总⽤量 16
-rw-r--r-- 1 lxl lxl 138 7⽉ 8 03:12 part-00000
-rw-r--r-- 1 lxl lxl 138 7⽉ 8 03:12 part-00001
-rw-r--r-- 1 lxl lxl 138 7⽉ 8 03:12 part-00002
-rw-r--r-- 1 lxl lxl 142 7⽉ 8 03:12 part-00003
-rw-r--r-- 1 lxl lxl 0 7⽉ 8 03:12 _SUCCESS
[lxl@hadoop102 object]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritabley.)a䴖¬ [IMº`&v xp
(4)读取 Object ⽂件
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印读取后的 Sequence ⽂件
scala> llect
res19: Array[Int] = Array(1, 2, 3, 4)
4.2 ⽂件系统类数据读取与保存
4.2.1 HDFS
  Spark 的整个⽣态系统与 Hadoop 是完全兼容的,所以对于 Hadoop 所⽀持的⽂件类型
或者数据库类型,Spark 也同样⽀持.另外,由于 Hadoop 的 API 有新旧两个版本,所以 Spark 为
了能够兼容 Hadoop 所有的版本,也提供了两套创建操作接⼝.对于外部存储创建操作⽽
⾔,hadoopRDD 和 newHadoopRDD 是最为抽象的两个函数接⼝,主要包含以下四个参数.
  1)输⼊格式(InputFormat): 制定数据输⼊的类型,如 TextInputFormat 等,新旧两个版本
所引⽤的版本分别是 org.apache.hadoop.mapred.InputFormat 和
org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
  2)键类型: 指定[K,V]键值对中 K 的类型
  3)值类型: 指定[K,V]键值对中 V 的类型
  4)分区值: 指定由外部存储⽣成的 RDD 的 partition 数量的最⼩值,如果没有指定,系
统会使⽤默认值 defaultMinSplits
注意:其他创建操作的 API 接⼝都是为了⽅便最终的 Spark 程序开发者⽽设置的,是这两个
接⼝的⾼效实现版本.例如,对于 textFile ⽽⾔,只有 path 这个指定⽂件路径的参数,其他参数
在系统内部指定了默认值。
  1.在 Hadoop 中以压缩形式存储的数据,不需要指定解压⽅式就能够进⾏读取,因为Hadoop 本⾝有⼀个解压器会根据压缩⽂件的后缀推断解压算法进⾏解压.
  2.如果⽤ Spark 从 Hadoop 中读取某种类型的数据不知道怎么读取的时候,上⽹查⼀个使⽤ map-reduce 的时候是怎么读取这种这种数据的,然后再将对应的读取⽅式改写成上⾯的hadoopRDD 和 newAPIHadoopRDD 两个类就⾏了
4.2.2 MySQL 数据库连接
⽀持通过 Java JDBC 访问关系型数据库。需要通过 JdbcRDD 进⾏,⽰例如下:
(1)添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
(2)Mysql 读取:
package com.lxl
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.创建 spark 配置信息
val sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
/
/2.创建 SparkContext
val sc = new SparkContext(sparkConf)
//3.定义连接 mysql 的参数
val driver = "sql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建 JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
drop table if exists admin
},
"select * from `rddtable` where `id` >= ? and id <= ?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最后结果
unt())
rdd.foreach(println)
sc.stop()
}
}
Mysql 写⼊:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
  Class.forName ("sql.jdbc.Driver").newInstance()
  val conn = java.Connection("jdbc:mysql://master01:3306/rdd", "root","hive")
  iterator.foreach(data => {
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data)
    ps.executeUpdate()
  })
}
spark-shell 中使⽤ JDBC 连接 Mysql:
[lxl@hadoop102 spark]$ cp /opt/module/hive/lib/mysql-connector-java-5.1.27-bin.jar ./jars/
scala> val rdd = new org.apache.spark.rdd.JdbcRDD(sc, () => {
|      Class.forName("sql.jdbc.Driver")
|      java.Connection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
|    },
|      "select * from `rddtable` where id >= ? and id <= ?;",
|      1,
|      10,
|      1,
|      r => (r.getInt(1), r.getString(2))
|    )
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[1] at JdbcRDD at <console>:24
scala> unt())
3
scala> rdd.foreach(println)
(1,zhangsan)
(2,lisi)
(3,wangwu)
4.2.3 HBase 数据库
  由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop 输⼊格式访问 HBase。这个输⼊格式会返回键值对数据,其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable,⽽值的类型为 org.apache.hadoop.hbase.client. Result。