001    /*
002     * CDDL HEADER START
003     *
004     * The contents of this file are subject to the terms of the
005     * Common Development and Distribution License, Version 1.0 only
006     * (the "License").  You may not use this file except in compliance
007     * with the License.
008     *
009     * You can obtain a copy of the license at
010     * trunk/opends/resource/legal-notices/OpenDS.LICENSE
011     * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
012     * See the License for the specific language governing permissions
013     * and limitations under the License.
014     *
015     * When distributing Covered Code, include this CDDL HEADER in each
016     * file and include the License file at
017     * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
018     * add the following below this CDDL HEADER, with the fields enclosed
019     * by brackets "[]" replaced with your own identifying information:
020     *      Portions Copyright [yyyy] [name of copyright owner]
021     *
022     * CDDL HEADER END
023     *
024     *
025     *      Copyright 2006-2008 Sun Microsystems, Inc.
026     */
027    package org.opends.server.replication.plugin;
028    import org.opends.messages.Message;
029    
030    import static org.opends.server.loggers.ErrorLogger.logError;
031    import static org.opends.messages.ReplicationMessages.*;
032    
033    import java.util.ArrayList;
034    import java.util.LinkedHashSet;
035    import java.util.LinkedList;
036    import java.util.List;
037    import java.util.Iterator;
038    
039    import org.opends.server.core.DirectoryServer;
040    import org.opends.server.core.ModifyOperationBasis;
041    import org.opends.server.protocols.asn1.ASN1OctetString;
042    import org.opends.server.protocols.internal.InternalClientConnection;
043    import org.opends.server.protocols.internal.InternalSearchOperation;
044    import org.opends.server.protocols.ldap.LDAPAttribute;
045    import org.opends.server.protocols.ldap.LDAPFilter;
046    import org.opends.server.protocols.ldap.LDAPModification;
047    import org.opends.server.replication.common.ChangeNumber;
048    import org.opends.server.replication.common.ServerState;
049    import org.opends.server.types.Attribute;
050    import org.opends.server.types.AttributeType;
051    import org.opends.server.types.AttributeValue;
052    import org.opends.server.types.Control;
053    import org.opends.server.types.DN;
054    import org.opends.server.types.DereferencePolicy;
055    import org.opends.server.types.DirectoryException;
056    import org.opends.server.types.LDAPException;
057    import org.opends.server.types.ModificationType;
058    import org.opends.server.types.RawModification;
059    import org.opends.server.types.ResultCode;
060    import org.opends.server.types.SearchFilter;
061    import org.opends.server.types.SearchResultEntry;
062    import org.opends.server.types.SearchScope;
063    
064    /**
065     * This class implements a ServerState that is stored on the backends
066     * used to store the synchronized data and that is therefore persistent
067     * accross server reboot.
068     */
069    public class PersistentServerState extends ServerState
070    {
071       private DN baseDn;
072       private boolean savedStatus = true;
073       private InternalClientConnection conn =
074           InternalClientConnection.getRootConnection();
075       private ASN1OctetString asn1BaseDn;
076       private short serverId;
077    
078       /**
079        * The attribute name used to store the state in the backend.
080        */
081       protected static final String REPLICATION_STATE = "ds-sync-state";
082    
083      /**
084       * create a new ServerState.
085       * @param baseDn The baseDN for which the ServerState is created
086       *  @param serverId The serverId
087       */
088      public PersistentServerState(DN baseDn, short serverId)
089      {
090        this.baseDn = baseDn;
091        this.serverId = serverId;
092        asn1BaseDn = new ASN1OctetString(baseDn.toString());
093        loadState();
094      }
095    
096      /**
097       * {@inheritDoc}
098       */
099      @Override
100      public boolean update(ChangeNumber changeNumber)
101      {
102        savedStatus = false;
103        return super.update(changeNumber);
104      }
105    
106      /**
107       * Save this object to persistent storage.
108       */
109      public void save()
110      {
111        if (savedStatus)
112          return;
113    
114        savedStatus = true;
115        ResultCode resultCode = updateStateEntry();
116        if (resultCode != ResultCode.SUCCESS)
117        {
118            savedStatus = false;
119        }
120      }
121    
122      /**
123       * Load the ServerState from the backing entry in database to memory.
124       */
125      public void loadState()
126      {
127        SearchResultEntry stateEntry = null;
128    
129        // try to load the state from the base entry.
130        stateEntry = searchBaseEntry();
131    
132        if (stateEntry == null)
133        {
134          // The base entry does not exist yet
135          // in the database or was deleted. Try to read the ServerState
136          // from the configuration instead.
137          stateEntry = searchConfigEntry();
138        }
139    
140        if (stateEntry != null)
141        {
142          updateStateFromEntry(stateEntry);
143        }
144    
145        /*
146         * In order to make sure that the replication never looses changes,
147         * the server needs to search all the entries that have been
148         * updated after the last write of the ServerState.
149         * Inconsistencies may append after a crash.
150         */
151        checkAndUpdateServerState();
152      }
153    
154      /**
155       * Run a search operation to find the base entry
156       * of the replication domain for which this ServerState was created.
157       *
158       * @return Thebasen Entry or null if no entry was found;
159       */
160      private SearchResultEntry searchBaseEntry()
161      {
162        LDAPFilter filter;
163    
164        try
165        {
166          filter = LDAPFilter.decode("objectclass=*");
167        } catch (LDAPException e)
168        {
169          // can not happen
170          return null;
171        }
172    
173        /*
174         * Search the database entry that is used to periodically
175         * save the ServerState
176         */
177        LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
178        attributes.add(REPLICATION_STATE);
179        InternalSearchOperation search = conn.processSearch(asn1BaseDn,
180            SearchScope.BASE_OBJECT,
181            DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
182            filter,attributes);
183        if (((search.getResultCode() != ResultCode.SUCCESS)) &&
184            ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
185        {
186          Message message = ERR_ERROR_SEARCHING_RUV.
187              get(search.getResultCode().getResultCodeName(), search.toString(),
188                  search.getErrorMessage(), baseDn.toString());
189          logError(message);
190          return null;
191        }
192    
193        SearchResultEntry stateEntry = null;
194        if (search.getResultCode() == ResultCode.SUCCESS)
195        {
196          /*
197           * Read the serverState from the REPLICATION_STATE attribute
198           */
199          LinkedList<SearchResultEntry> result = search.getSearchEntries();
200          if (!result.isEmpty())
201          {
202            stateEntry = result.getFirst();
203          }
204        }
205        return stateEntry;
206      }
207    
208      /**
209       * Run a search operation to find the entry with the configuration
210       * of the replication domain for which this ServerState was created.
211       *
212       * @return The configuration Entry or null if no entry was found;
213       */
214      private SearchResultEntry searchConfigEntry()
215      {
216        try
217        {
218          SearchFilter filter =
219            SearchFilter.createFilterFromString(
220                "(&(objectclass=ds-cfg-replication-domain)"
221                +"(ds-cfg-base-dn="+baseDn+"))");
222    
223          LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
224          attributes.add(REPLICATION_STATE);
225          InternalSearchOperation op =
226              conn.processSearch(DN.decode("cn=config"),
227              SearchScope.SUBORDINATE_SUBTREE,
228              DereferencePolicy.NEVER_DEREF_ALIASES,
229              1, 0, false, filter, attributes);
230    
231          if (op.getResultCode() == ResultCode.SUCCESS)
232          {
233            /*
234             * Read the serverState from the REPLICATION_STATE attribute
235             */
236            LinkedList<SearchResultEntry> resultEntries =
237              op.getSearchEntries();
238            if (!resultEntries.isEmpty())
239            {
240              SearchResultEntry resultEntry = resultEntries.getFirst();
241              return resultEntry;
242            }
243          }
244          return null;
245        } catch (DirectoryException e)
246        {
247          // can not happen
248          return null;
249        }
250      }
251    
252      /**
253       * Update this ServerState from the provided entry.
254       *
255       * @param resultEntry The entry that should be used to update this
256       *                    ServerState.
257       */
258      private void updateStateFromEntry(SearchResultEntry resultEntry)
259      {
260        AttributeType synchronizationStateType =
261          DirectoryServer.getAttributeType(REPLICATION_STATE);
262        List<Attribute> attrs =
263          resultEntry.getAttribute(synchronizationStateType);
264        if (attrs != null)
265        {
266          Attribute attr = attrs.get(0);
267          LinkedHashSet<AttributeValue> values = attr.getValues();
268          for (AttributeValue value : values)
269          {
270            ChangeNumber changeNumber =
271              new ChangeNumber(value.getStringValue());
272            update(changeNumber);
273          }
274        }
275      }
276    
277      /**
278       * Save the current values of this PersistentState object
279       * in the appropiate entry of the database.
280       *
281       * @return a ResultCode indicating if the method was successfull.
282       */
283      private ResultCode updateStateEntry()
284      {
285        /*
286         * Generate a modify operation on the Server State baseD Entry.
287         */
288        ResultCode result = runUpdateStateEntry(baseDn);
289    
290        if (result == ResultCode.NO_SUCH_OBJECT)
291        {
292          // The base entry does not exist yet in the database or
293          // has been deleted, save the state to the config entry instead.
294          SearchResultEntry configEntry = searchConfigEntry();
295          if (configEntry != null)
296          {
297            DN configDN = configEntry.getDN();
298            result = runUpdateStateEntry(configDN);
299          }
300        }
301        return result;
302      }
303    
304      /**
305       * Run a modify operation to update the entry whose DN is given as
306       * a parameter with the serverState information.
307       *
308       * @param serverStateEntryDN The DN of the entry to be updated.
309       *
310       * @return A ResultCode indicating if the operation was successful.
311       */
312      private ResultCode runUpdateStateEntry(DN serverStateEntryDN)
313      {
314        ArrayList<ASN1OctetString> values = this.toASN1ArrayList();
315    
316        LDAPAttribute attr =
317          new LDAPAttribute(REPLICATION_STATE, values);
318        LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
319        ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
320        mods.add(mod);
321    
322        ModifyOperationBasis op =
323          new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
324              InternalClientConnection.nextMessageID(),
325              new ArrayList<Control>(0),
326              new ASN1OctetString(serverStateEntryDN.toString()),
327              mods);
328        op.setInternalOperation(true);
329        op.setSynchronizationOperation(true);
330        op.setDontSynchronize(true);
331        op.run();
332        if (op.getResultCode() != ResultCode.SUCCESS)
333        {
334          Message message = DEBUG_ERROR_UPDATING_RUV.get(
335                  op.getResultCode().getResultCodeName().toString(),
336                  op.toString(),
337                  op.getErrorMessage().toString(),
338                  baseDn.toString());
339          logError(message);
340        }
341        return op.getResultCode();
342      }
343    
344      /**
345       * Empty the ServerState.
346       * After this call the Server State will be in the same state
347       * as if it was just created.
348       */
349      public void clearInMemory()
350      {
351        super.clear();
352        this.savedStatus = false;
353      }
354    
355      /**
356       * Empty the ServerState.
357       * After this call the Server State will be in the same state
358       * as if it was just created.
359       */
360      public void clear()
361      {
362        clearInMemory();
363        save();
364      }
365    
366      /**
367       * The ServerState is saved to the database periodically,
368       * therefore in case of crash it is possible that is does not contain
369       * the latest changes that have been processed and saved to the
370       * database.
371       * In order to make sure that we don't loose them, search all the entries
372       * that have been updated after this entry.
373       * This is done by using the HistoricalCsnOrderingMatchingRule
374       * and an ordering index for historical attribute
375       */
376      public final void checkAndUpdateServerState() {
377        Message message;
378        InternalSearchOperation op;
379        ChangeNumber serverStateMaxCn;
380        ChangeNumber dbMaxCn;
381        final AttributeType histType =
382          DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
383    
384        // Retrieves the entries that have changed since the
385        // maxCn stored in the serverState
386        synchronized (this)
387        {
388          serverStateMaxCn = this.getMaxChangeNumber(serverId);
389    
390          if (serverStateMaxCn == null)
391            return;
392    
393          try {
394            op = ReplicationBroker.searchForChangedEntries(baseDn,
395                serverStateMaxCn, null);
396          }
397          catch (Exception  e)
398          {
399            return;
400          }
401          if (op.getResultCode() != ResultCode.SUCCESS)
402          {
403            // An error happened trying to search for the updates
404            // Log an error
405            message = ERR_CANNOT_RECOVER_CHANGES.get(
406                baseDn.toNormalizedString());
407            logError(message);
408          }
409          else
410          {
411            dbMaxCn = serverStateMaxCn;
412            for (SearchResultEntry resEntry : op.getSearchEntries())
413            {
414              List<Attribute> attrs = resEntry.getAttribute(histType);
415              Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
416              try
417              {
418                while (true)
419                {
420                  AttributeValue attrVal = iav.next();
421                  HistVal histVal = new HistVal(attrVal.getStringValue());
422                  ChangeNumber cn = histVal.getCn();
423    
424                  if ((cn != null) && (cn.getServerId() == serverId))
425                  {
426                    // compare the csn regarding the maxCn we know and
427                    // store the biggest
428                    if (ChangeNumber.compare(dbMaxCn, cn) < 0)
429                    {
430                      dbMaxCn = cn;
431                    }
432                  }
433                }
434              }
435              catch(Exception e)
436              {
437              }
438            }
439    
440            if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
441            {
442              // Update the serverState with the new maxCn
443              // present in the database
444              this.update(dbMaxCn);
445              message = NOTE_SERVER_STATE_RECOVERY.get(
446                  baseDn.toNormalizedString(), dbMaxCn.toString());
447              logError(message);
448            }
449          }
450        }
451      }
452    }