Spark SQL
Spark SQL is a module that Spark uses to process structured data. It provides a programming abstraction called DataFrame and acts as a distributed SQL query engine. In Hive, Hive SQL is converted into MapReduce and then submitted to the cluster for execution, which greatly simplifies the complexity of writing MapReduce programs, because the execution efficiency of MapReduce is relatively slow. All Spark SQL came into being. It converts Spark SQL into RDD and submits it to the cluster for execution. The execution efficiency is very fast!
SparkConf and SparkSession
SparkContext
Any Spark program starts with SparkContext. The initialization of SparkContext requires a SparkConf object. SparkConf contains various parameters of Spark cluster configuration.
Once initialized, you can use the various methods contained in the SparkContext object to create and manipulate RDD s and shared variables.
SparkContext creation method 1 using SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest") val sc = SparkContext.getOrCreate(conf)
SparkContext creation method 2 using SparkSession
val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext
RDD,DataSet(DS),DataFrame(DF)
DataFrame and DataSet are two new abstractions provided by Spark in SparkSQL. What is the difference between them and RDD?
Compared with RDD, DataSet does not know the column information of self-generated elements, while DataSet can know the column and column information of data.
Compared with DataSet, DataFrame: The data type in DataSet is determined according to the actual situation, while the data type in DataFrame is fixed as ROW
Convert RDD to DS
import org.apache.spark.sql.SparkSession object CreateDataSetDemo { case class Point(label:String,x:Double,y:Double) case class Category(id:Long,name:String) def main(args: Array[String]): Unit = { //1. The first step is to create a SparkSession object val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext //guide package import spark.implicits._ //2. Create an RDD val pointRDD = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5))) val categoryRDD = sc.makeRDD(List((1,"foo"),(2,"bar"))) //2. Use RDD to create a DataSet val ds1 = pointRDD.toDS() ds1.show() println("------------------") val pointDS = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS() val categoryDS = categoryRDD.map(x=>Category(x._1,x._2)).toDS() pointDS.show() categoryDS.show() val df = pointDS.join(categoryDS,pointDS("label")===categoryDS("name")) df.show() spark.stop() } }
(The same is true for DF)
Demo: load data, convert to DS
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object test01 extends App { case class Order(id:String,date:String,customerId:String,status:String) case class Product(id:String,categoryId:String,name:String,description:String,price:String,image:String) case class Customer(id:String,fname:String,lname:String,email:String,password:String,street:String,city:String,state:String,zipcode:String) case class Order_Item(id:String,order_id:String,product_id:String,quantity:String,subtotal:String,product_price:String) //1. The first step is to create a SparkSession object val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext //guide package import spark.implicits._ private val orderRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\orders.csv") private val productRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\products.csv") private val customerRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\customers.csv") private val order_ItemRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\order_items.csv") private val orderDS: Dataset[Order] = orderRDD.map(x => { val fields = x.split(",").map(y => y.replace("\"", "")) Order(fields(0), fields(1), fields(2), fields(3)) }).toDS() private val productDS: Dataset[Product] = productRDD.map(x => { val fields = x.split(",").map(y => y.replace("\"", "")) Product(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5)) }).toDS() private val customerDS: Dataset[Customer] = customerRDD.map(x => { val fields = x.split(",").map(y => y.replace("\"", "")) Customer(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7), fields(8)) }).toDS() private val order_ItemDS: Dataset[Order_Item] = order_ItemRDD.map(x => { val fields = x.split(",").map(y => y.replace("\"", "")) Order_Item(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5)) }).toDS() order_ItemDS.show() customerDS.show() productDS.show() orderDS.show() spark.stop()
Read json file and convert to DF
import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDataFrameDemo extends App { //1. The first step is to create a SparkSession object val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext //guide package import spark.implicits._ //Read the json file through spark.read and generate a DataFrame private val jsontoDF: DataFrame = spark.read.json("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.json") jsontoDF.show() //print field information /** * root * |-- age: long (nullable = true) * |-- name: string (nullable = true) * Is the field type nullable? */ jsontoDF.printSchema() //select query jsontoDF.select("name").show() jsontoDF.select("age").show() //query multiple fields jsontoDF.select(jsontoDF("name"),jsontoDF("age")).show() //age+1 jsontoDF.select(jsontoDF("name"),jsontoDF("age")+1).show() //another form println("------------") jsontoDF.select($"name",$"age"+1).show() //Filter similar to where in sql jsontoDF.filter($"age">20).show() //group groupBy jsontoDF.groupBy("age").count().show() //Let's register the DataFrame as a table jsontoDF.createOrReplaceTempView("people") private val df2: DataFrame = spark.sql("select * from people where age>20") df2.show() jsontoDF.createOrReplaceGlobalTempView("p1") spark.newSession().sql("select * from global_temp,p1 where age>20").show() spark.stop() }
Convert RDD to DF through sample class
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object DataFrameDemo02 extends App { case class People(name:String,age:Int) //1. The first step is to create a SparkSession object val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext //guide package import spark.implicits._ private val peopleRDD: RDD[Array[String]] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.txt") .map(_.split(",")) //Convert RDD to DF with sample class private val peopleDF: DataFrame = peopleRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF() peopleDF.printSchema() peopleDF.show() spark.stop() }
Create DF by schema
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession, types} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * @author WGY */ object createDataFrameBySchame extends App { //1. The first step is to create a SparkSession object val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate() val sc = spark.sparkContext //guide package import spark.implicits._ private val textRDD: RDD[Array[String]] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.txt").map(_.split(",")) //Define schema information name,age private val schema = StructType(Array( StructField("name", StringType, true), StructField("age", IntegerType, true) )) schema //Convert rdd to row type private val mapRDD: RDD[Row] = textRDD.map(x=>Row(x(0),x(1).trim.toInt)) //Convert RDD to DataFrame private val df1: DataFrame = spark.createDataFrame(mapRDD,schema) df1.printSchema() df1.show() println("--------------") //DataFrame->RDD private val r1: RDD[Row] = df1.rdd println(r1.collect().mkString(" ")) } spark.stop()
Convert DF to RDD
/** people.json The content is as follows * {"name":"Michael"} * {"name":"Andy", "age":30} * {"name":"Justin", "age":19} */ val df = spark.read.json("file:///home/hadoop/data/people.json") //Convert DF to RDD df.rdd.collect
Spark SQL to manipulate external data sources
Read parquet file via DF
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.types._ object TestParquet extends App { private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate() import spark.implicits._ private val sc: SparkContext = spark.sparkContext private val rdd1: RDD[(String, String, Array[Int])] = sc.parallelize(List( ("zhangsan", "green", Array(3, 5, 6, 9)), ("lisi", "red", Array(3, 5, 6, 10)), ("wangwu", "black", Array(3, 5, 6, 11)), ("zhaoliu", "yellow", Array(3, 5, 6, 12)), ("wuqi", "green", Array(3, 5, 6, 13)) )) private val structType = StructType(Array( StructField("name", StringType), StructField("color", StringType), StructField("numbers", ArrayType(IntegerType)) )) private val rowRDD: RDD[Row] = rdd1.map(x=>Row(x._1,x._2,x._3)) private val df: DataFrame = spark.createDataFrame(rowRDD,structType) //Read and write parquet files // df.write.parquet("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\user") private val parquetRDD: DataFrame = spark .read .parquet("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\user") parquetRDD.printSchema() parquetRDD.show() spark.stop() }
spark connection database (SparkToMySQL)
import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} object SparkToMySQL extends App{ private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("mysql").getOrCreate() val url = "jdbc:mysql://hadoop1:3306/test" val tableName="student"//table name in mysql private val prop = new Properties() prop.setProperty("user","kb07")//username prop.setProperty("password","ok")//password prop.getProperty("driver","com.mysql.jdbc.Driver") //connect private val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,prop) mysqlDF.printSchema() mysqlDF.show() spark.stop() }
spark connect hive (HiveWoSpark)
import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SparkSession} object HiveOnSpark extends App { private val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName("Demo2") .enableHiveSupport() .config("hive.metastore.uris","thrift://192.168.226.101:9083") .getOrCreate() import spark.implicits._ private val sc: SparkContext = spark.sparkContext private val df1: DataFrame = spark.sql("select * from toronto") df1.printSchema() df1.show() spark.stop() }
Spark SQL built-in functions
package SQL0813 import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.swing.event.AdjustingEvent object testInterFunction extends App { case class Student(id:Int, name:String,sex:String,age:Int) private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate() import spark.implicits._ private val sc: SparkContext = spark.sparkContext //Import the packages needed by built-in functions import org.apache.spark.sql.functions._ private val accessLog = Array( "2020-08-13,1", "2020-08-13,1", "2020-08-13,2", "2020-08-13,2", "2020-08-13,3", "2020-08-13,3", "2020-08-14,1", "2020-08-14,1", "2020-08-14,2", "2020-08-14,3", "2020-08-15,1", "2020-08-15,1", "2020-08-15,2", "2020-08-15,3" ) private val accessLogRDD: RDD[Row] = sc.parallelize(accessLog).map(x => { val strings = x.split(",") Row(strings(0), strings(1).toInt) }) private val structType = StructType(Array( StructField("day", StringType), StructField("userID", IntegerType) )) private val logDF: DataFrame = spark.createDataFrame(accessLogRDD,structType) logDF.printSchema() logDF.show() //Find how many visits per day //agg aggregation logDF.groupBy("day").agg(count("userID").as("pv")).show() //result of deduplication logDF.groupBy("day").agg(countDistinct("userID").as("uv")).show() //sql method logDF.createOrReplaceTempView("logs") spark.sql( """ |select day,count(distinct userID) as pv |from logs |group by day """.stripMargin ).show() private val stuDF: DataFrame = Seq( Student(1001, "zhangsan", "F", 20), Student(1002, "zhangsan1", "M", 18), Student(1003, "zhangsan2", "F", 20), Student(1004, "zhangsan3", "F", 20), Student(1005, "zhangsan4", "M", 20), Student(1006, "zhangsan5", "F", 20), Student(1007, "zhangsan6", "M", 20), Student(1008, "zhangsan7", "F", 25), Student(1009, "zhangsan8", "M", 20), Student(1010, "zhangsan9", "F", 20) ).toDF() stuDF.printSchema() stuDF.show() //Average age grouped by gender stuDF.groupBy("sex").agg(avg("age")).show() stuDF.groupBy("sex").avg("age").show() //Age by gender (average max min) stuDF.groupBy("sex").agg("age"->"avg","age"->"max","age"->"min").show() //Group by gender and age stuDF.groupBy("sex","age").count().show() //Sort by age stuDF.sort("age").show() stuDF.sort($"age".desc).show() //Sort by age in reverse order stuDF.sort($"age",$"id".desc).show() // stuDF.orderBy($"age".desc).show() spark.stop() }
Spark SQL custom function
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object TestUGFDemo extends App { private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate() import spark.implicits._ private val sc: SparkContext = spark.sparkContext //Import the packages needed by built-in functions import org.apache.spark.sql.functions._ private val info: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\hobbies.txt") private val hobbyDF: DataFrame = info.map(_.split("\t")).map(x=>Hobbies(x(0),x(1))).toDF() hobbyDF.createOrReplaceTempView("hobby") //Register the UDF function and use spark.udf.register("hobby_num",(s:String)=>s.split(",").size) spark.sql( """ |select name,hobbies,hobby_num(hobbies) as hobbyNum |from hobby """.stripMargin ).show() spark.stop() } case class Hobbies(name:String,hobbies:(String))