00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "CPacketStreamFilter.h"
00016 #include "IEventQueue.h"
00017 #include "CLock.h"
00018 #include "TMethodEventJob.h"
00019 #include <cstring>
00020 #include <memory>
00021
00022
00023
00024
00025
00026 CPacketStreamFilter::CPacketStreamFilter(IStream* stream, bool adoptStream) :
00027 CStreamFilter(stream, adoptStream),
00028 m_size(0),
00029 m_inputShutdown(false)
00030 {
00031
00032 }
00033
00034 CPacketStreamFilter::~CPacketStreamFilter()
00035 {
00036
00037 }
00038
00039 void
00040 CPacketStreamFilter::close()
00041 {
00042 CLock lock(&m_mutex);
00043 m_size = 0;
00044 m_buffer.pop(m_buffer.getSize());
00045 CStreamFilter::close();
00046 }
00047
00048 UInt32
00049 CPacketStreamFilter::read(void* buffer, UInt32 n)
00050 {
00051 if (n == 0) {
00052 return 0;
00053 }
00054
00055 CLock lock(&m_mutex);
00056
00057
00058 if (!isReadyNoLock()) {
00059 return 0;
00060 }
00061
00062
00063 if (n > m_size) {
00064 n = m_size;
00065 }
00066
00067
00068 if (buffer != NULL) {
00069 memcpy(buffer, m_buffer.peek(n), n);
00070 }
00071 m_buffer.pop(n);
00072 m_size -= n;
00073
00074
00075
00076 readPacketSize();
00077
00078 if (m_inputShutdown && m_size == 0) {
00079 EVENTQUEUE->addEvent(CEvent(getInputShutdownEvent(),
00080 getEventTarget(), NULL));
00081 }
00082
00083 return n;
00084 }
00085
00086 void
00087 CPacketStreamFilter::write(const void* buffer, UInt32 count)
00088 {
00089
00090 UInt8 length[4];
00091 length[0] = (UInt8)((count >> 24) & 0xff);
00092 length[1] = (UInt8)((count >> 16) & 0xff);
00093 length[2] = (UInt8)((count >> 8) & 0xff);
00094 length[3] = (UInt8)( count & 0xff);
00095 getStream()->write(length, sizeof(length));
00096
00097
00098 getStream()->write(buffer, count);
00099 }
00100
00101 void
00102 CPacketStreamFilter::shutdownInput()
00103 {
00104 CLock lock(&m_mutex);
00105 m_size = 0;
00106 m_buffer.pop(m_buffer.getSize());
00107 CStreamFilter::shutdownInput();
00108 }
00109
00110 bool
00111 CPacketStreamFilter::isReady() const
00112 {
00113 CLock lock(&m_mutex);
00114 return isReadyNoLock();
00115 }
00116
00117 UInt32
00118 CPacketStreamFilter::getSize() const
00119 {
00120 CLock lock(&m_mutex);
00121 return isReadyNoLock() ? m_size : 0;
00122 }
00123
00124 bool
00125 CPacketStreamFilter::isReadyNoLock() const
00126 {
00127 return (m_size != 0 && m_buffer.getSize() >= m_size);
00128 }
00129
00130 void
00131 CPacketStreamFilter::readPacketSize()
00132 {
00133
00134
00135 if (m_size == 0 && m_buffer.getSize() >= 4) {
00136 UInt8 buffer[4];
00137 memcpy(buffer, m_buffer.peek(sizeof(buffer)), sizeof(buffer));
00138 m_buffer.pop(sizeof(buffer));
00139 m_size = ((UInt32)buffer[0] << 24) |
00140 ((UInt32)buffer[1] << 16) |
00141 ((UInt32)buffer[2] << 8) |
00142 (UInt32)buffer[3];
00143 }
00144 }
00145
00146 bool
00147 CPacketStreamFilter::readMore()
00148 {
00149
00150 bool wasReady = isReadyNoLock();
00151
00152
00153 char buffer[4096];
00154 UInt32 n = getStream()->read(buffer, sizeof(buffer));
00155 while (n > 0) {
00156 m_buffer.write(buffer, n);
00157 n = getStream()->read(buffer, sizeof(buffer));
00158 }
00159
00160
00161
00162 readPacketSize();
00163
00164
00165 bool isReady = isReadyNoLock();
00166
00167
00168
00169 return (wasReady != isReady);
00170 }
00171
00172 void
00173 CPacketStreamFilter::filterEvent(const CEvent& event)
00174 {
00175 if (event.getType() == getInputReadyEvent()) {
00176 CLock lock(&m_mutex);
00177 if (!readMore()) {
00178 return;
00179 }
00180 }
00181 else if (event.getType() == getInputShutdownEvent()) {
00182
00183 CLock lock(&m_mutex);
00184 m_inputShutdown = true;
00185 if (m_size != 0) {
00186 return;
00187 }
00188 }
00189
00190
00191 CStreamFilter::filterEvent(event);
00192 }