[关闭]
@liyuj 2017-02-04T19:32:59.000000Z 字数 15571 阅读 4609

Apache-Ignite-1.8.0-中文开发手册

7.流计算集成

7.1.摘要

Ignite可以和各种著名的流处理技术和产品进行集成,比如Kafka、Camel或者JMS,可以轻易且高效地将数据流数据注入Ignite。

7.2.Kafka流处理器

7.2.1.摘要

Apache Ignite的Kafka流处理器模块提供了从Kafka到Ignite缓存的流处理功能。
下面两个方法中的任何一个都可以用于获得这样的流处理功能:

7.2.2.通过Kafka连接器的数据流

通过从Kafka的主题拉取数据然后将其写入特定的Ignite缓存,IgniteSinkConnector可以用于将数据从Kafka导入Ignite缓存。
连接器位于optional/ignite-kafka,它和它的依赖需要位于一个Kafka运行实例的类路径中,下面会详细描述。
关于Kafka连接器的更多信息,可以参考Kafka文档
设置和运行

  1. ignite-kafka-x.x.x.jar <-- with IgniteSinkConnector
  2. ignite-core-x.x.x.jar
  3. cache-api-1.0.0.jar
  4. ignite-spring-1.5.0-SNAPSHOT.jar
  5. spring-aop-4.1.0.RELEASE.jar
  6. spring-beans-4.1.0.RELEASE.jar
  7. spring-context-4.1.0.RELEASE.jar
  8. spring-core-4.1.0.RELEASE.jar
  9. spring-expression-4.1.0.RELEASE.jar
  10. commons-logging-1.1.1.jar
  1. bootstrap.servers=localhost:9092
  2. key.converter=org.apache.kafka.connect.storage.StringConverter
  3. value.converter=org.apache.kafka.connect.storage.StringConverter
  4. key.converter.schemas.enable=false
  5. value.converter.schemas.enable=false
  6. internal.key.converter=org.apache.kafka.connect.storage.StringConverter
  7. internal.value.converter=org.apache.kafka.connect.storage.StringConverter
  8. internal.key.converter.schemas.enable=false
  9. internal.value.converter.schemas.enable=false
  10. offset.storage.file.filename=/tmp/connect.offsets
  11. offset.flush.interval.ms=10000
  1. # connector
  2. name=my-ignite-connector
  3. connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
  4. tasks.max=2
  5. topics=someTopic1,someTopic2
  6. # cache
  7. cacheName=myCache
  8. cacheAllowOverwrite=true
  9. igniteCfg=/some-path/ignite.xml

这里cacheName等于some-path/ignite.xml中指定的缓存名,之后someTopic1,someTopic2主题的数据就会被拉取和存储。如果希望开启覆盖缓存中的已有值,可以将cacheAllowOverwrite设置为true
还可以设置cachePerNodeDataSizecachePerNodeParOps,用于调整每个节点的缓冲区以及每个节点中并行流操作的最大值。
可以将test中的example-ignite.xml文件作为一个简单缓存配置文件的示例。

  1. bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties

流程检查
要执行一个非常基本的功能检查,可以这样做:

  1. bin/zookeeper-server-start.sh config/zookeeper.properties
  1. bin/kafka-server-start.sh config/server.properties
  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
  2. k1,v1
  1. bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
  1. http://node1:8080/ignite?cmd=size&cacheName=cache1

7.2.3.使用Ignite的Kafka流处理器模块的数据流

如果使用Maven来管理项目的依赖,首先要像下面这样添加Kafka流处理器的模块依赖(将'${ignite.version}'替换为实际的版本号):

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-kafka</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>

假定有一个缓存,键和值都是String类型,可以像下面这样启动流处理器:

  1. KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
  2. try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
  3. // allow overwriting cache data
  4. stmr.allowOverwrite(true);
  5. kafkaStreamer.setIgnite(ignite);
  6. kafkaStreamer.setStreamer(stmr);
  7. // set the topic
  8. kafkaStreamer.setTopic(someKafkaTopic);
  9. // set the number of threads to process Kafka streams
  10. kafkaStreamer.setThreads(4);
  11. // set Kafka consumer configurations
  12. kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
  13. // set decoders
  14. kafkaStreamer.setKeyDecoder(strDecoder);
  15. kafkaStreamer.setValueDecoder(strDecoder);
  16. kafkaStreamer.start();
  17. }
  18. finally {
  19. kafkaStreamer.stop();
  20. }

