AWS Glue Dynamic Filtering - фильтрация одного динамического кадра с использованием другого динамического кадра.

Я пытаюсь отфильтровать динамическую фильтрацию на основе данных, находящихся в другом динамическом фрейме, я работаю над пример соединения и отношения, в этом коде динамические кадры человека и членства объединяются по идентификатору, но я хотел бы фильтровать людей на основе идентификатора, присутствующего в членстве DF, ниже приведен код где я помещаю статические значения

    import sys
from awsglue.transforms import Join
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# catalog: database and table names
db_name = "legislators"
tbl_persons = "persons_json"
tbl_membership = "memberships_json"
tbl_organization = "organizations_json"



# Create dynamic frames from the source tables 
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)
memberships = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_membership)
orgs = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_organization)

persons = persons.drop_fields(['links', 'identifiers','other_names', 'images','contact_details'])


# Keep the fields we need and rename some.
orgs = orgs.drop_fields(['other_names', 'identifiers','links'])


fileredPersons = Filter.apply(frame = persons,
                              f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])
print "Filtered record count:  ", fileredPersons.count()

ниже логика фильтра

 fileredPersons = Filter.apply(frame = persons,
                                  f = lambda x: x["id"] in ["0005af3a-9471-4d1f-9299-737fff4b9b46", "00aca284-9323-4953-bb7a-1bf6f5eefe95"])

Я хотел бы передать столбец person_id, присутствующий в членстве DF, в условие функции фильтра, в основном фильтрует людей, имеющих членство, любая помощь будет оценена.


person sumedha    schedule 02.05.2020    source источник
comment
Если вы хотите использовать pyspark вместо клея, то это можно сделать с помощью клея, я не уверен   -  person Shubham Jain    schedule 02.05.2020
comment
не могли бы вы подробнее рассказать, как это сделать с помощью pyspark   -  person sumedha    schedule 02.05.2020


Ответы (1)


Вы можете просто выполнить внутреннее соединение вместо фильтрации, например

persons_filtered = persons.alias('persons').join(memberships, persons.id==memberships.id).select('persons.*')

Это даст вам только отфильтрованные значения. Если ваше членство в df невелико или похоже на поиск, вы даже можете транслировать его для более быстрых результатов

from pyspark.sql.functions import broadcast
persons_filtered = persons.alias('persons').join(broadcast(memberships), persons.id==memberships.id).select('persons.*')

Надеюсь, это поможет.

person Shubham Jain    schedule 02.05.2020
comment
я получаю недопустимый синтаксис для person_filtered = person.join (memberships, person.id == memberships.id) .select (person. *) есть ли проблема с этим - person sumedha; 04.05.2020
comment
Я преобразовал Glue Dyanmic Frame в Dataframe и применил предоставленный вами код - person sumedha; 05.05.2020