Лучшая практика для запроса большого количества сущностей ndb из хранилища данных

Я столкнулся с интересным ограничением в хранилище данных App Engine. Я создаю обработчик, чтобы помочь нам проанализировать некоторые данные об использовании на одном из наших производственных серверов. Для выполнения анализа мне нужно запросить и суммировать более 10 000 объектов, извлеченных из хранилища данных. Расчет несложный, это просто гистограмма элементов, прошедших определенный фильтр образцов использования. Проблема, с которой я столкнулся, заключается в том, что я не могу получить данные из хранилища данных достаточно быстро, чтобы выполнить какую-либо обработку до истечения крайнего срока запроса.

Я пробовал все, что мог придумать, чтобы разбить запрос на параллельные вызовы RPC для повышения производительности, но, согласно статистике приложений, я не могу заставить запросы фактически выполняться параллельно. Независимо от того, какой метод я пробую (см. Ниже), всегда кажется, что RPC возвращается к водопаду последовательных следующих запросов.

Примечание: код запроса и анализа действительно работает, он просто работает медленно, потому что я не могу достаточно быстро получить данные из хранилища данных.

Фон

У меня нет живой версии, которой я мог бы поделиться, но вот базовая модель той части системы, о которой я говорю:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

Вы можете думать о примерах как о случаях, когда пользователь использует возможность данного имени. (например, 'systemA.feature_x'). Теги основаны на деталях клиента, системной информации и функции. пример: ['winxp', '2.5.1', ​​'systemA', 'feature_x', 'premium_account']). Таким образом, теги образуют денормализованный набор токенов, которые можно использовать для поиска интересующих образцов.

Анализ, который я пытаюсь сделать, состоит в том, чтобы взять диапазон дат и спросить, сколько раз функция набора функций (возможно, всех функций) использовалась в день (или в час) для каждой учетной записи клиента (компании, а не для каждого пользователя).

Таким образом, ввод в обработчик будет примерно таким:

  • Дата начала
  • Дата окончания
  • Тег (и)

Результат будет:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Общий код для запросов

Вот общий код для всех запросов. Общая структура обработчика представляет собой простой обработчик получения с использованием webapp2, который устанавливает параметры запроса, выполняет запрос, обрабатывает результаты, создает данные для возврата.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Методы испробованные

Я пробовал различные методы, чтобы попытаться извлечь данные из хранилища данных как можно быстрее и параллельно. Методы, которые я пробовал до сих пор, включают:

A. Одиночная итерация

Это более простой базовый вариант для сравнения с другими методами. Я просто создаю запрос и перебираю все элементы, позволяя ndb вытягивать их один за другим.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

Б. Большой выбор

Идея заключалась в том, чтобы посмотреть, смогу ли я выполнить одну очень большую выборку.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Асинхронная выборка во временном диапазоне

Идея здесь состоит в том, чтобы признать, что образцы довольно хорошо разнесены по времени, поэтому я могу создать набор независимых запросов, которые разбивают общий временной интервал на фрагменты и пытаются запустить каждый из них параллельно с использованием async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Асинхронное отображение

Я попробовал этот метод, потому что в документации говорилось, что ndb может автоматически использовать некоторый параллелизм при использовании метода Query.map_async.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Исход

Я протестировал один пример запроса для сбора общего времени ответа и трассировки статистики приложений. Результаты следующие:

A. Одиночная итерация

реальный: 15,645 с

Этот идет последовательно через выборку пакетов один за другим, а затем извлекает каждый сеанс из кэша памяти.

Метод A appstats

Б. Большой выбор

реальный: 12,12 с

Фактически то же самое, что и вариант A, но по какой-то причине немного быстрее.

Метод B appstats

C. Асинхронная выборка во временном диапазоне

реальный: 15,251 с

Кажется, что вначале обеспечивает больший параллелизм, но, кажется, замедляется из-за последовательности вызовов next во время итерации результатов. Также, похоже, не может перекрывать поиск в кэше памяти сеанса с ожидающими запросами.

Метод C appstats

D. Асинхронное отображение

реальный: 13,752 с

Мне это труднее всего понять. Похоже, что у него много перекрытий, но кажется, что все растягивается водопадом, а не параллельно.