要了解有关Kafka消费者属性的详细信息,可以参照Kafka文档

7.3.Camel流处理器

7.3.1.摘要

本章节聚焦于Apache Camel流处理器,它也可以被视为一个统一的流处理器,因为他可以从Camel支持的任何技术或者协议中消费消息然后注入一个Ignite缓存。

Apache Camel是什么?
如果不知道Apache Camel是什么,本章节的后面会做一个简介。

使用这个流处理器,基于如下技术可以将数据条目注入一个Ignite缓存:

这个流处理器支持两种摄取模式,直接摄取间接摄取

一个Ignite Camel组件
还有一个camel-ignite组件,通过该组件,可以与Ignite缓存、计算、事件、消息等进行交互。

Ignite Camel流处理器架构视图

7.3.2.直接摄取

直接摄取使得通过一个提取器元组的帮助可以从任意Camel端点获得消息然后直接进入Ignite,这个被称为直接摄取
下面是一个代码示例:

  1. // Start Apache Ignite.
  2. Ignite ignite = Ignition.start();
  3. // Create an streamer pipe which ingests into the 'mycache' cache.
  4. IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");
  5. // Create a Camel streamer and connect it.
  6. CamelStreamer<String, String> streamer = new CamelStreamer<>();
  7. streamer.setIgnite(ignite);
  8. streamer.setStreamer(pipe);
  9. // This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
  10. streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");
  11. // This is the tuple extractor. We'll assume each message contains only one tuple.
  12. // If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
  13. // The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
  14. streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {
  15. @Override public Map.Entry<String, String> extract(Exchange exchange) {
  16. String stationId = exchange.getIn().getHeader("X-StationId", String.class);
  17. String temperature = exchange.getIn().getBody(String.class);
  18. return new GridMapEntry<>(stationId, temperature);
  19. }
  20. });
  21. // Start the streamer.
  22. streamer.start();

7.3.3.间接摄取

多于更多的复杂场景,也可以创建一个Camel route在输入的消息上执行复杂的处理,比如转换、验证、拆分、聚合、幂等、重新排序、富集等,然后只是将结果注入Ignite缓存,这个被称为间接摄取。

  1. // Create a CamelContext with a custom route that will:
  2. // (1) consume from our Jetty endpoint.
  3. // (2) transform incoming JSON into a Java object with Jackson.
  4. // (3) uses JSR 303 Bean Validation to validate the object.
  5. // (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
  6. CamelContext context = new DefaultCamelContext();
  7. context.addRoutes(new RouteBuilder() {
  8. @Override
  9. public void configure() throws Exception {
  10. from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
  11. .unmarshal().json(JsonLibrary.Jackson)
  12. .to("bean-validator:validate")
  13. .to("direct:ignite.ingest");
  14. }
  15. });
  16. // Remember our Streamer is now consuming from the Direct endpoint above.
  17. streamer.setEndpointUri("direct:ignite.ingest");

7.3.4.设置一个响应

响应默认只是简单地将一个原来的请求的副本反馈给调用者(如果是一个同步端点)。如果希望定制这个响应,需要设置一个Camel的Processor作为一个responseProcessor

  1. streamer.setResponseProcessor(new Processor() {
  2. @Override public void process(Exchange exchange) throws Exception {
  3. exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
  4. exchange.getOut().setBody("OK");
  5. }
  6. });

7.3.5.Maven依赖

要使用ignite-camel流处理器,需要添加如下的依赖:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-camel</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

也可以加入camel-core作为一个过度依赖。

不要忘记添加Camel组件依赖
还要确保添加流处理器中要用到的Camel组件的依赖。

7.3.6.Apache Camel

Apache Camel是一个企业级集成框架,围绕Gregor Hohpe和Bobby Woolf推广的企业集成模式思想,比如通道管道过滤器拆分器聚合器路由器重新排序器等等,他可以像一个乐高玩具一样连接彼此来创建一个将系统连接在一起的集成路径。
到目前为止,Camel有超过200个组件,很多都是针对不同技术的适配器,比如JMSSOAPHTTP文件FTPPOP3SMTPSSH;包括云服务,比如AWSGCESalesforce;社交网络,比如TwitterFacebook;甚至包括新一代的数据库,比如MongoDBCassandra;以及数据处理技术,比如Hadoop(HDFS,HBase)以及Spark
Camel可以运行在各种环境中,同时也被Ignite支持:独立的Java程序、OSGi、Servlet容器、Spring Boot、JEE应用服务器等等。他是完全模块化的,因此只需要部署实际需要的组件,其他都不需要。
要了解更多的信息,可以参照Camel是什么?

