001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.camel.component;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.InterruptedIOException;
022    import java.util.concurrent.atomic.AtomicReference;
023    
024    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
025    import org.apache.activemq.kaha.impl.async.Location;
026    import org.apache.activemq.util.ByteSequence;
027    import org.apache.camel.CamelExchangeException;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.NoTypeConversionAvailableException;
031    import org.apache.camel.Processor;
032    import org.apache.camel.Producer;
033    import org.apache.camel.RuntimeCamelException;
034    import org.apache.camel.ExchangePattern;
035    import org.apache.camel.impl.DefaultConsumer;
036    import org.apache.camel.impl.DefaultEndpoint;
037    import org.apache.camel.impl.DefaultExchange;
038    import org.apache.camel.impl.DefaultProducer;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    public class JournalEndpoint extends DefaultEndpoint {
043    
044        private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class);
045    
046        private final File directory;
047        private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>();
048        private final Object activationMutex = new Object();
049        private int referenceCount;
050        private AsyncDataManager dataManager;
051        private Thread thread;
052        private Location lastReadLocation;
053        private long idleDelay = 1000;
054        private boolean syncProduce = true;
055        private boolean syncConsume;
056    
057        public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
058            super(uri, journalComponent.getCamelContext());
059            this.directory = directory;
060        }
061    
062        public JournalEndpoint(String endpointUri, File directory) {
063            super(endpointUri);
064            this.directory = directory;
065        }
066    
067        public boolean isSingleton() {
068            return true;
069        }
070    
071        public File getDirectory() {
072            return directory;
073        }
074    
075        public Consumer createConsumer(Processor processor) throws Exception {
076            return new DefaultConsumer(this, processor) {
077                @Override
078                public void start() throws Exception {
079                    super.start();
080                    activateConsumer(this);
081                }
082    
083                @Override
084                public void stop() throws Exception {
085                    deactivateConsumer(this);
086                    super.stop();
087                }
088            };
089        }
090    
091        protected void decrementReference() throws IOException {
092            synchronized (activationMutex) {
093                referenceCount--;
094                if (referenceCount == 0) {
095                    LOG.debug("Closing data manager: " + directory);
096                    LOG.debug("Last mark at: " + lastReadLocation);
097                    dataManager.close();
098                    dataManager = null;
099                }
100            }
101        }
102    
103        protected void incrementReference() throws IOException {
104            synchronized (activationMutex) {
105                referenceCount++;
106                if (referenceCount == 1) {
107                    LOG.debug("Opening data manager: " + directory);
108                    dataManager = new AsyncDataManager();
109                    dataManager.setDirectory(directory);
110                    dataManager.start();
111    
112                    lastReadLocation = dataManager.getMark();
113                    LOG.debug("Last mark at: " + lastReadLocation);
114                }
115            }
116        }
117    
118        protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
119            synchronized (activationMutex) {
120                if (this.consumer.get() != consumer) {
121                    throw new RuntimeCamelException("Consumer was not active.");
122                }
123                this.consumer.set(null);
124                try {
125                    thread.join();
126                } catch (InterruptedException e) {
127                    throw new InterruptedIOException();
128                }
129                decrementReference();
130            }
131        }
132    
133        protected void activateConsumer(DefaultConsumer consumer) throws IOException {
134            synchronized (activationMutex) {
135                if (this.consumer.get() != null) {
136                    throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
137                }
138                incrementReference();
139                this.consumer.set(consumer);
140                thread = new Thread() {
141                    @Override
142                    public void run() {
143                        dispatchToConsumer();
144                    }
145                };
146                thread.setName("Dipatch thread: " + getEndpointUri());
147                thread.setDaemon(true);
148                thread.start();
149            }
150        }
151    
152        protected void dispatchToConsumer() {
153            try {
154                DefaultConsumer consumer;
155                while ((consumer = this.consumer.get()) != null) {
156                    // See if there is a new record to process
157                    Location location = dataManager.getNextLocation(lastReadLocation);
158                    if (location != null) {
159    
160                        // Send it on.
161                        ByteSequence read = dataManager.read(location);
162                        Exchange exchange = createExchange();
163                        exchange.getIn().setBody(read);
164                        exchange.getIn().setHeader("journal", getEndpointUri());
165                        exchange.getIn().setHeader("location", location);
166                        consumer.getProcessor().process(exchange);
167    
168                        // Setting the mark makes the data manager forget about
169                        // everything
170                        // before that record.
171                        if (LOG.isDebugEnabled()) {
172                            LOG.debug("Consumed record at: " + location);
173                        }
174                        dataManager.setMark(location, syncConsume);
175                        lastReadLocation = location;
176                    } else {
177                        // Avoid a tight CPU loop if there is no new record to read.
178                        LOG.debug("Sleeping due to no records being available.");
179                        Thread.sleep(idleDelay);
180                    }
181                }
182            } catch (Throwable e) {
183                e.printStackTrace();
184            }
185        }
186    
187        public Producer createProducer() throws Exception {
188            return new DefaultProducer(this) {
189                public void process(Exchange exchange) throws Exception {
190                    incrementReference();
191                    try {
192                        ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
193                        if (body == null) {
194                            byte[] bytes = exchange.getIn().getBody(byte[].class);
195                            if (bytes != null) {
196                                body = new ByteSequence(bytes);
197                            }
198                        }
199                        if (body == null) {
200                            throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
201                        }
202                        dataManager.write(body, syncProduce);
203    
204                    } finally {
205                        decrementReference();
206                    }
207                }
208            };
209        }
210    
211        public boolean isSyncConsume() {
212            return syncConsume;
213        }
214    
215        public void setSyncConsume(boolean syncConsume) {
216            this.syncConsume = syncConsume;
217        }
218    
219        public boolean isSyncProduce() {
220            return syncProduce;
221        }
222    
223        public void setSyncProduce(boolean syncProduce) {
224            this.syncProduce = syncProduce;
225        }
226    
227        boolean isOpen() {
228            synchronized (activationMutex) {
229                return referenceCount > 0;
230            }
231        }
232    }