内容简介:本系列共两篇文章,主要探讨如何将Ignite和Spark进行集成。下面简要地回顾一下在第一篇文章中所谈到的内容。Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。
本系列共两篇文章,主要探讨如何将Ignite和Spark进行集成。
下面简要地回顾一下在第一篇文章中所谈到的内容。
Ignite是一个分布式的内存数据库、缓存和处理平台,为事务型、分析型和流式负载而设计,在保证扩展性的前提下提供了内存级的性能。
Spark是一个流式数据和计算引擎,通常从HDFS或者其他存储中获取数据,一直以来,他都倾向于OLAP型业务,并且聚焦于MapReduce类型负载。
因此,这两种技术是可以互补的。
将Ignite与Spark整合
整合这两种技术会为Spark用户带来若干明显的好处:
- 通过避免大量的数据移动,获得真正可扩展的内存级性能;
- 提高RDD、DataFrame和 SQL 的性能;
- 在Spark作业之间更方便地共享状态和数据。
下图中显示了如何整合这两种技术,并且标注了显著的优势:
在第一篇文章中,主要聚焦于IgniteRDD,而本文会聚焦于IgniteDataFrames。
IgniteDataframes
Spark的DataFrame API为描述数据引入了模式的概念,Spark通过表格的形式进行模式的管理和数据的组织。
DataFrame是一个组织为命名列形式的分布式数据集,从概念上讲,DataFrame等同于关系数据库中的表,并允许Spark使用Catalyst查询优化器来生成高效的查询执行计划。而RDD只是跨集群节点分区化的元素集合。
Ignite扩展了DataFrames,简化了开发,改进了将Ignite作为Spark的内存存储时的数据访问时间,好处包括:
- 通过Ignite读写DataFrames时,可以在Spark作业之间共享数据和状态;
- 通过优化Spark的查询执行计划加快SparkSQL查询,这些主要是通过IgniteSQL引擎的高级索引以及避免了Ignite和Spark之间的网络数据移动实现的。
IgniteDataframes示例
下面通过一些代码以及搭建几个小程序的方式,了解Ignite DataFrames如何使用,如果想实际运行这些代码,可以从 GitHub 上下载。
一共会写两个 Java 的小应用,然后在IDE中运行,还会在这些Java应用中执行一些SQL。
一个Java应用会从JSON文件中读取一些数据,然后创建一个存储于Ignite的DataFrame,这个JSON文件Ignite的发行版中已经提供,另一个Java应用会从Ignite的DataFrame中读取数据然后使用SQL进行查询。
下面是写应用的代码:
public class DFWriter { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession .builder() .appName("DFWriter") .master("local") .config("spark.executor.instances", "2") .getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); Dataset<Row> peopleDF = spark.read().json( resolveIgnitePath("resources/people.json").getAbsolutePath()); System.out.println("JSON file contents:"); peopleDF.show(); System.out.println("Writing DataFrame to Ignite."); peopleDF.write() .format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) .option(IgniteDataFrameSettings.OPTION_TABLE(), "people") .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id") .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=replicated") .save(); System.out.println("Done!"); Ignition.stop(false); } }
在 DFWriter
中,首先创建了 SparkSession
,它包含了应用名,之后会使用 spark.read().json()
读取JSON文件并且输出文件内容,下一步是将数据写入Ignite存储。下面是 DFReader
的代码:
public class DFReader { private static final String CONFIG = "config/example-ignite.xml"; public static void main(String args[]) { Ignite ignite = Ignition.start(CONFIG); SparkSession spark = SparkSession .builder() .appName("DFReader") .master("local") .config("spark.executor.instances", "2") .getOrCreate(); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); System.out.println("Reading data from Ignite table."); Dataset<Row> peopleDF = spark.read() .format(IgniteDataFrameSettings.FORMAT_IGNITE()) .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), CONFIG) .option(IgniteDataFrameSettings.OPTION_TABLE(), "people") .load(); peopleDF.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people WHERE id > 0 AND id < 6"); sqlDF.show(); System.out.println("Done!"); Ignition.stop(false); } }
在 DFReader
中,初始化和配置与 DFWriter
相同,这个应用会执行一些过滤,需求是查找所有的id > 0 以及 < 6的人,然后输出结果。
在IDE中,通过下面的代码可以启动一个Ignite节点:
public class ExampleNodeStartup { public static void main(String[] args) throws IgniteException { Ignition.start("config/example-ignite.xml"); } }
到此,就可以对代码进行测试了。
运行应用
首先在IDE中启动一个Ignite节点,然后运行 DFWriter
应用,输出如下:
JSON file contents: +-------------------+---+------------------+ | department| id| name| +-------------------+---+------------------+ |Executive Committee| 1| Ivan Ivanov| |Executive Committee| 2| Petr Petrov| | Production| 3| John Doe| | Production| 4| Ann Smith| | Accounting| 5| Sergey Smirnov| | Accounting| 6|Alexandra Sergeeva| | IT| 7| Adam West| | Head Office| 8| Beverley Chase| | Head Office| 9| Igor Rozhkov| | IT| 10|Anastasia Borisova| +-------------------+---+------------------+ Writing DataFrame to Ignite. Done!
如果将上面的结果与JSON文件的内容进行对比,会显示两者是一致的,这也是期望的结果。
下一步会运行 DFReader
,输出如下:
Reading data from Ignite table. +-------------------+--------------+---+ | DEPARTMENT| NAME| ID| +-------------------+--------------+---+ |Executive Committee| Ivan Ivanov| 1| |Executive Committee| Petr Petrov| 2| | Production| John Doe| 3| | Production| Ann Smith| 4| | Accounting|Sergey Smirnov| 5| +-------------------+--------------+---+ Done!
这也是期望的输出。
总结
通过本文,会发现使用Ignite DataFrames是如何简单,这样就可以通过Ignite DataFrame进行数据的读写了。
未来, 这些代码示例 也会作为Ignite发行版的一部分进行发布。
关于Ignite和Spark的集成,内容就是这些了。
以上所述就是小编给大家介绍的《原 荐 Ignite集成Spark之IgniteDataFrames》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 持续集成:数据库集成及快速构建
- ShareSDK集成及集成后遇到的一些问题【原创】
- 持续集成与持续部署宝典Part 3:创建集成环境
- 持续集成与持续部署宝典Part 2:创建持续集成流水线
- 禅道 12.3.stable 版本发布,全面集成八种单元测试框架,打通持续集成闭环
- 持续集成将死
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First Rails
David Griffiths / O'Reilly Media / 2008-12-30 / USD 49.99
Figure its about time that you hop on the Ruby on Rails bandwagon? You've heard that it'll increase your productivity exponentially, and allow you to created full fledged web applications with minimal......一起来看看 《Head First Rails》 这本书的介绍吧!