From fbe08c19d181911cfa350acaf48d7966e61ed785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20S=C5=82omkowski?= Date: Fri, 4 Dec 2015 17:22:34 +0100 Subject: [PATCH] Add support for reconnecting after communication error. #4 --- include/mumlib/Transport.hpp | 2 ++ mumlib_example.cpp | 22 ++++++++++---- src/Transport.cpp | 57 ++++++++++++++++++++++++++++++------ src/mumlib.cpp | 9 +++++- 4 files changed, 74 insertions(+), 16 deletions(-) diff --git a/include/mumlib/Transport.hpp b/include/mumlib/Transport.hpp index 4f3679f..433106f 100644 --- a/include/mumlib/Transport.hpp +++ b/include/mumlib/Transport.hpp @@ -40,6 +40,8 @@ namespace mumlib { ProcessEncodedAudioPacketFunction processEncodedAudioPacketFunction, bool noUdp = false); + ~Transport(); + void connect(string host, int port, string user, diff --git a/mumlib_example.cpp b/mumlib_example.cpp index b4d3a45..b36383d 100644 --- a/mumlib_example.cpp +++ b/mumlib_example.cpp @@ -4,6 +4,10 @@ #include "log4cpp/FileAppender.hh" #include "log4cpp/OstreamAppender.hh" +#include +#include +#include + class MyCallback : public mumlib::BasicCallback { public: mumlib::Mumlib *mum; @@ -41,12 +45,18 @@ int main(int argc, char *argv[]) { } MyCallback myCallback; - mumlib::Mumlib mum(myCallback); - myCallback.mum = &mum; - mum.connect(argv[1], 64738, "mumlib_example", argv[2]); + while (true) { + try { + mumlib::Mumlib mum(myCallback); + myCallback.mum = &mum; + mum.connect(argv[1], 64738, "mumlib_example", argv[2]); + mum.run(); + } catch (mumlib::TransportException &exp) { + logger.error("TransportException: %s.", exp.what()); - mum.run(); - - return 0; + logger.notice("Attempting to reconnect in 5 s."); + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + } } \ No newline at end of file diff --git a/src/Transport.cpp b/src/Transport.cpp index 1909401..a37c678 100644 --- a/src/Transport.cpp +++ b/src/Transport.cpp @@ -45,6 +45,10 @@ mumlib::Transport::Transport( pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1)); } +mumlib::Transport::~Transport() { + disconnect(); +} + void mumlib::Transport::connect( std::string host, int port, @@ -87,12 +91,28 @@ void mumlib::Transport::connect( void mumlib::Transport::disconnect() { - state = ConnectionState::NOT_CONNECTED; + if (state != ConnectionState::NOT_CONNECTED) { + boost::system::error_code errorCode; - sslSocket.shutdown(); - sslSocket.lowest_layer().shutdown(tcp::socket::shutdown_both); + // todo perform different operations for each ConnectionState - udpSocket.shutdown(udp::socket::shutdown_both); + sslSocket.shutdown(errorCode); + if (errorCode) { + logger.warn("SSL socket shutdown returned an error: %s.", errorCode.message().c_str()); + } + + sslSocket.lowest_layer().shutdown(tcp::socket::shutdown_both, errorCode); + if (errorCode) { + logger.warn("SSL socket lowest layer shutdown returned an error: %s.", errorCode.message().c_str()); + } + + udpSocket.close(errorCode); + if (errorCode) { + logger.warn("UDP socket close returned error: %s.", errorCode.message().c_str()); + } + + state = ConnectionState::NOT_CONNECTED; + } } @@ -140,6 +160,10 @@ bool mumlib::Transport::isUdpActive() { } void mumlib::Transport::doReceiveUdp() { + if (state == ConnectionState::NOT_CONNECTED) { + return; + } + udpSocket.async_receive_from( buffer(udpIncomingBuffer, MAX_UDP_LENGTH), udpReceiverEndpoint, @@ -171,6 +195,8 @@ void mumlib::Transport::doReceiveUdp() { } doReceiveUdp(); + } else if (ec == boost::asio::error::operation_aborted) { + logger.debug("UDP receive function cancelled."); } else { throwTransportException("UDP receive failed: " + ec.message()); } @@ -220,10 +246,10 @@ void mumlib::Transport::pingTimerTick(const boost::system::error_code &e) { } } } - - pingTimer.expires_at(pingTimer.expires_at() + PING_INTERVAL); - pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1)); } + + pingTimer.expires_at(pingTimer.expires_at() + PING_INTERVAL); + pingTimer.async_wait(boost::bind(&Transport::pingTimerTick, this, _1)); } void mumlib::Transport::sendUdpAsync(uint8_t *buff, int length) { @@ -252,6 +278,10 @@ void mumlib::Transport::sendUdpAsync(uint8_t *buff, int length) { } void mumlib::Transport::doReceiveSsl() { + if (state == ConnectionState::NOT_CONNECTED) { + return; + } + async_read( sslSocket, boost::asio::buffer(sslIncomingBuffer, MAX_TCP_LENGTH), @@ -385,6 +415,11 @@ void mumlib::Transport::processMessageInternal(MessageType messageType, uint8_t } void mumlib::Transport::sendUdpPing() { + if (state == ConnectionState::NOT_CONNECTED) { + logger.debug("State changed to NOT_CONNECTED, skipping UDP ping."); + return; + } + logger.debug("Sending UDP ping."); vector message; @@ -404,7 +439,11 @@ void mumlib::Transport::sendSsl(uint8_t *buff, int length) { logger.debug("Sending %d bytes of data.", length); - write(sslSocket, boost::asio::buffer(buff, length)); + try { + write(sslSocket, boost::asio::buffer(buff, length)); + } catch (boost::system::system_error &err) { + throwTransportException(std::string("SSL send failed: ") + err.what()); + } } void mumlib::Transport::sendSslAsync(uint8_t *buff, int length) { @@ -428,7 +467,7 @@ void mumlib::Transport::sendSslAsync(uint8_t *buff, int length) { if (!ec and bytesTransferred > 0) { } else { - throwTransportException("send failed: " + ec.message()); + throwTransportException("async SSL send failed: " + ec.message()); } }); } diff --git a/src/mumlib.cpp b/src/mumlib.cpp index f140a81..0f19fad 100644 --- a/src/mumlib.cpp +++ b/src/mumlib.cpp @@ -335,6 +335,8 @@ namespace mumlib { : impl(new _Mumlib_Private(callback, ioService)) { } Mumlib::~Mumlib() { + disconnect(); + delete impl; } @@ -347,7 +349,12 @@ namespace mumlib { } void Mumlib::disconnect() { - impl->transport.disconnect(); + if (not impl->externalIoService) { + impl->ioService.reset(); + } + if (impl->transport.getConnectionState() != ConnectionState::NOT_CONNECTED) { + impl->transport.disconnect(); + } } void Mumlib::run() {