@liyuj
2016-09-03T14:01:11.000000Z
字数 24202
阅读 7015
Apache-Ignite-1.7.0-中文开发手册
Ignite流计算可以以可扩展以及容错的方式处理持续不断的数据流。在一个中等规模的集群中,数据注入Ignite的速度可以很高,甚至轻易地达到每秒处理百万级的事件。
工作方式
数据流处理器
数据流处理器是通过IgniteDataStreamer
API定义的,他可以往Ignite数据流缓存中注入大量的持续不断的数据流,数据流处理器对于所有流入Ignite的数据以可扩展和容错的方式提供了至少一次保证。
滑动窗口
Ignite流计算功能也可以在数据的滑动窗口中进行查询。因为数据流是持续不断的,所以很少希望查询最初的所有数据集,反而更感兴趣于问像“最近两小时最受欢迎的十个产品是什么?”或者“过去一天特定种类产品的平均价格是多少?”这样的问题。要获得这些数据,需要能够在滑动数据窗口中进行查询。
滑动窗口可以配置Ignite缓存的退出策略,可以基于时间、基于大小或者基于批次。可以一个缓存配置一个滑动窗口,然而如果希望同样的数据有不同的滑动窗口也可以容易地定义多个缓存。
查询数据
可以和Ignite的SQL、TEXT以及基于谓词的缓存查询一起使用Ignite数据索引能力的全部功能来在数据流中进行查询。
单词计数示例
在该示例中会将文本注入Ignite然后对每个独立的单词进行计数,还会周期性的在流中进行SQL查询来查询10个最频繁的单词。
JMS数据流处理器
Ignite提供了一个JMS数据流处理器来从JMS代理中消费消息,将他们转换为缓存元组然后将他们注入缓存。
Flume数据流处理器
IgniteSink是一个Flume sink,他会从一个相关联的Flume管道中提取事件然后注入一个Ignite缓存,当前支持Flume的1.6.0版本。
数据流处理器是通过IgniteDataStreamer
API定义的,用于将大量的持续数据流注入Ignite缓存。数据流处理器以可扩展以及容错的方式,为将所有的数据流注入Ignite提供了至少一次保证。
快速地将大量的数据流注入Ignite的主要抽象是IgniteDataStreamer
,在内部他会适当地将数据整合成批次然后将这些批次与缓存这些数据的节点并置在一起。
高速加载是通过如下技术获得的:
要将数据加入数据流处理器,调用IgniteDataStreamer.addData(...)
方法即可。
// Get the data streamer reference and stream data.
try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("myStreamCache")) {
// Stream entries.
for (int i = 0; i < 100000; i++)
stmr.addData(i, Integer.toString(i));
}
允许覆写
默认的话,数据流处理器不会覆写已有的数据,这意味着如果遇到一个缓存内已有的条目,他会忽略这个条目。这是一个最有效的以及高性能的模式,因为数据流处理器不需要在后台考虑数据的版本。
如果预想到数据可能在数据流缓存中可能存在以及希望覆写它,设置IgniteDataStreamer.allowOverwrite(true)
即可。
对于希望执行一些自定义的逻辑而不仅仅是添加新数据的情况,可以利用一下StreamReceiver
API。
流接收器可以以并置的方式直接在缓存该数据条目的节点上对数据流做出反应,可以在数据进入缓存之前修改数据或者在数据上添加任何的预处理逻辑。
注意
StreamReceiver
不会自动地将数据加入缓存,需要显式地调用任意的cache.put(...)
方法。
StreamTransformer
是一个StreamReceiver
的简单实现,他会基于之前的值来修改数据流缓存中的数据。更新是并置的,即他会明确地在数据缓存的集群节点上发生。
在下面的例子中,通过StreamTransformer
在文本流中为每个发现的确切的单词增加一个计数。
Java8:
CacheConfiguration cfg = new CacheConfiguration("wordCountCache");
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(cfg);
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count instances of the same word.
stmr.receiver(StreamTransformer.from((e, arg) -> {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
// Stream words into the streamer cache.
for (String word : text)
stmr.addData(word, 1L);
}
Java7:
CacheConfiguration cfg = new CacheConfiguration("wordCountCache");
IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg);
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count instances of the same word.
stmr.receiver(new StreamTransformer<String, Long>() {
@Override public Object process(MutableEntry<String, Long> e, Object... args) {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}
});
// Stream words into the streamer cache.
for (String word : text)
stmr.addData(word, 1L);
StreamVisitor
也是StreamReceiver
的一个方便实现,他会访问流中的每个键值组。注意,访问器不会更新缓存。如果键值组需要存储在缓存内,那么需要显式地调用任意的cache.put(...)
方法。
在下面的示例中,有两个缓存:marketData
和instruments
,收到market数据的瞬间就会将他们放入marketData
缓存的流处理器,映射到特定market数据的集群节点上的marketData
的流处理器的StreamVisitor
就会被调用,在分别收到market数据后就会用最新的市场价格更新instrument
缓存。
注意,根本不会更新marketData
缓存,它一直是空的,只是直接在数据将要存储的集群节点上简单利用了market数据的并置处理能力。
CacheConfiguration<String, Double> mrktDataCfg = new CacheConfiguration<>("marketData");
CacheConfiguration<String, Double> instCfg = new CacheConfiguration<>("instruments");
// Cache for market data ticks streamed into the system.
IgniteCache<String, Double> mrktData = ignite.getOrCreateCache(mrktDataCfg);
// Cache for financial instruments.
IgniteCache<String, Double> insts = ignite.getOrCreateCache(instCfg);
try (IgniteDataStreamer<String, Integer> mktStmr = ignite.dataStreamer("marketData")) {
// Note that we do not populate 'marketData' cache (it remains empty).
// Instead we update the 'instruments' cache based on the latest market price.
mktStmr.receiver(StreamVisitor.from((cache, e) -> {
String symbol = e.getKey();
Double tick = e.getValue();
Instrument inst = instCache.get(symbol);
if (inst == null)
inst = new Instrument(symbol);
// Update instrument price based on the latest market tick.
inst.setHigh(Math.max(inst.getLatest(), tick);
inst.setLow(Math.min(inst.getLatest(), tick);
inst.setLatest(tick);
// Update instrument cache.
instCache.put(symbol, inst);
}));
// Stream market data into Ignite.
for (Map.Entry<String, Double> tick : marketData)
mktStmr.addData(tick);
}
滑动窗口是配置作为Ignite缓存的退出策略的,可以基于时间、基于大小或者基于批量。可以一个缓存配置一个滑动窗口,如果需要不同的滑动窗口,也可以容易地为同样的数据定义多于一个缓存。
基于时间的滑动窗口可以用JCache标准的ExpiryPolicy
进行配置,可以收到基于创建时间,最后访问时间,更新时间的流化过期事件。
下面是在Ignite中如何配置基于创建时间的5秒钟滑动窗口:
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
// Sliding window of 5 seconds based on creation time.
cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
new CreatedExpiryPolicy(new Duration(SECONDS, 5))));
FIFO(先进先出)滑动窗口可以用FifoEvictionPolicy
进行配置,这个策略是基于大小的,数据流会被插入窗口直到缓存的大小达到上限,那么最老的数据就会被自动踢出。
下面是如何配置一个FIFO的滑动窗口,他持有了100万的流化数据:
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
// FIFO window holding 1,000,000 entries.
cfg.setEvictionPolicyFactory(new FifoEvictionPolicy(1_000_000));
LRU(最近最少使用)滑动窗口可以用LruEvictionPolicy
进行配置,这个策略是基于大小的,数据流会被插入窗口直到缓存的大小达到上限,那么最近最少使用的数据就会被自动踢出。
下面是如何配置一个LRU的滑动窗口,他持有了100万的流化数据:
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("myStreamCache");
// LRU window holding 1,000,000 entries.
cfg.setEvictionPolicyFactory(new LruEvictionPolicy(1_000_000));
滑动窗口可以和任何其他Ignite缓存以同样的方式进行查询,可以使用基于谓词的,基于SQL的和基于文本的查询。
下面的代码是一个在金融工具流进入缓存后的滑动窗口中执行SQL查询的例子。
首先要在要查询的字段上创建索引:
CacheConfiguration<String, Instrument> cfg = new CacheConfiguration<>("instCache");
// Index some fields for querying portfolio positions.
cfg.setIndexedTypes(String.class, Instrument.class);
// Get a handle on the cache (create it if necessary).
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(cfg);
然后要查询三个性能最好的金融工具,可以通过(latest - open) 价格进行排序然后选择靠前的三个:
// Select top 3 best performing instruments.
SqlFieldsQuery top3qry = new SqlFieldsQuery(
"select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3");
// List of rows. Every row is represented as a List as well.
List<List<?>> top3 = instCache.query(top3qry).getAll();
要查询所有金融工具的总利润,可以通过将所有的 (latest - open)值相加得到:
// Select total profit across all financial instruments.
SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument");
List<List<?>> profit = instCache.query(profitQry).getAll();
System.out.printf("Total profit: %.2f%n", row.get(0));
在这个例子中希望将文本流注入Ignite然后统计每个单独单词的总数,还会定期发布通过SQL在流中查询得到的十个最流行的单词。
示例会按照如下步骤进行工作:
StreamWords
程序会将文本流数据注入Ignite;QueryWords
程序会从流中查询最高的十个单词;定义了一个CacheConfig
类,他提供了两个程序(StreamWords
,QueryWords
)都会用到的配置信息,这个缓存会使用单词作为键,单词的计数作为值。
注意这个示例中在缓存中用了一个5秒的滑动窗口,这意味着单词从他第一次进入缓存开始5秒后就会从缓存中消失。
public class CacheConfig {
public static CacheConfiguration<String, Long> wordCache() {
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("words");
// Index the words and their counts,
// so we can use them for fast SQL querying.
cfg.setIndexedTypes(String.class, Long.class);
// Sliding window of 5 seconds.
cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
new CreatedExpiryPolicy(new Duration(SECONDS, 5))));
return cfg;
}
}
StreamWords
类负责从一个文本文件中持续地读入单词(例子中使用了alice-in-wonderland.txt
)然后以流的方式注入Ignite“words”缓存。
流处理器配置
allowOverwrite
属性为true
来确保已有的计数可以被更新;StreamTransformer
,他会获取一个单词的当前计数然后加1。
public class StreamWords {
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start()) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
// Create a streamer to stream words into the cache.
try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count instances of the same word.
stmr.receiver(StreamTransformer.from((e, arg) -> {
// Get current count.
Long val = e.getValue();
// Increment current count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
// Stream words from "alice-in-wonderland" book.
while (true) {
Path path = Paths.get(StreamWords.class.getResource("alice-in-wonderland.txt").toURI());
// Read words from a text file.
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
Stream<String> words = Stream.of(line.split(" "));
// Stream words into Ignite streamer.
words.forEach(word -> {
if (!word.trim().isEmpty())
stmr.addData(word, 1L);
});
});
}
}
}
}
}
}
QueryWords
类用于定期从缓存中查询单词计数。
SQL查询
Long
类型存储的,因此下面的SQL查询会查询Long
表;_key
和_val
字段名来保存键和值,因此在下面的SQL中使用这个语法。
public class QueryWords {
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start()) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
// Select top 10 words.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
"select _key, _val from Long order by _val desc limit 10");
// Query top 10 popular words every 5 seconds.
while (true) {
// Execute queries.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();
// Print top 10 words.
ExamplesUtils.printQueryResults(top10);
Thread.sleep(5000);
}
}
}
}
为了运行这个示例,需要启动数据节点,在Ignite中,数据节点称为服务端
节点。可以根据需要启动尽可能多的服务端节点,但是至少要有一个节点来运行这个示例。
服务端节点可以通过如下的命令行来启动:
bin/ignite.sh
也可以像下面这样通过编程的方式来启动:
public class ExampleNodeStartup {
public static void main(String[] args) throws IgniteException {
Ignition.start();
}
}
Ignite提供了一个JMS数据流处理器,他会从JMS代理中消费消息,将消息转换为缓存数据格式然后插入Ignite缓存。
这个数据流处理器支持如下的特性:
threads
参数支持并发的消费者; 会话
对象,每个都持有单独的MessageListener
实例,因此实现了自然的并发;transacted
参数支持事务级的会话;batched
参数支持批量的消费,他会对在一个本地JMS事务的范围内接受的消息进行分组(不需要支持XA)。依赖于代理,这个技术提供了一个很高的吞吐量,因为它减少了必要的消息往返确认的量,虽然存在复制消息的开销(特别是在事务的中间发生了一个事件)。 batchClosureMillis
时间或者会话收到了至少batchClosureSize
消息后批次会被提交;会话
;会话
(因为事务在JMS中是会话绑定的),因此当该会话
消费了那么多消息后就会被触发。Destination
对象或者名字来指定目的地。本实现已经在Apache ActiveMQ中进行了测试,但是只要客户端库实现了JMS 1.1 规范的所有JMS代理都是支持的。
实例化JMS流处理器时,需要具体化下面的泛型:
T extends Message
:流处理器会接收到的JMSMessage
的类型,如果他可以接收多个,可以使用通用的Message
类型;K
:缓存键的类型;V
:缓存值的类型;要配置JMS流处理器,还需要提供如下的必要属性:
connectionFactory
:ConnectionFactory
的实例,通过代理进行必要的配置,他也可以是一个ConnectionFactory
池;destination
或者(destinationName
和destinationType
):一个Destination
对象(通常是一个代理指定的JMSQueue
或者Topic
接口的实现),或者是目的地名字的组合(队列或者主题名)和到或者Queue
或者Topic
的Class
引用的类型, 在后一种情况下,流处理器通过Session.createQueue(String)
或者Session.createTopic(String)
来获得一个目的地;transformer
:一个MessageTransformer<T, K, V>
的实现,他会消化一个类型为T
的JMS消息然后产生一个要添加的缓存条目Map<K, V>
,他也可以返回null
或者空的Map
来忽略传入的消息。下面的示例通过String
类型的键和String
类型的值来填充一个缓存,要消费的TextMessage
格式如下:
raulk,Raul Kripalani
dsetrakyan,Dmitriy Setrakyan
sv,Sergi Vladykin
gm,Gianfranco Murador
下面是代码:
// create a data streamer
IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
dataStreamer.allowOverwrite(true);
// create a JMS streamer and plug the data streamer into it
JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connectionFactory);
jmsStreamer.setDestination(destination);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
@Override
public Map<String, String> apply(TextMessage message) {
final Map<String, String> answer = new HashMap<>();
String text;
try {
text = message.getText();
}
catch (JMSException e) {
LOG.warn("Could not parse message.", e);
return Collections.emptyMap();
}
for (String s : text.split("\n")) {
String[] tokens = s.split(",");
answer.put(tokens[0], tokens[1]);
}
return answer;
}
});
jmsStreamer.start();
// on application shutdown
jmsStreamer.stop();
dataStreamer.close();
要使用这个组件,必须通过构建系统(Maven, Ivy, Gradle,sbt等)导入如下的模块:
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-jms11</artifactId>
<version>${ignite.version}</version>
</dependency>
Apache Flume是一个高效的收集、汇总以及移动大量的日志数据的分布式的、高可靠和高可用的服务(https://github.com/apache/flume)。
IgniteSink是一个Flume池,他会从相对应的Flume通道中提取事件然后将数据注入Ignite缓存,目前支持Flume的1.6.0版本。
在启动Flume代理之前,就像下面章节描述的,IgniteSink及其依赖需要包含在代理的类路径中。
ignite
子目录,如果plugins.d目录不存在,创建它;${FLUME_HOME}/plugins.d/ignite/lib
目录;${FLUME_HOME}/plugins.d/ignite/libext
,如下所示;
plugins.d/
`-- ignite
|-- lib
| `-- ignite-flume-transformer-x.x.x.jar <-- your jar
`-- libext
|-- cache-api-1.0.0.jar
|-- ignite-core-x.x.x.jar
|-- ignite-flume-x.x.x.jar <-- IgniteSink
|-- ignite-spring-x.x.x.jar
|-- spring-aop-4.1.0.RELEASE.jar
|-- spring-beans-4.1.0.RELEASE.jar
|-- spring-context-4.1.0.RELEASE.jar
|-- spring-core-4.1.0.RELEASE.jar
`-- spring-expression-4.1.0.RELEASE.jar
属性名称 | 默认值 | 描述 |
---|---|---|
channel | - | |
type | 组件类型名,应该为org.apache.ignite.stream.flume.IgniteSink |
- |
igniteCfg | Ignite的XML配置文件 | - |
cacheName | 缓存名,与igniteCfg中的一致 | - |
eventTransformer | org.apache.ignite.stream.flume.EventTransformer的实现类名 | - |
batchSize | 每事务要写入的事件数 | 100 |
名字为a1的Sink代理配置片段如下所示:
a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
a1.sinks.k1.igniteCfg = /some-path/ignite.xml
a1.sinks.k1.cacheName = testCache
a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
a1.sinks.k1.batchSize = 100
指定代码和配置后(可以参照Flume的文档),就可以运行Flume的代理了。
该流处理器使用Eclipse Paho作为MQTT客户端,从一个MQTT主题消费消息,然后将键值对提供给IgniteDataStreamer
实例。
必须提供一个流的元组提取器(不管是单条目的,还是多条目的提取器)来处理传入的消息,然后提取元组以插入缓存。
这个流处理器支持:
下面的代码显示了如何使用这个流处理器:
// Start Ignite.
Ignite ignite = Ignition.start();
// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");
// Create an MQTT data streamer
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);
// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
}
});
// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
// Start the MQTT Streamer.
streamer.start();
要了解有关选项的更多信息,可以参考ignite-mqtt
模块的javadoc。
本章节聚焦于Apache Camel流处理器,它也可以被视为一个统一的流处理器,因为他可以从Camel支持的任何技术或者协议中消费消息然后注入一个Ignite缓存。
Apache Camel是什么?
如果不知道Apache Camel是什么,本章节的后面会做一个简介。
使用这个流处理器,基于如下技术可以将数据条目注入一个Ignite缓存:
这个流处理器支持两种摄取模式,直接摄取和间接摄取。
一个Ignite Camel组件
还有一个camel-ignite组件,通过该组件,可以与Ignite缓存、计算、事件、消息等进行交互。
直接摄取使得通过一个提取器元组的帮助可以从任意Camel端点获得消息然后直接进入Ignite,这个被称为直接摄取。
下面是一个代码示例:
// Start Apache Ignite.
Ignite ignite = Ignition.start();
// Create an streamer pipe which ingests into the 'mycache' cache.
IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");
// Create a Camel streamer and connect it.
CamelStreamer<String, String> streamer = new CamelStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(pipe);
// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");
// This is the tuple extractor. We'll assume each message contains only one tuple.
// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {
@Override public Map.Entry<String, String> extract(Exchange exchange) {
String stationId = exchange.getIn().getHeader("X-StationId", String.class);
String temperature = exchange.getIn().getBody(String.class);
return new GridMapEntry<>(stationId, temperature);
}
});
// Start the streamer.
streamer.start();
多于更多的复杂场景,也可以创建一个Camel route在输入的消息上执行复杂的处理,比如转换、验证、拆分、聚合、幂等、重新排序、富集等,然后只是将结果注入Ignite缓存,这个被称为间接摄取。
// Create a CamelContext with a custom route that will:
// (1) consume from our Jetty endpoint.
// (2) transform incoming JSON into a Java object with Jackson.
// (3) uses JSR 303 Bean Validation to validate the object.
// (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
.unmarshal().json(JsonLibrary.Jackson)
.to("bean-validator:validate")
.to("direct:ignite.ingest");
}
});
// Remember our Streamer is now consuming from the Direct endpoint above.
streamer.setEndpointUri("direct:ignite.ingest");
默认的话,响应只是简单地将一个原来的请求的副本反馈给调用者(如果是一个同步端点)。如果希望定制这个响应,需要设置一个Camel的Processor
作为一个responseProcessor
。
streamer.setResponseProcessor(new Processor() {
@Override public void process(Exchange exchange) throws Exception {
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
exchange.getOut().setBody("OK");
}
});
要使用ignite-camel
流处理器,需要添加如下的依赖:
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-camel</artifactId>
<version>${ignite.version}</version>
</dependency>
也可以加入camel-core
作为一个过度依赖。
不要忘记添加Camel组件依赖
还要确保添加流处理器中要用到的Camel组件的依赖。
Apache Camel是一个企业级集成框架,围绕Gregor Hohpe和Bobby Woolf推广的企业集成模式思想,比如通道、管道、过滤器、拆分器、聚合器、路由器、重新排序器等等,他可以像一个乐高玩具一样连接彼此来创建一个将系统连接在一起的集成路径。
到目前为止,Camel有超过200个组件,很多都是针对不同技术的适配器,比如JMS、SOAP、HTTP、文件、FTP、POP3、SMTP、SSH;包括云服务,比如AWS,GCE、Salesforce;社交网络,比如Twitter、Facebook;甚至包括新一代的数据库,比如MongoDB、Cassandra;以及数据处理技术,比如Hadoop(HDFS,HBase)以及Spark。
Camel可以运行在各种环境中,同时也被Ignite支持:独立的Java程序、OSGi、Servlet容器、Spring Boot、JEE应用服务器等等。他是完全模块化的,因此只需要部署实际需要的组件,其他都不需要。
要了解更多的信息,可以参照Camel是什么?。
Apache Ignite的Kafka流处理器模块提供了从Kafka到Ignite缓存的流处理功能。
下面两个方法中的任何一个都可以用于获得这样的流处理功能:
通过从Kafka的主题拉取数据然后将其写入特定的Ignite缓存,IgniteSinkConnector可以用于将数据从Kafka导入Ignite缓存。
连接器位于optional/ignite-kafka
,它和它的依赖需要位于一个Kafka运行实例的类路径中,下面会详细描述。
关于Kafka连接器的更多信息,可以参考Kafka文档。
设置和运行
ignite-kafka-x.x.x.jar <-- with IgniteSinkConnector
ignite-core-x.x.x.jar
cache-api-1.0.0.jar
ignite-spring-1.5.0-SNAPSHOT.jar
spring-aop-4.1.0.RELEASE.jar
spring-beans-4.1.0.RELEASE.jar
spring-context-4.1.0.RELEASE.jar
spring-core-4.1.0.RELEASE.jar
spring-expression-4.1.0.RELEASE.jar
commons-logging-1.1.1.jar
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# connector
name=my-ignite-connector
connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
tasks.max=2
topics=someTopic1,someTopic2
# cache
cacheName=myCache
cacheAllowOverwrite=true
igniteCfg=/some-path/ignite.xml
这里cacheName等于some-path/ignite.xml
中指定的缓存名,之后someTopic1,someTopic2
主题的数据就会被拉取和存储。如果希望开启覆盖缓存中的已有值,可以将cacheAllowOverwrite
设置为true
。
还可以设置cachePerNodeDataSize
和cachePerNodeParOps
,用于调整每个节点的缓冲区以及每个节点中并行流操作的最大值。
可以将test中的example-ignite.xml
文件作为一个简单缓存配置文件的示例。
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
流程检查
要执行一个非常基本的功能检查,可以这样做:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=,
k1,v1
bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
http://node1:8080/ignite?cmd=size&cacheName=cache1
如果使用Maven来管理项目的依赖,首先要像下面这样添加Kafka流处理器的模块依赖(将'${ignite.version}'替换为实际的版本号):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-kafka</artifactId>
<version>${ignite.version}</version>
</dependency>
...
</dependencies>
...
</project>
假定有一个缓存,键和值都是String类型,可以像下面这样启动流处理器:
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic(someKafkaTopic);
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
// set decoders
kafkaStreamer.setKeyDecoder(strDecoder);
kafkaStreamer.setValueDecoder(strDecoder);
kafkaStreamer.start();
}
finally {
kafkaStreamer.stop();
}
要了解有关Kafka消费者属性的详细信息,可以参照Kafka文档。
Apache Ignite的Storm流处理器模块提供了从Storm到Ignite缓存的流处理功能。
通过如下步骤可以将数据注入Ignite缓存:
${ignite.version}
替换为实际使用的版本):
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-storm</artifactId>
<version>${ignite.version}</version>
</dependency>
...
</dependencies>
...
</project>
ignite
的属性指定(或者通过StormStreamer.setIgniteTupleField(...)也可以指定一个不同的)。作为一个示例可以看TestStormSpout.declareOutputFields(...)
。
storm jar ignite-storm-streaming-jar-with-dependencies.jar my.company.ignite.MyStormTopology
Apache Ignite Flink Sink模块是一个流处理连接器,他可以将Flink数据注入Ignite缓存,该Sink会将输入的数据注入Ignite缓存。每当创建一个Sink,都需要提供一个Ignite缓存名和Ignite网格配置文件。
通过如下步骤,可以开启到Ignite缓存的数据注入:
${ignite.version}
替换为实际使用的版本);
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-flink</artifactId>
<version>${ignite.version}</version>
</dependency>
...
</dependencies>
...
</project>
IgniteSink igniteSink = new IgniteSink("myCache", "ignite.xml");
igniteSink.setAllowOverwrite(true);
igniteSink.setAutoFlushFrequency(10);
igniteSink.start();
DataStream<Map> stream = ...;
// Sink data into the grid.
stream.addSink(igniteSink);
try {
env.execute();
} catch (Exception e){
// Exception handling.
}
finally {
igniteSink.stop();
}
可以参考ignite-flink模块的javadoc来了解可用选项的详细信息。