[关闭]
@liyuj 2016-09-03T14:01:11.000000Z 字数 24202 阅读 7015

Apache-Ignite-1.7.0-中文开发手册

5.流计算和CEP

5.1.流计算和CEP

Ignite流计算可以以可扩展以及容错的方式处理持续不断的数据流。在一个中等规模的集群中,数据注入Ignite的速度可以很高,甚至轻易地达到每秒处理百万级的事件。
工作方式

  1. 客户端节点通过Ignite数据流处理器向Ignite缓存中注入有限的或者持续的数据流;
  2. 数据在Ignite数据节点间自动分区,每个节点持有均等的数据量;
  3. 数据流可以在Ignite数据节点上以并置的方式直接并行处理;
  4. 客户端也可以在数据流上执行并发的SQL查询。


数据流处理器
数据流处理器是通过IgniteDataStreamerAPI定义的,他可以往Ignite数据流缓存中注入大量的持续不断的数据流,数据流处理器对于所有流入Ignite的数据以可扩展和容错的方式提供了至少一次保证
滑动窗口
Ignite流计算功能也可以在数据的滑动窗口中进行查询。因为数据流是持续不断的,所以很少希望查询最初的所有数据集,反而更感兴趣于问像“最近两小时最受欢迎的十个产品是什么?”或者“过去一天特定种类产品的平均价格是多少?”这样的问题。要获得这些数据,需要能够在滑动数据窗口中进行查询。

滑动窗口可以配置Ignite缓存的退出策略,可以基于时间、基于大小或者基于批次。可以一个缓存配置一个滑动窗口,然而如果希望同样的数据有不同的滑动窗口也可以容易地定义多个缓存。
查询数据
可以和Ignite的SQL、TEXT以及基于谓词的缓存查询一起使用Ignite数据索引能力的全部功能来在数据流中进行查询。
单词计数示例
在该示例中会将文本注入Ignite然后对每个独立的单词进行计数,还会周期性的在流中进行SQL查询来查询10个最频繁的单词。
JMS数据流处理器
Ignite提供了一个JMS数据流处理器来从JMS代理中消费消息,将他们转换为缓存元组然后将他们注入缓存。
Flume数据流处理器
IgniteSink是一个Flume sink,他会从一个相关联的Flume管道中提取事件然后注入一个Ignite缓存,当前支持Flume的1.6.0版本。

5.2.数据流处理器

5.2.1.摘要

数据流处理器是通过IgniteDataStreamerAPI定义的,用于将大量的持续数据流注入Ignite缓存。数据流处理器以可扩展以及容错的方式,为将所有的数据流注入Ignite提供了至少一次保证

5.2.2.IgniteDataStreamer

快速地将大量的数据流注入Ignite的主要抽象是IgniteDataStreamer,在内部他会适当地将数据整合成批次然后将这些批次与缓存这些数据的节点并置在一起。
高速加载是通过如下技术获得的:

要将数据加入数据流处理器,调用IgniteDataStreamer.addData(...)方法即可。

  1. // Get the data streamer reference and stream data.
  2. try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myStreamCache")) {
  3. // Stream entries.
  4. for (int i = 0; i < 100000; i++)
  5. stmr.addData(i, Integer.toString(i));
  6. }

允许覆写
默认的话,数据流处理器不会覆写已有的数据,这意味着如果遇到一个缓存内已有的条目,他会忽略这个条目。这是一个最有效的以及高性能的模式,因为数据流处理器不需要在后台考虑数据的版本。
如果预想到数据可能在数据流缓存中可能存在以及希望覆写它,设置IgniteDataStreamer.allowOverwrite(true)即可。

5.2.3.StreamReceiver

对于希望执行一些自定义的逻辑而不仅仅是添加新数据的情况,可以利用一下StreamReceiverAPI。
流接收器可以以并置的方式直接在缓存该数据条目的节点上对数据流做出反应,可以在数据进入缓存之前修改数据或者在数据上添加任何的预处理逻辑。

注意StreamReceiver不会自动地将数据加入缓存,需要显式地调用任意的cache.put(...)方法。

5.2.4.StreamTransformer

StreamTransformer是一个StreamReceiver的简单实现,他会基于之前的值来修改数据流缓存中的数据。更新是并置的,即他会明确地在数据缓存的集群节点上发生。
在下面的例子中,通过StreamTransformer在文本流中为每个发现的确切的单词增加一个计数。