7.4.JMS流处理器

7.4.1.摘要

Ignite提供了一个JMS数据流处理器,他会从JMS代理中消费消息,将消息转换为缓存数据格式然后插入Ignite缓存。

7.4.2.特性

这个数据流处理器支持如下的特性:

本实现已经在Apache ActiveMQ中进行了测试,但是只要客户端库实现了JMS 1.1 规范的所有JMS代理都是支持的。

7.4.3.实例化JMS流处理器

实例化JMS流处理器时,需要具体化下面的泛型:

要配置JMS流处理器,还需要提供如下的必要属性:

7.4.4.示例

下面的示例通过String类型的键和String类型的值来填充一个缓存,要消费的TextMessage格式如下:

  1. raulk,Raul Kripalani
  2. dsetrakyan,Dmitriy Setrakyan
  3. sv,Sergi Vladykin
  4. gm,Gianfranco Murador

下面是代码:

  1. // create a data streamer
  2. IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
  3. dataStreamer.allowOverwrite(true);
  4. // create a JMS streamer and plug the data streamer into it
  5. JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
  6. jmsStreamer.setIgnite(ignite);
  7. jmsStreamer.setStreamer(dataStreamer);
  8. jmsStreamer.setConnectionFactory(connectionFactory);
  9. jmsStreamer.setDestination(destination);
  10. jmsStreamer.setTransacted(true);
  11. jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
  12. @Override
  13. public Map<String, String> apply(TextMessage message) {
  14. final Map<String, String> answer = new HashMap<>();
  15. String text;
  16. try {
  17. text = message.getText();
  18. }
  19. catch (JMSException e) {
  20. LOG.warn("Could not parse message.", e);
  21. return Collections.emptyMap();
  22. }
  23. for (String s : text.split("\n")) {
  24. String[] tokens = s.split(",");
  25. answer.put(tokens[0], tokens[1]);
  26. }
  27. return answer;
  28. }
  29. });
  30. jmsStreamer.start();
  31. // on application shutdown
  32. jmsStreamer.stop();
  33. dataStreamer.close();

要使用这个组件,必须通过构建系统(Maven, Ivy, Gradle,sbt等)导入如下的模块:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-jms11</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

7.5.MQTT流处理器

7.5.1.摘要

该流处理器使用Eclipse Paho作为MQTT客户端,从一个MQTT主题消费消息,然后将键值对提供给IgniteDataStreamer实例。
必须提供一个流的元组提取器(不管是单条目的,还是多条目的提取器)来处理传入的消息,然后提取元组以插入缓存。

7.5.2.特性

这个流处理器支持:

7.5.3.示例

下面的代码显示了如何使用这个流处理器:

  1. // Start Ignite.
  2. Ignite ignite = Ignition.start();
  3. // Get a data streamer reference.
  4. IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");
  5. // Create an MQTT data streamer
  6. MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
  7. streamer.setIgnite(ignite);
  8. streamer.setStreamer(dataStreamer);
  9. streamer.setBrokerUrl(brokerUrl);
  10. streamer.setBlockUntilConnected(true);
  11. // Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
  12. // (using Guava here).
  13. streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
  14. @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
  15. List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
  16. return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
  17. }
  18. });
  19. // Consume from multiple topics at once.
  20. streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
  21. // Start the MQTT Streamer.
  22. streamer.start();

要了解有关选项的更多信息,可以参考ignite-mqtt模块的javadoc。

7.6.Storm流处理器

Apache Ignite的Storm流处理器模块提供了从Storm到Ignite缓存的流处理功能。
通过如下步骤可以将数据注入Ignite缓存:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-storm</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>
  1. storm jar ignite-storm-streaming-jar-with-dependencies.jar my.company.ignite.MyStormTopology

7.7.Flink流处理器

