001 /** 002 * 003 * Copyright 2004 Protique Ltd 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 * 017 **/ 018 package org.activemq.spring; 019 020 import java.util.ArrayList; 021 import java.util.List; 022 023 import javax.jms.Message; 024 import javax.jms.MessageListener; 025 026 /** 027 * A simple consumer which is useful for testing which can be used to wait until the consumer has received 028 * a specific number of messages. 029 * 030 * @author Mike Perham 031 * @version $Revision$ 032 */ 033 public class TestingConsumer implements MessageListener { 034 private List messages = new ArrayList(); 035 private Object semaphore; 036 037 public TestingConsumer() { 038 this(new Object()); 039 } 040 041 public TestingConsumer(Object semaphore) { 042 this.semaphore = semaphore; 043 } 044 045 /** 046 * @return all the messages on the list so far, clearing the buffer 047 */ 048 public synchronized List flushMessages() { 049 List answer = new ArrayList(messages); 050 messages.clear(); 051 return answer; 052 } 053 054 public synchronized void onMessage(Message message) { 055 messages.add(message); 056 synchronized (semaphore) { 057 semaphore.notifyAll(); 058 } 059 } 060 061 public void waitForMessageToArrive() { 062 waitForMessagesToArrive(1); 063 } 064 065 public void waitForMessagesToArrive(int messageCount) { 066 System.out.println("Waiting for message to arrive"); 067 068 long start = System.currentTimeMillis(); 069 070 while (System.currentTimeMillis() - start < 10000) { 071 try { 072 if (hasReceivedMessages(messageCount)) { 073 break; 074 } 075 synchronized (semaphore) { 076 semaphore.wait(1000); 077 } 078 } 079 catch (InterruptedException e) { 080 System.out.println("Caught: " + e); 081 } 082 } 083 long end = System.currentTimeMillis() - start; 084 085 System.out.println("End of wait for " + end + " millis"); 086 } 087 088 protected boolean hasReceivedMessage() { 089 return messages.isEmpty(); 090 } 091 092 protected synchronized boolean hasReceivedMessages(int messageCount) { 093 return messages.size() >= messageCount; 094 } 095 096 097 }