Java8:

  1. CacheConfiguration cfg = new CacheConfiguration("wordCountCache");
  2. IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(cfg);
  3. try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  4. // Allow data updates.
  5. stmr.allowOverwrite(true);
  6. // Configure data transformation to count instances of the same word.
  7. stmr.receiver(StreamTransformer.from((e, arg) -> {
  8. // Get current count.
  9. Long val = e.getValue();
  10. // Increment count by 1.
  11. e.setValue(val == null ? 1L : val + 1);
  12. return null;
  13. }));
  14. // Stream words into the streamer cache.
  15. for (String word : text)
  16. stmr.addData(word, 1L);
  17. }

Java7:

  1. CacheConfiguration cfg = new CacheConfiguration("wordCountCache");
  2. IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg);
  3. try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  4. // Allow data updates.
  5. stmr.allowOverwrite(true);
  6. // Configure data transformation to count instances of the same word.
  7. stmr.receiver(new StreamTransformer<String, Long>() {
  8. @Override public Object process(MutableEntry<String, Long> e, Object... args) {
  9. // Get current count.
  10. Long val = e.getValue();
  11. // Increment count by 1.
  12. e.setValue(val == null ? 1L : val + 1);
  13. return null;
  14. }
  15. });
  16. // Stream words into the streamer cache.
  17. for (String word : text)
  18. stmr.addData(word, 1L);

5.2.5.StreamVisitor

StreamVisitor也是StreamReceiver的一个方便实现,他会访问流中的每个键值组。注意,访问器不会更新缓存。如果键值组需要存储在缓存内,那么需要显式地调用任意的cache.put(...)方法。
在下面的示例中,有两个缓存:marketDatainstruments,收到market数据的瞬间就会将他们放入marketData缓存的流处理器,映射到特定market数据的集群节点上的marketData的流处理器的StreamVisitor就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument缓存。
注意,根本不会更新marketData缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。

  1. CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
  2. CacheConfiguration<String, Double> instCfg = new CacheConfiguration<>("instruments");
  3. // Cache for market data ticks streamed into the system.
  4. IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);
  5. // Cache for financial instruments.
  6. IgniteCache<String, Double> insts = ignite.getOrCreateCache(instCfg);
  7. try (IgniteDataStreamer<String, Integer> mktStmr = ignite.dataStreamer("marketData")) {
  8. // Note that we do not populate 'marketData' cache (it remains empty).
  9. // Instead we update the 'instruments' cache based on the latest market price.
  10. mktStmr.receiver(StreamVisitor.from((cache, e) -> {
  11. String symbol = e.getKey();
  12. Double tick = e.getValue();
  13. Instrument inst = instCache.get(symbol);
  14. if (inst == null)
  15. inst = new Instrument(symbol);
  16. // Update instrument price based on the latest market tick.
  17. inst.setHigh(Math.max(inst.getLatest(), tick);
  18. inst.setLow(Math.min(inst.getLatest(), tick);
  19. inst.setLatest(tick);
  20. // Update instrument cache.
  21. instCache.put(symbol, inst);
  22. }));
  23. // Stream market data into Ignite.
  24. for (Map.Entry<String, Double> tick : marketData)
  25. mktStmr.addData(tick);
  26. }

5.3.滑动窗口

5.3.1.摘要

滑动窗口是配置作为Ignite缓存的退出策略的,可以基于时间、基于大小或者基于批量。可以一个缓存配置一个滑动窗口,如果需要不同的滑动窗口,也可以容易地为同样的数据定义多于一个缓存。

5.3.2.基于时间的滑动窗口

基于时间的滑动窗口可以用JCache标准的ExpiryPolicy进行配置,可以收到基于创建时间最后访问时间更新时间的流化过期事件。
下面是在Ignite中如何配置基于创建时间的5秒钟滑动窗口:

  1. CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
  2. // Sliding window of 5 seconds based on creation time.
  3. cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
  4. new CreatedExpiryPolicy(new Duration(SECONDS, 5))));

5.3.3.FIFO的滑动窗口

FIFO(先进先出)滑动窗口可以用FifoEvictionPolicy进行配置,这个策略是基于大小的,数据流会被插入窗口直到缓存的大小达到上限,那么最老的数据就会被自动踢出。
下面是如何配置一个FIFO的滑动窗口,他持有了100万的流化数据:

  1. CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
  2. // FIFO window holding 1,000,000 entries.
  3. cfg.setEvictionPolicyFactory(new FifoEvictionPolicy(1_000_000));

