001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * trunk/opends/resource/legal-notices/OpenDS.LICENSE 011 * or https://OpenDS.dev.java.net/OpenDS.LICENSE. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2006-2008 Sun Microsystems, Inc. 026 */ 027 package org.opends.server.replication.protocol; 028 029 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; 030 import static org.opends.server.loggers.debug.DebugLogger.getTracer; 031 032 import java.io.IOException; 033 import java.io.InputStream; 034 import java.io.OutputStream; 035 import java.net.Socket; 036 import java.net.SocketException; 037 import java.util.zip.DataFormatException; 038 039 import org.opends.server.loggers.debug.DebugTracer; 040 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; 041 042 /** 043 * This class Implement a protocol session using a basic socket and relying on 044 * the innate encoding/decoding capabilities of the ReplicationMessage 045 * by using the getBytes() and generateMsg() methods of those classes. 046 */ 047 public class SocketSession implements ProtocolSession 048 { 049 /** 050 * The tracer object for the debug logger. 051 */ 052 private static final DebugTracer TRACER = getTracer(); 053 054 private Socket socket; 055 private InputStream input; 056 private OutputStream output; 057 byte[] rcvLengthBuf = new byte[8]; 058 059 /** 060 * The time the last message published to this session. 061 */ 062 private long lastPublishTime = 0; 063 064 065 /** 066 * The time the last message was received on this session. 067 */ 068 private long lastReceiveTime = 0; 069 070 071 /** 072 * Creates a new SocketSession based on the provided socket. 073 * 074 * @param socket The Socket on which the SocketSession will be based. 075 * @throws IOException When an IException happens on the socket. 076 */ 077 public SocketSession(Socket socket) throws IOException 078 { 079 this.socket = socket; 080 /* 081 * Use a window instead of the TCP flow control. 082 * Therefore set a very large value for send and receive buffer sizes. 083 */ 084 input = socket.getInputStream(); 085 output = socket.getOutputStream(); 086 } 087 088 /** 089 * {@inheritDoc} 090 */ 091 public void close() throws IOException 092 { 093 if (debugEnabled()) 094 { 095 TRACER.debugInfo("Closing SocketSession." 096 + stackTraceToSingleLineString(new Exception())); 097 } 098 socket.close(); 099 } 100 101 /** 102 * {@inheritDoc} 103 */ 104 public synchronized void publish(ReplicationMessage msg) 105 throws IOException 106 { 107 byte[] buffer = msg.getBytes(); 108 String str = String.format("%08x", buffer.length); 109 110 if (debugEnabled()) 111 { 112 TRACER.debugInfo("SocketSession publish <" + str + ">"); 113 } 114 115 byte[] sendLengthBuf = str.getBytes(); 116 117 output.write(sendLengthBuf); 118 output.write(buffer); 119 output.flush(); 120 121 lastPublishTime = System.currentTimeMillis(); 122 } 123 124 /** 125 * {@inheritDoc} 126 */ 127 public ReplicationMessage receive() throws IOException, 128 ClassNotFoundException, DataFormatException 129 { 130 /* Read the first 8 bytes containing the packet length */ 131 int length = 0; 132 133 /* Let's start the stop-watch before waiting on read */ 134 /* for the heartbeat check to be operationnal */ 135 lastReceiveTime = System.currentTimeMillis(); 136 137 while (length<8) 138 { 139 int read = input.read(rcvLengthBuf, length, 8-length); 140 if (read == -1) 141 { 142 lastReceiveTime=0; 143 throw new IOException("no more data"); 144 } 145 else 146 { 147 length += read; 148 } 149 } 150 151 int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); 152 153 try 154 { 155 length = 0; 156 byte[] buffer = new byte[totalLength]; 157 while (length < totalLength) 158 { 159 length += input.read(buffer, length, totalLength - length); 160 } 161 /* We do not want the heartbeat to close the session when */ 162 /* we are processing a message even a time consuming one. */ 163 lastReceiveTime=0; 164 return ReplicationMessage.generateMsg(buffer); 165 } 166 catch (OutOfMemoryError e) 167 { 168 throw new IOException("Packet too large, can't allocate " 169 + totalLength + " bytes."); 170 } 171 } 172 173 /** 174 * {@inheritDoc} 175 */ 176 public void stopEncryption() 177 { 178 // There is no security layer. 179 } 180 181 /** 182 * {@inheritDoc} 183 */ 184 public boolean isEncrypted() 185 { 186 return false; 187 } 188 189 /** 190 * {@inheritDoc} 191 */ 192 public long getLastPublishTime() 193 { 194 return lastPublishTime; 195 } 196 197 /** 198 * {@inheritDoc} 199 */ 200 public long getLastReceiveTime() 201 { 202 if (lastReceiveTime==0) 203 { 204 return System.currentTimeMillis(); 205 } 206 return lastReceiveTime; 207 } 208 209 /** 210 * {@inheritDoc} 211 */ 212 public String getRemoteAddress() 213 { 214 return socket.getInetAddress().getHostAddress(); 215 } 216 217 /** 218 * {@inheritDoc} 219 */ 220 public void setSoTimeout(int timeout) throws SocketException 221 { 222 socket.setSoTimeout(timeout); 223 } 224 }