内容简介:提交任务到Spark
1.场景
在搭建好Hadoop+Spark环境后,现准备在此环境上提交简单的任务到Spark进行计算并输出结果。搭建过程: http://www.linuxidc.com/Linux/2017-06/144926.htm
本人比较熟悉 Java 语言,现以Java的WordCount为例讲解这整个过程,要实现计算出给定文本中每个单词出现的次数。
2.环境测试
在讲解例子之前,我想先测试一下之前搭建好的环境。
2.1测试Hadoop环境
首先创建一个文件wordcount.txt 内容如下:
Hello hadoop hello spark hello bigdata yellow banana red apple
然后执行如下命令:
hadoop fs -mkdir -p /Hadoop/Input (在HDFS创建目录)
hadoop fs -put wordcount.txt /Hadoop/Input (将wordcount.txt文件上传到HDFS)
hadoop fs -ls /Hadoop/Input (查看上传的文件)
hadoop fs -text /Hadoop/Input/wordcount.txt (查看文件内容)
2.2Spark环境测试
我使用spark-shell,做一个简单的WordCount的测试。我就用上面Hadoop测试上传到HDFS的文件wordcount.txt。
首先启动spark-shell命令:
spark-shell
然后直接输入scala语句:
val file=sc.textFile("hdfs://Master:9000/Hadoop/Input/wordcount.txt")
val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
rdd.collect()
rdd.foreach(println)
退出使用如下命令:
:quit
这样环境测试就结束了。
3.Java实现单词计数
package com.example.spark;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public final class WordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("kevin's first spark app");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(args[0]).cache();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
sc.close();
}
}
4.任务提交实现
将上面Java实现的单词计数打成jar包spark-example-0.0.1-SNAPSHOT.jar,并且将jar包上传到Master节点,我是将jar包上传到/opt目录下,本文将以两种方式提交任务到spark,第一种是以spark-submit命令的方式提交任务,第二种是以java web的方式提交任务。
4.1以spark-submit命令的方式提交任务
spark-submit --master spark://114.55.246.88:7077 --class com.example.spark.WordCount /opt/spark-example-0.0.1-SNAPSHOT.jar hdfs://Master:9000/Hadoop/Input/wordcount.txt
4.2以java web的方式提交任务
我是用spring boot搭建的java web框架,实现代码如下:
1)新建maven项目spark-submit
2)pom.xml文件内容, 这里要注意spark的依赖jar包要与scala的版本相对应,如spark-core_2.11,这后面2.11就是你安装的scala的版本 。
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
</parent>
<artifactId>spark-submit</artifactId>
<description>spark-submit</description>
<properties>
<start-class>com.example.spark.SparkSubmitApplication</start-class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<commons.version>3.4</commons.version>
<org.apache.spark-version>2.1.0</org.apache.spark-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-boot-starter-tomcat</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>apache-jsp</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-solr</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${org.apache.spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${org.apache.spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${org.apache.spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${org.apache.spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${org.apache.spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>
<packaging>war</packaging>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>maven2</id>
<url>http://repo1.maven.org/maven2/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<configuration>
<warSourceDirectory>src/main/webapp</warSourceDirectory>
</configuration>
</plugin>
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<configuration>
<systemProperties>
<systemProperty>
<name>spring.profiles.active</name>
<value>development</value>
</systemProperty>
<systemProperty>
<name>org.eclipse.jetty.server.Request.maxFormContentSize</name>
<!-- -1代表不作限制 -->
<value>600000</value>
</systemProperty>
</systemProperties>
<useTestClasspath>true</useTestClasspath>
<webAppConfig>
<contextPath>/</contextPath>
</webAppConfig>
<connectors>
<connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector">
<port>7080</port>
</connector>
</connectors>
</configuration>
</plugin>
</plugins>
</build>
</project>
3)SubmitJobToSpark.java
package com.example.spark;
import org.apache.spark.deploy.SparkSubmit;
/**
* @author kevin
*
*/
public class SubmitJobToSpark {
public static void submitJob() {
String[] args = new String[] { "--master", "spark://114.55.246.88:7077", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/spark-example-0.0.1-SNAPSHOT.jar", "hdfs://Master:9000/Hadoop/Input/wordcount.txt" };
SparkSubmit.main(args);
}
}
4)SparkController.java
package com.example.spark.web.controller;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.example.spark.SubmitJobToSpark;
@Controller
@RequestMapping("spark")
public class SparkController {
private Logger logger = LoggerFactory.getLogger(SparkController.class);
@RequestMapping(value = "sparkSubmit", method = { RequestMethod.GET, RequestMethod.POST })
@ResponseBody
public String sparkSubmit(HttpServletRequest request, HttpServletResponse response) {
logger.info("start submit spark tast...");
SubmitJobToSpark.submitJob();
return "hello";
}
}
5)将项目spark-submit打成war包部署到Master节点tomcat上,访问如下请求:
http://114.55.246.88:9090/spark-submit/spark/sparkSubmit
在tomcat的log中能看到计算的结果。
更多 Spark 相关教程见以下内容 :
CentOS 7.0下安装并配置Spark http://www.linuxidc.com/Linux/2015-08/122284.htm
Spark1.0.0部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm
Spark2.0安装配置文档 http://www.linuxidc.com/Linux/2016-09/135352.htm
Spark 1.5、Hadoop 2.7 集群环境搭建 http://www.linuxidc.com/Linux/2016-09/135067.htm
Spark官方文档 - 中文翻译 http://www.linuxidc.com/Linux/2016-04/130621.htm
CentOS 6.2(64位)下安装Spark0.8.0详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm
Spark2.0.2 Hadoop2.6.4全分布式配置详解 http://www.linuxidc.com/Linux/2016-11/137367.htm
Ubuntu 14.04 LTS 安装 Spark 1.6.0 (伪分布式) http://www.linuxidc.com/Linux/2016-03/129068.htm
Spark 的详细介绍 : 请点这里
Spark 的下载地址 : 请点这里
本文永久更新链接地址 : http://www.linuxidc.com/Linux/2017-06/144928.htm
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Spark 源码分析(一):Spark Submit 任务提交
- spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)
- JAVA线程池原理源码解析—为什么启动一个线程池,提交一个任务后,Main方法不会退出?
- Git提交错误时如何删除Git提交记录
- 分布式系统 - 两段式提交(2PC)和三段式提交(3PC)
- 减半前,比特币开发者代码提交数创历史新高:4月累计提交510次
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。