[关闭]
@liyuj 2017-10-06T22:17:47.000000Z 字数 18682 阅读 5447

Apache-Ignite-2.2.0-中文开发手册

16.持久化

16.1.持久化存储

16.1.1.摘要

Ignite的原生持久化是一个分布式的ACID和兼容SQL的磁盘存储,它可以透明地与Ignite的固化内存进行集成。Ignite的持久化是可选的,可以打开也可以关闭,当关闭时Ignite就会变成一个纯内存存储。
Ignite的原生持久化会在磁盘上存储一个数据的超集,以及根据容量在内存中存储一个子集。比如,如果有100个条目,然后内存只能存储20条,那么磁盘上会存储所有的100条,然后为了提高性能在内存中缓存20条。
另外值得一提的是,和纯内存的使用场景一样,当打开持久化时,每个独立的节点只会持久化数据的一个子集,不管是主还是备节点,都是只包括节点所属的分区的数据,总的来说,整个集群包括了完整的数据集。
Ignite的原生持久化有如下的特性:

16.1.2.使用

要开启Ignite的原生持久化,需要给集群的配置传递一个PersistentStoreConfiguration的实例:
XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. <!-- Enabling Apache Ignite Native Persistence. -->
  3. <property name="persistentStoreConfiguration">
  4. <bean class="org.apache.ignite.configuration.PersistentStoreConfiguration"/>
  5. </property>
  6. <!-- Additional setting. -->
  7. </bean>

Java:

  1. // Apache Ignite node configuration.
  2. IgniteConfiguration cfg = new IgniteConfiguration();
  3. // Native Persistence configuration.
  4. PersistentStoreConfiguration psCfg = new PersistentStoreConfiguration();
  5. // Enabling the Persistent Store.
  6. cfg.setPersistentStoreConfiguration(psCfg);
  7. //Additional parameters.

持久化开启之后,所有的数据和索引都会存储在所有集群节点的内存和磁盘上,下图描述了在单独的集群节点的文件系统层看到的持久化结构:

首先,节点中的每个缓存都要有一个唯一的目录,从上图可知,可以看到至少两个缓存(Cache_A和Cache_B),由节点来维护他们的数据和索引。
其次,对于节点的每个分区,不管是主还是备,Ignite的原生持久化都会在文件系统中创建一个专用文件。比如,对上面的节点来说,它负责分区1,10到564,索引是每个缓存一个文件。
最后,和前写日志活动有关的文件和目录,下面还会介绍。

逻辑缓存和分区文件
如果Cache_A和Cache_B属于同一个缓存组,那么这些缓存共享的分区文件会放在一个目录中。

下一步,当Ignite发现持久化开启时,它会将集群由激活状态变为未激活状态,这样会确保应用未经允许无法修改集群中的数据,这样可以避免在集群重启过程中应用就开始修改还未恢复的节点中的持久化数据的情况发生。因此,这里的一般做法是等待所有节点加入集群然后在任意节点或者应用调用Ignite.active(true),它会将集群调整为激活状态。

  1. Ignite ignite = ...;
  2. // Activating the cluster once all the cluster nodes are up and running.
  3. ignite.active(true);

Ignite原生持久化根路径
Ignite中的所有数据默认都会持久化到工作目录(${IGNITE_HOME}/work),使用PersistentStoreConfiguration.setPersistentStorePath(...)可以进行修改。

如上所述,打开Ignite的持久化之后就不再需要在内存中保存所有的数据了,磁盘会存储所有的数据和索引,然后只会在内存中缓存所有数据的一个子集。
考虑到这一点,如果在内存中没有找到某个页面,那么固化内存会向持久化中进行查找,内存中存储的数据的子集是由退出策略管理的。

16.1.3.预写日志(WAL)

Ignite的持久化会为节点的每个分区创建和维护一个专有文件(不管是主还是备),但是当内存中的页面更新时,更新是不会直接写入对应的分区文件的,因为会严重影响性能,而是将数据写入预写日志的尾部(WAL)。
WAL的目的是以最快的速度向磁盘传播更新,以及为单个节点或者整个集群故障的场景提供一种恢复机制。值得一提的是,集群可以根据WAL的内容在故障或者重启时随时恢复到最近成功提交的事务。
整个WAL会被拆分为若干个文件,叫做段,它是按顺序进行填充的。当第一个段满了之后,它的内容会被复制到WAL档案,然后在那里保存由PersistentStoreConfiguration.walHistorySize配置的时间。复制完成之后,第二个段会被视为激活的WAL文件,然后接收由应用发送过来的更新请求。默认会创建和使用10个这样的段,这个数值可以通过PersistentStoreConfiguration.setWalSegmentSize进行修改。
下面是和WAL有关的其他配置项,在PersistentStoreConfiguration可以看到完整的可用参数列表。