5.3.4.LRU滑动窗口

LRU(最近最少使用)滑动窗口可以用LruEvictionPolicy进行配置,这个策略是基于大小的,数据流会被插入窗口直到缓存的大小达到上限,那么最近最少使用的数据就会被自动踢出。
下面是如何配置一个LRU的滑动窗口,他持有了100万的流化数据:

  1. CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
  2. // LRU window holding 1,000,000 entries.
  3. cfg.setEvictionPolicyFactory(new LruEvictionPolicy(1_000_000));

5.3.5.查询滑动窗口

滑动窗口可以和任何其他Ignite缓存以同样的方式进行查询,可以使用基于谓词的,基于SQL的和基于文本的查询。
下面的代码是一个在金融工具流进入缓存后的滑动窗口中执行SQL查询的例子。
首先要在要查询的字段上创建索引:

  1. CacheConfiguration<String, Instrument> cfg = new CacheConfiguration<>("instCache");
  2. // Index some fields for querying portfolio positions.
  3. cfg.setIndexedTypes(String.class, Instrument.class);
  4. // Get a handle on the cache (create it if necessary).
  5. IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(cfg);

然后要查询三个性能最好的金融工具,可以通过(latest - open) 价格进行排序然后选择靠前的三个:

  1. // Select top 3 best performing instruments.
  2. SqlFieldsQuery top3qry = new SqlFieldsQuery(
  3. "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3");
  4. // List of rows. Every row is represented as a List as well.
  5. List<List<?>> top3 = instCache.query(top3qry).getAll();

要查询所有金融工具的总利润,可以通过将所有的 (latest - open)值相加得到:

  1. // Select total profit across all financial instruments.
  2. SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument");
  3. List<List<?>> profit = instCache.query(profitQry).getAll();
  4. System.out.printf("Total profit: %.2f%n", row.get(0));

5.4.单词计数示例

5.4.1.摘要

在这个例子中希望将文本流注入Ignite然后统计每个单独单词的总数,还会定期发布通过SQL在流中查询得到的十个最流行的单词。
示例会按照如下步骤进行工作:

  1. 设置缓存来持有单词和计数;
  2. 设置一个5秒的滑动窗口来保持最近5秒的单词计数;
  3. StreamWords程序会将文本流数据注入Ignite;
  4. QueryWords程序会从流中查询最高的十个单词;

5.4.2.缓存配置

定义了一个CacheConfig类,他提供了两个程序(StreamWordsQueryWords)都会用到的配置信息,这个缓存会使用单词作为键,单词的计数作为值。
注意这个示例中在缓存中用了一个5秒的滑动窗口,这意味着单词从他第一次进入缓存开始5秒后就会从缓存中消失。

  1. public class CacheConfig {
  2. public static CacheConfiguration<String, Long> wordCache() {
  3. CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("words");
  4. // Index the words and their counts,
  5. // so we can use them for fast SQL querying.
  6. cfg.setIndexedTypes(String.class, Long.class);
  7. // Sliding window of 5 seconds.
  8. cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
  9. new CreatedExpiryPolicy(new Duration(SECONDS, 5))));
  10. return cfg;
  11. }
  12. }

5.4.3.单词流

StreamWords类负责从一个文本文件中持续地读入单词(例子中使用了alice-in-wonderland.txt)然后以流的方式注入Ignite“words”缓存。
流处理器配置

  1. 设置allowOverwrite属性为true来确保已有的计数可以被更新;
  2. 配置了一个StreamTransformer,他会获取一个单词的当前计数然后加1。
  1. public class StreamWords {
  2. public static void main(String[] args) throws Exception {
  3. // Mark this cluster member as client.
  4. Ignition.setClientMode(true);
  5. try (Ignite ignite = Ignition.start()) {
  6. IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
  7. // Create a streamer to stream words into the cache.
  8. try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
  9. // Allow data updates.
  10. stmr.allowOverwrite(true);
  11. // Configure data transformation to count instances of the same word.
  12. stmr.receiver(StreamTransformer.from((e, arg) -> {
  13. // Get current count.
  14. Long val = e.getValue();
  15. // Increment current count by 1.
  16. e.setValue(val == null ? 1L : val + 1);
  17. return null;
  18. }));
  19. // Stream words from "alice-in-wonderland" book.
  20. while (true) {
  21. Path path = Paths.get(StreamWords.class.getResource("alice-in-wonderland.txt").toURI());
  22. // Read words from a text file.
  23. try (Stream<String> lines = Files.lines(path)) {
  24. lines.forEach(line -> {
  25. Stream<String> words = Stream.of(line.split(" "));
  26. // Stream words into Ignite streamer.
  27. words.forEach(word -> {
  28. if (!word.trim().isEmpty())
  29. stmr.addData(word, 1L);
  30. });
  31. });
  32. }
  33. }
  34. }
  35. }
  36. }
  37. }

