SparkSQL

作者admin

6 月 13, 2024

DataFrame & Dataset 的创建

不要刻意区分:DF、DS。DF是一种特殊的DS

1.由range生成Dataset

val numDS = spark.range(5, 100, 5)

2.由集合生成Dataset

case class Person(name: String, age: Int, height: Int)
val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
val ds1 = spark.createDataset(seq1)

3.由集合生成DataFrame

val lst = List(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
val df1 = spark.createDataFrame(lst).
  // 改单个字段名时简便 
  withColumnRenamed("_1", "name1").
  withColumnRenamed("_2", "age1").
  withColumnRenamed("_3", "height1")

4.RDD 转成 DataFrame

val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy",
  16, 165))
val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))

5.RDD转Dataset

val ds3 = spark.createDataset(rdd2)

6.从文件创建DateFrame(以csv文件为例)

val df2 = spark.read.csv("data/people2.csv")

RDD、DataFrame、DataSet三者的转换

Spark SQL 的创建

val df = spark.read.json("examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Spark SQL 操作详解

查看元数据

spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("bigdata_zyy.people")

血缘关系通过catalog获取元数据

缓存SQL表

还可以将表声明为 LAZY,这样它就不会立即被缓存,直到第一次使用时才会真正缓存

//缓存表
spark.sql("cache table ds_spark.people")
//清除缓存表
spark.sql("uncache table ds_spark.people")

Action操作

与RDD类似的操作

show、 collect、 collectAsList、 head、 first、 count、 take、 takeAsList、 reduce

val df1 = spark.read.
  option("header", "true").
  option("inferschema","true").
  csv("src/main/resources/emp.csv")
df1.count
// 缺省显示20行
df1.union(df1).show()
// 显示2行
df1.show(2)
// 不截断字符
df1.toJSON.show(false)
// 显示10行,不截断字符
df1.toJSON.show(10, false)
spark.catalog.listFunctions.show(10000, false)
// collect返回的是数组, Array[org.apache.spark.sql.Row]
val c1 = df1.collect()
// collectAsList返回的是List, List[org.apache.spark.sql.Row]
val c2 = df1.collectAsList()
// 返回 org.apache.spark.sql.Row
val h1 = df1.head()
val f1 = df1.first()
// 返回 Array[org.apache.spark.sql.Row],长度为3
val h2 = df1.head(3)
val f2 = df1.take(3)
// 返回 List[org.apache.spark.sql.Row],长度为2
val t2 = df1.takeAsList(2)

获取结构属性的操作

printSchema、explain、columns、dtypes、col

// 结构属性 
df1.columns // 查看列名 
df1.dtypes // 查看列名和类型 
df1.explain() // 参看执行计划 
df1.col("name") // 获取某个列 
df1.printSchema // 常用 

Transformation 操作

select * from table where ... group by ... having... order by...

与RDD类似的操作

map、filter、flatMap、mapPartitions、sample、 randomSplit、 limit、 distinct、dropDuplicates、describe 
df1.map(row=>row.getAs[Int](0)).show
// randomSplit(与RDD类似,将DF、DS按给定参数分成多份) 
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count
// 取10行数据生成新的DataSet 
val df2 = df1.limit(10)
// distinct,去重 
val df2 = df1.union(df1)
df2.distinct.count
// dropDuplicates,按列值去重 
df2.dropDuplicates.show
df2.dropDuplicates("mgr", "deptno").show
df2.dropDuplicates("mgr").show
df2.dropDuplicates("deptno").show
// 返回全部列的统计(count、mean、stddev、min、max) 
ds1.describe().show
// 返回指定列的统计 
ds1.describe("sal").show
ds1.describe("sal", "comm").show

存储相关

cacheTable、persist、checkpoint、unpersist、cache

Dataset 默认的存储级别是 MEMORY_AND_DISK

import org.apache.spark.storage.StorageLevel
spark.sparkContext.setCheckpointDir("data/checkpoint") 
df1.show()
df1.checkpoint()
df1.cache()
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)
df1.createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
spark.catalog.uncacheTable("t1")

select相关

