@liyuj 2017-06-14T21:36:23.000000Z 字数 14485 阅读 5462






Ignite附带了他自己的CacheJdbcPojoStore,他会自动地建立Java POJO和数据库模式之间的映射,可以参照3.13.自动持久化章节











load(), write(), delete()

loadAll(), writeAll(), deleteAll()



Cassandra Cache Store
Ignite提供了将Apache Cassandra作为内存网格级CacheStore的开箱即用的集成,要了解更多的信息,可以查看相关的文档。





  1. public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  2. // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  3. @Override public Person load(Long key) {
  4. try (Connection conn = connection()) {
  5. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
  6. st.setLong(1, key);
  7. ResultSet rs = st.executeQuery();
  8. return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
  9. }
  10. }
  11. catch (SQLException e) {
  12. throw new CacheLoaderException("Failed to load: " + key, e);
  13. }
  14. }
  15. // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  16. @Override public void write(Cache.Entry<Long, Person> entry) {
  17. try (Connection conn = connection()) {
  18. // Syntax of MERGE statement is database specific and should be adopted for your database.
  19. // If your database does not support MERGE statement then use sequentially update, insert statements.
  20. try (PreparedStatement st = conn.prepareStatement(
  21. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  22. for (Cache.Entry<Long, Person> entry : entries) {
  23. Person val = entry.getValue();
  24. st.setLong(1, entry.getKey());
  25. st.setString(2, val.getFirstName());
  26. st.setString(3, val.getLastName());
  27. st.executeUpdate();
  28. }
  29. }
  30. }
  31. catch (SQLException e) {
  32. throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
  33. }
  34. }
  35. // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  36. @Override public void delete(Object key) {
  37. try (Connection conn = connection()) {
  38. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  39. st.setLong(1, (Long)key);
  40. st.executeUpdate();
  41. }
  42. }
  43. catch (SQLException e) {
  44. throw new CacheWriterException("Failed to delete: " + key, e);
  45. }
  46. }
  47. // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  48. // methods are called on IgniteCache. It is used for bulk-loading the cache.
  49. // If you don't need to bulk-load the cache, skip this method.
  50. @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
  51. if (args == null || args.length == 0 || args[0] == null)
  52. throw new CacheLoaderException("Expected entry count parameter is not provided.");
  53. final int entryCnt = (Integer)args[0];
  54. try (Connection conn = connection()) {
  55. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
  56. try (ResultSet rs = st.executeQuery()) {
  57. int cnt = 0;
  58. while (cnt < entryCnt && rs.next()) {
  59. Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
  60. clo.apply(person.getId(), person);
  61. cnt++;
  62. }
  63. }
  64. }
  65. }
  66. catch (SQLException e) {
  67. throw new CacheLoaderException("Failed to load values from cache store.", e);
  68. }
  69. }
  70. // Open JDBC connection.
  71. private Connection connection() throws SQLException {
  72. // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
  73. // In this example we use H2 Database for simplification.
  74. Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
  75. conn.setAutoCommit(true);
  76. return conn;
  77. }
  78. }


  1. public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  2. /** Auto-injected store session. */
  3. @CacheStoreSessionResource
  4. private CacheStoreSession ses;
  5. // Complete transaction or simply close connection if there is no transaction.
  6. @Override public void sessionEnd(boolean commit) {
  7. try (Connection conn = ses.getAttached()) {
  8. if (conn != null && ses.isWithinTransaction()) {
  9. if (commit)
  10. conn.commit();
  11. else
  12. conn.rollback();
  13. }
  14. }
  15. catch (SQLException e) {
  16. throw new CacheWriterException("Failed to end store session.", e);
  17. }
  18. }
  19. // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  20. @Override public Person load(Long key) {
  21. try (Connection conn = connection()) {
  22. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
  23. st.setLong(1, key);
  24. ResultSet rs = st.executeQuery();
  25. return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
  26. }
  27. }
  28. catch (SQLException e) {
  29. throw new CacheLoaderException("Failed to load: " + key, e);
  30. }
  31. }
  32. // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  33. @Override public void write(Cache.Entry<Long, Person> entry) {
  34. try (Connection conn = connection()) {
  35. // Syntax of MERGE statement is database specific and should be adopted for your database.
  36. // If your database does not support MERGE statement then use sequentially update, insert statements.
  37. try (PreparedStatement st = conn.prepareStatement(
  38. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  39. for (Cache.Entry<Long, Person> entry : entries) {
  40. Person val = entry.getValue();
  41. st.setLong(1, entry.getKey());
  42. st.setString(2, val.getFirstName());
  43. st.setString(3, val.getLastName());
  44. st.executeUpdate();
  45. }
  46. }
  47. }
  48. catch (SQLException e) {
  49. throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
  50. }
  51. }
  52. // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  53. @Override public void delete(Object key) {
  54. try (Connection conn = connection()) {
  55. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  56. st.setLong(1, (Long)key);
  57. st.executeUpdate();
  58. }
  59. }
  60. catch (SQLException e) {
  61. throw new CacheWriterException("Failed to delete: " + key, e);
  62. }
  63. }
  64. // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  65. // methods are called on IgniteCache. It is used for bulk-loading the cache.
  66. // If you don't need to bulk-load the cache, skip this method.
  67. @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
  68. if (args == null || args.length == 0 || args[0] == null)
  69. throw new CacheLoaderException("Expected entry count parameter is not provided.");
  70. final int entryCnt = (Integer)args[0];
  71. try (Connection conn = connection()) {
  72. try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
  73. try (ResultSet rs = st.executeQuery()) {
  74. int cnt = 0;
  75. while (cnt < entryCnt && rs.next()) {
  76. Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));
  77. clo.apply(person.getId(), person);
  78. cnt++;
  79. }
  80. }
  81. }
  82. }
  83. catch (SQLException e) {
  84. throw new CacheLoaderException("Failed to load values from cache store.", e);
  85. }
  86. }
  87. // Opens JDBC connection and attaches it to the ongoing
  88. // session if within a transaction.
  89. private Connection connection() throws SQLException {
  90. if (ses.isWithinTransaction()) {
  91. Connection conn = ses.getAttached();
  92. if (conn == null) {
  93. conn = openConnection(false);
  94. // Store connection in the session, so it can be accessed
  95. // for other operations within the same transaction.
  96. ses.attach(conn);
  97. }
  98. return conn;
  99. }
  100. // Transaction can be null in case of simple load or put operation.
  101. else
  102. return openConnection(true);
  103. }
  104. // Opens JDBC connection.
  105. private Connection openConnection(boolean autocommit) throws SQLException {
  106. // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
  107. // In this example we use H2 Database for simplification.
  108. Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");
  109. conn.setAutoCommit(autocommit);
  110. return conn;
  111. }
  112. }


  1. public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
  2. // Skip single operations and open connection methods.
  3. // You can copy them from jdbc non-transactional or jdbc transactional examples.
  4. ...
  5. // This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
  6. @Override public Map<K, V> loadAll(Iterable<Long> keys) {
  7. try (Connection conn = connection()) {
  8. try (PreparedStatement st = conn.prepareStatement(
  9. "select firstName, lastName from PERSONS where id=?")) {
  10. Map<K, V> loaded = new HashMap<>();
  11. for (Long key : keys) {
  12. st.setLong(1, key);
  13. try(ResultSet rs = st.executeQuery()) {
  14. if (rs.next())
  15. loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
  16. }
  17. }
  18. return loaded;
  19. }
  20. }
  21. catch (SQLException e) {
  22. throw new CacheLoaderException("Failed to loadAll: " + keys, e);
  23. }
  24. }
  25. // This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
  26. @Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
  27. try (Connection conn = connection()) {
  28. // Syntax of MERGE statement is database specific and should be adopted for your database.
  29. // If your database does not support MERGE statement then use sequentially update, insert statements.
  30. try (PreparedStatement st = conn.prepareStatement(
  31. "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
  32. for (Cache.Entry<Long, Person> entry : entries) {
  33. Person val = entry.getValue();
  34. st.setLong(1, entry.getKey());
  35. st.setString(2, val.getFirstName());
  36. st.setString(3, val.getLastName());
  37. st.addBatch();
  38. }
  39. st.executeBatch();
  40. }
  41. }
  42. catch (SQLException e) {
  43. throw new CacheWriterException("Failed to writeAll: " + entries, e);
  44. }
  45. }
  46. // This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
  47. @Override public void deleteAll(Collection<Long> keys) {
  48. try (Connection conn = connection()) {
  49. try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
  50. for (Long key : keys) {
  51. st.setLong(1, key);
  52. st.addBatch();
  53. }
  54. st.executeBatch();
  55. }
  56. }
  57. catch (SQLException e) {
  58. throw new CacheWriterException("Failed to deleteAll: " + keys, e);
  59. }
  60. }
  61. }



