前言
在本文中将介绍spark中Task执行序列化的开发问题
开发环境准备
本实验Spark运行在Windows上,为了开发Spark应用程序,在本地机器上需要有Jdk1.8和Maven环境。
确保我们的环境配置正常,我们可以使用快捷键 Win+R 输入cmd:
环境如下:
![图片标题 图片说明](https://uploadfiles.nowcoder.com/images/20191029/9094293_1572341256892_8D8973B8667179A2319B041328F690BD)
程序开发工具我们使用IDEA,创建maven项目,添加pom依赖
编写Spark程序
目录结构如下:
创建Serializable.scala:
首先我们需要了解RDD中的函数传递:
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
如果我们对我们自定义的类不进行序列化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package SparkDemo
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object Serializable {
def main(args: Array[String]): Unit = {
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")
val sc = new SparkContext(config)
val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))
val search = new Search("h")
val match1:RDD[String] =search.getMatch1(rdd) match1.collect().foreach(println) sc.stop() } class Search(query:String){ def isMatch(s:String):Boolean ={ s.contains(query) } def getMatch1(rdd:RDD[String]) :RDD[String] = { rdd.filter(isMatch) } def getMatche2(rdd: RDD[String]): RDD[String] ={ rdd.filter(x=> x.contains(query)) } } }
|
如图:
可见,对于自己定义的普通类,Spark是无法直接将其序列化的。
需要我们自定义的类继承java.io.Serializable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package SparkDemo
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object Serializable {
def main(args: Array[String]): Unit = {
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")
val sc = new SparkContext(config)
val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))
val search = new Search("h")
val match1:RDD[String] =search.getMatch1(rdd) match1.collect().foreach(println) sc.stop() } class Search(query:String) extends java.io.Serializable { def isMatch(s:String):Boolean ={ s.contains(query) } def getMatch1(rdd:RDD[String]) :RDD[String] = { rdd.filter(isMatch) } def getMatche2(rdd: RDD[String]): RDD[String] ={ rdd.filter(x=> x.contains(query)) } } }
|
运行程序
如图我们过滤出包含字符h的字符串: