diff --git a/trunk/src/core/srs_core_hls.cpp b/trunk/src/core/srs_core_hls.cpp index b9c954523..e83bfb219 100644 --- a/trunk/src/core/srs_core_hls.cpp +++ b/trunk/src/core/srs_core_hls.cpp @@ -41,6 +41,7 @@ SrsHLS::SrsHLS() codec = new SrsCodec(); sample = new SrsCodecSample(); muxer = NULL; + jitter = new SrsRtmpJitter(); } SrsHLS::~SrsHLS() @@ -48,6 +49,7 @@ SrsHLS::~SrsHLS() srs_freep(codec); srs_freep(sample); srs_freep(muxer); + srs_freep(jitter); } int SrsHLS::on_publish(std::string _vhost) diff --git a/trunk/src/core/srs_core_hls.hpp b/trunk/src/core/srs_core_hls.hpp index e23183f97..3e1387aa3 100644 --- a/trunk/src/core/srs_core_hls.hpp +++ b/trunk/src/core/srs_core_hls.hpp @@ -38,6 +38,7 @@ class SrsCodecBuffer; class SrsMpegtsFrame; class SrsTSMuxer; class SrsCodec; +class SrsRtmpJitter; class SrsHLS { @@ -47,6 +48,7 @@ private: SrsCodec* codec; SrsCodecSample* sample; SrsTSMuxer* muxer; + SrsRtmpJitter* jitter; public: SrsHLS(); virtual ~SrsHLS(); diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp index 29b36a6a1..30320b0aa 100644 --- a/trunk/src/core/srs_core_source.cpp +++ b/trunk/src/core/srs_core_source.cpp @@ -36,23 +36,72 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define DEFAULT_FRAME_TIME_MS 10 #define PAUSED_SHRINK_SIZE 250 -std::map SrsSource::pool; +SrsRtmpJitter::SrsRtmpJitter() +{ + last_pkt_correct_time = last_pkt_time = 0; +} -SrsSource* SrsSource::find(std::string stream_url) +SrsRtmpJitter::~SrsRtmpJitter() { - if (pool.find(stream_url) == pool.end()) { - pool[stream_url] = new SrsSource(stream_url); - srs_verbose("create new source for url=%s", stream_url.c_str()); +} + +int SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate) +{ + int ret = ERROR_SUCCESS; + + /** + * we use a very simple time jitter detect/correct algorithm: + * 1. delta: ensure the delta is positive and valid, + * we set the delta to DEFAULT_FRAME_TIME_MS, + * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS. + * 2. last_pkt_time: specifies the original packet time, + * is used to detect next jitter. + * 3. last_pkt_correct_time: simply add the positive delta, + * and enforce the time monotonically. + */ + u_int32_t time = msg->header.timestamp; + int32_t delta = time - last_pkt_time; + + // if jitter detected, reset the delta. + if (delta < 0 || delta > CONST_MAX_JITTER_MS) { + // calc the right diff by audio sample rate + if (msg->header.is_audio() && audio_sample_rate > 0) { + delta = (int32_t)(delta * 1000.0 / audio_sample_rate); + } else if (msg->header.is_video() && video_frame_rate > 0) { + delta = (int32_t)(delta * 1.0 / video_frame_rate); + } else { + delta = DEFAULT_FRAME_TIME_MS; + } + + // sometimes, the time is absolute time, so correct it again. + if (delta < 0 || delta > CONST_MAX_JITTER_MS) { + delta = DEFAULT_FRAME_TIME_MS; + } + + srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d", + last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta); + } else { + srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d", + time, last_pkt_time, last_pkt_correct_time + delta); } - return pool[stream_url]; + last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta); + msg->header.timestamp = last_pkt_correct_time; + last_pkt_time = time; + + return ret; +} + +int SrsRtmpJitter::get_time() +{ + return (int)last_pkt_correct_time; } SrsConsumer::SrsConsumer(SrsSource* _source) { source = _source; - last_pkt_correct_time = last_pkt_time = 0; paused = false; + jitter = new SrsRtmpJitter(); } SrsConsumer::~SrsConsumer() @@ -60,18 +109,19 @@ SrsConsumer::~SrsConsumer() clear(); source->on_consumer_destroy(this); + srs_freep(jitter); } int SrsConsumer::get_time() { - return (int)last_pkt_correct_time; + return jitter->get_time(); } int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate) { int ret = ERROR_SUCCESS; - if ((ret = jitter_correct(msg, audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { + if ((ret = jitter->correct(msg, audio_sample_rate, video_frame_rate)) != ERROR_SUCCESS) { return ret; } @@ -178,53 +228,6 @@ void SrsConsumer::shrink() } } -int SrsConsumer::jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate) -{ - int ret = ERROR_SUCCESS; - - /** - * we use a very simple time jitter detect/correct algorithm: - * 1. delta: ensure the delta is positive and valid, - * we set the delta to DEFAULT_FRAME_TIME_MS, - * if the delta of time is nagative or greater than CONST_MAX_JITTER_MS. - * 2. last_pkt_time: specifies the original packet time, - * is used to detect next jitter. - * 3. last_pkt_correct_time: simply add the positive delta, - * and enforce the time monotonically. - */ - u_int32_t time = msg->header.timestamp; - int32_t delta = time - last_pkt_time; - - // if jitter detected, reset the delta. - if (delta < 0 || delta > CONST_MAX_JITTER_MS) { - // calc the right diff by audio sample rate - if (msg->header.is_audio() && audio_sample_rate > 0) { - delta = (int32_t)(delta * 1000.0 / audio_sample_rate); - } else if (msg->header.is_video() && video_frame_rate > 0) { - delta = (int32_t)(delta * 1.0 / video_frame_rate); - } else { - delta = DEFAULT_FRAME_TIME_MS; - } - - // sometimes, the time is absolute time, so correct it again. - if (delta < 0 || delta > CONST_MAX_JITTER_MS) { - delta = DEFAULT_FRAME_TIME_MS; - } - - srs_info("jitter detected, last_pts=%d, pts=%d, diff=%d, last_time=%d, time=%d, diff=%d", - last_pkt_time, time, time - last_pkt_time, last_pkt_correct_time, last_pkt_correct_time + delta, delta); - } else { - srs_verbose("timestamp no jitter. time=%d, last_pkt=%d, correct_to=%d", - time, last_pkt_time, last_pkt_correct_time + delta); - } - - last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta); - msg->header.timestamp = last_pkt_correct_time; - last_pkt_time = time; - - return ret; -} - void SrsConsumer::clear() { std::vector::iterator it; @@ -235,6 +238,18 @@ void SrsConsumer::clear() msgs.clear(); } +std::map SrsSource::pool; + +SrsSource* SrsSource::find(std::string stream_url) +{ + if (pool.find(stream_url) == pool.end()) { + pool[stream_url] = new SrsSource(stream_url); + srs_verbose("create new source for url=%s", stream_url.c_str()); + } + + return pool[stream_url]; +} + SrsSource::SrsSource(std::string _stream_url) { stream_url = _stream_url; diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp index d0bad024f..386c85ebb 100644 --- a/trunk/src/core/srs_core_source.hpp +++ b/trunk/src/core/srs_core_source.hpp @@ -41,13 +41,35 @@ class SrsSharedPtrMessage; class SrsHLS; /** -* the consumer for SrsSource, that is a play client. +* time jitter detect and correct, +* to ensure the rtmp stream is monotonically. */ -class SrsConsumer +class SrsRtmpJitter { private: u_int32_t last_pkt_time; u_int32_t last_pkt_correct_time; +public: + SrsRtmpJitter(); + virtual ~SrsRtmpJitter(); +public: + /** + * detect the time jitter and correct it. + */ + virtual int correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate); + /** + * get current client time, the last packet time. + */ + virtual int get_time(); +}; + +/** +* the consumer for SrsSource, that is a play client. +*/ +class SrsConsumer +{ +private: + SrsRtmpJitter* jitter; SrsSource* source; std::vector msgs; bool paused; @@ -82,10 +104,6 @@ private: * remove to cache only one gop. */ virtual void shrink(); - /** - * detect the time jitter and correct it. - */ - virtual int jitter_correct(SrsSharedPtrMessage* msg, int audio_sample_rate, int video_frame_rate); virtual void clear(); };