SparkDemo(Serializable)
前言
在本文中将介绍spark中Task执行序列化的开发问题
开发环境准备
本实验Spark运行在Windows上,为了开发Spark应用程序,在本地机器上需要有Jdk1.8和Maven环境。
确保我们的环境配置正常,我们可以使用快捷键 Win+R 输入cmd:
环境如下:
程序开发工具我们使用IDEA
#创建Maven项目
pom如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkDemo</groupId>
<artifactId>SparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.1</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
编写Spark程序
目录结构如下:
创建Serializable.scala:
首先我们需要了解
RDD中的函数传递:
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。
如果我们对我们自定义的类不进行序列化:
package SparkDemo
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: luomo
* @CreateTime: 2019/10/29
* @Description: Serializable from Driver to Executor
*/
object Serializable {
def main(args: Array[String]): Unit = {
//创建Spark上下文对象
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")
//创建Spark上下文对象
val sc = new SparkContext(config)
val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))
val search = new Search("h")
val match1:RDD[String] =search.getMatch1(rdd)
match1.collect().foreach(println)
sc.stop()
}
class Search(query:String){
//过滤出包含字符串的数据
def isMatch(s:String):Boolean ={
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1(rdd:RDD[String]) :RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] ={
rdd.filter(x=> x.contains(query))
}
}
}
如图:
可见,对于自己定义的普通类,Spark是无法直接将其序列化的。
需要我们自定义的类继承java.io.Serializable
package SparkDemo
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: luomo
* @CreateTime: 2019/10/29
* @Description: Serializable from Driver to Executor
*/
object Serializable {
def main(args: Array[String]): Unit = {
//创建Spark上下文对象
val config:SparkConf =new SparkConf().setMaster("local[*]").setAppName("Serializable")
//创建Spark上下文对象
val sc = new SparkContext(config)
val rdd:RDD[String] = sc.parallelize(Array("hadoop","spark","hive","Flink"))
val search = new Search("h")
val match1:RDD[String] =search.getMatch1(rdd)
match1.collect().foreach(println)
sc.stop()
}
//自定义类
class Search(query:String) extends java.io.Serializable {
//过滤出包含字符串的数据
def isMatch(s:String):Boolean ={
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1(rdd:RDD[String]) :RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] ={
rdd.filter(x=> x.contains(query))
}
}
}
运行程序
如图我们过滤出包含字符h的字符串:
安克创新 Anker公司福利 716人发布