Метод D appstats

Рекомендации

Исходя из всего этого, что мне не хватает? Я просто исчерпал лимит App Engine или есть лучший способ параллельного удаления большого количества объектов?

Я не знаю, что попробовать дальше. Я думал о том, чтобы переписать клиента, чтобы параллельно выполнять несколько запросов к движку приложения, но это кажется довольно грубой силой. Я действительно ожидал, что движок приложения сможет справиться с этим вариантом использования, поэтому я предполагаю, что мне что-то не хватает.

Обновлять

В конце концов, я обнаружил, что вариант C лучше всего подходит для моего случая. Я смог оптимизировать его до завершения за 6,1 секунды. Все еще не идеально, но намного лучше.

Получив советы от нескольких человек, я обнаружил, что следующие моменты являются ключевыми для понимания и запоминания:

  • Несколько запросов могут выполняться параллельно
  • Одновременно в полете могут быть только 10 RPC.
  • Попробуйте денормализовать до такой степени, чтобы не было вторичных запросов
  • Этот тип задач лучше оставить для сопоставления сокращений и очередей задач, а не запросов в реальном времени.

Итак, что я сделал, чтобы сделать это быстрее:

  • Я разделил пространство запросов с самого начала по времени. (примечание: чем более равными будут разделы с точки зрения возвращаемых сущностей, тем лучше)
  • Я денормализовал данные, чтобы исключить необходимость во вторичном запросе сеанса.
  • Я использовал асинхронные операции ndb и wait_any (), чтобы перекрыть запросы с обработкой

Я все еще не получаю ту производительность, которую я ожидал или хотел бы, но на данный момент это работоспособно. Я просто хотел бы, чтобы это был лучший способ быстро вытащить большое количество последовательных объектов в память в обработчиках.


person Allen    schedule 16.07.2012    source источник
comment
Я добился некоторого прогресса, и вариант C работал чуть менее чем за 9 секунд. Я думаю, что смогу оптимизировать его дальше. Я обнаружил, что если я разбиваю исходный запрос на 40 частей и отправляю запрос для всех сущностей сеанса одновременно, то большая часть времени RPC может перекрываться. В настоящее время я стараюсь выполнить RPC за 245 секунд в реальном времени за 9 секунд. Я попробую еще несколько вариантов и напишу о том, что работает лучше всего. А пока дайте мне знать, есть ли у кого-нибудь еще идеи.   -  person Allen    schedule 17.07.2012
comment
Привет, я понимаю, что это старый вопрос, но относительно D. Async Mapping, записывает ли ваш метод addCountForPeriod в хранилище данных? Если да, то я думаю, что это может быть причиной каскадирования из-за сочетания асинхронных операций с хранилищем данных и синхронных операций с хранилищем данных.   -  person Rob Curtis    schedule 22.02.2013
comment
Спасибо за отличный пост. Я столкнулся с этим после публикации с аналогичной проблемой здесь: stackoverflow.com/questions/25796142/. Как и вы, я разочарован тем, что не могу улучшить производительность асинхронных запросов. Хотелось бы хотя бы понять, почему они такие медленные.   -  person Michael Pedersen    schedule 15.09.2014
comment
У меня те же проблемы с производительностью, я пытаюсь найти более общее решение здесь # 26759950   -  person thomasf1    schedule 05.11.2014
comment
Этот вопрос должен быть в общем разделе вопросов и ответов StackOverflow как пример правильного вопроса stackoverflow.com/help/how-to- спросить   -  person András Szepesházi    schedule 05.05.2017


Ответы (4)


Подобная большая обработка не должна выполняться в пользовательском запросе, который имеет ограничение по времени в 60 секунд. Вместо этого это следует делать в контексте, поддерживающем длительные запросы. очередь задач поддерживает запросы до 10 минут и (я считаю) нормальные ограничения памяти (Экземпляры F1 по умолчанию имеют 128 МБ памяти). Для еще более высоких ограничений (без тайм-аута запроса, более 1 ГБ памяти) используйте серверные ВМ.

