Я использую Google Cloud Dataflow для Python SDK для чтения более 200 000 сущностей из хранилища данных с помощью функции ReadFromDatastore()
в запросе без каких-либо фильтров.
def make_example_entity_query():
"""
make an unfiltered query on the `ExampleEntity` entity
"""
query = query_pb2.Query()
query.kind.add().name = "ExampleEntity"
return query
Затем я делаю некоторую работу в конвейере с этим запросом
p = beam.Pipeline(options=PipelineOptions.from_dictionary(pipeline_options))
(
p
| 'read in the new donations from Datastore'
>> ReadFromDatastore(project, query, None)
|'protobuf2entity transformation'
>> beam.Map(entity_from_protobuf)
| 'do some work or something'
>> beam.Map(lambda item: item[0] + item[1])
)
return p.run()
это отлично работает локально, используя данные тестирования порядка нескольких тысяч записей, но когда я развертываю его в облаке и запускаю в нашей производственной базе данных с более чем 200 000 элементов, оно просто отключается через час или около того без какого-либо прогресса. Кажется, он полностью застрял на части чтения.
также показывает, что было прочитано ноль элементов
и похоже, что только один рабочий когда-либо был раскручен
Так что я не совсем уверен, что здесь происходит. Мои вопросы
- Существует ли какое-то разумное ограничение на объем данных, которые можно считывать из хранилища данных в качестве входных данных для конвейера?
- почему кажется, что данные вообще не попадают в конвейер? Если я запускаю это локально, я вижу, как данные проходят, хотя и довольно медленно.
- почему раскручивается только один рабочий? Я знаю, что если у вас есть фильтры для операции чтения, это приводит к тому, что чтение выполняется с одного узла, но это делается без фильтров неравенства при чтении из хранилища данных.