参数 描述 默认值
setWalStorePath(...) 配置WAL保存目录的路径,如果这个路径是相对路径,那么它会相对于Ignite的工作目录。 ${IGNITE_HOME}/work
setWalSegments(...) 配置WAL要处理的段数量,为了性能考虑,整个WAL会被拆分为若干个叫做段的固定大小的文件。 10
setWalSegmentSize(...) 配置WAL段的大小。 64MB
setWalHistorySize(...) 配置在WAL历史中保持的检查点的总数量。 20
setWalArchivePath(...) 配置WAL档案目录的路径,每个WAL段都会被完整复制到这个目录。 ${IGNITE_HOME}/work

16.1.4.检查点

由于WAL文件的性质,它会一直增长,并且通过WAL从头到尾地恢复集群会花费大量的时间。为了解决这个问题,Ignite引入了一个检查点过程。
检查点是一个将脏页面从内存复制到磁盘上的分区文件的过程,脏页面是指页面已经在内存中进行了更新但是还没有写入对应的分区文件(只是添加到了WAL中)。
这个过程有助于通过在磁盘上保持页面的最新状态而高效地利用磁盘空间,并且允许在WAL档案中删除过时的WAL段(文件)。
下图显示的是一个简单的更新操作的执行过程:

  1. 节点接收到更新请求之后,它会在内存中查找该数据所属的数据页面,该页面会被更新然后标记为脏页面;
  2. 更新会被附加到WAL的尾部;
  3. 节点会向更新发起方发送一个更新成功的确认信息;
  4. 根据配置或者其他参数配置的频率,检查点会被定期地触发。脏页面会从内存复制到磁盘,然后传递给特定的分区文件;

下面的表中显示了如何根据业务需求调整和检查点有关的参数,PersistentStoreConfiguration的javadoc中会有完整的可用参数列表:

属性名 描述 默认值
setCheckpointingFrequency(...) 配置检查点频率,它是将脏页面写入原生持久化的最小时间间隔,如果频率较高,检查点会被频繁地触发。 3分钟
setCheckpointingPageBufferSize(...) 配置为检查点的临时缓冲区分配的内存总量,这个缓冲区用于为正在写入磁盘的页面创建一个临时副本,以及在检查点处理过程中的并行更新。 256MB
setCheckpointingThreads(...) 配置用于检查点处理的线程数量。 1

检查点的更多细节
这里有关于Ignite的检查点实现的更多细节。

16.1.5.事务保证

Ignite的原生持久化是一个兼容ACID的分布式存储,每个事务性更新都会首先被添加到WAL。更新会被赋予一个唯一的ID,这意味着集群在故障或者重启时总是会恢复到最近的成功提交的事务或者原子性更新。

16.1.6.SQL支持

Ignite的原生持久化可以将Ignite作为一个分布式的SQL数据库。
在集群中执行SQL查询时是不需要在内存中保存所有的数据的,Ignite会在内存和磁盘上的所有数据中执行的。另外,在集群重启后将所有的数据都预加载到内存中也是一个选择,这时当集群启动运行时,就可以执行SQL查询了。

16.1.7.Ignite持久化内部

本文档提供了Ignite持久化的一个高层视图,如果想了解更多的技术细节,可以看下面的文档:

16.1.8.性能提示

11.4.固化内存调优章节中有关于性能方面的建议。

16.1.9.示例

要了解Ignite的原生持久化在实践中的应用,可以看Github中的这个示例

16.2.第三方存储

16.2.1.摘要

JCache提供了javax.cache.integration.CacheLoaderjavax.cache.integration.CacheWriterAPI,他们分别用于底层持久化存储的通读通写(比如RDBMS中的Oracle或者MySQL,以及NoSQL数据库中的MongoDB或者CouchDB)。

虽然Ignite可以单独地配置CacheRLoaderCacheWriter,但是在两个单独的类中实现事务化存储是非常尴尬的,因为多个loadput操作需要在同一个事务中的同一个连接中共享状态。为了解决这个问题,Ignite提供了org.apacche.ignite.cache.store.CacheStore接口,他同时扩展了CacheLoaderCacheWriter

