Я написал некоторый код на python с контекстом sql, т.е. pyspark для выполнения некоторых операций с csv путем преобразования их в фреймы данных pyspark (операции df, такие как предварительная обработка, переименование имен столбцов, создание нового столбца и добавление их в тот же фрейм данных и т. д.) . Я хочу написать для него модульные тесты. Я понятия не имею, как писать модульные тесты на фреймах данных. Может ли кто-нибудь помочь мне, как написать модульные тестовые примеры для фреймов данных в pyspark? Или дайте мне какие-то источники для тестов на фреймах данных?
Модульные тестовые примеры операций с кадрами данных Pyspark
Ответы (1)
Фреймы данных ничем не отличаются от всего остального в мире pyspark. Вы можете начать с просмотра раздела Python базы spark-testing-base. Есть несколько интересных проектов, в которых есть тесты фреймов данных, так что вы можете начать с изучения того, как они это делают: Sparkling Pandas — один, а вот другой пример. Существует также find-spark, который поможет найти контекст исполняемого файла spark. Но основная идея состоит в том, чтобы правильно настроить путь перед началом теста:
def add_pyspark_path():
"""
Add PySpark to the PYTHONPATH
Thanks go to this project: https://github.com/holdenk/sparklingpandas
"""
import sys
import os
try:
sys.path.append(os.path.join(os.environ['SPARK_HOME'], "python"))
sys.path.append(os.path.join(os.environ['SPARK_HOME'],
"python","lib","py4j-0.9-src.zip"))
except KeyError:
print "SPARK_HOME not set"
sys.exit(1)
add_pyspark_path() # Now we can import pyspark
И обычно у вас будет один базовый класс тестового примера:
import logging
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext, HiveContext
def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
class SparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
quiet_py4j()
# Setup a new spark context for each test
conf = SparkConf()
conf.set("spark.executor.memory","1g")
conf.set("spark.cores.max", "1")
#conf.set("spark.master", "spark://192.168.1.2:7077")
conf.set("spark.app.name", "nosetest")
cls.sc = SparkContext(conf=conf)
cls.sqlContext = HiveContext(cls.sc)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
person
Oleksiy
schedule
15.04.2016
не могли бы вы взглянуть на
https://stackoverflow.com/questions/49420660/unit-test-pyspark-code-using-python
- person User12345; 22.03.2018