001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.kaha.impl.data; 018 019 import java.io.File; 020 import java.io.FilenameFilter; 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.HashMap; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.concurrent.atomic.AtomicLong; 028 029 import org.apache.activemq.kaha.Marshaller; 030 import org.apache.activemq.kaha.StoreLocation; 031 import org.apache.activemq.kaha.impl.DataManager; 032 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem; 033 import org.apache.activemq.util.IOExceptionSupport; 034 import org.apache.activemq.util.IOHelper; 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** 039 * Manages DataFiles 040 * 041 * @version $Revision: 1.1.1.1 $ 042 */ 043 public final class DataManagerImpl implements DataManager { 044 045 public static final int ITEM_HEAD_SIZE = 5; // type + length 046 public static final byte DATA_ITEM_TYPE = 1; 047 public static final byte REDO_ITEM_TYPE = 2; 048 public static final long MAX_FILE_LENGTH = 1024 * 1024 * 32; 049 050 private static final Log LOG = LogFactory.getLog(DataManagerImpl.class); 051 private static final String NAME_PREFIX = "data-"; 052 053 private final File directory; 054 private final String name; 055 private SyncDataFileReader reader; 056 private SyncDataFileWriter writer; 057 private DataFile currentWriteFile; 058 private long maxFileLength = MAX_FILE_LENGTH; 059 private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 060 private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; 061 private String dataFilePrefix; 062 private final AtomicLong storeSize; 063 064 public DataManagerImpl(File dir, final String name,AtomicLong storeSize) { 065 this.directory = dir; 066 this.name = name; 067 this.storeSize=storeSize; 068 069 dataFilePrefix = IOHelper.toFileSystemSafeName(NAME_PREFIX + name + "-"); 070 // build up list of current dataFiles 071 File[] files = dir.listFiles(new FilenameFilter() { 072 public boolean accept(File dir, String n) { 073 return dir.equals(directory) && n.startsWith(dataFilePrefix); 074 } 075 }); 076 if (files != null) { 077 for (int i = 0; i < files.length; i++) { 078 File file = files[i]; 079 String n = file.getName(); 080 String numStr = n.substring(dataFilePrefix.length(), n.length()); 081 int num = Integer.parseInt(numStr); 082 DataFile dataFile = new DataFile(file, num); 083 storeSize.addAndGet(dataFile.getLength()); 084 fileMap.put(dataFile.getNumber(), dataFile); 085 if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) { 086 currentWriteFile = dataFile; 087 } 088 } 089 } 090 } 091 092 private DataFile createAndAddDataFile(int num) { 093 String fileName = dataFilePrefix + num; 094 File file = new File(directory, fileName); 095 DataFile result = new DataFile(file, num); 096 fileMap.put(result.getNumber(), result); 097 return result; 098 } 099 100 /* 101 * (non-Javadoc) 102 * 103 * @see org.apache.activemq.kaha.impl.data.IDataManager#getName() 104 */ 105 public String getName() { 106 return name; 107 } 108 109 synchronized DataFile findSpaceForData(DataItem item) throws IOException { 110 if (currentWriteFile == null || ((currentWriteFile.getLength() + item.getSize()) > maxFileLength)) { 111 int nextNum = currentWriteFile != null ? currentWriteFile.getNumber().intValue() + 1 : 1; 112 if (currentWriteFile != null && currentWriteFile.isUnused()) { 113 removeDataFile(currentWriteFile); 114 } 115 currentWriteFile = createAndAddDataFile(nextNum); 116 } 117 item.setOffset(currentWriteFile.getLength()); 118 item.setFile(currentWriteFile.getNumber().intValue()); 119 int len = item.getSize() + ITEM_HEAD_SIZE; 120 currentWriteFile.incrementLength(len); 121 storeSize.addAndGet(len); 122 return currentWriteFile; 123 } 124 125 DataFile getDataFile(StoreLocation item) throws IOException { 126 Integer key = Integer.valueOf(item.getFile()); 127 DataFile dataFile = fileMap.get(key); 128 if (dataFile == null) { 129 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 130 throw new IOException("Could not locate data file " + NAME_PREFIX + name + "-" + item.getFile()); 131 } 132 return dataFile; 133 } 134 135 /* 136 * (non-Javadoc) 137 * 138 * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, 139 * org.apache.activemq.kaha.StoreLocation) 140 */ 141 public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException { 142 return getReader().readItem(marshaller, item); 143 } 144 145 /* 146 * (non-Javadoc) 147 * 148 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, 149 * java.lang.Object) 150 */ 151 public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { 152 return getWriter().storeItem(marshaller, payload, DATA_ITEM_TYPE); 153 } 154 155 /* 156 * (non-Javadoc) 157 * 158 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object) 159 */ 160 public synchronized StoreLocation storeRedoItem(Object payload) throws IOException { 161 return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE); 162 } 163 164 /* 165 * (non-Javadoc) 166 * 167 * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, 168 * org.apache.activemq.kaha.Marshaller, java.lang.Object) 169 */ 170 public synchronized void updateItem(StoreLocation location, Marshaller marshaller, Object payload) 171 throws IOException { 172 getWriter().updateItem((DataItem)location, marshaller, payload, DATA_ITEM_TYPE); 173 } 174 175 /* 176 * (non-Javadoc) 177 * 178 * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener) 179 */ 180 public synchronized void recoverRedoItems(RedoListener listener) throws IOException { 181 182 // Nothing to recover if there is no current file. 183 if (currentWriteFile == null) { 184 return; 185 } 186 187 DataItem item = new DataItem(); 188 item.setFile(currentWriteFile.getNumber().intValue()); 189 item.setOffset(0); 190 while (true) { 191 byte type; 192 try { 193 type = getReader().readDataItemSize(item); 194 } catch (IOException ignore) { 195 LOG.trace("End of data file reached at (header was invalid): " + item); 196 return; 197 } 198 if (type == REDO_ITEM_TYPE) { 199 // Un-marshal the redo item 200 Object object; 201 try { 202 object = readItem(redoMarshaller, item); 203 } catch (IOException e1) { 204 LOG.trace("End of data file reached at (payload was invalid): " + item); 205 return; 206 } 207 try { 208 209 listener.onRedoItem(item, object); 210 // in case the listener is holding on to item references, 211 // copy it 212 // so we don't change it behind the listener's back. 213 item = item.copy(); 214 215 } catch (Exception e) { 216 throw IOExceptionSupport.create("Recovery handler failed: " + e, e); 217 } 218 } 219 // Move to the next item. 220 item.setOffset(item.getOffset() + ITEM_HEAD_SIZE + item.getSize()); 221 } 222 } 223 224 /* 225 * (non-Javadoc) 226 * 227 * @see org.apache.activemq.kaha.impl.data.IDataManager#close() 228 */ 229 public synchronized void close() throws IOException { 230 getWriter().close(); 231 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 232 DataFile dataFile = i.next(); 233 getWriter().force(dataFile); 234 dataFile.close(); 235 } 236 fileMap.clear(); 237 } 238 239 /* 240 * (non-Javadoc) 241 * 242 * @see org.apache.activemq.kaha.impl.data.IDataManager#force() 243 */ 244 public synchronized void force() throws IOException { 245 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 246 DataFile dataFile = i.next(); 247 getWriter().force(dataFile); 248 } 249 } 250 251 /* 252 * (non-Javadoc) 253 * 254 * @see org.apache.activemq.kaha.impl.data.IDataManager#delete() 255 */ 256 public synchronized boolean delete() throws IOException { 257 boolean result = true; 258 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 259 DataFile dataFile = i.next(); 260 storeSize.addAndGet(-dataFile.getLength()); 261 result &= dataFile.delete(); 262 } 263 fileMap.clear(); 264 return result; 265 } 266 267 /* 268 * (non-Javadoc) 269 * 270 * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int) 271 */ 272 public synchronized void addInterestInFile(int file) throws IOException { 273 if (file >= 0) { 274 Integer key = Integer.valueOf(file); 275 DataFile dataFile = fileMap.get(key); 276 if (dataFile == null) { 277 dataFile = createAndAddDataFile(file); 278 } 279 addInterestInFile(dataFile); 280 } 281 } 282 283 synchronized void addInterestInFile(DataFile dataFile) { 284 if (dataFile != null) { 285 dataFile.increment(); 286 } 287 } 288 289 /* 290 * (non-Javadoc) 291 * 292 * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int) 293 */ 294 public synchronized void removeInterestInFile(int file) throws IOException { 295 if (file >= 0) { 296 Integer key = Integer.valueOf(file); 297 DataFile dataFile = fileMap.get(key); 298 removeInterestInFile(dataFile); 299 } 300 } 301 302 synchronized void removeInterestInFile(DataFile dataFile) throws IOException { 303 if (dataFile != null) { 304 305 if (dataFile.decrement() <= 0) { 306 if (dataFile != currentWriteFile) { 307 removeDataFile(dataFile); 308 } 309 } 310 } 311 } 312 313 /* 314 * (non-Javadoc) 315 * 316 * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles() 317 */ 318 public synchronized void consolidateDataFiles() throws IOException { 319 List<DataFile> purgeList = new ArrayList<DataFile>(); 320 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 321 DataFile dataFile = i.next(); 322 if (dataFile.isUnused() && dataFile != currentWriteFile) { 323 purgeList.add(dataFile); 324 } 325 } 326 for (int i = 0; i < purgeList.size(); i++) { 327 DataFile dataFile = purgeList.get(i); 328 removeDataFile(dataFile); 329 } 330 } 331 332 private void removeDataFile(DataFile dataFile) throws IOException { 333 fileMap.remove(dataFile.getNumber()); 334 if (writer != null) { 335 writer.force(dataFile); 336 } 337 storeSize.addAndGet(-dataFile.getLength()); 338 boolean result = dataFile.delete(); 339 LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); 340 } 341 342 /* 343 * (non-Javadoc) 344 * 345 * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller() 346 */ 347 public Marshaller getRedoMarshaller() { 348 return redoMarshaller; 349 } 350 351 /* 352 * (non-Javadoc) 353 * 354 * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller) 355 */ 356 public void setRedoMarshaller(Marshaller redoMarshaller) { 357 this.redoMarshaller = redoMarshaller; 358 } 359 360 /** 361 * @return the maxFileLength 362 */ 363 public long getMaxFileLength() { 364 return maxFileLength; 365 } 366 367 /** 368 * @param maxFileLength the maxFileLength to set 369 */ 370 public void setMaxFileLength(long maxFileLength) { 371 this.maxFileLength = maxFileLength; 372 } 373 374 public String toString() { 375 return "DataManager:(" + NAME_PREFIX + name + ")"; 376 } 377 378 public synchronized SyncDataFileReader getReader() { 379 if (reader == null) { 380 reader = createReader(); 381 } 382 return reader; 383 } 384 385 protected synchronized SyncDataFileReader createReader() { 386 return new SyncDataFileReader(this); 387 } 388 389 public synchronized void setReader(SyncDataFileReader reader) { 390 this.reader = reader; 391 } 392 393 public synchronized SyncDataFileWriter getWriter() { 394 if (writer == null) { 395 writer = createWriter(); 396 } 397 return writer; 398 } 399 400 private SyncDataFileWriter createWriter() { 401 return new SyncDataFileWriter(this); 402 } 403 404 public synchronized void setWriter(SyncDataFileWriter writer) { 405 this.writer = writer; 406 } 407 408 }