事务
CacheStore是完整事务性的,他会自动地融入当前的缓存事务。
CacheJdbcPojoStore
Ignite附带了他自己的CacheJdbcPojoStore,他会自动地建立Java POJO和数据库模式之间的映射。

16.2.2.通读和通写

当希望通读和通写行为时,提供一个正确的缓存存储的实现是很重要的。通读意味着当缓存无效时会从底层的持久化存储中读取,通写意味着当缓存更新时会自动地进行持久化。所有的通读和通写都会参与整体的缓存事务以及作为一个整体提交或者回滚。
要配置通读和通写,需要实现CacheStore接口以及设置CacheConfigurationcacheStoreFactoryreadThroughwriteThrough属性,下面的例子会有说明。

16.2.3.后写缓存

在一个简单的通写模式中每个缓存的put和remove操作都会涉及一个持久化存储的请求,因此整个缓存更新的持续时间可能是相对比较长的。另外,密集的缓存更新频率也会导致非常高的存储负载。
对于这种情况,Ignite提供了一个选项来执行异步化的持久化存储更新,也叫做后写,这个方式的主要概念是累加更新操作然后作为一个批量操作异步化地刷入持久化存储中。真实的数据持久化可以被基于时间的事件触发(数据输入的最大时间受到队列的限制),也可以被队列的大小触发(当队列大小达到一个限值),或者通过两者的组合触发,这时任何事件都会触发刷新。

更新顺序
对于后写的方式只有数据的最后一次更新会被写入底层存储。如果键为key1的缓存数据分别依次地更新为值value1、value2和value3,那么只有(key1,value3)对这一个存储请求会被传播到持久化存储。
更新性能
批量的存储操作通常比按顺序的单一存储操作更有效率,因此可以通过开启后写模式的批量操作来利用这个特性。简单类型(put和remove)的简单顺序更新操作可以被组合成一个批量操作。比如,连续地往缓存中加入(key1,value1),(key2,value2),(key3,value3)可以通过一个单一的CacheStore.putAll(...)操作批量处理。

后写缓存可以通过CacheConfiguration.setWriteBehindEnabled(boolean)配置项来开启,下面的3.12.6.配置章节显示了一个完整的配置属性列表来进行后写缓存行为的定制。

16.2.4.CacheStore

Ignite中的CacheStore接口用于向底层的数据存储写入或者加载数据。除了标准的JCache加载和存储方法,他还引入了最终事务划界以及从底层数据存储批量载入数据的能力。

loadCache()
CacheStore.loadCache()方法可以加载缓存,即使没有传入要加载的所有键,它通常用于启动时缓存的热加载,但是也可以在缓存加载完之后的任何时间点调用。
在每一个相关的集群节点,IgniteCache.loadCache()方法会分配给CacheStore.loadCache()方法,如果只想在本地节点上进行加载,可以用IgniteCache.localLoadCache()方法。

对于分区缓存,不管是主节点还是备份节点,如果键没有被映射到该节点,会被缓存自动丢弃。

load(), write(), delete()
IgniteCache接口的get,put,remove方法被调用时,相对应的CacheStoreload(),write()delete()方法会被调用,当与单个缓存数据工作时,这些方法会用于启用通读通写行为。

loadAll(), writeAll(), deleteAll()
IgniteCache接口的getAll,putAll,removeAll方法被调用时,相对应的CacheStoreloadAll(),writeAll()deleteAll()方法会被调用,当与多个缓存数据工作时,这些方法会用于启用通读通写行为,他们通常用批量操作的方式实现以提供更好的性能。

CacheStoreAdapter提供了loadAll(),writeAll()deleteAll()方法的默认实现,他只是简单地对键进行一个一个地迭代。

sessionEnd()
Ignite有一个存储会话的概念,他可以跨越不止一个的缓存存储操作,会话对于事务非常有用。
对于原子化的缓存,sessionEnd()方法会在每个CacheStore方法完成之后被调用,对于事务化的缓存,不管是在底层持久化存储进行提交或者回滚多个操作,sessionEnd()方法都会在每个事务结束后被调用。

CacheStoreAdapater提供了sessionEnd()方法的默认的空实现。
Cassandra Cache Store
Ignite提供了将Apache Cassandra作为内存网格级CacheStore的开箱即用的集成,要了解更多的信息,可以查看相关的文档。

