Най-добра практика за запитване до голям брой ndb обекти от хранилището на данни

Попаднах на интересен лимит с хранилището за данни на App Engine. Създавам манипулатор, който да ни помогне да анализираме някои данни за използването на един от нашите производствени сървъри. За да извърша анализа, трябва да направя заявка и да обобщя 10 000+ обекта, извлечени от хранилището за данни. Изчислението не е трудно, то е просто хистограма на елементи, които преминават през специфичен филтър на пробите за използване. Проблемът, който срещнах, е, че не мога да върна данните от хранилището за данни достатъчно бързо, за да извърша обработка, преди да достигна крайния срок за заявка.

Опитах всичко, за което мога да се сетя, за да разделя заявката на паралелни RPC извиквания, за да подобря производителността, но според appstats изглежда не мога да накарам заявките да се изпълняват действително паралелно. Без значение какъв метод опитам (вижте по-долу), винаги изглежда, че RPC се връща към водопад от последователни следващи заявки.

Забележка: кодът за заявка и анализ работи, просто работи бавно, защото не мога да получа достатъчно бързо данни от хранилището за данни.

Заден план

Нямам версия на живо, която мога да споделя, но ето основния модел за частта от системата, за която говоря:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

Можете да мислите за примерите като моменти, когато потребител използва възможност на дадено име. (напр.: 'systemA.feature_x'). Етикетите се основават на данни за клиента, системна информация и функция. напр.: ['winxp', '2.5.1', ​​'systemA', 'feature_x', 'premium_account']). Така етикетите образуват денормализиран набор от токени, които могат да се използват за намиране на мостри, представляващи интерес.

Анализът, който се опитвам да направя, се състои от вземане на период от време и запитване колко пъти е била използвана функция от набор от функции (може би всички функции) на ден (или на час) за клиентски акаунт (компания, не за потребител).

Така че входът към манипулатора е нещо като:

  • Начална дата
  • Крайна дата
  • Таг(ове)

Резултатът ще бъде:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Общ код за заявки

Ето някакъв общ код за всички заявки. Общата структура на манипулатора е прост манипулатор за получаване, използващ webapp2, който настройва параметрите на заявката, изпълнява заявката, обработва резултатите, създава данни за връщане.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Изпробвани методи

Опитах различни методи, за да се опитам да изтегля данни от хранилището възможно най-бързо и паралелно. Методите, които съм пробвал досега, включват:

A. Единична итерация

Това е по-скоро прост основен случай за сравнение с другите методи. Просто създавам заявката и итерирам всички елементи, позволявайки на ndb да направи това, което прави, за да ги изтегли един след друг.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Голямо извличане

Идеята тук беше да видя дали мога да направя едно много голямо извличане.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Асинхронно извличане във времеви диапазон

Идеята тук е да разбера, че пробите са доста добре разпределени във времето, така че мога да създам набор от независими заявки, които разделят общия времеви регион на части и да се опитам да изпълня всяка от тях паралелно, използвайки async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Асинхронно картографиране

Опитах този метод, защото документацията звучи така, сякаш ndb може да използва някакъв паралелизъм автоматично, когато използва метода Query.map_async.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Резултат

Тествах една примерна заявка, за да събера общо време за отговор и следи на appstats. Резултатите са:

A. Единична итерация

реално: 15.645s

Този преминава последователно през извличане на партиди една след друга и след това извлича всяка сесия от memcache.

Статистика на приложението за метод A

B. Голямо извличане

реално: 12.12s

Ефективно същото като опция А, но малко по-бързо по някаква причина.

Метод B appstats

C. Асинхронно извличане във времеви диапазон

реално: 15.251s

Изглежда, че предоставя повече паралелизъм в началото, но изглежда се забавя от поредица от извиквания към next по време на итерация на резултатите. Също така не изглежда да може да припокрива търсенията в memcache на сесията с чакащите заявки.

Метод C appstats

D. Асинхронно картографиране

реално: 13.752s

Това ми е най-трудно за разбиране. Изглежда, че има доста голямо припокриване, но всичко изглежда се простира във водопад, вместо успоредно.

