001    /**
002     *
003     * Copyright 2004 Protique Ltd
004     *
005     * Licensed under the Apache License, Version 2.0 (the "License");
006     * you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     *
009     * http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     *
017     **/
018    
019    package org.activemq.io.impl;
020    import java.io.DataInput;
021    import java.io.DataInputStream;
022    import java.io.DataOutput;
023    import java.io.DataOutputStream;
024    import java.io.IOException;
025    import java.io.ObjectStreamException;
026    import java.io.Serializable;
027    import java.util.Arrays;
028    import java.util.HashMap;
029    import java.util.Map;
030    
031    import org.activemq.io.WireFormat;
032    import org.activemq.io.util.WireByteArrayInputStream;
033    import org.activemq.io.util.WireByteArrayOutputStream;
034    import org.activemq.message.CachedValue;
035    import org.activemq.message.Packet;
036    
037    /**
038     * This is a stateful AbstractDefaultWireFormat which implements value caching.  Not optimal for use by 
039     * many concurrent threads.  One DefaultWireFormat is typically allocated per client connection.  
040     *  
041     * @version $Revision: 1.1.1.1 $
042     */
043    public class DefaultWireFormat extends AbstractDefaultWireFormat implements Serializable {
044    
045        private static final long serialVersionUID = -1454851936411678612L;
046    
047        private static final int MAX_CACHE_SIZE = Short.MAX_VALUE/2; //needs to be a lot less than Short.MAX_VALUE
048        
049        static final short NULL_VALUE = -1;
050        static final short CLEAR_CACHE = -2;
051        
052        //
053        // Fields used during a write.
054        //
055        protected transient final Object writeMutex = new Object();
056        protected transient WireByteArrayOutputStream internalBytesOut;
057        protected transient DataOutputStream internalDataOut;
058        protected transient WireByteArrayOutputStream cachedBytesOut;
059        protected transient DataOutputStream cachedDataOut;
060        private Map writeValueCache = new HashMap();
061        protected transient short cachedKeyGenerator;
062        protected transient short lastWriteValueCacheEvictionPosition=500;
063        
064        //
065        // Fields used during a read.
066        //
067        protected transient final Object readMutex = new Object();
068        protected transient WireByteArrayInputStream internalBytesIn;
069        protected transient DataInputStream internalDataIn;    
070        private Object[] writeValueCacheArray = new Object[MAX_CACHE_SIZE];
071        private Object[] readValueCacheArray = new Object[MAX_CACHE_SIZE];
072    
073        
074        /**
075         * Default Constructor
076         */
077        public DefaultWireFormat() {
078            internalBytesOut = new WireByteArrayOutputStream();
079            internalDataOut = new DataOutputStream(internalBytesOut);
080            internalBytesIn = new WireByteArrayInputStream();
081            internalDataIn = new DataInputStream(internalBytesIn);
082            this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
083            this.cachedBytesOut = new WireByteArrayOutputStream();
084            this.cachedDataOut = new DataOutputStream(cachedBytesOut);
085        }    
086    
087        /**
088         * @return new WireFormat
089         */
090        public WireFormat copy() {
091            DefaultWireFormat format = new DefaultWireFormat();
092            format.setCachingEnabled(cachingEnabled);
093            format.setCurrentWireFormatVersion(getCurrentWireFormatVersion());
094            return format;
095        }
096        
097        
098        private Object readResolve() throws ObjectStreamException {
099            return new DefaultWireFormat();
100        }
101        
102        public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
103            PacketWriter writer = getWriter(packet);
104            if (writer != null) {
105                synchronized (writeMutex) {
106                    internalBytesOut.reset();
107                    writer.writePacket(packet, internalDataOut);
108                    internalDataOut.flush();
109                    //reuse the byte buffer in the ByteArrayOutputStream
110                    byte[] data = internalBytesOut.getData();
111                    int count = internalBytesOut.size();
112                    dataOut.writeByte(packet.getPacketType());
113                    dataOut.writeInt(count);
114                    //byte[] data = internalBytesOut.toByteArray();
115                    //int count = data.length;
116                    //dataOut.writeInt(count);
117                    packet.setMemoryUsage(count);
118                    dataOut.write(data, 0, count);                
119                }
120            }
121            return null;
122        }
123    
124        protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
125            synchronized (readMutex) {
126                Packet packet = reader.createPacket();
127                int length = dataIn.readInt();
128                packet.setMemoryUsage(length);
129                byte[] data = new byte[length];
130                dataIn.readFully(data);
131                //then splat into the internal datainput
132                internalBytesIn.restart(data);
133                reader.buildPacket(packet, internalDataIn);
134                return packet;
135            }
136        }
137        
138        /**
139         * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
140         * BytesOutputStream
141         * 
142         * @param packet
143         * @return a byte array representing the packet using some wire protocol
144         * @throws IOException
145         */
146        public byte[] toBytes(Packet packet) throws IOException {
147            byte[] data = null;
148            PacketWriter writer = getWriter(packet);
149    
150            if (writer != null) {
151                
152                synchronized (writeMutex) {
153                    internalBytesOut.reset();
154                    internalDataOut.writeByte(packet.getPacketType());
155                    internalDataOut.writeInt(-1);//the length
156                    writer.writePacket(packet, internalDataOut);
157                    internalDataOut.flush();
158                    data = internalBytesOut.toByteArray();
159                }
160                
161                // lets subtract the header offset from the length
162                int length = data.length - 5;
163                packet.setMemoryUsage(length);
164                //write in the length to the data
165                data[1] = (byte) ((length >>> 24) & 0xFF);
166                data[2] = (byte) ((length >>> 16) & 0xFF);
167                data[3] = (byte) ((length >>> 8) & 0xFF);
168                data[4] = (byte) ((length >>> 0) & 0xFF);
169            }
170            
171            return data;
172        }
173    
174        ///////////////////////////////////////////////////////////////
175        //
176        // Methods to handle cached values
177        //
178        ///////////////////////////////////////////////////////////////    
179        
180        public Object getValueFromReadCache(short key) {
181            if( key < 0 || key > readValueCacheArray.length )
182                return null;
183            return readValueCacheArray[key];
184        }
185        
186        protected short getWriteCachedKey(Object key) throws IOException{
187            if (key != null){
188                Short result = null;
189                result = (Short)writeValueCache.get(key);
190                if (result == null){
191                    result = getNextCacheId();
192                    writeValueCache.put(key,result);
193                    writeValueCacheArray[result.shortValue()]=key;
194                    updateCachedValue(result.shortValue(),key);                
195                }
196                return result.shortValue();
197            }
198            return DefaultWireFormat.NULL_VALUE;
199        }
200    
201        /**
202         * @return
203         */
204        private Short getNextCacheId() {
205            Short result;
206            result = new Short(cachedKeyGenerator++);
207            // once we fill the cache start reusing old cache locations to avoid memory leaks.
208            if (cachedKeyGenerator >= MAX_CACHE_SIZE) {
209                cachedKeyGenerator=0;
210            }
211            
212            lastWriteValueCacheEvictionPosition++;
213            if (lastWriteValueCacheEvictionPosition >= MAX_CACHE_SIZE) {
214                lastWriteValueCacheEvictionPosition=0;
215            }
216            
217            if( writeValueCacheArray[lastWriteValueCacheEvictionPosition] !=null ) {
218                Object o = writeValueCacheArray[lastWriteValueCacheEvictionPosition];
219                writeValueCache.remove(o);
220                writeValueCacheArray[lastWriteValueCacheEvictionPosition]=null;            
221            }
222            return result;
223        }
224        
225        protected void validateWriteCache() throws IOException {
226            if (cachingEnabled) {
227                if (writeValueCache.size() >= MAX_CACHE_SIZE) {
228                    writeValueCache.clear();
229                    Arrays.fill(writeValueCacheArray,null);
230                    cachedKeyGenerator = 0;
231                    updateCachedValue((short) -1, null);// send update to peer to
232                                                        // clear the peer cache
233                }
234            }
235        }
236        
237        protected void handleCachedValue(CachedValue cv) {
238            if (cv != null) {
239                if (cv.getId() == CLEAR_CACHE) {
240                    Arrays.fill(readValueCacheArray, null);
241                } else if (cv.getId() != NULL_VALUE) {
242                    readValueCacheArray[cv.getId()] = cv.getValue();
243                }
244            }
245        }    
246        
247        private synchronized void updateCachedValue(short key, Object value) throws IOException {
248            if (cachedValueWriter == null) {
249                cachedValueWriter = new CachedValueWriter();
250            }
251            CachedValue cv = new CachedValue();
252            cv.setId(key);
253            cv.setValue(value);
254            cachedBytesOut.reset();
255            cachedValueWriter.writePacket(cv, cachedDataOut);
256            cachedDataOut.flush();
257            byte[] data = cachedBytesOut.getData();
258            int count = cachedBytesOut.size();
259            getTransportDataOut().writeByte(cv.getPacketType());
260            getTransportDataOut().writeInt(count);
261            getTransportDataOut().write(data, 0, count);
262        }
263    }