SparkSQL
1、SparkSql概述
1、什么是SparkSql?
SparkSql⽤于处理结构化数据,底层还是RDD
2、SparkSql的两个数据抽象: DataFrame、DataSet
1、什么是DataFrame
DataFrame可以当做⼀个⼆维表格,有schema信息<;有列名、列类型>
DataFrame只关注列不关注⾏的类型,不管每个元素<;每⾏>是什么类型,表现出来都是Row类型
2、什么是DataSet
DataSet可以当做⼀个⼆维表格,有schema信息<;有列名、列类型>
DataSet即关注列也关注⾏的类型,每个的数据类型是啥,表现出来就是啥
3、DataFrame与DataSet的区别:
1、DataFrame是弱类型,DataSet是强类型
2、DataFrame是运⾏期安全,编译器不安全。DataSet是编译器安全,运⾏期也安全
4、DataFrame与DataSet的使⽤时机:
1、如果是将rdd转成sparksql编程,
此时如果rdd⾥⾯的元素类型是样例类,转成DataSet或者DataFrame都可以
此时如果rdd⾥⾯的元素类型元组,推荐转成DataFrame,可以通过toDF指定列名
2、如果想要使⽤map、flatMap这种写函数的强类型算⼦,推荐使⽤DataSet
5、RDD、DataFrame、DataSet的联系
1、RDD、DataFrame、DataSet都是弹性分布式数据集
2、RDD、DataFrame、DataSet都是惰性执⾏的,都需要调⽤action算⼦之后才会真正执⾏
3、RDD、DataFrame、DataSet都有分区
4、RDD、DataFrame、DataSet有很多共同的函数: map、flatMap、filter..
5、RDD、DataFrame、DataSet都是数据在内存与磁盘中动态存储
2、SparkSql编程
1、创建SparkSession: SparkSession.builder().master("").appName(..).getOrCreate()
2、DataFrame创建:
1、通过toDF⽅法
要想使⽤toDF⽅法必须导⼊隐式转换: import sparksession对象名.implicits._
1、集合.toDF()
2、DF()
toDF有两个重载的⽅式,如果调⽤的是⽆参的toDF,此时会⽣成默认的列名<;如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
所以如果元素是元组,可以有参的toDF⽅法指定列名<;指定的列名的个数必须与列的个数要相同>
2、通过读取⽂件: ad.csv/json/jdbc..
3、通过其他DataFrame衍⽣
3、DataSet创建
1、通过toDS⽅法
要想使⽤toDS⽅法必须导⼊隐式转换: import sparksession对象名.implicits._
1、集合.toDS()
2、DS()
toDS⽅法⽣成的DataSet此时会⽣成默认的列名<;如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
2、通过读取⽂件: File()
3、通过其他DataFrame衍⽣
4、SparkSql编程的两种⽅式:
1、SQL风格
1、将df/ds注册成表:union是什么类型
createTempView:: 注册成临时表
createOrReplaceTempView: 注册成临时表[如果表已经存在会替换],只能在当前SparkSession中使⽤,后续只在使⽤表的时候直接⽤表名既可以
createGlobalTempView:注册成全局表
createOrReplaceGlobalTempView: 注册成全局表,可以在多个sparkSession中使⽤,后续在使⽤的时候,必须通过 global_temp.表名的⽅式使⽤
2、sql编写: spark.sql("sql语句")
2、DSL风格: 使⽤select、filter、where、groupBy等api变成
常⽤的DSL api:
1、过滤:
1、filter("过滤条件") // filter("age>20")
2、where("过滤条件") // where("age>20")
2、去重:
1、distinct: 只有所有列都相同才会去重
2、dropDuplicates: 当指定列相同的时候就会去重
3、列裁剪: selectExpr("字段名","函数(字段名)","字段名 as 别名")
5、RDD、DataFrame、DataSet转换
1、RDD转DataFrame: DF(列名,列名,..)
2、DataFrame转rdd: df.rdd
3、Rdd转DataSet: DS
4、DataSet转rdd: val rdd:RDD[DataSet元素类型] = ds.rdd
5、DataFrame转DataSet: val ds:DataSet[类型] = df.as[类型]
DataFrame转DataSet的时候,
如果as后⾯的类型是样例类,需要样例类的属性名要与列名⼀致。
如果as后⾯的类型是元组,需要元组的个数 = 列的个数,类型也要⼀致
6、DataSet转DataFrame: ds.DF(列名,列名,..)
6、Row类型的取值: As[ 列的类型 ] ( "列名" )
7、⾃定义函数:
1、⾃定义UDF函数:
1、定义普通函数
val func = (id:Int) => id+"-001"
2、注册udf函数: ister("函数名",函数)
ister("myfunc",func)
3、通过sql使⽤函数:
spark.sql("select myfunc(id) from 表名")
2、⾃定义udaf函数
1、弱类型udaf:
1、定义class继承UserDefinedAggregateFunction
2、重写抽象⽅法
def inputSchema: StructType <;定义udaf参数类型>
def bufferSchema: StructType <;定义中间变量的参数类型>
def dataType: DataType <;定义最终结果类型>
def deterministic: Boolean <⼀致性>
def initialize(buffer: MutableAggregationBuffer): Unit <;初始化中间变量>
def update(buffer: MutableAggregationBuffer, input: Row): Unit <;每次传⼊组中⼀个值,更新中间变量> def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit <;合并所有task的统计结果>
def evaluate(buffer: Row): Any <;获取最终结果>
3、注册udaf:
1、创建⾃定义udaf对象: val obj = new xxx
2、注册: ister("函数名",obj)
2、强类型的udaf
1、定义class继承Aggregator[IN,BUF,OUT]
IN: udaf参数类型
BUF: 中间变量类型
OUT: 最终结果类型
2、重写⽅法
def zero: Buff <;中间变量赋初始值>
def reduce(buff: Buff, age: Int): Buff <;在每个分区中先预聚合,每个传⼊⼀个元素,更新中间结果>
def merge(b1: Buff, b2: Buff): Buff <;对所有分区的结果再次聚合>
def finish(reduction: Buff): Double <;获取最终结果>
def bufferEncoder: Encoder[Buff] <;对中间结果类型编码>
def outputEncoder: Encoder[Double] <;对最终结果类型编码>
3、注册
1、创建udaf对象: val obj = new XXX
2、导⼊隐式转换,使⽤udaf函数:
import org.apache.spark.sql.functions._
val uobj = udaf(obj)
3、注册: ister("函数名",uobj)
3、数据读取与保存
1、读取
1、⽂件读取:
1、ad
.format() --指定⽂件读取格式[csv/json/text/parquet/orc]
.option().option().. --指定读取的参数
.load(path) --指定加载路径的数据
在读取⽂件的时候,⼀般只有csv⽂件才需要配置option,csv⽂件常⽤的option:
sep: 指定字段之间的分隔符
header: 指定是否以⽂件的第⼀⾏作为列名
inferSchema: 指定是否⾃动推断字段的类型
2、ad[.option()].csv/json/csv/parquet/orc
2、mysql数据读取
1、ad
.format() --指定⽂件读取格式[jdbc]
.option().option().. --指定读取的参数<;账号、密码、driver、表、url>
.load() --指定加载路径的数据
2、ad.jdbc(url,表名,参数设置): 此种⽅式读取jdbc的时候分区数 = 1,只能⽤于数据量⼩的场景
此种⽅式读取的时候,分区数 = (uperBound-lowerBound) > 分区数 ? 分区数 : uperBound-lowerBound
2、数据保存:
1、df/ds.write
.mode() --指定写⼊模式
.format() --指定数据写⼊的格式[csv/json/parquet/orc/jdbc]
.option() --指定数据写⼊的时候需要的参数
csv⽂件写⼊的时候指定的option:
header: 写⼊的时候是否将列名也写⼊
sep: 写⼊的时候指定字段之间的分隔符
.save() --数据保存
2、df/de(..).csv/json/parquet/orc/jdbc
常⽤的写⼊模式:
SaveMode.Overwrite: 如果指定的路径/表已经存在,则覆盖历史数据<;数据写⼊HDFS的时候使⽤>
SaveMode.Append: 如果指定的路径/表已经存在,则追加数据<;数据写⼊mysql的时候使⽤>
如果写⼊mysql的时候,主键数据已经存在,此时不能使⽤append,需要通过foreachPartitions对数据进⾏更新写⼊
3、hive的数据读取和保存
1、读取数据:
1、在创建SparkSession的时候通过enableHiveSupport需要开启hive的⽀持:
SparkSession.builder().master(..).appName(..).enableHiveSupport().getOrCreate
2、直接在代码中通过spark.sql("查询hive表数据")
2、保存数据到hive表:
df/de(..).saveAsTable("hive表") <;不常⽤,⼀般都是将数据写⼊HDFS>
4、多维聚合:<;对多个维度按照不同的组合进⾏聚合>
grouping sets:
语法: select 维度1,维度2,..,聚合函数 from 表 group by 维度1,维度2,.. grouping sets( (维度1),(维度1,维度2),(..) )
grouping sets后⾯的字段名必须是group by后⾯的字段
案例:
select A,B,C,count(1) from person group by A,B,C grouping sets( (A),(A,B),(B,C),(A,B,C) )
等价于:
select A,null B,null C,count(1) from person group by A
union
select A,B,null C,count(1) from person group by A,B
union
select null A,B,C,count(1) from person group by B,C
union
select A,B,C,count(1) from person group by A,B,C