Метод D appstats

Препоръки

Въз основа на всичко това, какво пропускам? Дали просто достигам лимит на App Engine или има по-добър начин да извадя паралелно голям брой обекти?

Не знам какво да опитам по-нататък. Мислех да пренапиша клиента, за да прави няколко заявки към машината за приложения паралелно, но това изглежда доста груба сила. Наистина бих очаквал този двигател на приложението да може да се справи с този случай на употреба, така че предполагам, че има нещо, което пропускам.

Актуализация

В крайна сметка открих, че вариант C е най-добрият за моя случай. Успях да го оптимизирам да завърши за 6,1 секунди. Все още не е перфектно, но много по-добре.

След като получих съвет от няколко души, открих, че следните елементи са ключови за разбиране и запомняне:

  • Множество заявки могат да се изпълняват паралелно
  • Само 10 RPC могат да летят едновременно
  • Опитайте се да денормализирате до такава степен, че да няма вторични заявки
  • Този тип задачи е по-добре да се оставят да съпоставят редуцирането и опашките със задачи, а не заявките в реално време

И така, какво направих, за да го направя по-бързо:

  • Разделих пространството за заявки от самото начало въз основа на времето. (забележка: колкото по-равни са дяловете по отношение на върнатите обекти, толкова по-добре)
  • Денормализирах данните допълнително, за да премахна необходимостта от заявка за вторична сесия
  • Използвах ndb async операции и wait_any(), за да припокрия заявките с обработката

Все още не получавам представянето, което бих очаквал или харесвам, но засега е работещо. Просто ми се иска да е по-добър начин за бързо изтегляне на голям брой последователни обекти в паметта в манипулатори.


person Allen    schedule 16.07.2012    source източник
comment
Постигнах известен напредък и накарах опция C да работи за малко под 9 секунди. Мисля, че мога да го оптимизирам допълнително. Това, което открих е, че ако разделя първоначалната заявка на 40 части и ако изпратя заявка за всички обекти на сесията едновременно, тогава по-голямата част от RPC времето може да се припокрие. Най-добрият ми опит в момента е да направя RPC общо време от 245 секунди в реално време от 9 секунди. Ще опитам още няколко опции и ще публикувам кое работи най-добре. Междувременно ме уведомете, ако някой има още идеи.   -  person Allen    schedule 17.07.2012
comment
Здравейте, осъзнавам, че този въпрос е стар, но по отношение на D. Async Mapping, вашият метод addCountForPeriod пише ли в хранилището за данни? Ако отговорът е да, тогава смятам, че това може да е причина за каскадирането, поради комбинацията от асинхронни операции на хранилище за данни и синхронни операции на хранилище за данни.   -  person Rob Curtis    schedule 22.02.2013
comment
Благодаря за отличната публикация. Попаднах на това след публикуване с подобен проблем тук: stackoverflow.com/questions/25796142/. Подобно на вас, аз съм разочарован, че не мога да подобря ефективността на асинхронните заявки. Най-малкото бих искал да разбера защо са толкова бавни.   -  person Michael Pedersen    schedule 15.09.2014
comment
Имам същите проблеми с производителността, опитвайки се да намеря по-общо решение тук #26759950   -  person thomasf1    schedule 05.11.2014
comment
Този въпрос трябва да бъде в раздела за общи въпроси и отговори на StackOverflow като пример за правилен въпрос stackoverflow.com/help/how-to- попитайте   -  person András Szepesházi    schedule 05.05.2017


Отговори (4)


Голяма обработка като тази не трябва да се извършва в потребителска заявка, която има времево ограничение от 60 секунди. Вместо това трябва да се направи в контекст, който поддържа дългосрочни заявки. Опашката със задачи поддържа заявки до 10 минути и (вярвам) нормални ограничения на паметта (Екземплярите на F1, по подразбиране, имат 128 MB памет). За още по-високи лимити (без време за изчакване на заявка, 1 GB+ памет) използвайте бекенд.