5.4.4.查询单词

QueryWords类用于定期从缓存中查询单词计数。
SQL查询

  1. 使用标准SQL查询计数;
  2. Ignite SQL会将Java类视为SQL表,因为计数是以简单的Long类型存储的,因此下面的SQL查询会查询Long表;
  3. Ignite通常以_key_val字段名来保存键和值,因此在下面的SQL中使用这个语法。
  1. public class QueryWords {
  2. public static void main(String[] args) throws Exception {
  3. // Mark this cluster member as client.
  4. Ignition.setClientMode(true);
  5. try (Ignite ignite = Ignition.start()) {
  6. IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
  7. // Select top 10 words.
  8. SqlFieldsQuery top10Qry = new SqlFieldsQuery(
  9. "select _key, _val from Long order by _val desc limit 10");
  10. // Query top 10 popular words every 5 seconds.
  11. while (true) {
  12. // Execute queries.
  13. List<List<?>> top10 = stmCache.query(top10Qry).getAll();
  14. // Print top 10 words.
  15. ExamplesUtils.printQueryResults(top10);
  16. Thread.sleep(5000);
  17. }
  18. }
  19. }
  20. }

5.4.5.启动服务端节点

为了运行这个示例,需要启动数据节点,在Ignite中,数据节点称为服务端节点。可以根据需要启动尽可能多的服务端节点,但是至少要有一个节点来运行这个示例。
服务端节点可以通过如下的命令行来启动:

  1. bin/ignite.sh

也可以像下面这样通过编程的方式来启动:

  1. public class ExampleNodeStartup {
  2. public static void main(String[] args) throws IgniteException {
  3. Ignition.start();
  4. }
  5. }

5.5.JMS数据流处理器

5.5.1.摘要

Ignite提供了一个JMS数据流处理器,他会从JMS代理中消费消息,将消息转换为缓存数据格式然后插入Ignite缓存。

5.5.2.特性

这个数据流处理器支持如下的特性:

本实现已经在Apache ActiveMQ中进行了测试,但是只要客户端库实现了JMS 1.1 规范的所有JMS代理都是支持的。

5.5.3.实例化一个JMS流处理器

实例化JMS流处理器时,需要具体化下面的泛型:

要配置JMS流处理器,还需要提供如下的必要属性:

5.5.4.示例

下面的示例通过String类型的键和String类型的值来填充一个缓存,要消费的TextMessage格式如下:

  1. raulk,Raul Kripalani
  2. dsetrakyan,Dmitriy Setrakyan
  3. sv,Sergi Vladykin
  4. gm,Gianfranco Murador

下面是代码:

  1. // create a data streamer
  2. IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
  3. dataStreamer.allowOverwrite(true);
  4. // create a JMS streamer and plug the data streamer into it
  5. JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
  6. jmsStreamer.setIgnite(ignite);
  7. jmsStreamer.setStreamer(dataStreamer);
  8. jmsStreamer.setConnectionFactory(connectionFactory);
  9. jmsStreamer.setDestination(destination);
  10. jmsStreamer.setTransacted(true);
  11. jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
  12. @Override
  13. public Map<String, String> apply(TextMessage message) {
  14. final Map<String, String> answer = new HashMap<>();
  15. String text;
  16. try {
  17. text = message.getText();
  18. }
  19. catch (JMSException e) {
  20. LOG.warn("Could not parse message.", e);
  21. return Collections.emptyMap();
  22. }
  23. for (String s : text.split("\n")) {
  24. String[] tokens = s.split(",");
  25. answer.put(tokens[0], tokens[1]);
  26. }
  27. return answer;
  28. }
  29. });
  30. jmsStreamer.start();
  31. // on application shutdown
  32. jmsStreamer.stop();
  33. dataStreamer.close();

要使用这个组件,必须通过构建系统(Maven, Ivy, Gradle,sbt等)导入如下的模块:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-jms11</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

