279 lines
7.1 KiB
C++
279 lines
7.1 KiB
C++
/*
|
|
* Copyright (C) 2006-2016 Jacek Sieka, arnetheduck on gmail point com
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
*/
|
|
|
|
#include "adchpp.h"
|
|
|
|
#include "ManagedSocket.h"
|
|
|
|
#include "SocketManager.h"
|
|
|
|
namespace adchpp {
|
|
|
|
using namespace std;
|
|
|
|
using namespace boost::asio;
|
|
|
|
ManagedSocket::ManagedSocket(SocketManager &sm, const AsyncStreamPtr &sock_) :
|
|
sock(sock_),
|
|
overflow(time::not_a_date_time),
|
|
disc(time::not_a_date_time),
|
|
lastWrite(time::not_a_date_time),
|
|
sm(sm)
|
|
{ }
|
|
|
|
ManagedSocket::~ManagedSocket() throw() {
|
|
dcdebug("ManagedSocket deleted\n");
|
|
}
|
|
|
|
static size_t sum(const BufferList& l) {
|
|
size_t bytes = 0;
|
|
for(BufferList::const_iterator i = l.begin(); i != l.end(); ++i) {
|
|
bytes += (*i)->size();
|
|
}
|
|
|
|
return bytes;
|
|
}
|
|
|
|
size_t ManagedSocket::getQueuedBytes() const {
|
|
return sum(outBuf);
|
|
}
|
|
|
|
void ManagedSocket::write(const BufferPtr& buf, bool lowPrio /* = false */) throw() {
|
|
if(buf->size() == 0 || disconnecting())
|
|
return;
|
|
|
|
size_t queued = getQueuedBytes();
|
|
|
|
if(sm.getMaxBufferSize() > 0 && queued + buf->size() > sm.getMaxBufferSize()) {
|
|
if(lowPrio) {
|
|
return;
|
|
} else if(!overflow.is_not_a_date_time() && overflow + time::millisec(sm.getOverflowTimeout()) < time::now()) {
|
|
disconnect(Util::REASON_WRITE_OVERFLOW);
|
|
return;
|
|
} else {
|
|
overflow = time::now();
|
|
}
|
|
}
|
|
|
|
sm.getStats().queueBytes += buf->size();
|
|
sm.getStats().queueCalls++;
|
|
|
|
outBuf.push_back(buf);
|
|
|
|
prepareWrite();
|
|
}
|
|
|
|
// Simplified handlers to avoid bind complexity
|
|
namespace {
|
|
|
|
/** Keeper keeps a reference to the managed socket */
|
|
struct Keeper {
|
|
Keeper(const ManagedSocketPtr& ms_) : ms(ms_) { }
|
|
ManagedSocketPtr ms;
|
|
|
|
void operator()(const boost::system::error_code& ec, size_t bytes) { }
|
|
};
|
|
|
|
template<void (ManagedSocket::*F)(const boost::system::error_code&, size_t)>
|
|
struct Handler : Keeper {
|
|
Handler(const ManagedSocketPtr& ms) : Keeper(ms) { }
|
|
|
|
void operator()(const boost::system::error_code& ec, size_t bytes) {
|
|
(ms.get()->*F)(ec, bytes);
|
|
}
|
|
};
|
|
|
|
struct Disconnector {
|
|
Disconnector(const AsyncStreamPtr& stream_) : stream(stream_) { }
|
|
void operator()() { stream->close(); }
|
|
AsyncStreamPtr stream;
|
|
};
|
|
|
|
}
|
|
|
|
void ManagedSocket::prepareWrite() throw() {
|
|
if(!writing()) { // Not writing
|
|
if(!outBuf.empty()) {
|
|
lastWrite = time::now();
|
|
sock->write(outBuf, Handler<&ManagedSocket::completeWrite>(shared_from_this()));
|
|
}
|
|
} else if(time::now() > lastWrite + time::seconds(60)) {
|
|
disconnect(Util::REASON_WRITE_TIMEOUT);
|
|
}
|
|
}
|
|
|
|
void ManagedSocket::completeWrite(const boost::system::error_code& ec, size_t bytes) throw() {
|
|
lastWrite = time::not_a_date_time;
|
|
|
|
if(!ec) {
|
|
sm.getStats().sendBytes += bytes;
|
|
sm.getStats().sendCalls++;
|
|
|
|
while(bytes > 0) {
|
|
BufferPtr& p = *outBuf.begin();
|
|
if(p->size() <= bytes) {
|
|
bytes -= p->size();
|
|
outBuf.erase(outBuf.begin());
|
|
} else {
|
|
p = make_shared<Buffer>(p->data() + bytes, p->size() - bytes);
|
|
bytes = 0;
|
|
}
|
|
}
|
|
|
|
if(!overflow.is_not_a_date_time()) {
|
|
size_t left = getQueuedBytes();
|
|
if(left < sm.getMaxBufferSize()) {
|
|
overflow = time::not_a_date_time;
|
|
}
|
|
}
|
|
|
|
if(disconnecting() && outBuf.empty()) {
|
|
sock->shutdown(Keeper(shared_from_this()));
|
|
} else {
|
|
prepareWrite();
|
|
}
|
|
} else {
|
|
fail(Util::REASON_SOCKET_ERROR, ec.message());
|
|
}
|
|
}
|
|
|
|
void ManagedSocket::prepareRead() throw() {
|
|
// We first send in an empty buffer to get notification when there's data available
|
|
sock->prepareRead(BufferPtr(), Handler<&ManagedSocket::prepareRead2>(shared_from_this()));
|
|
}
|
|
|
|
void ManagedSocket::prepareRead2(const boost::system::error_code& ec, size_t) throw() {
|
|
if(!ec) {
|
|
// ADC commands are typically small - using a small buffer
|
|
// helps with fairness
|
|
// Calling available() on an ASIO socket seems to be terribly slow
|
|
// Also, we might end up here if the socket has been closed, in which
|
|
// case available would return 0 bytes...
|
|
// We can't make a synchronous receive here because when using SSL
|
|
// there might be data on the socket that won't translate into user data
|
|
// and thus read_some will block
|
|
// If there's no user data, this will effectively post a read operation
|
|
// with a buffer and waste memory...to be continued.
|
|
inBuf = make_shared<Buffer>(64);
|
|
|
|
sock->prepareRead(inBuf, Handler<&ManagedSocket::completeRead>(shared_from_this()));
|
|
} else {
|
|
fail(Util::REASON_SOCKET_ERROR, ec.message());
|
|
}
|
|
}
|
|
|
|
void ManagedSocket::completeRead(const boost::system::error_code& ec, size_t bytes) throw() {
|
|
if(!ec) {
|
|
try {
|
|
sm.getStats().recvBytes += bytes;
|
|
sm.getStats().recvCalls++;
|
|
|
|
inBuf->resize(bytes);
|
|
|
|
if(dataHandler) {
|
|
dataHandler(inBuf);
|
|
}
|
|
|
|
inBuf.reset();
|
|
prepareRead();
|
|
} catch(const boost::system::system_error& e) {
|
|
fail(Util::REASON_SOCKET_ERROR, e.code().message());
|
|
}
|
|
} else {
|
|
inBuf.reset();
|
|
fail(Util::REASON_SOCKET_ERROR, ec.message());
|
|
}
|
|
}
|
|
|
|
void ManagedSocket::completeAccept(const boost::system::error_code& ec) throw() {
|
|
if(!ec) {
|
|
if(connectedHandler)
|
|
connectedHandler();
|
|
|
|
sock->init(std::bind(&ManagedSocket::ready, shared_from_this()));
|
|
|
|
} else {
|
|
fail(Util::REASON_SOCKET_ERROR, ec.message());
|
|
}
|
|
}
|
|
|
|
void ManagedSocket::ready() throw() {
|
|
if(readyHandler)
|
|
readyHandler();
|
|
|
|
prepareRead();
|
|
}
|
|
|
|
void ManagedSocket::fail(Util::Reason reason, const std::string &info) throw() {
|
|
if(failedHandler) {
|
|
failedHandler(reason, info);
|
|
|
|
// using nullptr fails on older GCCs for which we're using nullptr.h; using 0 fails on VS...
|
|
#ifndef FAKE_NULLPTR
|
|
connectedHandler = nullptr;
|
|
readyHandler = nullptr;
|
|
dataHandler = nullptr;
|
|
failedHandler = nullptr;
|
|
#else
|
|
connectedHandler = 0;
|
|
readyHandler = 0;
|
|
dataHandler = 0;
|
|
failedHandler = 0;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
struct Reporter {
|
|
Reporter(ManagedSocketPtr ms, void (ManagedSocket::*f)(Util::Reason reason, const std::string &info), Util::Reason reason, const std::string &info) :
|
|
ms(ms), f(f), reason(reason), info(info) { }
|
|
|
|
void operator()() { (ms.get()->*f)(reason, info); }
|
|
|
|
ManagedSocketPtr ms;
|
|
void (ManagedSocket::*f)(Util::Reason reason, const std::string &info);
|
|
|
|
Util::Reason reason;
|
|
std::string info;
|
|
};
|
|
|
|
void ManagedSocket::disconnect(Util::Reason reason, const std::string &info) throw() {
|
|
if(disconnecting()) {
|
|
return;
|
|
}
|
|
|
|
const auto timeout = sm.getDisconnectTimeout();
|
|
|
|
disc = time::now() + time::millisec(timeout);
|
|
|
|
sm.addJob(Reporter(shared_from_this(), &ManagedSocket::fail, reason, info));
|
|
|
|
if(!writing()) {
|
|
sock->shutdown(Keeper(shared_from_this()));
|
|
}
|
|
sm.addJob(timeout, Disconnector(sock));
|
|
}
|
|
|
|
bool ManagedSocket::disconnecting() const {
|
|
return !disc.is_not_a_date_time();
|
|
}
|
|
|
|
bool ManagedSocket::writing() const {
|
|
return !lastWrite.is_not_a_date_time();
|
|
}
|
|
}
|