pyspark操作MongoDB

栏目: 服务器 · 发布时间: 5年前

内容简介:pyspark对mongo数据库的基本操作 (๑• . •๑)有几点需要注意的:

pyspark对mongo数据库的基本操作 (๑• . •๑)

pyspark操作MongoDB

这是崔斯特的第八十一篇原创文章

有几点需要注意的:

  1. 不要安装最新的pyspark版本,请安装 pip3 install pyspark==2.3.2
  2. spark-connector 与平常的 MongoDB 写法不同,格式是: mongodb://127.0.0.1:database.collection
  3. 如果计算数据量比较大,你的电脑可能会比较卡,^_^
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: zhangslob
@file: spark_count.py 
@time: 2019/01/03
@desc:
    不要安装最新的pyspark版本
    `pip3 install pyspark==2.3.2`
    更多pyspark操作MongoDB请看https://docs.mongodb.com/spark-connector/master/python-api/
"""

import os
from pyspark.sql import SparkSession

# set PYSPARK_PYTHON to python36
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'

# load mongodb data
# 格式是:"mongodb://127.0.0.1:database.collection"
input_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
output_uri = "mongodb://127.0.0.1:27017/spark.spark_test"

# 创建spark,默认使用本地环境,或者"spark://master:7077"
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("MyApp") \
    .config("spark.mongodb.input.uri", input_uri) \
    .config("spark.mongodb.output.uri", output_uri) \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
    .getOrCreate()


def except_id(collection_1, collection_2, output_collection, pipeline):
    """
    计算表1与表2中不同的数据
    :param collection_1: 导入表1
    :param collection_2: 导入表2
    :param output_collection: 保存的表
    :param pipeline: MongoDB查询语句 str
    :return:
    """
    # 可以在这里指定想要导入的数据库,将会覆盖上面配置中的input_uri。下面保存数据也一样
    # .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test")
    # .option("database", "people").option("collection", "contacts")

    df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \
        .option("pipeline", pipeline).load()

    df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \
        .option("pipeline", pipeline).load()

    # df_1有但是不在 df_2,同理可以计算df_2有,df_1没有
    df = df_1.subtract(df_2)
    df.show()

    # mode 参数可选范围
    # * `append`: Append contents of this :class:`DataFrame` to existing data.
    # * `overwrite`: Overwrite existing data.
    # * `error` or `errorifexists`: Throw an exception if data already exists.
    # * `ignore`: Silently ignore this operation if data already exists.

    df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save()
    spark.stop()


if __name__ == '__main__':
    # mongodb query, MongoDB查询语句,可以减少导入数据量
    pipeline = "[{'$project': {'uid': 1, '_id': 0}}]"

    collection_1 = "spark_1"
    collection_2 = "spark_2"
    output_collection = 'diff_uid'
    except_id(collection_1, collection_2, output_collection, pipeline)
    print('success')

完整代码地址: spark_count_diff_uid.py


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Intersectional Internet

The Intersectional Internet

Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016

From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码