001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb; 018 019 import org.apache.activeio.journal.Journal; 020 import org.apache.activemq.broker.BrokerService; 021 import org.apache.activemq.broker.BrokerServiceAware; 022 import org.apache.activemq.broker.ConnectionContext; 023 import org.apache.activemq.command.ActiveMQDestination; 024 import org.apache.activemq.command.ActiveMQQueue; 025 import org.apache.activemq.command.ActiveMQTopic; 026 import org.apache.activemq.store.MessageStore; 027 import org.apache.activemq.store.PersistenceAdapter; 028 import org.apache.activemq.store.TopicMessageStore; 029 import org.apache.activemq.store.TransactionStore; 030 import org.apache.activemq.usage.SystemUsage; 031 import java.io.File; 032 import java.io.IOException; 033 import java.util.Set; 034 /** 035 * An implementation of {@link PersistenceAdapter} designed for use with a 036 * {@link Journal} and then check pointing asynchronously on a timeout with some 037 * other long term persistent storage. 038 * 039 * @org.apache.xbean.XBean element="kahaDB" 040 * @version $Revision: 1.17 $ 041 */ 042 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware { 043 private KahaDBStore letter = new KahaDBStore(); 044 045 046 /** 047 * @param context 048 * @throws IOException 049 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 050 */ 051 public void beginTransaction(ConnectionContext context) throws IOException { 052 this.letter.beginTransaction(context); 053 } 054 055 /** 056 * @param sync 057 * @throws IOException 058 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 059 */ 060 public void checkpoint(boolean sync) throws IOException { 061 this.letter.checkpoint(sync); 062 } 063 064 /** 065 * @param context 066 * @throws IOException 067 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 068 */ 069 public void commitTransaction(ConnectionContext context) throws IOException { 070 this.letter.commitTransaction(context); 071 } 072 073 /** 074 * @param destination 075 * @return MessageStore 076 * @throws IOException 077 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 078 */ 079 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 080 return this.letter.createQueueMessageStore(destination); 081 } 082 083 /** 084 * @param destination 085 * @return TopicMessageStore 086 * @throws IOException 087 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 088 */ 089 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 090 return this.letter.createTopicMessageStore(destination); 091 } 092 093 /** 094 * @return TrandactionStore 095 * @throws IOException 096 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 097 */ 098 public TransactionStore createTransactionStore() throws IOException { 099 return this.letter.createTransactionStore(); 100 } 101 102 /** 103 * @throws IOException 104 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 105 */ 106 public void deleteAllMessages() throws IOException { 107 this.letter.deleteAllMessages(); 108 } 109 110 /** 111 * @return destinations 112 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 113 */ 114 public Set<ActiveMQDestination> getDestinations() { 115 return this.letter.getDestinations(); 116 } 117 118 /** 119 * @return lastMessageBrokerSequenceId 120 * @throws IOException 121 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 122 */ 123 public long getLastMessageBrokerSequenceId() throws IOException { 124 return this.letter.getLastMessageBrokerSequenceId(); 125 } 126 127 /** 128 * @param destination 129 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 130 */ 131 public void removeQueueMessageStore(ActiveMQQueue destination) { 132 this.letter.removeQueueMessageStore(destination); 133 } 134 135 /** 136 * @param destination 137 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 138 */ 139 public void removeTopicMessageStore(ActiveMQTopic destination) { 140 this.letter.removeTopicMessageStore(destination); 141 } 142 143 /** 144 * @param context 145 * @throws IOException 146 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 147 */ 148 public void rollbackTransaction(ConnectionContext context) throws IOException { 149 this.letter.rollbackTransaction(context); 150 } 151 152 /** 153 * @param brokerName 154 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 155 */ 156 public void setBrokerName(String brokerName) { 157 this.letter.setBrokerName(brokerName); 158 } 159 160 161 162 /** 163 * @param usageManager 164 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 165 */ 166 public void setUsageManager(SystemUsage usageManager) { 167 this.letter.setUsageManager(usageManager); 168 } 169 170 /** 171 * @return the size of the store 172 * @see org.apache.activemq.store.PersistenceAdapter#size() 173 */ 174 public long size() { 175 return this.letter.size(); 176 } 177 178 /** 179 * @throws Exception 180 * @see org.apache.activemq.Service#start() 181 */ 182 public void start() throws Exception { 183 this.letter.start(); 184 } 185 186 /** 187 * @throws Exception 188 * @see org.apache.activemq.Service#stop() 189 */ 190 public void stop() throws Exception { 191 this.letter.stop(); 192 } 193 194 /** 195 * Get the journalMaxFileLength 196 * @return the journalMaxFileLength 197 */ 198 public int getJournalMaxFileLength() { 199 return this.letter.getJournalMaxFileLength(); 200 } 201 202 /** 203 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 204 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 205 */ 206 public void setJournalMaxFileLength(int journalMaxFileLength) { 207 this.letter.setJournalMaxFileLength(journalMaxFileLength); 208 } 209 210 /** 211 * Get the checkpointInterval 212 * @return the checkpointInterval 213 */ 214 public long getCheckpointInterval() { 215 return this.letter.getCheckpointInterval(); 216 } 217 218 /** 219 * Set the checkpointInterval 220 * @param checkpointInterval the checkpointInterval to set 221 */ 222 public void setCheckpointInterval(long checkpointInterval) { 223 this.letter.setCheckpointInterval(checkpointInterval); 224 } 225 226 /** 227 * Get the cleanupInterval 228 * @return the cleanupInterval 229 */ 230 public long getCleanupInterval() { 231 return this.letter.getCleanupInterval(); 232 } 233 234 /** 235 * Set the cleanupInterval 236 * @param cleanupInterval the cleanupInterval to set 237 */ 238 public void setCleanupInterval(long cleanupInterval) { 239 this.letter.setCleanupInterval(cleanupInterval); 240 } 241 242 /** 243 * Get the indexWriteBatchSize 244 * @return the indexWriteBatchSize 245 */ 246 public int getIndexWriteBatchSize() { 247 return this.letter.getIndexWriteBatchSize(); 248 } 249 250 /** 251 * Set the indexWriteBatchSize 252 * @param indexWriteBatchSize the indexWriteBatchSize to set 253 */ 254 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 255 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 256 } 257 258 /** 259 * Get the journalMaxWriteBatchSize 260 * @return the journalMaxWriteBatchSize 261 */ 262 public int getJournalMaxWriteBatchSize() { 263 return this.letter.getJournalMaxWriteBatchSize(); 264 } 265 266 /** 267 * Set the journalMaxWriteBatchSize 268 * @param journalMaxWriteBatchSize the journalMaxWriteBatchSize to set 269 */ 270 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 271 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 272 } 273 274 /** 275 * Get the enableIndexWriteAsync 276 * @return the enableIndexWriteAsync 277 */ 278 public boolean isEnableIndexWriteAsync() { 279 return this.letter.isEnableIndexWriteAsync(); 280 } 281 282 /** 283 * Set the enableIndexWriteAsync 284 * @param enableIndexWriteAsync the enableIndexWriteAsync to set 285 */ 286 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 287 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 288 } 289 290 /** 291 * Get the directory 292 * @return the directory 293 */ 294 public File getDirectory() { 295 return this.letter.getDirectory(); 296 } 297 298 /** 299 * @param dir 300 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 301 */ 302 public void setDirectory(File dir) { 303 this.letter.setDirectory(dir); 304 } 305 306 /** 307 * Get the enableJournalDiskSyncs 308 * @return the enableJournalDiskSyncs 309 */ 310 public boolean isEnableJournalDiskSyncs() { 311 return this.letter.isEnableJournalDiskSyncs(); 312 } 313 314 /** 315 * Set the enableJournalDiskSyncs 316 * @param enableJournalDiskSyncs the enableJournalDiskSyncs to set 317 */ 318 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 319 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 320 } 321 322 /** 323 * Get the indexCacheSize 324 * @return the indexCacheSize 325 */ 326 public int getIndexCacheSize() { 327 return this.letter.getIndexCacheSize(); 328 } 329 330 /** 331 * Set the indexCacheSize 332 * @param indexCacheSize the indexCacheSize to set 333 */ 334 public void setIndexCacheSize(int indexCacheSize) { 335 this.letter.setIndexCacheSize(indexCacheSize); 336 } 337 338 /** 339 * Get the ignoreMissingJournalfiles 340 * @return the ignoreMissingJournalfiles 341 */ 342 public boolean isIgnoreMissingJournalfiles() { 343 return this.letter.isIgnoreMissingJournalfiles(); 344 } 345 346 /** 347 * Set the ignoreMissingJournalfiles 348 * @param ignoreMissingJournalfiles the ignoreMissingJournalfiles to set 349 */ 350 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 351 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 352 } 353 354 public boolean isChecksumJournalFiles() { 355 return letter.isChecksumJournalFiles(); 356 } 357 358 public boolean isCheckForCorruptJournalFiles() { 359 return letter.isCheckForCorruptJournalFiles(); 360 } 361 362 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 363 letter.setChecksumJournalFiles(checksumJournalFiles); 364 } 365 366 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 367 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 368 } 369 370 public void setBrokerService(BrokerService brokerService) { 371 letter.setBrokerService(brokerService); 372 } 373 }