[关闭]
@liyuj 2017-12-05T21:06:47.000000Z 字数 7056 阅读 4770
标签:Apache-Ignite-2.3.0-中文开发手册

9.消息和事件

9.1.基于主题的消息

9.1.1.摘要

Ignite分布式消息可以在集群内的所有节点间进行基于主题的通信,带有特定消息主题的消息可以分布到订阅了该主题的所有节点或者节点的子集。
Ignite消息基于发布-订阅范式,发布者和订阅者通过一个通用的主题连接在一起。当一个节点针对主题T发布了一个消息A,他会被分布到所有订阅了主题T的节点。

任意加入集群的新节点会自动地订阅集群内(或者集群组内)其他节点订阅的所有的主题。

9.1.2.IgniteMessaging

Ignite中的分布式消息功能是通过IgniteMessaging接口提供的,可以像下面这样获得一个IgniteMessaging的实例:

  1. Ignite ignite = Ignition.ignite();
  2. // Messaging instance over this cluster.
  3. IgniteMessaging msg = ignite.message();
  4. // Messaging instance over given cluster group (in this case, remote nodes).
  5. IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());

9.1.3.发布消息

send方法可以将一个带有特定消息主题的消息发送/发布到所有的节点,消息可以以有序也可以以无序的方式发送。
有序消息
sendOrdered(...)可以用于希望按照发送消息的顺序接收消息的场合,可以传递一个timeout参数来指定一个消息可以在队列中保持多长时间来等待发送之前的消息。如果达到了超时时间,那么还没有到达该节点上指定主题的所有消息都会被忽略。
无序消息
send(...)方法不保证消息的顺序,这意味着,当顺序地发送消息A和消息B,不能保证目标节点先收到A后收到B。

9.1.4.订阅消息

listen方法可以监听/订阅消息。当这些方法被调用时,带有指定消息主题的监听器就会被注册到所有的(或者集群组)节点来监听新的消息。对于listen方法,可以传入一个返回boolean值的谓词,他会告诉监听器是继续还是停止监听新的消息。
本地监听
localListen(...)方法只在本地节点注册了一个带有指定主题的消息监听器然后监听来自集群内任意节点的消息。
远程监听
remoteListen(...)方法在集群内的所有节点上注册了一个带有指定主题的监听器然后监听来自集群内任意节点的消息。

9.1.5.示例

下面的示例显示了在远程节点间的消息交换:
Java8:有序消息:

  1. Ignite ignite = Ignition.ignite();
  2. IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());
  3. // Add listener for unordered messages on all remote nodes.
  4. rmtMsg.remoteListen("MyOrderedTopic", (nodeId, msg) -> {
  5. System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');
  6. return true; // Return true to continue listening.
  7. });
  8. // Send ordered messages to remote nodes.
  9. for (int i = 0; i < 10; i++)
  10. rmtMsg.sendOrdered("MyOrderedTopic", Integer.toString(i));

Java8:无序消息:

  1. Ignite ignite = Ignition.ignite();
  2. IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forRemotes());
  3. // Add listener for unordered messages on all remote nodes.
  4. rmtMsg.remoteListen("MyUnOrderedTopic", (nodeId, msg) -> {
  5. System.out.println("Received unordered message [msg=" + msg + ", from=" + nodeId + ']');
  6. return true; // Return true to continue listening.
  7. });
  8. // Send unordered messages to remote nodes.
  9. for (int i = 0; i < 10; i++)
  10. rmtMsg.send("MyUnOrderedTopic", Integer.toString(i));

Java7:有序消息:

  1. Ignite ignite = Ignition.ignite();
  2. // Get cluster group of remote nodes.
  3. ClusterGroup rmtPrj = ignite.cluster().forRemotes();
  4. // Get messaging instance over remote nodes.
  5. IgniteMessaging msg = ignite.message(rmtPrj);
  6. // Add message listener for specified topic on all remote nodes.
  7. msg.remoteListen("myOrderedTopic", new IgniteBiPredicate<UUID, String>() {
  8. @Override public boolean apply(UUID nodeId, String msg) {
  9. System.out.println("Received ordered message [msg=" + msg + ", from=" + nodeId + ']');
  10. return true; // Return true to continue listening.
  11. }
  12. });
  13. // Send ordered messages to all remote nodes.
  14. for (int i = 0; i < 10; i++)
  15. msg.sendOrdered("myOrderedTopic", Integer.toString(i), 0);

9.2.本地和远程事件

9.2.1.摘要

Ignite分布式事件功能使得在分布式集群环境下发生各种各样事件时应用可以接收到通知。可以自动获得比如任务执行、发生在本地或者远程节点上的读写或者查询操作的通知。

9.2.2.IgniteEvents

分布式事件功能是通过IgniteEvents接口提供的,可以通过如下方式从Ignite中获得IgniteEvents的实例:

  1. Ignite ignite = Ignition.ignite();
  2. IgniteEvents evts = ignite.events();

9.2.3.订阅事件