Apache Ignite Flink Sink模块是一个流处理连接器,他可以将Flink数据注入Ignite缓存,该Sink会将输入的数据注入Ignite缓存。每当创建一个Sink,都需要提供一个Ignite缓存名和Ignite网格配置文件。
通过如下步骤,可以开启到Ignite缓存的数据注入:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
  4. http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. ...
  6. <dependencies>
  7. ...
  8. <dependency>
  9. <groupId>org.apache.ignite</groupId>
  10. <artifactId>ignite-flink</artifactId>
  11. <version>${ignite.version}</version>
  12. </dependency>
  13. ...
  14. </dependencies>
  15. ...
  16. </project>
  1. IgniteSink igniteSink = new IgniteSink("myCache", "ignite.xml");
  2. igniteSink.setAllowOverwrite(true);
  3. igniteSink.setAutoFlushFrequency(10);
  4. igniteSink.start();
  5. DataStream<Map> stream = ...;
  6. // Sink data into the grid.
  7. stream.addSink(igniteSink);
  8. try {
  9. env.execute();
  10. } catch (Exception e){
  11. // Exception handling.
  12. }
  13. finally {
  14. igniteSink.stop();
  15. }

可以参考ignite-flink模块的javadoc来了解可用选项的详细信息。

7.8.Twitter流处理器

Ignite的Twitter流处理器模块会从Twitter消费微博然后将转换后的键值对注入Ignite缓存。
要将来自Twitter的数据流注入Ignite缓存,需要:

  1. <dependency>
  2. <groupId>org.apache.ignite</groupId>
  3. <artifactId>ignite-twitter</artifactId>
  4. <version>${ignite.version}</version>
  5. </dependency>

${ignite.version}替换为实际使用的Ignite版本。

  1. IgniteDataStreamer dataStreamer = ignite.dataStreamer("myCache");
  2. dataStreamer.allowOverwrite(true);
  3. dataStreamer.autoFlushFrequency(10);
  4. OAuthSettings oAuthSettings = new OAuthSettings("setting1", "setting2", "setting3", "setting4");
  5. TwitterStreamer<Integer, String> streamer = new TwitterStreamer<>(oAuthSettings);
  6. streamer.setIgnite(ignite);
  7. streamer.setStreamer(dataStreamer);
  8. Map<String, String> params = new HashMap<>();
  9. params.put("track", "apache, twitter");
  10. params.put("follow", "3004445758");
  11. streamer.setApiParams(params);// Twitter Streaming API params.
  12. streamer.setEndpointUrl(endpointUrl);// Twitter streaming API endpoint.
  13. streamer.setThreadsCount(8);
  14. streamer.start();

可以参考Twitter流API文档来了解各种参数的详细信息。

7.9.Flume流处理器

7.9.1.摘要

Apache Flume是一个高效的收集、汇总以及移动大量的日志数据的分布式的、高可靠和高可用的服务(https://github.com/apache/flume)。
IgniteSink是一个Flume池,他会从相对应的Flume通道中提取事件然后将数据注入Ignite缓存,目前支持Flume的1.6.0版本。
在启动Flume代理之前,就像下面章节描述的,IgniteSink及其依赖需要包含在代理的类路径中。

7.9.2.设置

  1. plugins.d/
  2. `-- ignite
  3. |-- lib
  4. | `-- ignite-flume-transformer-x.x.x.jar <-- your jar
  5. `-- libext
  6. |-- cache-api-1.0.0.jar
  7. |-- ignite-core-x.x.x.jar
  8. |-- ignite-flume-x.x.x.jar <-- IgniteSink
  9. |-- ignite-spring-x.x.x.jar
  10. |-- spring-aop-4.1.0.RELEASE.jar
  11. |-- spring-beans-4.1.0.RELEASE.jar
  12. |-- spring-context-4.1.0.RELEASE.jar
  13. |-- spring-core-4.1.0.RELEASE.jar
  14. `-- spring-expression-4.1.0.RELEASE.jar
属性名称 默认值 描述
channel -
type 组件类型名,应该为org.apache.ignite.stream.flume.IgniteSink -
igniteCfg Ignite的XML配置文件 -
cacheName 缓存名,与igniteCfg中的一致 -
eventTransformer org.apache.ignite.stream.flume.EventTransformer的实现类名 -
batchSize 每事务要写入的事件数 100

名字为a1的Sink代理配置片段如下所示:

  1. a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
  2. a1.sinks.k1.igniteCfg = /some-path/ignite.xml
  3. a1.sinks.k1.cacheName = testCache
  4. a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
  5. a1.sinks.k1.batchSize = 100

指定代码和配置后(可以参照Flume的文档),就可以运行Flume的代理了。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注