5.6.Flume Sink

5.6.1.摘要

Apache Flume是一个高效的收集、汇总以及移动大量的日志数据的分布式的、高可靠和高可用的服务(https://github.com/apache/flume)。
IgniteSink是一个Flume池,他会从相对应的Flume通道中提取事件然后将数据注入Ignite缓存,目前支持Flume的1.6.0版本。
在启动Flume代理之前,就像下面章节描述的,IgniteSink及其依赖需要包含在代理的类路径中。

5.6.2.设置

  1. plugins.d/
  2. `-- ignite
  3. |-- lib
  4. | `-- ignite-flume-transformer-x.x.x.jar <-- your jar
  5. `-- libext
  6. |-- cache-api-1.0.0.jar
  7. |-- ignite-core-x.x.x.jar
  8. |-- ignite-flume-x.x.x.jar <-- IgniteSink
  9. |-- ignite-spring-x.x.x.jar
  10. |-- spring-aop-4.1.0.RELEASE.jar
  11. |-- spring-beans-4.1.0.RELEASE.jar
  12. |-- spring-context-4.1.0.RELEASE.jar
  13. |-- spring-core-4.1.0.RELEASE.jar
  14. `-- spring-expression-4.1.0.RELEASE.jar
属性名称 默认值 描述
channel -
type 组件类型名,应该为org.apache.ignite.stream.flume.IgniteSink -
igniteCfg Ignite的XML配置文件 -
cacheName 缓存名,与igniteCfg中的一致 -
eventTransformer org.apache.ignite.stream.flume.EventTransformer的实现类名 -
batchSize 每事务要写入的事件数 100

名字为a1的Sink代理配置片段如下所示:

  1. a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
  2. a1.sinks.k1.igniteCfg = /some-path/ignite.xml
  3. a1.sinks.k1.cacheName = testCache
  4. a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
  5. a1.sinks.k1.batchSize = 100

指定代码和配置后(可以参照Flume的文档),就可以运行Flume的代理了。

5.7.MQTT流处理器

5.7.1.摘要

该流处理器使用Eclipse Paho作为MQTT客户端,从一个MQTT主题消费消息,然后将键值对提供给IgniteDataStreamer实例。
必须提供一个流的元组提取器(不管是单条目的,还是多条目的提取器)来处理传入的消息,然后提取元组以插入缓存。

5.7.2.特性

这个流处理器支持:

5.7.3.示例

下面的代码显示了如何使用这个流处理器:

  1. // Start Ignite.
  2. Ignite ignite = Ignition.start();
  3. // Get a data streamer reference.
  4. IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");
  5. // Create an MQTT data streamer
  6. MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
  7. streamer.setIgnite(ignite);
  8. streamer.setStreamer(dataStreamer);
  9. streamer.setBrokerUrl(brokerUrl);
  10. streamer.setBlockUntilConnected(true);
  11. // Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
  12. // (using Guava here).
  13. streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
  14. @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
  15. List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
  16. return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
  17. }
  18. });
  19. // Consume from multiple topics at once.
  20. streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
  21. // Start the MQTT Streamer.
  22. streamer.start();

要了解有关选项的更多信息,可以参考ignite-mqtt模块的javadoc。

5.8.Apache Camel流处理器

5.8.1.摘要

本章节聚焦于Apache Camel流处理器,它也可以被视为一个统一的流处理器,因为他可以从Camel支持的任何技术或者协议中消费消息然后注入一个Ignite缓存。

Apache Camel是什么?
如果不知道Apache Camel是什么,本章节的后面会做一个简介。

使用这个流处理器,基于如下技术可以将数据条目注入一个Ignite缓存:

这个流处理器支持两种摄取模式,直接摄取间接摄取

一个Ignite Camel组件
还有一个camel-ignite组件,通过该组件,可以与Ignite缓存、计算、事件、消息等进行交互。

Ignite Camel流处理器架构视图

5.8.2.直接摄取

