From 584a427c2e355af2fe4b251c3233561d7ca3e354 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 22 Oct 2013 20:08:55 +0800 Subject: [PATCH] dispatch video/audio/data to consumers --- trunk/src/core/srs_core_source.cpp | 99 +++++++++++++++++++++++++++++- trunk/src/core/srs_core_source.hpp | 8 +++ 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 0ad8c109b..c135713d9 100755 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include std::map SrsSource::pool; @@ -47,6 +48,12 @@ SrsConsumer::~SrsConsumer() { } +int SrsConsumer::enqueue(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + return ret; +} + int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count) { msgs = NULL; @@ -59,6 +66,7 @@ int SrsConsumer::get_packets(int max_count, SrsCommonMessage**& msgs, int& count SrsSource::SrsSource(std::string _stream_url) { stream_url = _stream_url; + cache_metadata = new SrsSharedPtrMessage(); } SrsSource::~SrsSource() @@ -69,6 +77,8 @@ SrsSource::~SrsSource() srs_freep(consumer); } consumers.clear(); + + srs_freep(cache_metadata); } int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) @@ -78,18 +88,105 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata metadata->metadata->set("server", new SrsAmf0String(RTMP_SIG_SRS_NAME""RTMP_SIG_SRS_VERSION)); + // encode the metadata to payload + int size = metadata->get_payload_length(); + if (size <= 0) { + srs_warn("ignore the invalid metadata. size=%d", size); + return ret; + } + srs_verbose("get metadata size success."); + + char* payload = new char[size]; + memset(payload, 0, size); + if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { + srs_error("encode metadata error. ret=%d", ret); + srs_freepa(payload); + return ret; + } + srs_verbose("encode metadata success."); + + // create a shared ptr message. + srs_freep(cache_metadata); + cache_metadata = new SrsSharedPtrMessage(); + + // dump message to shared ptr message. + if ((ret = cache_metadata->initialize(&msg->header, payload, size, msg->get_perfer_cid())) != ERROR_SUCCESS) { + srs_error("initialize the cache metadata failed. ret=%d", ret); + return ret; + } + srs_verbose("initialize shared ptr metadata success."); + + // copy to all consumer + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(cache_metadata->copy())) != ERROR_SUCCESS) { + srs_error("dispatch the metadata failed. ret=%d", ret); + return ret; + } + } + srs_trace("dispatch metadata success."); + return ret; } int SrsSource::on_audio(SrsCommonMessage* audio) { int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); + SrsAutoFree(SrsSharedPtrMessage, msg, false); + if ((ret = msg->initialize(&audio->header, (char*)audio->payload, audio->size, audio->get_perfer_cid())) != ERROR_SUCCESS) { + srs_error("initialize the audio failed. ret=%d", ret); + return ret; + } + srs_verbose("initialize shared ptr audio success."); + + // detach the original audio + audio->payload = NULL; + audio->size = 0; + + // copy to all consumer + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) { + srs_error("dispatch the audio failed. ret=%d", ret); + return ret; + } + } + srs_info("dispatch audio success."); + return ret; } -int SrsSource::on_video(SrsCommonMessage* audio) +int SrsSource::on_video(SrsCommonMessage* video) { int ret = ERROR_SUCCESS; + + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); + SrsAutoFree(SrsSharedPtrMessage, msg, false); + if ((ret = msg->initialize(&video->header, (char*)video->payload, video->size, video->get_perfer_cid())) != ERROR_SUCCESS) { + srs_error("initialize the video failed. ret=%d", ret); + return ret; + } + srs_verbose("initialize shared ptr video success."); + + // detach the original audio + video->payload = NULL; + video->size = 0; + + // copy to all consumer + std::vector::iterator it; + for (it = consumers.begin(); it != consumers.end(); ++it) { + SrsConsumer* consumer = *it; + if ((ret = consumer->enqueue(msg->copy())) != ERROR_SUCCESS) { + srs_error("dispatch the video failed. ret=%d", ret); + return ret; + } + } + srs_info("dispatch video success."); + return ret; } diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index a62807807..b5103be4c 100755 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -31,10 +31,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include class SrsCommonMessage; class SrsOnMetaDataPacket; +class SrsSharedPtrMessage; /** * the consumer for SrsSource, that is a play client. @@ -45,6 +47,10 @@ public: SrsConsumer(); virtual ~SrsConsumer(); public: + /** + * enqueue an shared ptr message. + */ + virtual int enqueue(SrsSharedPtrMessage* msg); /** * get packets in consumer queue. * @msgs SrsMessages*[], output the prt array. @@ -72,6 +78,8 @@ public: private: std::string stream_url; std::vector consumers; +private: + SrsSharedPtrMessage* cache_metadata; public: SrsSource(std::string _stream_url); virtual ~SrsSource();