001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.plugin; 028 029 import java.util.NoSuchElementException; 030 import java.util.SortedMap; 031 import java.util.TreeMap; 032 033 import org.opends.server.replication.common.ChangeNumber; 034 import org.opends.server.replication.common.ChangeNumberGenerator; 035 import org.opends.server.replication.common.ServerState; 036 import org.opends.server.replication.protocol.UpdateMessage; 037 import org.opends.server.types.operation.PluginOperation; 038 039 /** 040 * This class is use to store the list of local operations currently 041 * in progress and not yet committed in the database. 042 * 043 * It is used to make sure that operations are sent to the Replication 044 * Server in the order defined by their ChangeNumber. 045 * It is also used to update the ServerState at the appropriate time. 046 * 047 * On object of this class is instanciated for each ReplicationDomain. 048 */ 049 public class PendingChanges 050 { 051 /** 052 * A map used to store the pending changes. 053 */ 054 private SortedMap<ChangeNumber, PendingChange> pendingChanges = 055 new TreeMap<ChangeNumber, PendingChange>(); 056 057 /** 058 * The ChangeNumberGenerator to use to create new unique ChangeNumbers 059 * for each operation done on the replication domain. 060 */ 061 private ChangeNumberGenerator changeNumberGenerator; 062 063 /** 064 * The Replicationbroker that will be used to send UpdateMessage. 065 */ 066 private ReplicationBroker broker; 067 068 /** 069 * The ServerState that will be updated when UpdateMessage are committed. 070 */ 071 private ServerState state; 072 073 /** 074 * Creates a new PendingChanges using the provided ChangeNumberGenerator. 075 * 076 * @param changeNumberGenerator The ChangeNumberGenerator to use to create 077 * new unique ChangeNumbers. 078 * @param broker The Replicationbroker that will be used to send 079 * UpdateMessage. 080 * @param state The ServerState that will be updated when UpdateMessage 081 * are committed. 082 */ 083 public PendingChanges( 084 ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker, 085 ServerState state) 086 { 087 this.changeNumberGenerator = changeNumberGenerator; 088 this.broker = broker; 089 this.state = state; 090 } 091 092 /** 093 * Remove and return an update form the pending changes list. 094 * 095 * @param changeNumber The ChangeNumber of the update to remove. 096 * 097 * @return The UpdateMessage that was just removed. 098 */ 099 public synchronized UpdateMessage remove(ChangeNumber changeNumber) 100 { 101 return pendingChanges.remove(changeNumber).getMsg(); 102 } 103 104 /** 105 * Returns the number of update currently in the list. 106 * 107 * @return The number of update currently in the list. 108 */ 109 public int size() 110 { 111 return pendingChanges.size(); 112 } 113 114 /** 115 * Mark an update message as committed. 116 * 117 * @param changeNumber The ChangeNumber of the update message that must be 118 * set as committed. 119 * @param msg The message associated to the update. 120 */ 121 public synchronized void commit(ChangeNumber changeNumber, 122 UpdateMessage msg) 123 { 124 PendingChange curChange = pendingChanges.get(changeNumber); 125 if (curChange == null) 126 { 127 throw new NoSuchElementException(); 128 } 129 curChange.setCommitted(true); 130 131 curChange.setMsg(msg); 132 } 133 134 /** 135 * Mark an update message as committed. 136 * 137 * @param changeNumber The ChangeNumber of the update message that must be 138 * set as committed. 139 */ 140 public synchronized void commit(ChangeNumber changeNumber) 141 { 142 PendingChange curChange = pendingChanges.get(changeNumber); 143 if (curChange == null) 144 { 145 throw new NoSuchElementException(); 146 } 147 curChange.setCommitted(true); 148 } 149 150 /** 151 * Add a new UpdateMessage to the pending list from the provided local 152 * operation. 153 * 154 * @param operation The local operation for which an UpdateMessage mus 155 * be added in the pending list. 156 * @return The ChangeNumber now associated to the operation. 157 */ 158 public synchronized ChangeNumber putLocalOperation(PluginOperation operation) 159 { 160 ChangeNumber changeNumber; 161 162 changeNumber = changeNumberGenerator.newChangeNumber(); 163 PendingChange change = new PendingChange(changeNumber, operation, null); 164 pendingChanges.put(changeNumber, change); 165 return changeNumber; 166 167 } 168 169 /** 170 * Push all committed local changes to the replicationServer service. 171 * 172 * @return The number of pushed updates. 173 */ 174 public synchronized int pushCommittedChanges() 175 { 176 int numSentUpdates = 0; 177 if (pendingChanges.isEmpty()) 178 return numSentUpdates; 179 180 ChangeNumber firstChangeNumber = pendingChanges.firstKey(); 181 PendingChange firstChange = pendingChanges.get(firstChangeNumber); 182 183 while ((firstChange != null) && firstChange.isCommitted()) 184 { 185 if ((firstChange.getOp() != null ) && 186 (firstChange.getOp().isSynchronizationOperation() == false)) 187 { 188 numSentUpdates++; 189 broker.publish(firstChange.getMsg()); 190 } 191 state.update(firstChangeNumber); 192 pendingChanges.remove(firstChangeNumber); 193 194 if (pendingChanges.isEmpty()) 195 { 196 firstChange = null; 197 } 198 else 199 { 200 firstChangeNumber = pendingChanges.firstKey(); 201 firstChange = pendingChanges.get(firstChangeNumber); 202 } 203 } 204 return numSentUpdates; 205 } 206 }