У нас есть вариант использования, как показано ниже
1-) Начать 2 запускать экземпляр как узлы данных и вставлять данные в кеш.
2-) Создайте очередь и зарегистрируйте удаленного прослушивателя с помощью remoteListen, как показано ниже.
//Queue creation
CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCacheMode(PARTITIONED);
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);
//Remote Listener Closure
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
Ignite ignite = Ignition.ignite();
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
String key = evt.key();
BinaryObject profile = (BinaryObject) evt.newValue();
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString());
if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end
&& ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){
queue.add(profile.field("number"));
}
return false;
}
};
Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr,
EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
3-) Сделайте несколько обновлений в экземплярах кеша, как показано ниже, чтобы получить обновления в удаленном прослушивателе.
void updateAnyProfile(Double newUsage){
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10");
List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll();
Profile profile = res.iterator().next().getValue();
profile.setUsage(newUsage);
profileCache.put(profile.getCtn(), profile);
profile.setUsage(newUsage+1);
profileCache.put(profile.getCtn(), profile);
}
4-) Возьмите элементы из очереди.
public void readFromQueue (String queueName) {
// Initialize new FIFO queue.
IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null);
while (true) {
String profile = queue.take();
System.out.println("Profile from queue: " + profile.toString());
}
}
Шаги 2,3,4 выполняются из разных экземпляров JVM с TRUE клиентского узла. Проблема в том, что приложение зависает, чтобы выполнить любую операцию после выполнения вышеуказанного сценария. Не могли бы вы помочь нам? Мы были бы очень признательны, если бы вы сказали нам, что мы делаем неправильно?
Ниже приведен дамп потока зависшего узла данных, и тот же узел данных зависает в приведенном ниже коде.
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
Иногда вы можете успешно обновить записи, и после следующего обновления он начинает зависать или даже не может выполнить операцию помещения в кеш.
"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:952)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor$12.applyx(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560)
at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:303)
at com.ignite.trial.roaming.ProfileService$4.apply(ProfileService.java:297)
at org.apache.ignite.internal.GridEventConsumeHandler$2.onEvent(GridEventConsumeHandler.java:170)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806)
- locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:127)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:282)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$6.apply(GridDhtAtomicCache.java:277)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access$000(GridCacheIoManager.java:100)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:253)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885)
at org.apache.ignite.internal.managers.communication.GridIoManager.access$2100(GridIoManager.java:114)
at org.apache.ignite.internal.managers.communication.GridIoManager$7.run(GridIoManager.java:802)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483)
at java.lang.Thread.run(Thread.java:748)