直接摄取使得通过一个提取器元组的帮助可以从任意Camel端点获得消息然后直接进入Ignite,这个被称为直接摄取
下面是一个代码示例:

  1. // Start Apache Ignite.
  2. Ignite ignite = Ignition.start();
  3. // Create an streamer pipe which ingests into the 'mycache' cache.
  4. IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");
  5. // Create a Camel streamer and connect it.
  6. CamelStreamer<String, String> streamer = new CamelStreamer<>();
  7. streamer.setIgnite(ignite);
  8. streamer.setStreamer(pipe);
  9. // This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
  10. streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");
  11. // This is the tuple extractor. We'll assume each message contains only one tuple.
  12. // If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
  13. // The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
  14. streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {
  15. @Override public Map.Entry<String, String> extract(Exchange exchange) {
  16. String stationId = exchange.getIn().getHeader("X-StationId", String.class);
  17. String temperature = exchange.getIn().getBody(String.class);
  18. return new GridMapEntry<>(stationId, temperature);
  19. }
  20. });
  21. // Start the streamer.
  22. streamer.start();

5.8.3.间接摄取

多于更多的复杂场景,也可以创建一个Camel route在输入的消息上执行复杂的处理,比如转换、验证、拆分、聚合、幂等、重新排序、富集等,然后只是将结果注入Ignite缓存,这个被称为间接摄取。

  1. // Create a CamelContext with a custom route that will:
  2. // (1) consume from our Jetty endpoint.
  3. // (2) transform incoming JSON into a Java object with Jackson.
  4. // (3) uses JSR 303 Bean Validation to validate the object.
  5. // (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
  6. CamelContext context = new DefaultCamelContext();
  7. context.addRoutes(new RouteBuilder() {
  8. @Override
  9. public void configure() throws Exception {
  10. from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
  11. .unmarshal().json(JsonLibrary.Jackson)
  12. .to("bean-validator:validate")
  13. .to("direct:ignite.ingest");
  14. }
  15. });
  16. // Remember our Streamer is now consuming from the Direct endpoint above.
  17. streamer.setEndpointUri("direct:ignite.ingest");

5.8.4.设置一个响应

默认的话,响应只是简单地将一个原来的请求的副本反馈给调用者(如果是一个同步端点)。如果希望定制这个响应,需要设置一个Camel的Processor作为一个responseProcessor

  1. streamer.setResponseProcessor(new Processor() {
  2. @Override public void process(Exchange exchange) throws Exception {
  3. exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
  4. exchange.getOut().setBody("OK");
  5. }
  6. });

5.8.5.Maven依赖

要使用ignite-camel流处理器,需要添加如下的依赖:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-camel</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

也可以加入camel-core作为一个过度依赖。

不要忘记添加Camel组件依赖
还要确保添加流处理器中要用到的Camel组件的依赖。

5.8.6.Apache Camel

Apache Camel是一个企业级集成框架,围绕Gregor Hohpe和Bobby Woolf推广的企业集成模式思想,比如通道管道过滤器拆分器聚合器路由器重新排序器等等,他可以像一个乐高玩具一样连接彼此来创建一个将系统连接在一起的集成路径。
到目前为止,Camel有超过200个组件,很多都是针对不同技术的适配器,比如JMSSOAPHTTP文件FTPPOP3SMTPSSH;包括云服务,比如AWSGCESalesforce;社交网络,比如TwitterFacebook;甚至包括新一代的数据库,比如MongoDBCassandra;以及数据处理技术,比如Hadoop(HDFS,HBase)以及Spark
Camel可以运行在各种环境中,同时也被Ignite支持:独立的Java程序、OSGi、Servlet容器、Spring Boot、JEE应用服务器等等。他是完全模块化的,因此只需要部署实际需要的组件,其他都不需要。
要了解更多的信息,可以参照Camel是什么?

5.9.Kafka流处理器

5.9.1.摘要

Apache Ignite的Kafka流处理器模块提供了从Kafka到Ignite缓存的流处理功能。
下面两个方法中的任何一个都可以用于获得这样的流处理功能:

5.9.2.通过Kafka连接器的数据流