Вот что можно попробовать: настроить URL-адрес, при доступе к которому запускается задача очереди задач. Он возвращает веб-страницу, которая каждые ~ 5 секунд опрашивает другой URL-адрес, который отвечает истиной / ложью, если задача очереди задач еще не завершена. Очередь задач обрабатывает данные, что может занять около 10 секунд, и сохраняет результат в хранилище данных либо в виде вычисленных данных, либо в виде визуализированной веб-страницы. Как только начальная страница обнаруживает, что она завершена, пользователь перенаправляется на страницу, которая извлекает теперь вычисленные результаты из хранилища данных.

person mjibson    schedule 17.07.2012
comment
Я тоже думал об использовании серверной части. Я все еще надеюсь, что запрос будет работать в нормальный срок, но если это не сработает, я вернусь к использованию серверной части для его выполнения, как вы описываете. Поскольку одним из моих узких мест является загрузка всех объектов сеанса в локальный кеш, также может быть способ повысить производительность с помощью бэкэндов, если я могу постоянно держать все сеансы в памяти. - person Allen; 17.07.2012
comment
Это ничего не отвечает. Вопрос был конкретно о том, как должно работать хранилище данных, а это не так. Та же проблема применима к очередям задач и бэкэндам, когда нужно получить 100 000 или 1 млн объектов. Собака медленный, дорогой хранилище данных - person ZiglioUK; 22.01.2014
comment
Взгляните на MapReduce andwer от Мартина Берендса ниже. Серверные ВМ устарели. Есть хорошее руководство, описывающее процесс миграции: cloud.google.com/appengine/ документы / python / modules / преобразование - person Josep Valls; 19.04.2015

Новая экспериментальная функция Обработка данных (API AppEngine для MapReduce) выглядит очень подходящей. для решения этой проблемы. Он выполняет автоматическое сегментирование для выполнения нескольких параллельных рабочих процессов.

person Martin Berends    schedule 22.02.2014

У меня похожая проблема, и после нескольких недель работы со службой поддержки Google я могу подтвердить, что волшебного решения не существует, по крайней мере, по состоянию на декабрь 2017 года.

tl; dr: можно ожидать пропускную способность от 220 объектов в секунду для стандартного SDK, работающего на экземпляре B1, до 900 объектов в секунду для исправленного SDK, работающий на экземпляре B8.

Ограничение связано с процессором, и изменение типа экземпляра напрямую влияет на производительность. Это подтверждается аналогичными результатами, полученными на экземплярах B4 и B4_1G.

Лучшая пропускная способность, которую я получил для объекта Expando с примерно 30 полями, составляет:

Стандартный пакет SDK для GAE

  • Экземпляр B1: ~ 220 объектов в секунду
  • Экземпляр B2: ~ 250 объектов в секунду
  • Экземпляр B4: ~ 560 объектов в секунду
  • Экземпляр B4_1G: ~ 560 объектов в секунду
  • Экземпляр B8: ~ 650 объектов в секунду

Исправленный GAE SDK

  • Экземпляр B1: ~ 420 объектов в секунду
  • Экземпляр B8: ~ 900 объектов в секунду

Для стандартного GAE SDK я пробовал различные подходы, включая многопоточность, но лучшим оказался _ 1_ с _2 _. Текущая библиотека NDB уже отлично справляется с использованием async и фьючерсов под капотом, поэтому любая попытка продвинуть это с помощью потоков только ухудшит ситуацию.

Я нашел два интересных подхода к оптимизации:

Мэтт Фаус очень хорошо объясняет проблему:

GAE SDK предоставляет API для чтения и записи объектов, производных от ваших классов, в хранилище данных. Это избавит вас от утомительной работы по проверке необработанных данных, возвращаемых из хранилища данных, и их переупаковке в простой в использовании объект. В частности, GAE использует буферы протокола для передачи необработанных данных из хранилища на интерфейсную машину, которая в них нуждается. Затем SDK отвечает за декодирование этого формата и возвращение чистого объекта в ваш код. Это отличная утилита, но иногда она делает немного больше, чем вам хотелось бы. [...] Используя наш инструмент профилирования, я обнаружил, что 50% времени, потраченного на выборку этих сущностей, приходилось на фазу декодирования protobuf-to-python-object. Это означает, что центральный процессор внешнего сервера был узким местом при чтении из хранилища данных!