setter方法 描述 默认值
setWriteBehindEnabled(boolean) 设置后写是否启用的标志 false
setWriteBehindFlushSize(int) 后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入缓存存储然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 10240
setWriteBehindFlushFrequency(long) 后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到缓存存储的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 5000
setWriteBehindFlushThreadCount(int) 执行缓存刷新的线程数 1
setWriteBehindBatchSize(int) 后写缓存存储操作的操作数最大值 512




  1. <bean class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <list>
  5. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  6. ...
  7. <property name="cacheStoreFactory">
  8. <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
  9. <constructor-arg value="foo.bar.MyPersonStore"/>
  10. </bean>
  11. </property>
  12. <property name="readThrough" value="true"/>
  13. <property name="writeThrough" value="true"/>
  14. </bean>
  15. </list>
  16. </property>
  17. ...
  18. </bean>


  1. IgniteConfiguration cfg = new IgniteConfiguration();
  2. CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();
  3. cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(MyPersonStore.class));
  4. cacheCfg.setReadThrough(true);
  5. cacheCfg.setWriteThrough(true);
  6. cfg.setCacheConfiguration(cacheCfg);
  7. // Start Ignite node.
  8. Ignition.start(cfg);




  1. <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  2. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  3. ...
  4. <property name="cacheConfiguration">
  5. <list>
  6. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  7. ...
  8. <property name="cacheStoreFactory">
  9. <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
  10. <property name="user" value = "user" />
  11. <property name="dataSourceBean" value = "simpleDataSource" />
  12. </bean>
  13. </property>
  14. </bean>
  15. </list>
  16. </property>
  17. ...
  18. </bean>


CacheJdbcPojoStore实现基于JDBC和基于反射的POJO,这个实现将对象用基于反射的Java Bean映射描述的形式存储在底层数据库中。

  1. <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
  2. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  3. ...
  4. <property name="cacheConfiguration">
  5. <list>
  6. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  7. ...
  8. <property name="cacheStoreFactory">
  9. <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
  10. <property name="dataSourceBean" value = "simpleDataSource" />
  11. </bean>
  12. </property>
  13. </bean>
  14. </list>
  15. </property>
  16. </bean>



  1. <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  2. ...
  3. <property name="cacheConfiguration">
  4. <list>
  5. <bean class="org.apache.ignite.configuration.CacheConfiguration">
  6. <bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
  7. <property name="hibernateProperties">
  8. <props>
  9. <prop key="connection.url">jdbc:h2:mem:</prop>
  10. <prop key="hbm2ddl.auto">update</prop>
  11. <prop key="show_sql">true</prop>
  12. </props>
  13. </property>
  14. </bean>
  15. </list>
  16. </property>
  17. ...
  18. </bean>

16.1.11.Cassandra CacheStore

