内容简介:既然要统计单词我们就需要一个包含一定数量的文本,我们这里选择了英文原著《GoneWithTheWind》(《飘》)的文本来做一个数据统计,看看文章中各个单词出现频次如何。为了便于大家下载文本。可以到首先我们要读取该文件,就要用到SparkContext中的textFile的方法,我们尝试先读取第一行。scala实现
既然要统计单词我们就需要一个包含一定数量的文本,我们这里选择了英文原著《GoneWithTheWind》(《飘》)的文本来做一个数据统计,看看文章中各个单词出现频次如何。为了便于大家下载文本。可以到 GitHub 上下载文本以及对应的代码。我将文本放在项目的目录下。
首先我们要读取该文件,就要用到SparkContext中的textFile的方法,我们尝试先读取第一行。
scala实现
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) println(sc.textFile("./GoneWithTheWind").first()) } } 复制代码
java实现
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; public class WordCountJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); System.out.println(sc.textFile("./GoneWithTheWind").first()); } } 复制代码
python实现
from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) print(sc.textFile("./GoneWithTheWind").first()) 复制代码
得到输出
Chapter 1 复制代码
以scala为例,其余两种语言也差不多。第一步我们创建了一个SparkConf
val conf = new SparkConf().setMaster("local").setAppName("WordCount") 复制代码
这里我们设置Master为local,该程序名称为WordCount,当然程序名称可以任意取,和类名不同也无妨。但是这个Master则不能乱写,当我们在集群上运行,用spark-submit的时候,则要注意。我们现在只讨论本地的写法,因此,这里只写local。
接着一句我们创建了一个SparkContext,这是spark的核心,我们将conf配置传入初始化
val sc = new SparkContext(conf) 复制代码
最后我们将文本路径告诉SparkContext,然后输出第一行内容
println(sc.textFile("./GoneWithTheWind").first()) 复制代码
开始统计
接着我们就可以开始统计文本的单词数了,因为单词是以空格划分,所以我们可以把空格作为单词的标记。
scala实现
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) //设置数据路径 val text = sc.textFile("./GoneWithTheWind") //将文本数据按行处理,每行按空格拆成一个数组 // flatMap会将各个数组中元素合成一个大的集合 val textSplit = text.flatMap(line =>line.split(" ")) //处理合并后的集合中的元素,每个元素的值为1,返回一个元组(key,value) //其中key为单词,value这里是1,即该单词出现一次 val textSplitFlag = textSplit.map(word => (word,1)) //reduceByKey会将textSplitFlag中的key相同的放在一起处理 //传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1 val countWord = textSplitFlag.reduceByKey((x,y)=>x+y) //将计算后的结果存在项目目录下的result目录中 countWord.saveAsTextFile("./result") } } 复制代码
java实现
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCountJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); //设置数据的路径 JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); //将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合 //这里需要注意的是FlatMapFunction中<String, String>,第一个表示输入,第二个表示输出 //与Hadoop中的map-reduce非常相似 JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //处理合并后的集合中的元素,每个元素的值为1,返回一个Tuple2,Tuple2表示两个元素的元组 //值得注意的是上面是JavaRDD,这里是JavaPairRDD,在返回的是元组时需要注意这个区别 //PairFunction中<String, String, Integer>,第一个String是输入值类型 //第二第三个,String, Integer是返回值类型 //这里返回的是一个word和一个数值1,表示这个单词出现一次 JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }); //reduceByKey会将splitFlagRDD中的key相同的放在一起处理 //传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1 JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //将计算后的结果存在项目目录下的result目录中 countRDD.saveAsTextFile("./resultJava"); } } 复制代码
python实现
from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) # 设置数据的路径 textData = sc.textFile("./GoneWithTheWind") # 将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合 splitData = textData.flatMap(lambda line:line.split(" ")) # 处理合并后的集合中的元素,每个元素的值为1,返回一个元组(key,value) # 其中key为单词,value这里是1,即该单词出现一次 flagData = splitData.map(lambda word:(word,1)) # reduceByKey会将textSplitFlag中的key相同的放在一起处理 # 传入的(x,y)中,x是上一次统计后的value,y是本次单词中的value,即每一次是x+1 countData = flagData.reduceByKey(lambda x,y:x+y) #输出文件 countData.saveAsTextFile("./result") 复制代码
运行后在住目录下得到一个名为result的目录,该目录如下图,SUCCESS表示生成文件成功,文件内容存储在part-00000中
我们可以查看文件的部分内容:
('Chapter', 1) ('1', 1) ('SCARLETT', 1) ('O’HARA', 1) ('was', 74) ('not', 33) ('beautiful,', 1) ('but', 32) ('men', 4) ('seldom', 3) ('realized', 2) ('it', 37) ('when', 19) ('caught', 1) ('by', 20) ('her', 65) ('charmas', 1) ('the', 336) ('Tarleton', 7) ('twins', 16) ('were.', 1) ('In', 1) ('face', 6) ('were', 49) ... ... ... ... 复制代码
这样就完成了一个spark的真正HelloWorld程序--单词计数。对比三个语言版本的程序,发现一个事实那就是,用scala和 python 写的代码非常简洁而且易懂,而 Java 实现的则相对复杂,难懂。当然这个易懂和难懂是相对而言的。如果你只会Java无论如何你都应该从中能看懂java的程序,而简洁的scala和python对你来说根本看不懂。这也无妨,语言只是工具,重点看你怎么用。况且,我们使用java8的特性也可以写出简洁的代码。
java8实现
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class WordCountJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); countJava8(sc); } public static void countJava8(JavaSparkContext sc){ sc.textFile("./GoneWithTheWind") .flatMap(s->Arrays.asList(s.split(" ")).iterator()) .mapToPair(s->new Tuple2<>(s,1)) .reduceByKey((x,y)->x+y) .saveAsTextFile("./resultJava8"); } } 复制代码
spark的优越性在这个小小的程序中已经有所体现,计算一本书的每个单词出现的次数,spark在单机上运行(读取文件、生成临时文件、将结果写到硬盘),加载-运行-结束只花费了2秒时间。
对程序进行优化
程序是否还能再简单高效呢?当然是可以的,我们可以用countByValue这个函数,这个函数正是常用的计数的方法。
scala实现
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) //设置数据路径 val text = sc.textFile("./GoneWithTheWind") //将文本数据按行处理,每行按空格拆成一个数组 // flatMap会将各个数组中元素合成一个大的集合 val textSplit = text.flatMap(line =>line.split(" ")) println(textSplit.countByValue()) } } 复制代码
运行得到结果
Map(Heknew -> 1, “Ashley -> 1, “Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,....... 复制代码
java实现
public class WordCountJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); countJava(sc); } public static void countJava(JavaSparkContext sc){ //设置数据的路径 JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); //将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合 //这里需要注意的是FlatMapFunction中<String, String>,第一个表示输入,第二个表示输出 //与Hadoop中的map-reduce非常相似 JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); System.out.println(splitRDD.countByValue()); } } 复制代码
运行得到结果
{Heknew=1, “Ashley=1, “Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1, 复制代码
python实现
from pyspark import SparkConf,SparkContext conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) # 设置数据的路径 textData = sc.textFile("./GoneWithTheWind") # 将文本数据按行处理,每行按空格拆成一个数组,flatMap会将各个数组中元素合成一个大的集合 splitData = textData.flatMap(lambda line:line.split(" ")) print(splitData.countByValue()) 复制代码
运行得到结果:
defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4, 复制代码spark的优越性在这个小小的程序中已经有所体现,计算一本书的每个单词出现的次数,spark在单机上运行(读取文件、生成临时文件、将结果写到硬盘),加载-运行-结束只花费了2秒时间。如果想要获取源代码以及数据内容,可以前往我的 github
下载。
以上所述就是小编给大家介绍的《Spark入门(三)--Spark经典的单词统计》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 算法 – 给出一个单词,打印其索引,可以相应地增加单词
- Pocketsphinx – 添加单词和提高准确性
- 每日一道算法题--leetcode 290--单词规则--python
- go基础库之将字符串分解为单词
- python将每个单词按空格分开并保存到文件中
- LeetCode每日一题: 最后一个单词的长度(No.58)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。