GAE-data-access-web-request

Оба подхода пытаются сократить время, затрачиваемое на декодирование protobuf для Python, за счет уменьшения количества декодируемых полей.

Я пробовал оба подхода, но добился успеха только с Мэттом. Внутреннее устройство SDK изменилось с тех пор, как Эван опубликовал свое решение. Мне пришлось немного изменить код, опубликованный Мэттом здесь, но это было довольно просто - если есть интерес я могу опубликовать окончательный код.

Для обычного объекта Expando с примерно 30 полями я использовал решение Мэтта для декодирования только пары полей и получил значительное улучшение.

В заключение, нужно соответствующим образом планировать и не ожидать, что можно будет обрабатывать намного больше, чем несколько сотен объектов в запросе GAE «в реальном времени».

person FelixEnescu    schedule 07.12.2017

Операции с большими данными в App Engine лучше всего реализовать с помощью какой-либо операции mapreduce.

Вот видео, описывающее процесс, но с использованием BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

Не похоже, что вам нужен BigQuery, но вы, вероятно, захотите использовать части конвейера как Map, так и Reduce.

Основное различие между тем, что вы делаете, и ситуацией mapreduce заключается в том, что вы запускаете один экземпляр и повторяете запросы, тогда как на mapreduce у вас будет отдельный экземпляр, выполняющийся параллельно для каждого запроса. Вам понадобится операция сокращения, чтобы «суммировать» все данные и записать результат куда-нибудь.

Другая проблема заключается в том, что вы должны использовать курсоры для итерации. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Если итератор использует смещение запроса, это будет неэффективно, поскольку смещение выдает тот же запрос, пропускает ряд результатов и дает вам следующий набор, в то время как курсор переходит прямо к следующему набору.

person dragonx    schedule 16.07.2012
comment
не могли бы вы показать простой пример того, как использовать ваш подход для параллельного получения сущностей? Я думал, что тасклет позаботится об этом, но похоже, что это не так. - person aschmid00; 16.07.2012
comment
Я не использую курсоры, потому что ни один из запросов не перезапускается в середине позже. Все они сразу захватывают все объекты без смещения. Что касается уменьшения карты, я подумал об этом, но это не автономный анализ, это должен быть живой запрос, который внутренние пользователи будут динамически изменять по мере изучения данных. Насколько я понимаю, map reduce не подходит для этого интерактивного сценария использования в реальном времени. - person Allen; 16.07.2012
comment
Я мог сделать неверное предположение, я думал, что вызовы datastore_v3.Next в C были вызваны использованием какого-то итератора на основе смещения. По моему опыту, Mapreduce не идеален для интерактивного использования, потому что а) вы не можете предсказать, сколько времени займет операция, и б) вам обычно приходится записывать свои результаты в хранилище данных, а не получать простой результат, который можно поместить по шаблону. На стороне клиента это становится немного уродливым, я думаю, вам нужен способ опроса, чтобы увидеть, готов ли результат. Однако из-за параллельной природы он, как правило, быстрее, чем сериализация запросов. - person dragonx; 16.07.2012
comment
Согласились, что map reduce может распараллеливать. Я просто надеялся, что операции ndb и async также могут быть достаточно распараллелены для моего варианта использования. Мне не нужно распараллеливать вычисления, просто получение данных. Я также рассматривал возможность использования urlfetch для написания многоуровневого обработчика, который будет порождать запросы к субобработчикам для получения данных, а затем собирать и обрабатывать их в родительском обработчике. Просто кажется, что должен быть более простой способ. - person Allen; 16.07.2012
comment
Я не думаю, что вы сможете надежно сделать это как живой запрос, особенно если ваш набор данных (возвращаемые результаты становятся намного больше). - person Tim Hoffman; 17.07.2012
comment
Согласитесь, что MapReduce - это правильный путь, по моему ограниченному опыту я видел, что он работает намного лучше, чем мои собственные запросы, не знаю почему. Жаль, что Google не поддерживает собственный пакет MR. Интересно, есть ли вообще работа по улучшению хранилища данных, его ужасной производительности и затрат, вся работа, похоже, идет на GCE и облачное хранилище - person ZiglioUK; 22.01.2014