Я столкнулся с интересным ограничением в хранилище данных App Engine. Я создаю обработчик, чтобы помочь нам проанализировать некоторые данные об использовании на одном из наших производственных серверов. Для выполнения анализа мне нужно запросить и суммировать более 10 000 объектов, извлеченных из хранилища данных. Расчет несложный, это просто гистограмма элементов, прошедших определенный фильтр образцов использования. Проблема, с которой я столкнулся, заключается в том, что я не могу получить данные из хранилища данных достаточно быстро, чтобы выполнить какую-либо обработку до истечения крайнего срока запроса.
Я пробовал все, что мог придумать, чтобы разбить запрос на параллельные вызовы RPC для повышения производительности, но, согласно статистике приложений, я не могу заставить запросы фактически выполняться параллельно. Независимо от того, какой метод я пробую (см. Ниже), всегда кажется, что RPC возвращается к водопаду последовательных следующих запросов.
Примечание: код запроса и анализа действительно работает, он просто работает медленно, потому что я не могу достаточно быстро получить данные из хранилища данных.
Фон
У меня нет живой версии, которой я мог бы поделиться, но вот базовая модель той части системы, о которой я говорю:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
Вы можете думать о примерах как о случаях, когда пользователь использует возможность данного имени. (например, 'systemA.feature_x'). Теги основаны на деталях клиента, системной информации и функции. пример: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). Таким образом, теги образуют денормализованный набор токенов, которые можно использовать для поиска интересующих образцов.
Анализ, который я пытаюсь сделать, состоит в том, чтобы взять диапазон дат и спросить, сколько раз функция набора функций (возможно, всех функций) использовалась в день (или в час) для каждой учетной записи клиента (компании, а не для каждого пользователя).
Таким образом, ввод в обработчик будет примерно таким:
- Дата начала
- Дата окончания
- Тег (и)
Результат будет:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
Общий код для запросов
Вот общий код для всех запросов. Общая структура обработчика представляет собой простой обработчик получения с использованием webapp2, который устанавливает параметры запроса, выполняет запрос, обрабатывает результаты, создает данные для возврата.
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
Методы испробованные
Я пробовал различные методы, чтобы попытаться извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:
A. Одиночная итерация
Это более простой базовый вариант для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb вытягивать их один за другим.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
Б. Большой выбор
Идея заключалась в том, чтобы посмотреть, смогу ли я выполнить одну очень большую выборку.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C. Асинхронная выборка во временном диапазоне
Идея здесь состоит в том, чтобы признать, что образцы довольно хорошо разнесены по времени, поэтому я могу создать набор независимых запросов, которые разбивают общий временной интервал на фрагменты и пытаются запустить каждый из них параллельно с использованием async:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. Асинхронное отображение
Я попробовал этот метод, потому что в документации говорилось, что ndb может автоматически использовать некоторый параллелизм при использовании метода Query.map_async.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Исход
Я протестировал один пример запроса для сбора общего времени ответа и трассировки статистики приложений. Результаты следующие:
A. Одиночная итерация
реальный: 15,645 с
Этот идет последовательно через выборку пакетов один за другим, а затем извлекает каждый сеанс из кэша памяти.
Б. Большой выбор
реальный: 12,12 с
Фактически то же самое, что и вариант A, но по какой-то причине немного быстрее.
C. Асинхронная выборка во временном диапазоне
реальный: 15,251 с
Кажется, что вначале обеспечивает больший параллелизм, но, кажется, замедляется из-за последовательности вызовов next во время итерации результатов. Также, похоже, не может перекрывать поиск в кэше памяти сеанса с ожидающими запросами.
D. Асинхронное отображение
реальный: 13,752 с
Мне это труднее всего понять. Похоже, что у него много перекрытий, но кажется, что все растягивается водопадом, а не параллельно.
Рекомендации
Исходя из всего этого, что мне не хватает? Я просто исчерпал лимит App Engine или есть лучший способ параллельного удаления большого количества объектов?
Я не знаю, что попробовать дальше. Я думал о том, чтобы переписать клиента, чтобы параллельно выполнять несколько запросов к движку приложения, но это кажется довольно грубой силой. Я действительно ожидал, что движок приложения сможет справиться с этим вариантом использования, поэтому я предполагаю, что мне что-то не хватает.
Обновлять
В конце концов, я обнаружил, что вариант C лучше всего подходит для моего случая. Я смог оптимизировать его до завершения за 6,1 секунды. Все еще не идеально, но намного лучше.
Получив советы от нескольких человек, я обнаружил, что следующие моменты являются ключевыми для понимания и запоминания:
- Несколько запросов могут выполняться параллельно
- Одновременно в полете могут быть только 10 RPC.
- Попробуйте денормализовать до такой степени, чтобы не было вторичных запросов
- Этот тип задач лучше оставить для сопоставления сокращений и очередей задач, а не запросов в реальном времени.
Итак, что я сделал, чтобы сделать это быстрее:
- Я разделил пространство запросов с самого начала по времени. (примечание: чем более равными будут разделы с точки зрения возвращаемых сущностей, тем лучше)
- Я денормализовал данные, чтобы исключить необходимость во вторичном запросе сеанса.
- Я использовал асинхронные операции ndb и wait_any (), чтобы перекрыть запросы с обработкой
Я все еще не получаю ту производительность, которую я ожидал или хотел бы, но на данный момент это работоспособно. Я просто хотел бы, чтобы это был лучший способ быстро вытащить большое количество последовательных объектов в память в обработчиках.