Refactor current Mumble->SIP audio stream to separate AudioFramesMixer class. #1

This commit is contained in:
Michał Słomkowski 2015-11-18 00:17:25 +01:00
parent 0548fca0dd
commit 45f8536e09
9 changed files with 156 additions and 66 deletions

61
AudioFramesMixer.cpp Normal file
View File

@ -0,0 +1,61 @@
#include "AudioFramesMixer.hpp"
//// Reset mix buffer.
//Arrays.fill(tempMix, 0);
//
//// Sum the buffers.
//for (final AudioUser user : mix) {
//for (int i = 0; i < tempMix.length; i++) {
//tempMix[i] += user.lastFrame[i];
//}
//}
//
//// Clip buffer for real output.
//for (int i = 0; i < MumbleProtocol.FRAME_SIZE; i++) {
//clipOut[i] = (short) (Short.MAX_VALUE * (tempMix[i] < -1.0f ? -1.0f
//: (tempMix[i] > 1.0f ? 1.0f : tempMix[i])));
//}
mixer::AudioFramesMixer::AudioFramesMixer(pj_pool_factory &poolFactory)
: logger(log4cpp::Category::getInstance("AudioFramesMixer")) {
pool = pj_pool_create(&poolFactory, "media", 32768, 8192, nullptr);
if (!pool) {
throw mixer::Exception("error when creating memory pool");
}
// todo calculate sizes
pj_status_t status = pjmedia_circ_buf_create(pool, 960 * 10, &inputBuff);
if (status != PJ_SUCCESS) {
throw mixer::Exception("error when creating circular buffer", status);
}
}
mixer::AudioFramesMixer::~AudioFramesMixer() {
if (pool != nullptr) {
pj_pool_release(pool);
}
}
void mixer::AudioFramesMixer::addFrameToBuffer(int sessionId, int sequenceNumber, int16_t *samples, int samplesLength) {
std::unique_lock<std::mutex> lock(inBuffAccessMutex);
logger.debug("Pushing %d samples to in-buff.", samplesLength);
pjmedia_circ_buf_write(inputBuff, samples, samplesLength);
}
int mixer::AudioFramesMixer::getMixedSamples(int16_t *mixedSamples, int requestedLength) {
std::unique_lock<std::mutex> lock(inBuffAccessMutex);
int availableSamples = pjmedia_circ_buf_get_len(inputBuff);
const int samplesToRead = std::min(requestedLength, availableSamples);
logger.debug("Pulling %d samples from in-buff.", samplesToRead);
pjmedia_circ_buf_read(inputBuff, mixedSamples, samplesToRead);
return samplesToRead;
}
void mixer::AudioFramesMixer::clean() {
}

57
AudioFramesMixer.hpp Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#include <pjmedia.h>
#include <log4cpp/Category.hh>
#include <boost/noncopyable.hpp>
#include <mutex>
namespace mixer {
class Exception : public std::runtime_error {
public:
Exception(const char *title) : std::runtime_error(title) {
mesg += title;
}
Exception(const char *title, pj_status_t status) : std::runtime_error(title) {
char errorMsgBuffer[500];
pj_strerror(status, errorMsgBuffer, sizeof(errorMsgBuffer));
mesg += title;
mesg += ": ";
mesg += errorMsgBuffer;
}
virtual const char *what() const throw() override {
return mesg.c_str();
}
private:
std::string mesg;
};
class AudioFramesMixer : boost::noncopyable {
public:
AudioFramesMixer(pj_pool_factory &poolFactory);
virtual ~AudioFramesMixer();
void addFrameToBuffer(int sessionId, int sequenceNumber, int16_t *samples, int samplesLength);
int getMixedSamples(int16_t *mixedSamples, int requestedLength);
void clean();
private:
log4cpp::Category &logger;
pj_pool_t *pool = nullptr;
pjmedia_circ_buf *inputBuff;
std::mutex inBuffAccessMutex;
};
}

View File

@ -24,11 +24,12 @@ set(SOURCE_FILES
PjsuaCommunicator.hpp PjsuaCommunicator.hpp
MumbleCommunicator.cpp MumbleCommunicator.cpp
MumbleCommunicator.hpp MumbleCommunicator.hpp
ICommunicator.hpp
Configuration.cpp Configuration.cpp
Configuration.hpp Configuration.hpp
IncomingConnectionValidator.cpp IncomingConnectionValidator.cpp
IncomingConnectionValidator.hpp) IncomingConnectionValidator.hpp
AudioFramesMixer.hpp
AudioFramesMixer.cpp)
add_executable(mumsi ${SOURCE_FILES} main.cpp) add_executable(mumsi ${SOURCE_FILES} main.cpp)
target_link_libraries(mumsi ${PJSIP_LIBRARIES}) target_link_libraries(mumsi ${PJSIP_LIBRARIES})