通过从Kafka的主题拉取数据然后将其写入特定的Ignite缓存,IgniteSinkConnector可以用于将数据从Kafka导入Ignite缓存。
连接器位于optional/ignite-kafka,它和它的依赖需要位于一个Kafka运行实例的类路径中,下面会详细描述。
关于Kafka连接器的更多信息,可以参考Kafka文档
设置和运行

  1. ignite-kafka-x.x.x.jar <-- with IgniteSinkConnector
  2. ignite-core-x.x.x.jar
  3. cache-api-1.0.0.jar
  4. ignite-spring-1.5.0-SNAPSHOT.jar
  5. spring-aop-4.1.0.RELEASE.jar
  6. spring-beans-4.1.0.RELEASE.jar
  7. spring-context-4.1.0.RELEASE.jar
  8. spring-core-4.1.0.RELEASE.jar
  9. spring-expression-4.1.0.RELEASE.jar
  10. commons-logging-1.1.1.jar
  1. bootstrap.servers=localhost:9092
  2. key.converter=org.apache.kafka.connect.storage.StringConverter
  3. value.converter=org.apache.kafka.connect.storage.StringConverter
  4. key.converter.schemas.enable=false
  5. value.converter.schemas.enable=false
  6. internal.key.converter=org.apache.kafka.connect.storage.StringConverter
  7. internal.value.converter=org.apache.kafka.connect.storage.StringConverter
  8. internal.key.converter.schemas.enable=false
  9. internal.value.converter.schemas.enable=false
  10. offset.storage.file.filename=/tmp/connect.offsets
  11. offset.flush.interval.ms=10000
  1. # connector
  2. name=my-ignite-connector
  3. connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
  4. tasks.max=2
  5. topics=someTopic1,someTopic2
  6. # cache
  7. cacheName=myCache
  8. cacheAllowOverwrite=true
  9. igniteCfg=/some-path/ignite.xml

这里cacheName等于some-path/ignite.xml中指定的缓存名,之后someTopic1,someTopic2主题的数据就会被拉取和存储。如果希望开启覆盖缓存中的已有值,可以将cacheAllowOverwrite设置为true
还可以设置cachePerNodeDataSizecachePerNodeParOps,用于调整每个节点的缓冲区以及每个节点中并行流操作的最大值。
可以将test中的example-ignite.xml文件作为一个简单缓存配置文件的示例。

  1. bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties

流程检查
要执行一个非常基本的功能检查,可以这样做:

  1. bin/zookeeper-server-start.sh config/zookeeper.properties
  1. bin/kafka-server-start.sh config/server.properties
  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
  2. k1,v1
  1. bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
  1. http://node1:8080/ignite?cmd=size&cacheName=cache1

5.9.3.使用Ignite的Kafka流处理器模块的数据流

如果使用Maven来管理项目的依赖,首先要像下面这样添加Kafka流处理器的模块依赖(将'${ignite.version}'替换为实际的版本号):

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-kafka</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>

假定有一个缓存,键和值都是String类型,可以像下面这样启动流处理器:

  1. KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
  2. try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
  3. // allow overwriting cache data
  4. stmr.allowOverwrite(true);
  5. kafkaStreamer.setIgnite(ignite);
  6. kafkaStreamer.setStreamer(stmr);
  7. // set the topic
  8. kafkaStreamer.setTopic(someKafkaTopic);
  9. // set the number of threads to process Kafka streams
  10. kafkaStreamer.setThreads(4);
  11. // set Kafka consumer configurations
  12. kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
  13. // set decoders
  14. kafkaStreamer.setKeyDecoder(strDecoder);
  15. kafkaStreamer.setValueDecoder(strDecoder);
  16. kafkaStreamer.start();
  17. }
  18. finally {
  19. kafkaStreamer.stop();
  20. }

要了解有关Kafka消费者属性的详细信息,可以参照Kafka文档

5.10.Storm流处理器

Apache Ignite的Storm流处理器模块提供了从Storm到Ignite缓存的流处理功能。
通过如下步骤可以将数据注入Ignite缓存:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-storm</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>
  1. storm jar ignite-storm-streaming-jar-with-dependencies.jar my.company.ignite.MyStormTopology

5.11.Flink流处理器

Apache Ignite Flink Sink模块是一个流处理连接器,他可以将Flink数据注入Ignite缓存,该Sink会将输入的数据注入Ignite缓存。每当创建一个Sink,都需要提供一个Ignite缓存名和Ignite网格配置文件。
通过如下步骤,可以开启到Ignite缓存的数据注入:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-flink</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>
  1. IgniteSink igniteSink = new IgniteSink("myCache", "ignite.xml");
  2. igniteSink.setAllowOverwrite(true);
  3. igniteSink.setAutoFlushFrequency(10);
  4. igniteSink.start();
  5. DataStream<Map> stream = ...;
  6. // Sink data into the grid.
  7. stream.addSink(igniteSink);
  8. try {
  9. env.execute();
  10. } catch (Exception e){
  11. // Exception handling.
  12. }
  13. finally {
  14. igniteSink.stop();
  15. }

可以参考ignite-flink模块的javadoc来了解可用选项的详细信息。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注