001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.command;
019    
020    import java.io.DataInputStream;
021    import java.io.DataOutputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.ObjectOutputStream;
025    import java.io.OutputStream;
026    import java.io.Serializable;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.JMSException;
031    import javax.jms.ObjectMessage;
032    
033    import org.apache.activemq.ActiveMQConnection;
034    import org.apache.activemq.util.ByteArrayInputStream;
035    import org.apache.activemq.util.ByteArrayOutputStream;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
038    import org.apache.activemq.util.JMSExceptionSupport;
039    
040    /**
041     * An <CODE>ObjectMessage</CODE> object is used to send a message that
042     * contains a serializable object in the Java programming language ("Java
043     * object"). It inherits from the <CODE>Message</CODE> interface and adds a
044     * body containing a single reference to an object. Only
045     * <CODE>Serializable</CODE> Java objects can be used. <p/>
046     * <P>
047     * If a collection of Java objects must be sent, one of the
048     * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
049     * <P>
050     * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
051     * mode. If a client attempts to write to the message at this point, a
052     * <CODE>MessageNotWriteableException</CODE> is thrown. If
053     * <CODE>clearBody</CODE> is called, the message can now be both read from and
054     * written to.
055     * 
056     * @openwire:marshaller code="26"
057     * @see javax.jms.Session#createObjectMessage()
058     * @see javax.jms.Session#createObjectMessage(Serializable)
059     * @see javax.jms.BytesMessage
060     * @see javax.jms.MapMessage
061     * @see javax.jms.Message
062     * @see javax.jms.StreamMessage
063     * @see javax.jms.TextMessage
064     */
065    public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
066        
067        // TODO: verify classloader
068        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
069        static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 
070    
071        protected transient Serializable object;
072    
073        public Message copy() {
074            ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
075            copy(copy);
076            return copy;
077        }
078    
079        private void copy(ActiveMQObjectMessage copy) {
080            storeContent();
081            super.copy(copy);
082            copy.object = null;
083        }
084    
085        public void storeContent() {
086            ByteSequence bodyAsBytes = getContent();
087            if (bodyAsBytes == null && object != null) {
088                try {
089                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
090                    OutputStream os = bytesOut;
091                    ActiveMQConnection connection = getConnection();
092                    if (connection != null && connection.isUseCompression()) {
093                        compressed = true;
094                        os = new DeflaterOutputStream(os);
095                    }
096                    DataOutputStream dataOut = new DataOutputStream(os);
097                    ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
098                    objOut.writeObject(object);
099                    objOut.flush();
100                    objOut.reset();
101                    objOut.close();
102                    setContent(bytesOut.toByteSequence());
103                } catch (IOException ioe) {
104                    throw new RuntimeException(ioe.getMessage(), ioe);
105                }
106            }
107        }
108    
109        public byte getDataStructureType() {
110            return DATA_STRUCTURE_TYPE;
111        }
112    
113        public String getJMSXMimeType() {
114            return "jms/object-message";
115        }
116    
117        /**
118         * Clears out the message body. Clearing a message's body does not clear its
119         * header values or property entries. <p/>
120         * <P>
121         * If this message body was read-only, calling this method leaves the
122         * message body in the same state as an empty body in a newly created
123         * message.
124         * 
125         * @throws JMSException if the JMS provider fails to clear the message body
126         *                 due to some internal error.
127         */
128    
129        public void clearBody() throws JMSException {
130            super.clearBody();
131            this.object = null;
132        }
133    
134        /**
135         * Sets the serializable object containing this message's data. It is
136         * important to note that an <CODE>ObjectMessage</CODE> contains a
137         * snapshot of the object at the time <CODE>setObject()</CODE> is called;
138         * subsequent modifications of the object will have no effect on the
139         * <CODE>ObjectMessage</CODE> body.
140         * 
141         * @param newObject the message's data
142         * @throws JMSException if the JMS provider fails to set the object due to
143         *                 some internal error.
144         * @throws javax.jms.MessageFormatException if object serialization fails.
145         * @throws javax.jms.MessageNotWriteableException if the message is in
146         *                 read-only mode.
147         */
148    
149        public void setObject(Serializable newObject) throws JMSException {
150            checkReadOnlyBody();
151            this.object = newObject;
152            setContent(null);
153            ActiveMQConnection connection = getConnection();
154            if (connection == null || !connection.isObjectMessageSerializationDefered()) {
155                storeContent();
156            }
157        }
158    
159        /**
160         * Gets the serializable object containing this message's data. The default
161         * value is null.
162         * 
163         * @return the serializable object containing this message's data
164         * @throws JMSException
165         */
166        public Serializable getObject() throws JMSException {
167            if (object == null && getContent() != null) {
168                try {
169                    ByteSequence content = getContent();
170                    InputStream is = new ByteArrayInputStream(content);
171                    if (isCompressed()) {
172                        is = new InflaterInputStream(is);
173                    }
174                    DataInputStream dataIn = new DataInputStream(is);
175                    ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
176                    try {
177                        object = (Serializable)objIn.readObject();
178                    } catch (ClassNotFoundException ce) {
179                        throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
180                    } finally {
181                        dataIn.close();
182                    }
183                } catch (IOException e) {
184                    throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
185                }
186            }
187            return this.object;
188        }
189    
190        public void onMessageRolledBack() {
191            super.onMessageRolledBack();
192    
193            // lets force the object to be deserialized again - as we could have
194            // changed the object
195            object = null;
196        }
197    
198        public String toString() {
199            try {
200                getObject();
201            } catch (JMSException e) {
202            }
203            return super.toString();
204        }
205    }