16.2.5.CacheStoreSession

缓存存储会话的主要目的是当CacheStore用于事务中时在多个存储操作中持有一个上下文。比如,如果使用JDBC,可以通过CacheStoreSession.attach()方法保存数据库的连接,然后可以在CacheStore.sessionEnd(boolean)方法中提交这个连接。
CacheStoreSession可以通过@GridCacheStoreSessionResource注解注入自定义的缓存存储实现中。

16.2.6.CacheStore示例

下面是几个不同场景的缓存存储的实现,注意事务化的实现用还是没用事务。
JDBC非事务:

  1. public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  2. // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  3. @Override public Person load(Long key) {
  4. try (Connection conn = connection()) {
  5. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
  6. st.setLong(1, key);
  7. ResultSet rs = st.executeQuery();
  8. return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
  9. }
  10. }
  11. catch (SQLException e) {
  12. throw new CacheLoaderException("Failed to load: " + key, e);
  13. }
  14. }
  15. // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  16. @Override public void write(Cache.Entry<Long, Person> entry) {
  17. try (Connection conn = connection()) {
  18. // Syntax of MERGE statement is database specific and should be adopted for your database.
  19. // If your database does not support MERGE statement then use sequentially update, insert statements.
  20. try (PreparedStatement st = conn.prepareStatement(
  21. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  22. for (Cache.Entry<Long, Person> entry : entries) {
  23. Person val = entry.getValue();
  24. st.setLong(1, entry.getKey());
  25. st.setString(2, val.getFirstName());
  26. st.setString(3, val.getLastName());
  27. st.executeUpdate();
  28. }
  29. }
  30. }
  31. catch (SQLException e) {
  32. throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
  33. }
  34. }
  35. // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  36. @Override public void delete(Object key) {
  37. try (Connection conn = connection()) {
  38. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  39. st.setLong(1, (Long)key);
  40. st.executeUpdate();
  41. }
  42. }
  43. catch (SQLException e) {
  44. throw new CacheWriterException("Failed to delete: " + key, e);
  45. }
  46. }
  47. // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  48. // methods are called on IgniteCache. It is used for bulk-loading the cache.
  49. // If you don't need to bulk-load the cache, skip this method.
  50. @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
  51. if (args == null || args.length == 0 || args[0] == null)
  52. throw new CacheLoaderException("Expected entry count parameter is not provided.");
  53. final int entryCnt = (Integer)args[0];
  54. try (Connection conn = connection()) {
  55. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
  56. try (ResultSet rs = st.executeQuery()) {
  57. int cnt = 0;
  58. while (cnt < entryCnt && rs.next()) {
  59. Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
  60. clo.apply(person.getId(), person);
  61. cnt++;
  62. }
  63. }
  64. }
  65. }
  66. catch (SQLException e) {
  67. throw new CacheLoaderException("Failed to load values from cache store.", e);
  68. }
  69. }
  70. // Open JDBC connection.
  71. private Connection connection() throws SQLException {
  72. // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
  73. // In this example we use H2 Database for simplification.
  74. Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
  75. conn.setAutoCommit(true);
  76. return conn;
  77. }
  78. }

