首页新闻动态正文

Spark系列——从零学习SparkSQL编程(下)【黑马大数据培训】

更新时间:2022年12月21日 18时25分27秒 来源:黑马程序员论坛

黑马中级程序员课程

5. 导入Java依赖
要使用SparkSQL的API,首先要导入Scala,Spark,SparkSQL的依赖:
<properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.0.2</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.32</version> </dependency> <!-- spark sql 依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.0.2</version> </dependency> </dependencies>
6. Java代码操作DataFrame
1.DataFrame作为SparkSQL的核心API,它是通过SparkContext来获取,代码如下:
  //1.创建spark session,并指定appName,需要将任务提交到哪里。 val spark = new SparkSession.Builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2.获取SparkContext,后面所有的SparkSQL操作都需要该上下文。 val sc: SparkContext = spark.sparkContext
上文中,master指定的是sparkSQL的执行环境,可以是集群也可以是本地,这里的local[2]指定的是本地单机运行模 式,使用2条线程来执行任务,注意这里local必须是小写的。 SparkSession是SparkContext的升级版,他支持HiveContext和SparkContext。 2.通过SparkContext后我们可以获取到数据对应的DataFrame,代码如下:
//3.获取每一行内容的RDD,并通过schema将RDD转化成DF val lineRdd: RDD[Array[String]] = sc.textFile("hdfs://node01:8020/spark_res/people.txt").map(_.split(", ")) val peopleRdd: RDD[People] = lineRdd.map(x => People(x(0), x(1).toInt)) import spark.implicits._ val peopleDF: DataFrame = peopleRdd.toDF //4.对DF进行操作 peopleDF.printSchema() peopleDF.show() println(peopleDF.head()) println(peopleDF.count()) peopleDF.columns.foreach(println)
使用DataFrame之前,必须导包,否则无法使用  toDF 方法。 3.DataFrame操作SQL有两种方式,DSL和SQL,代码如下:
//DSL peopleDF.select("name","age").show() peopleDF.filter($"age">20).groupBy("name").count().show //SQL peopleDF.createOrReplaceTempView("t_people") spark.sql("select * from t_people order by age desc").show
4.SQL操作完毕后必须关闭sparkContext和SparkSession,代码分别是 sc.stop() 和  spark.stop() 除了读取普通文件,还可以读取mysql orcale数据,代码如下:
val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //重点代码,连接JDBC val ipLocationDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/iplocation","iplocation",properties) ipLocationDF.printSchema ipLocationDF.show
7. 保存DataFrame的结果
除了读取数据,DataFrame还提供了一整套保存处理数据结果的机制,代码如下:
object SparkSqlToMysql { def main(args: Array[String]): Unit = { val sc: SparkContext = ... //1.通过spark session读取json数据,并返回DataFrame val peopleDF: DataFrame = spark.read.json(args(0)) //2.将DataFrame注册为t_people表,并对该表进行SQL语句操作 peopleDF.createOrReplaceTempView("t_people") val resultDF = spark.sql("select * from t_people") //3.对上个SQL语句的操作结果进行保存 val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") resultDF.write.jdbc("jdbc:mysql://192.168.52.105:3306/iplocation","spark_save_result",properties) //close sparkcontext sparksession.. } }
需要注意的是resultDF.write,其返回DataFrameWriter。 1. 该类可以保存任何SQL的结果,并且由于API的便利性,可以保存成多种格式,如 text ,json ,orc,csv,jdbc 等。 2. 对于保存的数据,系统提供了几种保存模式,可以通过mode(String)来指定: overwrite : 重写文件内部数据 append : 将新增内容添加到文件末尾 ignore : 如果文件已存在 则忽略操作 error : default option, 如果文件存在,则抛出异常 8. 总结
1. 在SparkSQL系列中,我们首先介绍了SparkSQL的核心API DataFrame,DataFrame内部分为RDD基础分布式 数据集和Schema元信息。DataFrame的SQL代码在执行之前会经过Catalyst优化,变成高效的处理代码。接着 我们介绍了通过 spark-shell 和 java api 两种客户端窗口操作DataFrame。 2. 创建DataFrame有两种方式: 1. 通过 rdd.toDF 直接将rdd转换成DataFrame。 2. 通过  spark.read 直接读取各种格式的数据。 3. 查看DataFrame的内容有两种: 1. 通过  df.printSchema 查看数据结构。 2. 通过  df.show 查看数据内容。 4. df提供了DSL和SQL两种风格的来操作数据。对于DSL风格,常见的方法有  select() filter() 等。 5. 本文在后半部分主要介绍SparkSQL如何与mysql进行交互,除此之外,还支持Parquet,ORC,JSON,Hive, JDBC , avro协议文件等交互,可以通过 官方网站学习:http://spark.apache.org/docs/latest/sql-data-source s.html。

推荐了解热门学科

java培训 Python人工智能 Web前端培训 PHP培训
区块链培训 影视制作培训 C++培训 产品经理培训
UI设计培训 新媒体培训 软件测试培训 Linux运维
大数据培训 智能机器人软件开发




传智播客是一家致力于培养高素质软件开发人才的科技公司“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。

传智播客从未停止思考

传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”

中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。

何为中高级程序员课程?

传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。




黑马程序员热门视频教程

Python入门教程完整版(懂中文就能学会) 零起点打开Java世界的大门
C++| 匠心之作 从0到1入门学编程 PHP|零基础入门开发者编程核心技术
Web前端入门教程_Web前端html+css+JavaScript 软件测试入门到精通


分享到:
在线咨询 我要报名
和我们在线交谈!