00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "CTCPSocket.h"
00016 #include "CNetworkAddress.h"
00017 #include "CSocketMultiplexer.h"
00018 #include "TSocketMultiplexerMethodJob.h"
00019 #include "XSocket.h"
00020 #include "CLock.h"
00021 #include "CLog.h"
00022 #include "IEventQueue.h"
00023 #include "IEventJob.h"
00024 #include "CArch.h"
00025 #include "XArch.h"
00026 #include <cstring>
00027 #include <cstdlib>
00028 #include <memory>
00029
00030
00031
00032
00033
00034 CTCPSocket::CTCPSocket() :
00035 m_mutex(),
00036 m_flushed(&m_mutex, true)
00037 {
00038 try {
00039 m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
00040 }
00041 catch (XArchNetwork& e) {
00042 throw XSocketCreate(e.what());
00043 }
00044
00045 init();
00046 }
00047
00048 CTCPSocket::CTCPSocket(CArchSocket socket) :
00049 m_mutex(),
00050 m_socket(socket),
00051 m_flushed(&m_mutex, true)
00052 {
00053 assert(m_socket != NULL);
00054
00055
00056 init();
00057 onConnected();
00058 setJob(newJob());
00059 }
00060
00061 CTCPSocket::~CTCPSocket()
00062 {
00063 try {
00064 close();
00065 }
00066 catch (...) {
00067
00068 }
00069 }
00070
00071 void
00072 CTCPSocket::bind(const CNetworkAddress& addr)
00073 {
00074 try {
00075 ARCH->bindSocket(m_socket, addr.getAddress());
00076 }
00077 catch (XArchNetworkAddressInUse& e) {
00078 throw XSocketAddressInUse(e.what());
00079 }
00080 catch (XArchNetwork& e) {
00081 throw XSocketBind(e.what());
00082 }
00083 }
00084
00085 void
00086 CTCPSocket::close()
00087 {
00088
00089 setJob(NULL);
00090
00091 CLock lock(&m_mutex);
00092
00093
00094 if (m_connected) {
00095 sendEvent(getDisconnectedEvent());
00096 }
00097 onDisconnected();
00098
00099
00100 if (m_socket != NULL) {
00101 CArchSocket socket = m_socket;
00102 m_socket = NULL;
00103 try {
00104 ARCH->closeSocket(socket);
00105 }
00106 catch (XArchNetwork& e) {
00107
00108 LOG((CLOG_WARN "error closing socket: %s", e.what().c_str()));
00109 }
00110 }
00111 }
00112
00113 void*
00114 CTCPSocket::getEventTarget() const
00115 {
00116 return const_cast<void*>(reinterpret_cast<const void*>(this));
00117 }
00118
00119 UInt32
00120 CTCPSocket::read(void* buffer, UInt32 n)
00121 {
00122
00123 CLock lock(&m_mutex);
00124 UInt32 size = m_inputBuffer.getSize();
00125 if (n > size) {
00126 n = size;
00127 }
00128 if (buffer != NULL && n != 0) {
00129 memcpy(buffer, m_inputBuffer.peek(n), n);
00130 }
00131 m_inputBuffer.pop(n);
00132
00133
00134 if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
00135 sendEvent(getDisconnectedEvent());
00136 m_connected = false;
00137 }
00138
00139 return n;
00140 }
00141
00142 void
00143 CTCPSocket::write(const void* buffer, UInt32 n)
00144 {
00145 bool wasEmpty;
00146 {
00147 CLock lock(&m_mutex);
00148
00149
00150 if (!m_writable) {
00151 sendEvent(getOutputErrorEvent());
00152 return;
00153 }
00154
00155
00156 if (n == 0) {
00157 return;
00158 }
00159
00160
00161 wasEmpty = (m_outputBuffer.getSize() == 0);
00162 m_outputBuffer.write(buffer, n);
00163
00164
00165 m_flushed = false;
00166 }
00167
00168
00169 if (wasEmpty) {
00170 setJob(newJob());
00171 }
00172 }
00173
00174 void
00175 CTCPSocket::flush()
00176 {
00177 CLock lock(&m_mutex);
00178 while (m_flushed == false) {
00179 m_flushed.wait();
00180 }
00181 }
00182
00183 void
00184 CTCPSocket::shutdownInput()
00185 {
00186 bool useNewJob = false;
00187 {
00188 CLock lock(&m_mutex);
00189
00190
00191 try {
00192 ARCH->closeSocketForRead(m_socket);
00193 }
00194 catch (XArchNetwork&) {
00195
00196 }
00197
00198
00199 if (m_readable) {
00200 sendEvent(getInputShutdownEvent());
00201 onInputShutdown();
00202 useNewJob = true;
00203 }
00204 }
00205 if (useNewJob) {
00206 setJob(newJob());
00207 }
00208 }
00209
00210 void
00211 CTCPSocket::shutdownOutput()
00212 {
00213 bool useNewJob = false;
00214 {
00215 CLock lock(&m_mutex);
00216
00217
00218 try {
00219 ARCH->closeSocketForWrite(m_socket);
00220 }
00221 catch (XArchNetwork&) {
00222
00223 }
00224
00225
00226 if (m_writable) {
00227 sendEvent(getOutputShutdownEvent());
00228 onOutputShutdown();
00229 useNewJob = true;
00230 }
00231 }
00232 if (useNewJob) {
00233 setJob(newJob());
00234 }
00235 }
00236
00237 bool
00238 CTCPSocket::isReady() const
00239 {
00240 CLock lock(&m_mutex);
00241 return (m_inputBuffer.getSize() > 0);
00242 }
00243
00244 UInt32
00245 CTCPSocket::getSize() const
00246 {
00247 CLock lock(&m_mutex);
00248 return m_inputBuffer.getSize();
00249 }
00250
00251 void
00252 CTCPSocket::connect(const CNetworkAddress& addr)
00253 {
00254 {
00255 CLock lock(&m_mutex);
00256
00257
00258 if (m_socket == NULL || m_connected) {
00259 sendConnectionFailedEvent("busy");
00260 return;
00261 }
00262
00263 try {
00264 if (ARCH->connectSocket(m_socket, addr.getAddress())) {
00265 sendEvent(getConnectedEvent());
00266 onConnected();
00267 }
00268 else {
00269
00270 m_writable = true;
00271 }
00272 }
00273 catch (XArchNetwork& e) {
00274 throw XSocketConnect(e.what());
00275 }
00276 }
00277 setJob(newJob());
00278 }
00279
00280 void
00281 CTCPSocket::init()
00282 {
00283
00284 m_connected = false;
00285 m_readable = false;
00286 m_writable = false;
00287
00288 try {
00289
00290
00291
00292 ARCH->setNoDelayOnSocket(m_socket, true);
00293 }
00294 catch (XArchNetwork& e) {
00295 try {
00296 ARCH->closeSocket(m_socket);
00297 m_socket = NULL;
00298 }
00299 catch (XArchNetwork&) {
00300
00301 }
00302 throw XSocketCreate(e.what());
00303 }
00304 }
00305
00306 void
00307 CTCPSocket::setJob(ISocketMultiplexerJob* job)
00308 {
00309
00310 if (job == NULL) {
00311 CSocketMultiplexer::getInstance()->removeSocket(this);
00312 }
00313 else {
00314 CSocketMultiplexer::getInstance()->addSocket(this, job);
00315 }
00316 }
00317
00318 ISocketMultiplexerJob*
00319 CTCPSocket::newJob()
00320 {
00321
00322
00323 if (m_socket == NULL) {
00324 return NULL;
00325 }
00326 else if (!m_connected) {
00327 assert(!m_readable);
00328 if (!(m_readable || m_writable)) {
00329 return NULL;
00330 }
00331 return new TSocketMultiplexerMethodJob<CTCPSocket>(
00332 this, &CTCPSocket::serviceConnecting,
00333 m_socket, m_readable, m_writable);
00334 }
00335 else {
00336 if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
00337 return NULL;
00338 }
00339 return new TSocketMultiplexerMethodJob<CTCPSocket>(
00340 this, &CTCPSocket::serviceConnected,
00341 m_socket, m_readable,
00342 m_writable && (m_outputBuffer.getSize() > 0));
00343 }
00344 }
00345
00346 void
00347 CTCPSocket::sendConnectionFailedEvent(const char* msg)
00348 {
00349 CConnectionFailedInfo* info = (CConnectionFailedInfo*)malloc(
00350 sizeof(CConnectionFailedInfo) + strlen(msg));
00351 strcpy(info->m_what, msg);
00352 EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
00353 getEventTarget(), info));
00354 }
00355
00356 void
00357 CTCPSocket::sendEvent(CEvent::Type type)
00358 {
00359 EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
00360 }
00361
00362 void
00363 CTCPSocket::onConnected()
00364 {
00365 m_connected = true;
00366 m_readable = true;
00367 m_writable = true;
00368 }
00369
00370 void
00371 CTCPSocket::onInputShutdown()
00372 {
00373 m_inputBuffer.pop(m_inputBuffer.getSize());
00374 m_readable = false;
00375 }
00376
00377 void
00378 CTCPSocket::onOutputShutdown()
00379 {
00380 m_outputBuffer.pop(m_outputBuffer.getSize());
00381 m_writable = false;
00382
00383
00384 m_flushed = true;
00385 m_flushed.broadcast();
00386 }
00387
00388 void
00389 CTCPSocket::onDisconnected()
00390 {
00391
00392 onInputShutdown();
00393 onOutputShutdown();
00394 m_connected = false;
00395 }
00396
00397 ISocketMultiplexerJob*
00398 CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
00399 bool, bool write, bool error)
00400 {
00401 CLock lock(&m_mutex);
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 if (error || true) {
00421 try {
00422
00423 ARCH->throwErrorOnSocket(m_socket);
00424 }
00425 catch (XArchNetwork& e) {
00426 sendConnectionFailedEvent(e.what().c_str());
00427 onDisconnected();
00428 return newJob();
00429 }
00430 }
00431
00432 if (write) {
00433 sendEvent(getConnectedEvent());
00434 onConnected();
00435 return newJob();
00436 }
00437
00438 return job;
00439 }
00440
00441 ISocketMultiplexerJob*
00442 CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
00443 bool read, bool write, bool error)
00444 {
00445 CLock lock(&m_mutex);
00446
00447 if (error) {
00448 sendEvent(getDisconnectedEvent());
00449 onDisconnected();
00450 return newJob();
00451 }
00452
00453 bool needNewJob = false;
00454
00455 if (write) {
00456 try {
00457
00458 UInt32 n = m_outputBuffer.getSize();
00459 const void* buffer = m_outputBuffer.peek(n);
00460 n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
00461
00462
00463 if (n > 0) {
00464 m_outputBuffer.pop(n);
00465 if (m_outputBuffer.getSize() == 0) {
00466 sendEvent(getOutputFlushedEvent());
00467 m_flushed = true;
00468 m_flushed.broadcast();
00469 needNewJob = true;
00470 }
00471 }
00472 }
00473 catch (XArchNetworkShutdown&) {
00474
00475
00476 onOutputShutdown();
00477 sendEvent(getOutputShutdownEvent());
00478 if (!m_readable && m_inputBuffer.getSize() == 0) {
00479 sendEvent(getDisconnectedEvent());
00480 m_connected = false;
00481 }
00482 needNewJob = true;
00483 }
00484 catch (XArchNetworkDisconnected&) {
00485
00486 onDisconnected();
00487 sendEvent(getDisconnectedEvent());
00488 needNewJob = true;
00489 }
00490 catch (XArchNetwork& e) {
00491
00492 LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
00493 onDisconnected();
00494 sendEvent(getOutputErrorEvent());
00495 sendEvent(getDisconnectedEvent());
00496 needNewJob = true;
00497 }
00498 }
00499
00500 if (read && m_readable) {
00501 try {
00502 UInt8 buffer[4096];
00503 size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00504 if (n > 0) {
00505 bool wasEmpty = (m_inputBuffer.getSize() == 0);
00506
00507
00508 do {
00509 m_inputBuffer.write(buffer, n);
00510 n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00511 } while (n > 0);
00512
00513
00514 if (wasEmpty) {
00515 sendEvent(getInputReadyEvent());
00516 }
00517 }
00518 else {
00519
00520
00521
00522 sendEvent(getInputShutdownEvent());
00523 if (!m_writable && m_inputBuffer.getSize() == 0) {
00524 sendEvent(getDisconnectedEvent());
00525 m_connected = false;
00526 }
00527 m_readable = false;
00528 needNewJob = true;
00529 }
00530 }
00531 catch (XArchNetworkDisconnected&) {
00532
00533 sendEvent(getDisconnectedEvent());
00534 onDisconnected();
00535 needNewJob = true;
00536 }
00537 catch (XArchNetwork& e) {
00538
00539 LOG((CLOG_WARN "error reading socket: %s", e.what().c_str()));
00540 }
00541 }
00542
00543 return needNewJob ? newJob() : job;
00544 }