Add support for reconnecting after communication error. #4

This commit is contained in:
Michał Słomkowski 2015-12-04 17:22:34 +01:00
parent f01b693eed
commit fbe08c19d1
4 changed files with 74 additions and 16 deletions

View File

@ -40,6 +40,8 @@ namespace mumlib {
ProcessEncodedAudioPacketFunction processEncodedAudioPacketFunction, ProcessEncodedAudioPacketFunction processEncodedAudioPacketFunction,
bool noUdp = false); bool noUdp = false);
~Transport();
void connect(string host, void connect(string host,
int port, int port,
string user, string user,

View File

@ -4,6 +4,10 @@
#include "log4cpp/FileAppender.hh" #include "log4cpp/FileAppender.hh"
#include "log4cpp/OstreamAppender.hh" #include "log4cpp/OstreamAppender.hh"
#include <chrono>
#include <thread>
#include <mumlib/Transport.hpp>
class MyCallback : public mumlib::BasicCallback { class MyCallback : public mumlib::BasicCallback {
public: public:
mumlib::Mumlib *mum; mumlib::Mumlib *mum;
@ -41,12 +45,18 @@ int main(int argc, char *argv[]) {
} }
MyCallback myCallback; MyCallback myCallback;
while (true) {
try {
mumlib::Mumlib mum(myCallback); mumlib::Mumlib mum(myCallback);
myCallback.mum = &mum; myCallback.mum = &mum;
mum.connect(argv[1], 64738, "mumlib_example", argv[2]); mum.connect(argv[1], 64738, "mumlib_example", argv[2]);
mum.run(); mum.run();
} catch (mumlib::TransportException &exp) {
logger.error("TransportException: %s.", exp.what());
return 0; logger.notice("Attempting to reconnect in 5 s.");
std::this_thread::sleep_for(std::chrono::seconds(5));
}
}
} }

View File

@ -45,6 +45,10 @@ mumlib::Transport::Transport(
pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1)); pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1));
} }
mumlib::Transport::~Transport() {
disconnect();
}
void mumlib::Transport::connect( void mumlib::Transport::connect(
std::string host, std::string host,
int port, int port,
@ -87,12 +91,28 @@ void mumlib::Transport::connect(
void mumlib::Transport::disconnect() { void mumlib::Transport::disconnect() {
if (state != ConnectionState::NOT_CONNECTED) {
boost::system::error_code errorCode;
// todo perform different operations for each ConnectionState
sslSocket.shutdown(errorCode);
if (errorCode) {
logger.warn("SSL socket shutdown returned an error: %s.", errorCode.message().c_str());
}
sslSocket.lowest_layer().shutdown(tcp::socket::shutdown_both, errorCode);
if (errorCode) {
logger.warn("SSL socket lowest layer shutdown returned an error: %s.", errorCode.message().c_str());
}
udpSocket.close(errorCode);
if (errorCode) {
logger.warn("UDP socket close returned error: %s.", errorCode.message().c_str());
}
state = ConnectionState::NOT_CONNECTED; state = ConnectionState::NOT_CONNECTED;
}
sslSocket.shutdown();
sslSocket.lowest_layer().shutdown(tcp::socket::shutdown_both);
udpSocket.shutdown(udp::socket::shutdown_both);
} }
@ -140,6 +160,10 @@ bool mumlib::Transport::isUdpActive() {
} }
void mumlib::Transport::doReceiveUdp() { void mumlib::Transport::doReceiveUdp() {
if (state == ConnectionState::NOT_CONNECTED) {
return;
}
udpSocket.async_receive_from( udpSocket.async_receive_from(
buffer(udpIncomingBuffer, MAX_UDP_LENGTH), buffer(udpIncomingBuffer, MAX_UDP_LENGTH),
udpReceiverEndpoint, udpReceiverEndpoint,
@ -171,6 +195,8 @@ void mumlib::Transport::doReceiveUdp() {
} }
doReceiveUdp(); doReceiveUdp();
} else if (ec == boost::asio::error::operation_aborted) {
logger.debug("UDP receive function cancelled.");
} else { } else {
throwTransportException("UDP receive failed: " + ec.message()); throwTransportException("UDP receive failed: " + ec.message());
} }
@ -220,11 +246,11 @@ void mumlib::Transport::pingTimerTick(const boost::system::error_code &e) {
} }
} }
} }
}
pingTimer.expires_at(pingTimer.expires_at() + PING_INTERVAL); pingTimer.expires_at(pingTimer.expires_at() + PING_INTERVAL);
pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1)); pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1));
} }
}
void mumlib::Transport::sendUdpAsync(uint8_t *buff, int length) { void mumlib::Transport::sendUdpAsync(uint8_t *buff, int length) {
if (length > MAX_UDP_LENGTH - 4) { if (length > MAX_UDP_LENGTH - 4) {
@ -252,6 +278,10 @@ void mumlib::Transport::sendUdpAsync(uint8_t *buff, int length) {
} }
void mumlib::Transport::doReceiveSsl() { void mumlib::Transport::doReceiveSsl() {
if (state == ConnectionState::NOT_CONNECTED) {
return;
}
async_read( async_read(
sslSocket, sslSocket,
boost::asio::buffer(sslIncomingBuffer, MAX_TCP_LENGTH), boost::asio::buffer(sslIncomingBuffer, MAX_TCP_LENGTH),
@ -385,6 +415,11 @@ void mumlib::Transport::processMessageInternal(MessageType messageType, uint8_t
} }
void mumlib::Transport::sendUdpPing() { void mumlib::Transport::sendUdpPing() {
if (state == ConnectionState::NOT_CONNECTED) {
logger.debug("State changed to NOT_CONNECTED, skipping UDP ping.");
return;
}
logger.debug("Sending UDP ping."); logger.debug("Sending UDP ping.");
vector<uint8_t> message; vector<uint8_t> message;
@ -404,7 +439,11 @@ void mumlib::Transport::sendSsl(uint8_t *buff, int length) {
logger.debug("Sending %d bytes of data.", length); logger.debug("Sending %d bytes of data.", length);
try {
write(sslSocket, boost::asio::buffer(buff, length)); write(sslSocket, boost::asio::buffer(buff, length));
} catch (boost::system::system_error &err) {
throwTransportException(std::string("SSL send failed: ") + err.what());
}
} }
void mumlib::Transport::sendSslAsync(uint8_t *buff, int length) { void mumlib::Transport::sendSslAsync(uint8_t *buff, int length) {
@ -428,7 +467,7 @@ void mumlib::Transport::sendSslAsync(uint8_t *buff, int length) {
if (!ec and bytesTransferred > 0) { if (!ec and bytesTransferred > 0) {
} else { } else {
throwTransportException("send failed: " + ec.message()); throwTransportException("async SSL send failed: " + ec.message());
} }
}); });
} }

View File

@ -335,6 +335,8 @@ namespace mumlib {
: impl(new _Mumlib_Private(callback, ioService)) { } : impl(new _Mumlib_Private(callback, ioService)) { }
Mumlib::~Mumlib() { Mumlib::~Mumlib() {
disconnect();
delete impl; delete impl;
} }
@ -347,8 +349,13 @@ namespace mumlib {
} }
void Mumlib::disconnect() { void Mumlib::disconnect() {
if (not impl->externalIoService) {
impl->ioService.reset();
}
if (impl->transport.getConnectionState() != ConnectionState::NOT_CONNECTED) {
impl->transport.disconnect(); impl->transport.disconnect();
} }
}
void Mumlib::run() { void Mumlib::run() {
if (impl->externalIoService) { if (impl->externalIoService) {