Как создать фрейм данных Spark, используя соединение снежинки в Python?

Я новичок в Spark и Python, и у меня есть sql, который хранится в переменной в python, и мы используем базу данных SnowFlake. Как создать искровые данные с помощью SQL со снежинкой?

    import sf_connectivity (we have a code for establishing connection with Snowflake database)
    emp = 'Select * From Employee'
    snowflake_connection = sf_connectivity.collector() (It is a method to establish snowflake conenction)
    requirement 1: Create Spark Dataframe (sf_df) using 'emp' and 'snowflake_connection '
    requirement 2: sf_df.createOrReplaceTempView(Temp_Employee)

Какие пакеты или библиотеки ему требуются? Как я могу заставить это работать?


comment
Отвечает ли это на ваш вопрос? Как создать фрейм данных Spark из фрейма данных Pandas, используя Snow Flake и Python?   -  person Mike Walton    schedule 04.06.2020


Ответы (2)


Документация, которая помогла мне в этом разобраться, находится здесь: https://docs.databricks.com/data/data-sources/snowflake.html.

Мне потребовалось некоторое время, чтобы понять, как заставить его работать! После множества вопросов я попросил ИТ-отдел моей компании настроить учетную запись пользователя в виде снежинки с аутентификацией с закрытым / открытым ключом, и они настроили этот идентификатор, чтобы он был доступен в нашей корпоративной учетной записи Databricks.

После настройки следующий код представляет собой пример того, как передать команду sql в качестве переменной в Spark и заставить Spark преобразовать ее в фрейм данных.

optionsSource = dict(sfUrl="mycompany.east-us-2.azure.snowflakecomputing.com", # Snowflake Account Name
                          sfUser="my_service_acct",
                          pem_private_key=dbutils.secrets.get("my_scope", "my_secret"),
                   sfDatabase="mydatabase", # Snowflake Database
                   sfSchema="myschema", # Snowflake Schema
                   sfWarehouse="mywarehouse",
                   sfRole="myrole"
                        )   

    sqlcmd = '''
    select current_date;
    '''

    df = spark.read.format("snowflake").options(**optionsSource).option("query", sqlcmd).load()
    display(df)
person Nathan T Alexander    schedule 05.06.2020

С помощью открытого / закрытого ключа вам необходимо создать сертификат https://community.snowflake.com/s/article/How-to-connect-snowflake-with-Spark-connector-using-Public-Private-Key и тогда вы можете использовать приведенный ниже код.

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

with open("<path/to/>/rsa_key.p8", "rb") as key_file:
    p_key = serialization.load_pem_private_key(
        key_file.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")


sfOptions = {
    "sfURL": "<URL>",
    "sfAccount": "<ACCOUNTNAME>",
    "sfUser": "<USER_NAME",
    "pem_private_key": pkb,
    # "sfPassword": "xxxxxxxxx",
    "sfDatabase": "<DBNAME>",
    "sfSchema": "<SCHEMA_NAME>",
    "sfWarehouse": "<WH_NAME>",
    "sfRole": "<ROLENAME>",
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "<TABLENAME>") \
    .load()

df.show()
person Ankur Srivastava    schedule 06.06.2020