Как Storm обрабатывает группировку полей при добавлении дополнительных узлов?

Просто прочитал более подробную информацию о storm и наткнулся на его способность группировать поля, например, если вы подсчитываете твиты для каждого пользователя и у вас есть две задачи с группировкой полей идентификатора пользователя, одни и те же идентификаторы пользователей будут отправлены в одни и те же задачи. .

Таким образом, задача 1 может иметь следующие значения в памяти bob: 10 alice: 5

задача 2 может иметь следующие значения в памяти jill:10 joe: 4

Если я добавлю новую машину в кластер для увеличения емкости и запущу перебалансировку, что произойдет с моими счетчиками в памяти? Вы начнете получать пользователей с разным количеством?


person James    schedule 07.12.2013    source источник


Ответы (4)


Используя группировку полей, мы можем направить конкретное поле для перехода к конкретным задачам.

Группировка полей. Поток разделен по полям, указанным в группировке. Например, если поток сгруппирован по полю «user-id», кортежи с одним и тем же «user-id» всегда будут выполнять одну и ту же задачу, а кортежи с разными «user-id» могут выполнять разные задачи. .

эти задачи всегда статичны в жизненном цикле шторма, что вы можете изменить, используя rebalance, это количество исполнителей (потоков). в случае добавления нового узла в кластер позволяет перенастроить количество исполнителей для запуска без отключения топологии, но независимо от того, что количество задач остается прежним. просто добавление нового узла дает вам преимущество в повышении производительности за счет настройки параллелизма storm.

person user2720864    schedule 07.12.2013
comment
попался, поэтому, как только Боб переходит к одной задаче, он всегда будет переходить к одной и той же, пока топология не будет остановлена. Когда я добавляю новые узлы, эта задача просто получает больше потоков для выполнения параллельных операций, поэтому все ваши структуры данных должны быть потокобезопасными в ваших болтах? - person James; 09.12.2013
comment
правда, подробнее об этом читайте здесь - person user2720864; 09.12.2013

Чтобы каждый раз отправлять сообщение одной и той же задаче, storm будет модифицировать хэш-код значения с количеством задач (хэш-код (значения)% #tasks). Если вы увеличите свои задачи, ваши подсчеты будут неточными, поскольку они могут не перейти к той же задаче/работнику после перебалансировки.

https://groups.google.com/forum/#!msg/storm-user/lCKnl8AmSVE/rVCH3uuUAcMJ
person Naresh    schedule 08.12.2013
comment
задачи не могут быть изменены во время выполнения - person user2720864; 12.12.2013

Чтобы полностью понять это, вы должны увидеть код:

Группировка полей зависит от строки поля, а не от того, какой носик ее выдал. Так что ребаланс на это не повлияет. Это функция: https://github.com/apache/storm/blob/3b1ab3d8a7da7ed35adc448d24f1f1ccb6c5ff27/storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java#L157-L161

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
    int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
    return Collections.singletonList(targetTasks.get(targetTaskIndex));
}

TupleUtils.listHashCode приводит к

public static <T> int listHashCode(List<T> alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());
  }
}
person Nav    schedule 01.09.2016

На основе полей одного или нескольких кортежей группировка полей позволяет управлять кортежами, отправляемыми в болты. Это гарантирует, что заданный набор значений для комбинации полей всегда отправляется в один и тот же болт.

person VIJ    schedule 02.09.2017