JDBC事务:

  1. public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  2. /** Auto-injected store session. */
  3. @CacheStoreSessionResource
  4. private CacheStoreSession ses;
  5. // Complete transaction or simply close connection if there is no transaction.
  6. @Override public void sessionEnd(boolean commit) {
  7. try (Connection conn = ses.getAttached()) {
  8. if (conn != null && ses.isWithinTransaction()) {
  9. if (commit)
  10. conn.commit();
  11. else
  12. conn.rollback();
  13. }
  14. }
  15. catch (SQLException e) {
  16. throw new CacheWriterException("Failed to end store session.", e);
  17. }
  18. }
  19. // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  20. @Override public Person load(Long key) {
  21. try (Connection conn = connection()) {
  22. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
  23. st.setLong(1, key);
  24. ResultSet rs = st.executeQuery();
  25. return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
  26. }
  27. }
  28. catch (SQLException e) {
  29. throw new CacheLoaderException("Failed to load: " + key, e);
  30. }
  31. }
  32. // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  33. @Override public void write(Cache.Entry<Long, Person> entry) {
  34. try (Connection conn = connection()) {
  35. // Syntax of MERGE statement is database specific and should be adopted for your database.
  36. // If your database does not support MERGE statement then use sequentially update, insert statements.
  37. try (PreparedStatement st = conn.prepareStatement(
  38. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  39. for (Cache.Entry<Long, Person> entry : entries) {
  40. Person val = entry.getValue();
  41. st.setLong(1, entry.getKey());
  42. st.setString(2, val.getFirstName());
  43. st.setString(3, val.getLastName());
  44. st.executeUpdate();
  45. }
  46. }
  47. }
  48. catch (SQLException e) {
  49. throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
  50. }
  51. }
  52. // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  53. @Override public void delete(Object key) {
  54. try (Connection conn = connection()) {
  55. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  56. st.setLong(1, (Long)key);
  57. st.executeUpdate();
  58. }
  59. }
  60. catch (SQLException e) {
  61. throw new CacheWriterException("Failed to delete: " + key, e);
  62. }
  63. }
  64. // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  65. // methods are called on IgniteCache. It is used for bulk-loading the cache.
  66. // If you don't need to bulk-load the cache, skip this method.
  67. @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
  68. if (args == null || args.length == 0 || args[0] == null)
  69. throw new CacheLoaderException("Expected entry count parameter is not provided.");
  70. final int entryCnt = (Integer)args[0];
  71. try (Connection conn = connection()) {
  72. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
  73. try (ResultSet rs = st.executeQuery()) {
  74. int cnt = 0;
  75. while (cnt < entryCnt && rs.next()) {
  76. Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
  77. clo.apply(person.getId(), person);
  78. cnt++;
  79. }
  80. }
  81. }
  82. }
  83. catch (SQLException e) {
  84. throw new CacheLoaderException("Failed to load values from cache store.", e);
  85. }
  86. }
  87. // Opens JDBC connection and attaches it to the ongoing
  88. // session if within a transaction.
  89. private Connection connection() throws SQLException {
  90. if (ses.isWithinTransaction()) {
  91. Connection conn = ses.getAttached();
  92. if (conn == null) {
  93. conn = openConnection(false);
  94. // Store connection in the session, so it can be accessed
  95. // for other operations within the same transaction.
  96. ses.attach(conn);
  97. }
  98. return conn;
  99. }
  100. // Transaction can be null in case of simple load or put operation.
  101. else
  102. return openConnection(true);
  103. }
  104. // Opens JDBC connection.
  105. private Connection openConnection(boolean autocommit) throws SQLException {
  106. // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
  107. // In this example we use H2 Database for simplification.
  108. Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
  109. conn.setAutoCommit(autocommit);
  110. return conn;
  111. }
  112. }

JDBC批量操作

  1. public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
  2. // Skip single operations and open connection methods.
  3. // You can copy them from jdbc non-transactional or jdbc transactional examples.
  4. ...
  5. // This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
  6. @Override public Map<K, V> loadAll(Iterable<Long> keys) {
  7. try (Connection conn = connection()) {
  8. try (PreparedStatement st = conn.prepareStatement(
  9. "select firstName, lastName from PERSONS where id=?")) {
  10. Map<K, V> loaded = new HashMap<>();
  11. for (Long key : keys) {
  12. st.setLong(1, key);
  13. try(ResultSet rs = st.executeQuery()) {
  14. if (rs.next())
  15. loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
  16. }
  17. }
  18. return loaded;
  19. }
  20. }
  21. catch (SQLException e) {
  22. throw new CacheLoaderException("Failed to loadAll: " + keys, e);
  23. }
  24. }
  25. // This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
  26. @Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
  27. try (Connection conn = connection()) {
  28. // Syntax of MERGE statement is database specific and should be adopted for your database.
  29. // If your database does not support MERGE statement then use sequentially update, insert statements.
  30. try (PreparedStatement st = conn.prepareStatement(
  31. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  32. for (Cache.Entry<Long, Person> entry : entries) {
  33. Person val = entry.getValue();
  34. st.setLong(1, entry.getKey());
  35. st.setString(2, val.getFirstName());
  36. st.setString(3, val.getLastName());
  37. st.addBatch();
  38. }
  39. st.executeBatch();
  40. }
  41. }
  42. catch (SQLException e) {
  43. throw new CacheWriterException("Failed to writeAll: " + entries, e);
  44. }
  45. }
  46. // This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
  47. @Override public void deleteAll(Collection<Long> keys) {
  48. try (Connection conn = connection()) {
  49. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  50. for (Long key : keys) {
  51. st.setLong(1, key);
  52. st.addBatch();
  53. }
  54. st.executeBatch();
  55. }
  56. }
  57. catch (SQLException e) {
  58. throw new CacheWriterException("Failed to deleteAll: " + keys, e);
  59. }
  60. }
  61. }

