Модульные тестовые примеры операций с кадрами данных Pyspark

Я написал некоторый код на python с контекстом sql, т.е. pyspark для выполнения некоторых операций с csv путем преобразования их в фреймы данных pyspark (операции df, такие как предварительная обработка, переименование имен столбцов, создание нового столбца и добавление их в тот же фрейм данных и т. д.) . Я хочу написать для него модульные тесты. Я понятия не имею, как писать модульные тесты на фреймах данных. Может ли кто-нибудь помочь мне, как написать модульные тестовые примеры для фреймов данных в pyspark? Или дайте мне какие-то источники для тестов на фреймах данных?


person Bheemineti Shobhana    schedule 14.04.2016    source источник


Ответы (1)


Фреймы данных ничем не отличаются от всего остального в мире pyspark. Вы можете начать с просмотра раздела Python базы spark-testing-base. Есть несколько интересных проектов, в которых есть тесты фреймов данных, так что вы можете начать с изучения того, как они это делают: Sparkling Pandas — один, а вот другой пример. Существует также find-spark, который поможет найти контекст исполняемого файла spark. Но основная идея состоит в том, чтобы правильно настроить путь перед началом теста:

def add_pyspark_path():
    """
    Add PySpark to the PYTHONPATH
    Thanks go to this project: https://github.com/holdenk/sparklingpandas
    """
    import sys
    import os
    try:
        sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
        sys.path.append(os.path.join(os.environ['SPARK_HOME'],
            "python","lib","py4j-0.9-src.zip"))
    except KeyError:
        print "SPARK_HOME not set"
        sys.exit(1)

add_pyspark_path() # Now we can import pyspark

И обычно у вас будет один базовый класс тестового примера:

import logging

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext, HiveContext

def quiet_py4j():
    """ turn down spark logging for the test context """
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

class SparkTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        quiet_py4j()

        # Setup a new spark context for each test
        conf = SparkConf()
        conf.set("spark.executor.memory","1g")
        conf.set("spark.cores.max", "1")
        #conf.set("spark.master", "spark://192.168.1.2:7077")
        conf.set("spark.app.name", "nosetest")
        cls.sc = SparkContext(conf=conf)
        cls.sqlContext = HiveContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()
person Oleksiy    schedule 15.04.2016
comment
не могли бы вы взглянуть на https://stackoverflow.com/questions/49420660/unit-test-pyspark-code-using-python - person User12345; 22.03.2018