Ето нещо, което да опитате: настройте URL адрес, който при достъп да задейства задача от опашка със задачи. Той връща уеб страница, която анкетира на всеки ~5 s до друг URL адрес, който отговаря с true/false, ако задачата от опашката със задачи все още е завършена. Опашката със задачи обработва данните, което може да отнеме около 10 секунди секунди, и записва резултата в хранилището за данни или като изчислени данни, или като изобразена уеб страница. След като първоначалната страница установи, че е завършена, потребителят се пренасочва към страницата, която извлича вече изчислените резултати от хранилището на данни.

person mjibson    schedule 17.07.2012
comment
Мислех да използвам и бекенд. Все още се надявам да накарам заявката да работи в нормален срок, но ако това не проработи, ще се върна към използване на бекенд, за да я изпълня, както описвате. Тъй като едно от моите тесни места е зареждането на всички сесийни обекти в локалния кеш, може също да има начин да се увеличи производителността с помощта на бекенд, ако мога да запазя всички сесии в паметта по всяко време. - person Allen; 17.07.2012
comment
Това не отговаря на нищо. Въпросът беше специфичен за това как трябва да работи хранилището за данни, но не работи. Същият проблем се прилага за опашки със задачи и бекендове, когато трябва да извлечете 100 000 или 1 милион обекта. Куче бавно, скъпо хранилище за данни - person ZiglioUK; 22.01.2014
comment
Разгледайте MapReduce andwer от Мартин Берендс по-долу. Бекендовете са отхвърлени. Има хубаво ръководство, описващо процеса на мигриране: cloud.google.com/appengine/ docs/python/modules/converting - person Josep Valls; 19.04.2015

Новата експериментална функция за обработка на данни (API на AppEngine за MapReduce) изглежда много подходяща за решаване на този проблем. Той прави автоматично шардинг, за да изпълни множество паралелни работни процеси.

person Martin Berends    schedule 22.02.2014

Имам подобен проблем и след няколко седмици работа с поддръжката на Google мога да потвърдя, че няма магическо решение поне от декември 2017 г.

tl;dr: Може да се очаква пропускателна способност от 220 обекта/секунда за стандартен SDK, работещ на B1 екземпляр до 900 обекта/секунда за коригиран SDK, работещ на екземпляр B8.

Ограничението е свързано с процесора и промяната на инстанционния тип директно влияе върху производителността. Това се потвърждава от подобни резултати, получени на екземпляри B4 и B4_1G

Най-добрата производителност, която получих за Expando обект с около 30 полета е:

Стандартен GAE SDK

  • B1 инстанция: ~220 обекта/секунда
  • B2 инстанция: ~250 обекта/секунда
  • B4 инстанция: ~560 обекта/секунда
  • B4_1G екземпляр: ~560 обекта/секунда
  • B8 инстанция: ~650 обекта/секунда

Пачиран GAE SDK

  • B1 инстанция: ~420 обекта/секунда
  • B8 инстанция: ~900 обекта/секунда

За стандартен GAE SDK опитах различни подходи, включително многопоточност, но най-добрият се оказа fetch_async с wait_any. Текущата NDB библиотека вече върши страхотна работа с използването на async и фючърси под капака, така че всеки опит да се прокара това използване на нишки само влошава положението.

Намерих два интересни подхода за оптимизиране на това:

Мат Фаус обяснява проблема много добре:

GAE SDK предоставя API за четене и писане на обекти, извлечени от вашите класове, в хранилището за данни. Това ви спестява скучната работа по валидиране на необработени данни, върнати от хранилището за данни, и препакетирането им в лесен за използване обект. По-специално, GAE използва протоколни буфери за предаване на необработени данни от магазина към предната машина, която се нуждае от тях. След това SDK е отговорен за декодирането на този формат и връщането на чист обект към вашия код. Тази помощна програма е страхотна, но понякога върши малко повече работа, отколкото бихте искали. [...] С помощта на нашия инструмент за профилиране открих, че цели 50% от времето, прекарано в извличане на тези обекти, е по време на фазата на декодиране на protobuf-към-python-обект. Това означава, че процесорът на предния сървър е бил тясно място в тези четения на хранилище за данни!

GAE-data-access-web-request