列的多种表示、select、selectExpr 
drop、withColumn、withColumnRenamed、cast(内置函数)
// 列的多种表示方法。使用""、$""、'、col()、ds("") 
// 注意:不要混用;必要时使用spark.implicitis._;并非每个表示在所有的地方都有效
df1.select($"ename", $"hiredate", $"sal").show
df1.select("ename", "hiredate", "sal").show
df1.select('ename, 'hiredate, 'sal).show
df1.select(col("ename"), col("hiredate"), col("sal")).show
df1.select(df1("ename"), df1("hiredate"), df1("sal")).show
// 下面的写法无效,其他列的表示法有效 
df1.select("ename", "hiredate", "sal"+100).show
df1.select("ename", "hiredate", "sal+100").show
// 这样写才符合语法 
df1.select($"ename", $"hiredate", $"sal"+100).show
df1.select('ename, 'hiredate, 'sal+100).show
// 可使用expr表达式(expr里面只能使用引号) 
df1.select(expr("comm+100"), expr("sal+100"),
  expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("power(sal, 2)", "sal").show
df1.selectExpr("round(sal, -3) as newsal", "sal",
  "ename").show
// drop、withColumn、 withColumnRenamed、casting 
// drop 删除一个或多个列,得到新的DF 
df1.drop("mgr")
df1.drop("empno", "mgr")
// withColumn,修改列值 
val df2 = df1.withColumn("sal", $"sal"+1000)
df2.show
// withColumnRenamed,更改列名 
df1.withColumnRenamed("sal", "newsal")
// 备注:drop、withColumn、withColumnRenamed返回的是DF 
// cast,类型转换 
df1.selectExpr("cast(empno as string)").printSchema
import org.apache.spark.sql.types._
df1.select('empno.cast(StringType)).printSchema

where 相关的

where=filter

// where操作
df1.filter("sal>1000").show
df1.filter("sal>1000 and job=='MANAGER'").show
// filter操作
df1.where("sal>1000").show
df1.where("sal>1000 and job=='MANAGER'").show

groupBy相关的

groupBy、agg、max、min、avg、sum、count(后面5个为内置函数)
df1.groupBy("Job").sum("sal").show
df1.groupBy("Job").max("sal").show
df1.groupBy("Job").min("sal").show
df1.groupBy("Job").avg("sal").show
df1.groupBy("Job").count.show
// 类似having子句
df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show
// agg
df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"->"avg", "sal"->"sum", "sal"->"count").show
df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"->"avg", "sal"->"sum", "sal"->"count").show
// 这种方式更好理解
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"),
  sum("sal"), count("sal")).show
// 给列取别名
df1.groupBy("Job").agg(max("sal"), min("sal"), avg("sal"),
  sum("sal"), count("sal")).withColumnRenamed("min(sal)",
  "min1").show
// 给列取别名,最简便
df1.groupBy("Job").agg(max("sal").as("max1"),
  min("sal").as("min2"), avg("sal").as("avg3"),
  sum("sal").as("sum4"), count("sal").as("count5")).show

orderBy相关的

// orderBy 
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show
// 降序 
df1.orderBy(-$"sal").show
df1.orderBy('sal).show
df1.orderBy(col("sal")).show
df1.orderBy(df1("sal")).show
df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'deptno, -'sal).show
// sort,以下语句等价 
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort('sal).show
df1.sort(col("sal")).show
df1.sort(df1("sal")).show
df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'deptno, -'sal).show

 join相关的

// 1、笛卡尔积 
df1.crossJoin(df1).count
// 2、等值连接(单字段)(连接字段empno,仅显示了一次) 
df1.join(df1, "empno").count
// 3、等值连接(多字段)(连接字段empno、ename,仅显示了一次) 
df1.join(df1, Seq("empno", "ename")).show
// 1、笛卡尔积 
df1.crossJoin(df1).count
// 2、等值连接(单字段)(连接字段empno,仅显示了一次) 
df1.join(df1, "empno").count
// 3、等值连接(多字段)(连接字段empno、ename,仅显示了一次) 
df1.join(df1, Seq("empno", "ename")).show

// 备注:不能使用双引号,而且这里是 === 
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, 'name==='sname).show

ds1.join(ds2, ds1("name")===ds2("sname")).show
ds1.join(ds2, ds1("sname")===ds2("sname"), "inner").show
// 多种连接方式 
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, $"name"===$"sname", "inner").show
ds1.join(ds2, $"name"===$"sname", "left").show
ds1.join(ds2, $"name"===$"sname", "left_outer").show
ds1.join(ds2, $"name"===$"sname", "right").show
ds1.join(ds2, $"name"===$"sname", "right_outer").show
ds1.join(ds2, $"name"===$"sname", "outer").show
ds1.join(ds2, $"name"===$"sname", "full").show
ds1.join(ds2, $"name"===$"sname", "full_outer").show

便于记忆和使用,统一写成下述格式

ds1.join(ds2, ds1(“sname”)===ds2(“sname”), “inner”).show

集合相关的

union==unionAll(过期)、intersect、except

// union、unionAll、intersect、except。集合的交、并、差 
val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")
// union 求并集,不去重 
ds3.union(ds4).show
// unionAll、union 等价;unionAll过期方法,不建议使用 
ds3.unionAll(ds4).show
// intersect 求交 
ds3.intersect(ds4).show
// except 求差 
ds3.except(ds4).show

空值处理

na.fill、na.drop

// NaN (Not a Number) 
math.sqrt(-1.0)
math.sqrt(-1.0).isNaN()
df1.show
// 删除所有列的空值和NaN 
df1.na.drop.show
// 删除某列的空值和NaN 
df1.na.drop(Array("mgr")).show
// 对全部列填充;对指定单列填充;对指定多列填充 
df1.na.fill(1000).show
df1.na.fill(1000, Array("comm")).show
df1.na.fill(Map("mgr"->2000, "comm"->1000)).show
// 对指定的值进行替换 
df1.na.replace("comm" :: "deptno" :: Nil, Map(0 -> 100, 10 ->
  100)).show
// 查询空值列或非空值列。isNull、isNotNull为内置函数 
df1.filter("comm is null").show
df1.filter($"comm".isNull).show
df1.filter(col("comm").isNull).show
df1.filter("comm is not null").show
df1.filter(col("comm").isNotNull).show

内建数据源详解

数据源之通用操作load和save操作

load:数据加载进spark

save:数据从spark写出到文件中

val df = spark.read.load("users.parquet")
df.select("name", "age").write.save("namesAndAges.parquet")

Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的

Save Mode意义
SaveMode.ErrorIfExists (默认)如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
SaveMode.Ignore如果目标位置已经存在数据,那么就忽略,不做任何操作。

SparkSQL内建支持的数据源包括:Parquet、JSON、CSV、Avro、Images、

BinaryFiles(Spark 3.0)。其中Parquet是默认的数据源

计算引擎是Spark的话,推荐选择Parquet

查询引擎主要是Presto的话,Presto对ORC文件读取做了特定的优化,这中情况下,推荐使用ORC会更好

Spark on hive 与 Hive on Spark 的区别

Spark on hive:

Spark通过Spark-SQL使用hive 语句,操作hive,底层运行的还是 spark rdd

Hive on Spark:

是把hive查询从mapreduce 的mr (Hadoop计算引擎)操作替换为spark rdd(spark 执行引擎) 操作. 相对于spark on hive,这个要实现起来则麻烦很多, 必须重新编译spark和导入jar包,不过目前大部分使用的是spark on hive

Spark 提供了两种写入hive的模式saveAsTable, insertInto

saveAstable

当schema存在时依赖以下几种存储模式:
append :追加数据到已存在的表
overwrite:覆盖已存在的数据
error or errorifexsists:数据已存在则抛出异常(默认)
ignore:数据/分区已存在忽略,不会插入数据

与saveAsTable 最大的区别就是要求表必须存在否则插入会报错,并且dateframe的schema与目标表的schema字段个数要保持一致

Spark SQL 常用函数

sparksql内置函数官方文档

// 日期格式化
spark.sql("select to_date('20220401','yyyyMMdd')").show()
//date_format(timestamp, fmt) - Converts timestamp to a value of string in the format specified by the date format fmt.
spark.sql("SELECT date_format('2016-04-08', 'y')")
// date_add(start_date, num_days) - Returns the date that is num_days after start_date.
spark.sql("SELECT date_add('2016-07-30', 1)")
// date_sub date_sub(start_date, num_days) - Returns the date that is num_days before start_date.
spark.sql("SELECT date_sub('2016-07-30', 1)")
// datediff(endDate, startDate) - Returns the number of days from startDate to endDate.
spark.sql("SELECT datediff('2009-07-31', '2009-07-30')")
// from_unixtime(unix_time, format) - Returns unix_time in the specified format.
spark.sql("SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss')")
// from_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' would yield '2017-07-14 03:40:00.0'.
spark.sql("  SELECT from_utc_timestamp('2016-08-31', 'Asia/Seoul');")
// last_day(date) - Returns the last day of the month which the date belongs to.
spark.sql("SELECT last_day('2009-01-12')")

// explode(expr) - Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns.
spark.sql("SELECT explode(array(10, 20))")
// from_json(jsonStr, schema[, options]) - Returns a struct value with the given jsonStr and schema.
spark.sql("SELECT from_json('{\"a\":1, \"b\":0.8}', 'a INT, b DOUBLE')")
spark.sql("SELECT from_json('{\"time\":\"26/08/2015\"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))")
//get_json_object(json_txt, path) - Extracts a json object from path.
spark.sql("SELECT get_json_object('{\"a\":\"b\"}', '$.a')")
//json_tuple
spark.sql("SELECT json_tuple('{\"a\":1, \"b\":2}', 'a', 'b');")
//to_json(expr[, options]) - Returns a json string with a given struct value
spark.sql("select to_json(named_struct('a', 1, 'b', 2))")
spark.sql("select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'))")
spark.sql("select to_json(array(named_struct('a', 1, 'b', 2))")
spark.sql("select to_json(map('a', named_struct('b', 1)))")
spark.sql("SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)))")
spark.sql("SELECT to_json(map('a', 1))")
spark.sql("SELECT to_json(array((map('a', 1))))")

// 数组 array(expr, ...) - 返回给定值组成的数组。
spark.sql("select array(1,3,4)")
// array_contains(array, value) - 如果数组包含了 value,则返回 true。
spark.sql("select array_contains(array(1,3,4),1)")
// size size(expr) - Returns the size of an array or a map. Returns -1 if null.
spark.sql("SELECT size(array('b', 'd', 'c', 'a'))")
// sort_array(array[, ascendingOrder]) - Sorts the input array in ascending or descending order according to the natural ordering of the array elements.
spark.sql("SELECT sort_array(array('b', 'd', 'c', 'a'), true)")
// collect_list(expr) - 收集并返回非唯一元素列表。
spark.sql("select a,collect_list(b) from table group a ")
// collect_set(expr) - 收集并返回唯一元素列表。
spark.sql("select a,collect_set(b) from table group a ")
// map
//map(key0, value0, key1, value1, ...) - Creates a map with the given key/value pairs.
spark.sql("SELECT map(1.0, '2', 3.0, '4')")
// map_keys(map) - Returns an unordered array containing the keys of the map.
spark.sql("SELECT map_keys(map(1, 'a', 2, 'b'))")
// map_values(map) - Returns an unordered array containing the values of the map.
spark.sql("SELECT map_values(map(1, 'a', 2, 'b'))")
// struct
// struct(col1, col2, col3, ...) - Creates a struct with the given field values.
spark.sql("select struct(1 as a, 2 as b) s")
// coalesce(expr1, expr2, ...) - 返回第一个非空参数(如果存在)。 否则,返回 null。
spark.sql("SELECT coalesce(NULL, 1, NULL)")

sparksql开窗函数

一般情况下窗口函数不用 DSL 处理,直接用SQL更方便

import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("cookieid").orderBy("createtime")
val w2 = Window.partitionBy("cookieid").orderBy("pv")
val w3 = w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val w4 = w1.rowsBetween(-1, 1)

// 聚组函数【用分析函数的数据集】 
df.select($"cookieid", $"pv", sum("pv").over(w1).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w3).alias("pv1")).show
df.select($"cookieid", $"pv", sum("pv").over(w4).as("pv1")).show
df.select($"cookieid", $"pv", rank().over(w2).alias("rank")).show
df.select($"cookieid", $"pv", dense_rank().over(w2).alias("denserank")).show
df.select($"cookieid", $"pv", row_number().over(w2).alias("rownumber")).show

// lag、lead 
df.select($"cookieid", $"pv", lag("pv", 2).over(w2).alias("rownumber")).show
df.select($"cookieid", $"pv", lag("pv", -2).over(w2).alias("rownumber")).show

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注