001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.transport.http; 019 020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 021 import org.apache.commons.logging.Log; 022 import org.apache.commons.logging.LogFactory; 023 import org.activemq.io.TextWireFormat; 024 import org.activemq.io.WireFormat; 025 import org.activemq.transport.TransportChannelSupport; 026 027 import javax.jms.JMSException; 028 029 /** 030 * @version $Revision$ 031 */ 032 public abstract class HttpTransportChannelSupport extends TransportChannelSupport implements Runnable { 033 private static final Log log = LogFactory.getLog(HttpTransportChannelSupport.class); 034 035 private TextWireFormat wireFormat; 036 private String remoteUrl; 037 private Thread thread; // should use pool 038 private SynchronizedBoolean closed = new SynchronizedBoolean(false); 039 private SynchronizedBoolean started = new SynchronizedBoolean(false); 040 041 public HttpTransportChannelSupport(TextWireFormat wireFormat, String remoteUrl) { 042 this.wireFormat = wireFormat; 043 this.remoteUrl = remoteUrl; 044 } 045 046 public boolean isMulticast() { 047 return false; 048 } 049 050 public void start() throws JMSException { 051 if (started.commit(false, true)) { 052 if (getClientID() != null) { 053 startThread(); 054 } 055 } 056 } 057 058 protected void startThread() { 059 thread = new Thread(this, toString()); 060 thread.start(); 061 } 062 063 public void stop() { 064 if (closed.commit(false, true)) { 065 super.stop(); 066 } 067 } 068 069 public synchronized void setClientID(String clientID) { 070 super.setClientID(clientID); 071 if (clientID != null && thread == null && started.get()) { 072 startThread(); 073 } 074 } 075 076 public String toString() { 077 return "HTTP Reader " + getRemoteUrl(); 078 } 079 080 /** 081 * Can this wireformat process packets of this version 082 * @param version the version number to test 083 * @return true if can accept the version 084 */ 085 public boolean canProcessWireFormatVersion(int version){ 086 return wireFormat.canProcessWireFormatVersion(version); 087 } 088 089 /** 090 * @return the current version of this wire format 091 */ 092 public int getCurrentWireFormatVersion(){ 093 return wireFormat.getCurrentWireFormatVersion(); 094 } 095 096 // Properties 097 //------------------------------------------------------------------------- 098 public String getRemoteUrl() { 099 return remoteUrl; 100 } 101 102 public WireFormat getWireFormat() { 103 return wireFormat; 104 } 105 106 public TextWireFormat getTextWireFormat(){ 107 return wireFormat; 108 } 109 110 public void setWireFormat(TextWireFormat wireFormat) { 111 this.wireFormat = wireFormat; 112 } 113 114 public SynchronizedBoolean getClosed() { 115 return closed; 116 } 117 118 public SynchronizedBoolean getStarted() { 119 return started; 120 } 121 }