И двата подхода се опитват да намалят времето, прекарано в декодиране на protobuf към Python, като намалят броя на декодираните полета.

Опитах и ​​двата подхода, но успях само с този на Мат. Вътрешните елементи на SDK се промениха, откакто Evan публикува своето решение. Трябваше да променя малко кода, публикуван от Мат тук, но беше доста лесно - ако има интерес мога да публикувам окончателния код.

За обикновен Expando обект с около 30 полета използвах решението на Matt за декодиране само на няколко полета и получих значително подобрение.

В заключение трябва да планирате съответно и не очаквайте да можете да обработите много повече от няколкостотин обекта в GAE заявка в „реално време“.

person FelixEnescu    schedule 07.12.2017

Операциите с големи данни в App Engine се изпълняват най-добре с помощта на някаква операция mapreduce.

Ето видео, описващо процеса, но включващо BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

Не звучи като че ли имате нужда от BigQuery, но вероятно искате да използвате както Map, така и Reduce частите на конвейера.

Основната разлика между това, което правите, и ситуацията с mapreduce е, че стартирате един екземпляр и итерирате през заявките, където при mapreduce ще имате отделен екземпляр, работещ паралелно за всяка заявка. Ще ви трябва операция за намаляване, за да „сумирате“ всички данни и да запишете резултата някъде.

Другият проблем, който имате, е, че трябва да използвате курсори за повторение. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Ако итераторът използва отместване на заявка, това ще бъде неефективно, тъй като отместването издава същата заявка, пропуска редица резултати и ви дава следващия набор, докато курсорът прескача направо към следващия набор.

person dragonx    schedule 16.07.2012
comment
можете ли да покажете прост пример за това как да използвате вашия подход, за да получите паралелни обекти? Мислех, че tasklet ще се погрижи за това, но не изглежда така. - person aschmid00; 16.07.2012
comment
Не използвам курсори, защото никоя от заявките не се рестартира по средата по-късно. Всички те грабват всички обекти веднага без компенсиране. Що се отнася до намаляването на картата, помислих за това, но това не е офлайн анализ, то е предназначено да бъде жива заявка, която вътрешните потребители ще променят динамично, докато изследват данните. Моето разбиране за намаляване на картата е, че не отговаря на този случай на интерактивна употреба в реално време. - person Allen; 16.07.2012
comment
Може да съм направил лошо предположение, мислех си, че datastore_v3. Следващите извиквания в C се дължат на използването на някакъв итератор, базиран на отместване. Според моя опит Mapreduce не е идеален за интерактивна употреба, защото а) не можете да предвидите колко време ще отнеме операцията и б) обикновено трябва да запишете резултатите си в хранилището за данни, вместо да получите лесен резултат, който можете да поставите на шаблон. Става малко грозно от страна на клиента, мисля, че имате нужда от начин за анкета, за да видите дали резултатът е готов. Въпреки това, поради паралелния характер, той има тенденция да бъде по-бърз от сериализирането на заявки. - person dragonx; 16.07.2012
comment
Съгласен съм, че намаляването на картата може да паралелизира. Просто се надявах, че операциите ndb и async също могат да паралелизират достатъчно за моя случай на употреба. Не е необходимо да паралелизирам изчислението, просто извличането на данни. Също така обмислях да използвам urlfetch, за да напиша многостепенен манипулатор, който да генерира заявки към субхандлерите за получаване на данните и след това да ги събира и обработва в родителския манипулатор. Просто изглежда, че трябва да има по-лесен начин. - person Allen; 16.07.2012
comment
Не мисля, че ще можете надеждно да направите това като заявка на живо, особено ако вашият набор от данни (върнатите резултати стават много по-големи). - person Tim Hoffman; 17.07.2012
comment
Съгласен съм, че MapReduce е правилният начин, в моя ограничен опит съм го виждал да се представя много по-добре от собствените ми заявки, но не знам защо. Жалко, че Google не поддържа собствен пакет за MR. Чудя се дали изобщо има работа за подобряване на хранилището за данни и неговите ужасни показатели и разходи, цялата работа изглежда отива за GCE и облачното хранилище - person ZiglioUK; 22.01.2014