16.2.7.配置

下面的配置参数可以通过CacheConfiguration用于启用以及配置后写缓存:

setter方法 描述 默认值
setWriteBehindEnabled(boolean) 设置后写是否启用的标志 false
setWriteBehindFlushSize(int) 后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入缓存存储然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 10240
setWriteBehindFlushFrequency(long) 后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到缓存存储的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 5000
setWriteBehindFlushThreadCount(int) 执行缓存刷新的线程数 1
setWriteBehindBatchSize(int) 后写缓存存储操作的操作数最大值 512

CacheStore接口可以在IgniteConfiguration上通过一个工厂进行设置,就和CacheLoaderCacheWriter同样的方式。

对于分布式缓存的配置,Factory应该是可序列化的。

XML:

  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <list>
  5. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  6. ...
  7. <property name="cacheStoreFactory">
  8. <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
  9. <constructor-arg value="foo.bar.MyPersonStore"/>
  10. </bean>
  11. </property>
  12. <property name="readThrough" value="true"/>
  13. <property name="writeThrough" value="true"/>
  14. </bean>
  15. </list>
  16. </property>
  17. ...
  18. </bean>

Java:

  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();
  3. cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
  4. cacheCfg.setReadThrough(true);
  5. cacheCfg.setWriteThrough(true);
  6. cfg.setCacheConfiguration(cacheCfg);
  7. // Start Ignite node.
  8. Ignition.start(cfg);

16.2.8.CacheJdbcBlobStore

CacheJdbcBlobStore实现基于JDBC,这个实现将对象以BLOB的格式存储在底层数据库中。存储会在数据库中创建名为ENTRIES的表来存储数据,表具有key和val两个字段。
如果提供了定制的DDL和DML语句,表和字段的名字要和所有的语句一致以及参数的顺序也要保留。
使用CacheJdbcBlobStoreFactory工厂来向CacheConfiguration传入CacheJdbcBlobStore:

Spring:

  1. <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource">
  2. <property name="url" value="jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1" />
  3. </bean>
  4. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  5. ...
  6. <property name="cacheConfiguration">
  7. <list>
  8. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  9. ...
  10. <property name="cacheStoreFactory">
  11. <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
  12. <property name="user" value = "user" />
  13. <property name="dataSourceBean" value = "simpleDataSource" />
  14. </bean>
  15. </property>
  16. </bean>
  17. </list>
  18. </property>
  19. ...
  20. </bean>

16.2.9.CacheJdbcPojoStore

CacheJdbcPojoStore实现基于JDBC和基于反射的POJO,这个实现将对象用基于反射的Java Bean映射描述的形式存储在底层数据库中。
使用CacheJdbcPojoStoreFactory工厂来向CacheConfiguration传入CacheJdbcPojoStore:
Spring:

  1. <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  2. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  3. ...
  4. <property name="cacheConfiguration">
  5. <list>
  6. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  7. ...
  8. <property name="cacheStoreFactory">
  9. <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
  10. <property name="dataSourceBean" value = "simpleDataSource" />
  11. </bean>
  12. </property>
  13. </bean>
  14. </list>
  15. </property>
  16. </bean>

16.2.10.CacheHibernateBlobStore

CacheHibernateBlobStore实现基于Hibernate,这个实现将对象以BLOB的格式存储在底层数据库中。
使用CacheHibernateBlobStoreFactory工厂来向CacheConfiguration传入CacheHibernateBlobStore:
Spring:

  1. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <list>
  5. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  6. <bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
  7. <property name="hibernateProperties">
  8. <props>
  9. <prop key="connection.url">jdbc:h2:mem:</prop>
  10. <prop key="hbm2ddl.auto">update</prop>
  11. <prop key="show_sql">true</prop>
  12. </props>
  13. </property>
  14. </bean>
  15. </list>
  16. </property>
  17. ...
  18. </bean>

16.2.11.Cassandra CacheStore

可以查看Cassandra集成相关章节的内容,了解更详细的信息。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注