Почему карта потока Java уменьшает количество результатов в два раза?

У меня есть этот код:

        ComparisonResults comparisonResults = requestsList
                .stream()
                .map(item -> getResponse(item))
                .map(item -> compareToBl(item))
                .reduce(new ComparisonResults(), (result1, result2) ->
                {
                     result1.addSingleResult(result2);
                 //   return result1;
                    return new ComparisonResults(result1);
                });

и этот код б:

        ComparisonResults comparisonResults = requestsList
                .parallelStream()
                .map(item -> getResponse(item))
                .map(item -> compareToBl(item))
                .reduce(new ComparisonResults(), (result1, result2) ->
                {
                     result1.addSingleResult(result2);
                 //   return result1;
                    return new ComparisonResults(result1);
                });

Все, что я делаю, это создаю объекты ответа, затем преобразовываю их в объекты comaprisonResult и сокращаю их до одного объекта comaprisonResult.

код a показывает член класса int comparisonResults.num_of_sub_responses==5, который является правильным

code b показывает член класса int comparisonResults.num_of_sub_responses==10, который удваивает правильный результат.

java 8 reduce должен быть потокобезопасным, верно?

я что-то пропустил?

getResponse и compareToBl являются потокобезопасными


person Elad Benda2    schedule 27.10.2015    source источник
comment
Покажите код для ComparisonResult.   -  person RealSkeptic    schedule 27.10.2015


Ответы (2)


Вы мутируете входящий объект в reduce. Это не правильно. Не помогает то, что вы создаете новый объект после изменения входящего объекта.

Что вы хотите сделать, это

.collect(ComparisonResults::new, ComparisonResults::addSingleResult,
         (a,b)->/* code to merge two ComparisonResults instances*/);

Если результатом .map(item -> compareToBl(item)) является ComparisonResults или, другими словами, addSingleResult объединяет два экземпляра ComparisonResults, вы можете использовать ComparisonResults::addSingleResult в качестве функции слияния, хотя тогда ее название немного вводит в заблуждение.

Вам следует внимательно прочитать "Сокращение » главы документации и ее продолжения «Изменчивая редукция».

person Holger    schedule 27.10.2015

Операция сокращения reduce(identity, operator) основывается на двух важных предположениях относительно аргументов, которые вы ей передаете.

  1. Первый параметр — это идентификатор. То есть, когда вы используете оператор сокращения между любым элементом и заданным identity, вы получаете исходный элемент.
  2. Оператор ассоциативен.

Эти два допущения очень важны при работе с параллельным потоком, потому что в целом он делает следующее:

  • Дайте каждому потоку часть потока
  • Каждый поток начинается с идентификатора и накапливает результат, используя элементы в потоке:

    result = identity op element1 op element2 op element3 ...
    
  • Затем результаты из разных потоков объединяются вместе с помощью оператора:

    grand result = result1 op result2 op result3
    

Итак, предположим, вы суммируете числа, это разбивает операцию вроде 5 + 4 + 3 + 20 на ( 0 + 5 + 4 ) + ( 0 + 3 + 20 ). Который работает тогда и только тогда, когда вышеуказанные предположения верны. (0 тождество сложения, сложение ассоциативно).

Но изменяя первый операнд внутри вашего оператора, вы фактически изменяете объект identity. Так что это больше не может считаться идентификацией. То есть op(identity,result) не дает вам того же значения, что и result.

Если оператор не ассоциативен, проблемы возникнут на этапе «большой результат».

person RealSkeptic    schedule 27.10.2015