listen方法可以用于接收集群内发生的指定事件的通知,这些方法在本地或者远程节点上注册了一个指定事件的监听器,当在该节点上发生该事件时,会通知该监听器。
本地事件
localListen(...)方法只在本地节点上针对指定事件注册事件监听器。
远程事件
remoteListen(...)方法会在集群或者集群组内的所有节点上针对指定事件注册监听器。
下面是每个方法的示例:
Java8:本地监听:

  1. Ignite ignite = Ignition.ignite();
  2. // Local listener that listenes to local events.
  3. IgnitePredicate<CacheEvent> locLsnr = evt -> {
  4. System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
  5. ", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());
  6. return true; // Continue listening.
  7. };
  8. // Subscribe to specified cache events occuring on local node.
  9. ignite.events().localListen(locLsnr,
  10. EventType.EVT_CACHE_OBJECT_PUT,
  11. EventType.EVT_CACHE_OBJECT_READ,
  12. EventType.EVT_CACHE_OBJECT_REMOVED);
  13. // Get an instance of named cache.
  14. final IgniteCache<Integer, String> cache = ignite.cache("cacheName");
  15. // Generate cache events.
  16. for (int i = 0; i < 20; i++)
  17. cache.put(i, Integer.toString(i));

Java8:远程监听:

  1. Ignite ignite = Ignition.ignite();
  2. // Get an instance of named cache.
  3. final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
  4. // Sample remote filter which only accepts events for keys
  5. // that are greater than or equal to 10.
  6. IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
  7. @Override public boolean apply(CacheEvent evt) {
  8. System.out.println("Cache event: " + evt);
  9. int key = evt.key();
  10. return key >= 10;
  11. }
  12. };
  13. // Subscribe to specified cache events occuring on
  14. // all nodes that have the specified cache running.
  15. ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_READ, EVT_CACHE_OBJECT_REMOVED);
  16. // Generate cache events.
  17. for (int i = 0; i < 20; i++)
  18. cache.put(i, Integer.toString(i));

Java7:监听:

  1. Ignite ignite = Ignition.ignite();
  2. // Get an instance of named cache.
  3. final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
  4. // Sample remote filter which only accepts events for keys
  5. // that are greater than or equal to 10.
  6. IgnitePredicate<CacheEvent> rmtLsnr = evt -> evt.<Integer>key() >= 10;
  7. // Subscribe to specified cache events on all nodes that have cache running.
  8. ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EventType.EVT_CACHE_OBJECT_PUT,
  9. EventType.EVT_CACHE_OBJECT_READ,
  10. EventType.EVT_CACHE_OBJECT_REMOVED);
  11. // Generate cache events.
  12. for (int i = 0; i < 20; i++)
  13. cache.put(i, Integer.toString(i));

在上述示例中,EVT_CACHE_OBJECT_PUT,EVT_CACHE_OBJECT_READ,EVT_CACHE_OBJECT_REMOVED是在EventType接口中预定义的事件类型常量。

EventType接口定义了监听方法可用的各种事件类型常量,可以在相关的javadoc中看到这些事件类型的完整列表。

作为参数传入localListen(...)remoteListen(...)方法的事件类型还必须在IgniteConfiguration中进行配置,可以参照下面的11.2.5.配置章节。

9.2.4.事件的查询

系统生成的所有事件都会保持在本地节点的本地,IgniteEventsAPI提供了查询这些事件的方法。
本地事件
localQuery(...)方法通过传入的谓词过滤器在本地节点上进行事件的查询。如果满足了所有的条件,就会返回一个本地节点发生的所有事件的集合。
远程事件
remoteQuery(...)方法通过传入的谓词过滤器在远程节点上进行事件的异步查询。这个操作是分布式的,因此可能在通信层发生故障而且通常也会比本地事件通知花费更多的时间,注意这个方法是非阻塞的,然后附带future立即返回。

9.2.5.配置

要获得集群内发生的任意任务或者缓存事件的通知,IgniteConfigurationincludeEventTypes属性必须启用:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <!-- Enable cache events. -->
  4. <property name="includeEventTypes">
  5. <util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
  6. </property>
  7. ...
  8. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. // Enable cache events.
  3. cfg.setIncludeEventTypes(EVTS_CACHE);
  4. // Start Ignite node.
  5. Ignition.start(cfg);

默认的话,因为性能原因事件通知是关闭的。

因为每秒生成上千的事件,他会在系统中产生额外的负载,这会导致显著的性能下降。因此强烈建议只有在应用逻辑必需时才启用这些事件。

9.3.自动化批处理

Ignite会自动地对集群内发生的,作为缓存事件的结果生成的事件通知进行分组或者分批处理。
缓存内的每个事件都会导致一个事件通知被生成以及发送,对于缓存活动频繁的系统,获取每个事件的通知都将是网络密集的,可能导致集群内缓存操作的性能下降。
Ignite中,事件通知可以被分组然后分批地或者定时地发送,下面是一个如何实现这一点的示例:

  1. Ignite ignite = Ignition.ignite();
  2. // Get an instance of named cache.
  3. final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
  4. // Sample remote filter which only accepts events for keys
  5. // that are greater than or equal to 10.
  6. IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
  7. @Override public boolean apply(CacheEvent evt) {
  8. System.out.println("Cache event: " + evt);
  9. int key = evt.key();
  10. return key >= 10;
  11. }
  12. };
  13. // Subscribe to cache events occuring on all nodes
  14. // that have the specified cache running.
  15. // Send notifications in batches of 10.
  16. ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(
  17. 10 /*batch size*/, 0 /*time intervals*/, false, null, rmtLsnr, EVTS_CACHE);
  18. // Generate cache events.
  19. for (int i = 0; i < 20; i++)
  20. cache.put(i, Integer.toString(i));
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注