Попаднах на интересен лимит с хранилището за данни на App Engine. Създавам манипулатор, който да ни помогне да анализираме някои данни за използването на един от нашите производствени сървъри. За да извърша анализа, трябва да направя заявка и да обобщя 10 000+ обекта, извлечени от хранилището за данни. Изчислението не е трудно, то е просто хистограма на елементи, които преминават през специфичен филтър на пробите за използване. Проблемът, който срещнах, е, че не мога да върна данните от хранилището за данни достатъчно бързо, за да извърша обработка, преди да достигна крайния срок за заявка.
Опитах всичко, за което мога да се сетя, за да разделя заявката на паралелни RPC извиквания, за да подобря производителността, но според appstats изглежда не мога да накарам заявките да се изпълняват действително паралелно. Без значение какъв метод опитам (вижте по-долу), винаги изглежда, че RPC се връща към водопад от последователни следващи заявки.
Забележка: кодът за заявка и анализ работи, просто работи бавно, защото не мога да получа достатъчно бързо данни от хранилището за данни.
Заден план
Нямам версия на живо, която мога да споделя, но ето основния модел за частта от системата, за която говоря:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
Можете да мислите за примерите като моменти, когато потребител използва възможност на дадено име. (напр.: 'systemA.feature_x'). Етикетите се основават на данни за клиента, системна информация и функция. напр.: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). Така етикетите образуват денормализиран набор от токени, които могат да се използват за намиране на мостри, представляващи интерес.
Анализът, който се опитвам да направя, се състои от вземане на период от време и запитване колко пъти е била използвана функция от набор от функции (може би всички функции) на ден (или на час) за клиентски акаунт (компания, не за потребител).
Така че входът към манипулатора е нещо като:
- Начална дата
- Крайна дата
- Таг(ове)
Резултатът ще бъде:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
Общ код за заявки
Ето някакъв общ код за всички заявки. Общата структура на манипулатора е прост манипулатор за получаване, използващ webapp2, който настройва параметрите на заявката, изпълнява заявката, обработва резултатите, създава данни за връщане.
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
Изпробвани методи
Опитах различни методи, за да се опитам да изтегля данни от хранилището възможно най-бързо и паралелно. Методите, които съм пробвал досега, включват:
A. Единична итерация
Това е по-скоро прост основен случай за сравнение с другите методи. Просто създавам заявката и итерирам всички елементи, позволявайки на ndb да направи това, което прави, за да ги изтегли един след друг.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B. Голямо извличане
Идеята тук беше да видя дали мога да направя едно много голямо извличане.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C. Асинхронно извличане във времеви диапазон
Идеята тук е да разбера, че пробите са доста добре разпределени във времето, така че мога да създам набор от независими заявки, които разделят общия времеви регион на части и да се опитам да изпълня всяка от тях паралелно, използвайки async:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. Асинхронно картографиране
Опитах този метод, защото документацията звучи така, сякаш ndb може да използва някакъв паралелизъм автоматично, когато използва метода Query.map_async.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Резултат
Тествах една примерна заявка, за да събера общо време за отговор и следи на appstats. Резултатите са:
A. Единична итерация
реално: 15.645s
Този преминава последователно през извличане на партиди една след друга и след това извлича всяка сесия от memcache.
B. Голямо извличане
реално: 12.12s
Ефективно същото като опция А, но малко по-бързо по някаква причина.
C. Асинхронно извличане във времеви диапазон
реално: 15.251s
Изглежда, че предоставя повече паралелизъм в началото, но изглежда се забавя от поредица от извиквания към next по време на итерация на резултатите. Също така не изглежда да може да припокрива търсенията в memcache на сесията с чакащите заявки.
D. Асинхронно картографиране
реално: 13.752s
Това ми е най-трудно за разбиране. Изглежда, че има доста голямо припокриване, но всичко изглежда се простира във водопад, вместо успоредно.
Препоръки
Въз основа на всичко това, какво пропускам? Дали просто достигам лимит на App Engine или има по-добър начин да извадя паралелно голям брой обекти?
Не знам какво да опитам по-нататък. Мислех да пренапиша клиента, за да прави няколко заявки към машината за приложения паралелно, но това изглежда доста груба сила. Наистина бих очаквал този двигател на приложението да може да се справи с този случай на употреба, така че предполагам, че има нещо, което пропускам.
Актуализация
В крайна сметка открих, че вариант C е най-добрият за моя случай. Успях да го оптимизирам да завърши за 6,1 секунди. Все още не е перфектно, но много по-добре.
След като получих съвет от няколко души, открих, че следните елементи са ключови за разбиране и запомняне:
- Множество заявки могат да се изпълняват паралелно
- Само 10 RPC могат да летят едновременно
- Опитайте се да денормализирате до такава степен, че да няма вторични заявки
- Този тип задачи е по-добре да се оставят да съпоставят редуцирането и опашките със задачи, а не заявките в реално време
И така, какво направих, за да го направя по-бързо:
- Разделих пространството за заявки от самото начало въз основа на времето. (забележка: колкото по-равни са дяловете по отношение на върнатите обекти, толкова по-добре)
- Денормализирах данните допълнително, за да премахна необходимостта от заявка за вторична сесия
- Използвах ndb async операции и wait_any(), за да припокрия заявките с обработката
Все още не получавам представянето, което бих очаквал или харесвам, но засега е работещо. Просто ми се иска да е по-добър начин за бързо изтегляне на голям брой последователни обекти в паметта в манипулатори.