View File

@ -1,17 +0,0 @@
#pragma once
#include <functional>
#include <stdint.h>
class ICommunicator {
public:
/**
* Send samples through the Communicator.
*/
virtual void sendPcmSamples(int16_t *samples, unsigned int length) = 0;
/**
* This callback is called when Communicator has received samples.
*/
std::function<void(int16_t *, int)> onIncomingPcmSamples;
};

View File

@ -15,7 +15,7 @@ namespace mumble {
int sequenceNumber, int sequenceNumber,
int16_t *pcm_data, int16_t *pcm_data,
uint32_t pcm_data_size) override { uint32_t pcm_data_size) override {
communicator->onIncomingPcmSamples(pcm_data, pcm_data_size); communicator->onIncomingPcmSamples(sessionId, sequenceNumber, pcm_data, pcm_data_size);
} }
}; };
} }

View File

@ -1,7 +1,5 @@
#pragma once #pragma once
#include "ICommunicator.hpp"
#include <mumlib.hpp> #include <mumlib.hpp>
#include <log4cpp/Category.hh> #include <log4cpp/Category.hh>
@ -19,7 +17,7 @@ namespace mumble {
class MumlibCallback; class MumlibCallback;
class MumbleCommunicator : public ICommunicator, boost::noncopyable { class MumbleCommunicator : boost::noncopyable {
public: public:
MumbleCommunicator( MumbleCommunicator(
boost::asio::io_service &ioService); boost::asio::io_service &ioService);
@ -30,9 +28,15 @@ namespace mumble {
std::string host, std::string host,
int port = 0); int port = 0);
~MumbleCommunicator(); virtual ~MumbleCommunicator();
virtual void sendPcmSamples(int16_t *samples, unsigned int length) override; void sendPcmSamples(int16_t *samples, unsigned int length);
/**
* This callback is called when communicator has received samples.
* Arguments: session ID, sequence number, PCM samples, length of samples
*/
std::function<void(int, int, int16_t *, int)> onIncomingPcmSamples;
void sendTextMessage(std::string message); void sendTextMessage(std::string message);

View File

@ -226,19 +226,9 @@ sip::PjsuaCommunicator::PjsuaCommunicator(IncomingConnectionValidator &validator
endpoint.libInit(endpointConfig); endpoint.libInit(endpointConfig);
pj_status_t status;
pj_caching_pool cachingPool;
pj_caching_pool_init(&cachingPool, &pj_pool_factory_default_policy, 0); pj_caching_pool_init(&cachingPool, &pj_pool_factory_default_policy, 0);
pool = pj_pool_create(&cachingPool.factory, "media", 32768, 8192, nullptr);
if (!pool) {
throw sip::Exception("error when creating memory pool", status);
}
// todo calculate sizes mixer.reset(new mixer::AudioFramesMixer(cachingPool.factory));
status = pjmedia_circ_buf_create(pool, 960 * 10, &inputBuff);
if (status != PJ_SUCCESS) {
throw sip::Exception("error when creating circular buffer", status);
}
media.reset(new _MumlibAudioMedia(*this)); media.reset(new _MumlibAudioMedia(*this));
} }
@ -269,25 +259,22 @@ sip::PjsuaCommunicator::~PjsuaCommunicator() {
endpoint.libDestroy(); endpoint.libDestroy();
} }
void sip::PjsuaCommunicator::sendPcmSamples(int sessionId, int sequenceNumber, int16_t *samples, unsigned int length) {
mixer->addFrameToBuffer(sessionId, sequenceNumber, samples, length);
}
pj_status_t sip::PjsuaCommunicator::mediaPortGetFrame(pjmedia_port *port, pjmedia_frame *frame) { pj_status_t sip::PjsuaCommunicator::mediaPortGetFrame(pjmedia_port *port, pjmedia_frame *frame) {
std::unique_lock<std::mutex> lock(inBuffAccessMutex);
frame->type = PJMEDIA_FRAME_TYPE_AUDIO; frame->type = PJMEDIA_FRAME_TYPE_AUDIO;
pj_int16_t *samples = static_cast<pj_int16_t *>(frame->buf); pj_int16_t *samples = static_cast<pj_int16_t *>(frame->buf);
pj_size_t count = frame->size / 2 / PJMEDIA_PIA_CCNT(&(port->info)); pj_size_t count = frame->size / 2 / PJMEDIA_PIA_CCNT(&(port->info));
pj_size_t availableSamples = pjmedia_circ_buf_get_len(inputBuff); const int readSamples = mixer->getMixedSamples(samples, count);
const int samplesToRead = std::min(count, availableSamples);
pjsuaLogger.debug("Pulling %d samples from in-buff.", samplesToRead); if (readSamples < count) {
pjmedia_circ_buf_read(inputBuff, samples, samplesToRead); pjsuaLogger.debug("Requested %d samples, available %d, filling remaining with zeros.",
count, readSamples);
if (availableSamples < count) { for (int i = readSamples; i < count; ++i) {
pjsuaLogger.debug("Requested %d samples, available %d, filling remaining with zeros.", count,
availableSamples);
for (int i = samplesToRead; i < count; ++i) {
samples[i] = 0; samples[i] = 0;
} }
} }
@ -322,9 +309,3 @@ void sip::PjsuaCommunicator::registerAccount(string host, string user, string pa
account.reset(new _Account(*this)); account.reset(new _Account(*this));
account->create(accountConfig); account->create(accountConfig);
} }
void sip::PjsuaCommunicator::sendPcmSamples(int16_t *samples, unsigned int length) {
std::unique_lock<std::mutex> lock(inBuffAccessMutex);
pjsuaLogger.debug("Pushing %d samples to in-buff.", length);
pjmedia_circ_buf_write(inputBuff, samples, length);
}

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include "ICommunicator.hpp"
#include "IncomingConnectionValidator.hpp" #include "IncomingConnectionValidator.hpp"
#include "AudioFramesMixer.hpp"
#include <pjmedia.h> #include <pjmedia.h>
#include <pjsua-lib/pjsua.h> #include <pjsua-lib/pjsua.h>
@ -15,7 +15,6 @@
#include <string> #include <string>
#include <stdexcept> #include <stdexcept>
#include <mutex>
#include <climits> #include <climits>
#include <bits/unique_ptr.h> #include <bits/unique_ptr.h>
@ -55,7 +54,7 @@ namespace sip {
class _MumlibAudioMedia; class _MumlibAudioMedia;
class PjsuaCommunicator : public ICommunicator, boost::noncopyable { class PjsuaCommunicator : boost::noncopyable {
public: public:
PjsuaCommunicator(IncomingConnectionValidator &validator); PjsuaCommunicator(IncomingConnectionValidator &validator);
@ -65,9 +64,15 @@ namespace sip {
std::string password, std::string password,
unsigned int port = DEFAULT_PORT); unsigned int port = DEFAULT_PORT);
~PjsuaCommunicator(); virtual ~PjsuaCommunicator();
virtual void sendPcmSamples(int16_t *samples, unsigned int length) override; void sendPcmSamples(
int sessionId,
int sequenceNumber,
int16_t *samples,
unsigned int length);
std::function<void(int16_t *, int)> onIncomingPcmSamples;
std::function<void(std::string)> onStateChange; std::function<void(std::string)> onStateChange;
@ -79,18 +84,16 @@ namespace sip {
log4cpp::Category &logger; log4cpp::Category &logger;
log4cpp::Category &pjsuaLogger; log4cpp::Category &pjsuaLogger;
std::unique_ptr<mixer::AudioFramesMixer> mixer;
std::unique_ptr<_LogWriter> logWriter; std::unique_ptr<_LogWriter> logWriter;
std::unique_ptr<_Account> account; std::unique_ptr<_Account> account;
std::unique_ptr<_MumlibAudioMedia> media; std::unique_ptr<_MumlibAudioMedia> media;
pj_caching_pool cachingPool;
pj::Endpoint endpoint; pj::Endpoint endpoint;
pj_pool_t *pool = nullptr;
pjmedia_circ_buf *inputBuff;
std::mutex inBuffAccessMutex;
IncomingConnectionValidator &uriValidator; IncomingConnectionValidator &uriValidator;
void registerAccount(std::string host, void registerAccount(std::string host,

View File

@ -45,7 +45,7 @@ int main(int argc, char *argv[]) {
mumbleCommunicator.onIncomingPcmSamples = std::bind( mumbleCommunicator.onIncomingPcmSamples = std::bind(
&sip::PjsuaCommunicator::sendPcmSamples, &sip::PjsuaCommunicator::sendPcmSamples,
&pjsuaCommunicator, &pjsuaCommunicator,
_1, _2); _1, _2, _3, _4);
mumbleCommunicator.connect( mumbleCommunicator.connect(
conf.getString("mumble.user"), conf.getString("mumble.user"),