001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    
029    import java.util.NoSuchElementException;
030    import java.util.SortedMap;
031    import java.util.TreeMap;
032    
033    import org.opends.server.replication.common.ChangeNumber;
034    import org.opends.server.replication.common.ChangeNumberGenerator;
035    import org.opends.server.replication.common.ServerState;
036    import org.opends.server.replication.protocol.UpdateMessage;
037    import org.opends.server.types.operation.PluginOperation;
038    
039    /**
040     * This class is use to store the list of local operations currently
041     * in progress and not yet committed in the database.
042     *
043     * It is used to make sure that operations are sent to the Replication
044     * Server in the order defined by their ChangeNumber.
045     * It is also used to update the ServerState at the appropriate time.
046     *
047     * On object of this class is instanciated for each ReplicationDomain.
048     */
049    public class PendingChanges
050    {
051      /**
052       * A map used to store the pending changes.
053       */
054      private SortedMap<ChangeNumber, PendingChange> pendingChanges =
055        new TreeMap<ChangeNumber, PendingChange>();
056    
057      /**
058       * The ChangeNumberGenerator to use to create new unique ChangeNumbers
059       * for each operation done on the replication domain.
060       */
061      private ChangeNumberGenerator changeNumberGenerator;
062    
063      /**
064       * The Replicationbroker that will be used to send UpdateMessage.
065       */
066      private ReplicationBroker broker;
067    
068      /**
069       * The ServerState that will be updated when UpdateMessage are committed.
070       */
071      private ServerState state;
072    
073      /**
074       * Creates a new PendingChanges using the provided ChangeNumberGenerator.
075       *
076       * @param changeNumberGenerator The ChangeNumberGenerator to use to create
077       *                               new unique ChangeNumbers.
078       * @param broker  The Replicationbroker that will be used to send
079       *                UpdateMessage.
080       * @param state   The ServerState that will be updated when UpdateMessage
081       *                are committed.
082       */
083      public PendingChanges(
084          ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
085          ServerState state)
086      {
087        this.changeNumberGenerator = changeNumberGenerator;
088        this.broker = broker;
089        this.state = state;
090      }
091    
092      /**
093       * Remove and return an update form the pending changes list.
094       *
095       * @param changeNumber The ChangeNumber of the update to remove.
096       *
097       * @return The UpdateMessage that was just removed.
098       */
099      public synchronized UpdateMessage remove(ChangeNumber changeNumber)
100      {
101        return pendingChanges.remove(changeNumber).getMsg();
102      }
103    
104      /**
105       * Returns the number of update currently in the list.
106       *
107       * @return The number of update currently in the list.
108       */
109      public int size()
110      {
111        return pendingChanges.size();
112      }
113    
114      /**
115       * Mark an update message as committed.
116       *
117       * @param changeNumber The ChangeNumber of the update message that must be
118       *                     set as committed.
119       * @param msg          The message associated to the update.
120       */
121      public synchronized void commit(ChangeNumber changeNumber,
122          UpdateMessage msg)
123      {
124        PendingChange curChange = pendingChanges.get(changeNumber);
125        if (curChange == null)
126        {
127          throw new NoSuchElementException();
128        }
129        curChange.setCommitted(true);
130    
131        curChange.setMsg(msg);
132      }
133    
134      /**
135       * Mark an update message as committed.
136       *
137       * @param changeNumber The ChangeNumber of the update message that must be
138       *                     set as committed.
139       */
140      public synchronized void commit(ChangeNumber changeNumber)
141      {
142        PendingChange curChange = pendingChanges.get(changeNumber);
143        if (curChange == null)
144        {
145          throw new NoSuchElementException();
146        }
147        curChange.setCommitted(true);
148      }
149    
150      /**
151       * Add a new UpdateMessage to the pending list from the provided local
152       * operation.
153       *
154       * @param operation The local operation for which an UpdateMessage mus
155       *                  be added in the pending list.
156       * @return The ChangeNumber now associated to the operation.
157       */
158      public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
159      {
160        ChangeNumber changeNumber;
161    
162        changeNumber = changeNumberGenerator.newChangeNumber();
163        PendingChange change = new PendingChange(changeNumber, operation, null);
164        pendingChanges.put(changeNumber, change);
165        return changeNumber;
166    
167      }
168    
169      /**
170       * Push all committed local changes to the replicationServer service.
171       *
172       * @return The number of pushed updates.
173       */
174      public synchronized int pushCommittedChanges()
175      {
176        int numSentUpdates = 0;
177        if (pendingChanges.isEmpty())
178          return numSentUpdates;
179    
180        ChangeNumber firstChangeNumber = pendingChanges.firstKey();
181        PendingChange firstChange = pendingChanges.get(firstChangeNumber);
182    
183        while ((firstChange != null) && firstChange.isCommitted())
184        {
185          if ((firstChange.getOp() != null ) &&
186              (firstChange.getOp().isSynchronizationOperation() == false))
187          {
188            numSentUpdates++;
189            broker.publish(firstChange.getMsg());
190          }
191          state.update(firstChangeNumber);
192          pendingChanges.remove(firstChangeNumber);
193    
194          if (pendingChanges.isEmpty())
195          {
196            firstChange = null;
197          }
198          else
199          {
200            firstChangeNumber = pendingChanges.firstKey();
201            firstChange = pendingChanges.get(firstChangeNumber);
202          }
203        }
204        return numSentUpdates;
205      }
206    }