Spark Demo(Serializable)

前言

在本文中将介绍spark中Task执行序列化的开发问题

开发环境准备

本实验Spark运行在Windows上,为了开发Spark应用程序,在本地机器上需要有Jdk1.8和Maven环境。
确保我们的环境配置正常,我们可以使用快捷键 Win+R 输入cmd:
环境如下:
图片说明
程序开发工具我们使用IDEA,创建maven项目,添加pom依赖

编写Spark程序

目录结构如下:

图片说明

创建Serializable.scala:

首先我们需要了解RDD中的函数传递:
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
如果我们对我们自定义的类不进行序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package SparkDemo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author: luomo
* @CreateTime: 2019/10/29
* @Description: Serializable from Driver to Executor
*/
object Serializable {

def main(args: Array[String]): Unit = {

//创建Spark上下文对象
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")

//创建Spark上下文对象
val sc = new SparkContext(config)

val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))

val search = new Search("h")

val match1:RDD[String] =search.getMatch1(rdd)
match1.collect().foreach(println)
sc.stop()
}
class Search(query:String){
//过滤出包含字符串的数据
def isMatch(s:String):Boolean ={
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1(rdd:RDD[String]) :RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] ={
rdd.filter(x=> x.contains(query))
}
}
}

如图:
图片说明

可见,对于自己定义的普通类,Spark是无法直接将其序列化的。
需要我们自定义的类继承java.io.Serializable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package SparkDemo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author: luomo
* @CreateTime: 2019/10/29
* @Description: Serializable from Driver to Executor
*/
object Serializable {

def main(args: Array[String]): Unit = {

//创建Spark上下文对象
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")

//创建Spark上下文对象
val sc = new SparkContext(config)

val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))

val search = new Search("h")

val match1:RDD[String] =search.getMatch1(rdd)
match1.collect().foreach(println)
sc.stop()
}
//自定义类
class Search(query:String) extends java.io.Serializable {
//过滤出包含字符串的数据
def isMatch(s:String):Boolean ={
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1(rdd:RDD[String]) :RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] ={
rdd.filter(x=> x.contains(query))
}
}
}

运行程序

如图我们过滤出包含字符h的字符串:
图片说明

文章目录
  1. 1. 前言
  2. 2. 开发环境准备
  3. 3. 编写Spark程序
    1. 3.1. 目录结构如下:
    2. 3.2. 创建Serializable.scala:
  4. 4. 运行程序
,