Замыкания зависали в версии 2.0 при попытке добавить элемент в очередь

У нас есть вариант использования, как показано ниже

1-) Начать 2 запускать экземпляр как узлы данных и вставлять данные в кеш.

2-) Создайте очередь и зарегистрируйте удаленного прослушивателя с помощью remoteListen, как показано ниже.

//Queue creation         
CollectionConfiguration colCfg = new CollectionConfiguration(); 
colCfg.setCacheMode(PARTITIONED); 
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);



//Remote Listener Closure 
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() { 
                        @Override public boolean apply(CacheEvent evt) { 
                                System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']'); 
                Ignite ignite = Ignition.ignite(); 
                IgniteQueue<String> queue = ignite.queue(queueName, 0, null); 
                String key = evt.key(); 
                BinaryObject profile = (BinaryObject) evt.newValue(); 
                System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + 
                        ", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString()); 

                if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end 
                        && ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){ 
                    queue.add(profile.field("number")); 
                } 
                                return false; 
                        } 
                };       

Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr, 
                                EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); 

3-) Сделайте несколько обновлений в экземплярах кеша, как показано ниже, чтобы получить обновления в удаленном прослушивателе.

void updateAnyProfile(Double newUsage){         
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10"); 
    List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll(); 
    Profile profile = res.iterator().next().getValue(); 
    profile.setUsage(newUsage); 
    profileCache.put(profile.getCtn(), profile); 
    profile.setUsage(newUsage+1); 
    profileCache.put(profile.getCtn(), profile); 

} 

4-) Возьмите элементы из очереди.

 public void readFromQueue (String queueName) { 
    // Initialize new FIFO queue. 
    IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null); 
    while (true) { 
        String profile = queue.take(); 
        System.out.println("Profile from queue: " + profile.toString()); 
    } 
} 

Шаги 2,3,4 выполняются из разных экземпляров JVM с TRUE клиентского узла. Проблема в том, что приложение зависает, чтобы выполнить любую операцию после выполнения вышеуказанного сценария. Не могли бы вы помочь нам? Мы были бы очень признательны, если бы вы сказали нам, что мы делаем неправильно?

Ниже приведен дамп потока зависшего узла данных, и тот же узел данных зависает в приведенном ниже коде.

 IgniteQueue<String> queue = ignite.queue(queueName, 0, null);

Иногда вы можете успешно обновить записи, и после следующего обновления он начинает зависать или даже не может выполнить операцию помещения в кеш.

"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000] 
   java.lang.Thread.State: WAITING (parking) 
        at sun.misc.Unsafe.park(Native Method) 
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:952) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:303) 
        at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:297) 
        at org.apache.ignite.internal.GridEventConsumeHandler$2.onEvent(GridEventConsumeHandler.java:170) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806) 
        - locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:127) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:282) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:277) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$000(GridCacheIoManager.java:100) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:253) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.access$2100(GridIoManager.java:114) 
        at org.apache.ignite.internal.managers.communication.GridIoManager$7.run(GridIoManager.java:802) 
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483) 
        at java.lang.Thread.run(Thread.java:748) 

person fatih tekin    schedule 10.05.2017    source источник
comment
Вы вызываете метод readFromQueue в другом потоке?   -  person Evgenii Zhuravlev    schedule 10.05.2017
comment
да хоть от другого jvm   -  person fatih tekin    schedule 10.05.2017
comment
Итак, у вас всего 2 узла? Тогда, что вы имели в виду здесь. Шаги 2,3,4 находятся в разных jvms. ? Есть ли какие-либо ожидающие транзакции, блокировки и т. д. в журналах? Где потоки застряли на других узлах? Не могли бы вы поделиться дампами и логами потоков?   -  person Evgenii Zhuravlev    schedule 10.05.2017
comment
Для данных у меня есть 2 узла. Я запускаю другие шаги в разных jvms. Один из узлов данных застревает, вы можете легко попробовать то же самое поведение самостоятельно.   -  person fatih tekin    schedule 10.05.2017


Ответы (1)


Не разрешается вызывать методы ignite.queue и ignite.affinity в EventListener, так как это может привести к взаимоблокировке.

Все операции кеша, включая EventListeners, выполняются в системном пуле, поэтому не рекомендуется вызывать внутри операций EventListener, которые также используют системный пул.

Вы можете прочитать больше здесь, в разделе «Выполнение замыканий и пулы потоков»: https://apacheignite.readme.io/docs/async-support#section-listeners-and-chaining-futures

А здесь https://apacheignite.readme.io/docs/thread-pools#section-system-pool

person Evgenii Zhuravlev    schedule 11.05.2017
comment
Можете ли вы показать в документации по воспламенению, где упоминается ваше объяснение. - person fatih tekin; 11.05.2017
comment
добавил ссылки на ответ - person Evgenii Zhuravlev; 11.05.2017