From 659e173e159d66063804947ac709923db3c03326 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 1 May 2021 16:41:42 +0800 Subject: [PATCH 1/8] RTC: Refine for writing doc. 4.0.92 --- README.md | 12 ++-- trunk/conf/full.conf | 92 +++++++++++++-------------- trunk/conf/rtc.conf | 9 ++- trunk/src/app/srs_app_config.cpp | 32 +++------- trunk/src/app/srs_app_http_stream.hpp | 1 + trunk/src/app/srs_app_source.hpp | 3 +- trunk/src/core/srs_core_version4.hpp | 2 +- 7 files changed, 67 insertions(+), 84 deletions(-) diff --git a/README.md b/README.md index b40276857..c8eb328e9 100755 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ docker run --rm -p 1935:1935 -p 1985:1985 -p 8080:8080 \ ossrs/srs:v4.0.85 ``` -> To enable WebRTC, user MUST set the env `CANDIDATE`, see [#307](https://github.com/ossrs/srs/issues/307#issue-76908382). +> To enable WebRTC, user MUST set the env `CANDIDATE`, see [wiki](https://github.com/ossrs/srs/wiki/v4_CN_WebRTC#config-candidate). Open [http://localhost:8080/](http://localhost:8080/) to check it, then publish [stream](https://github.com/ossrs/srs/blob/3.0release/trunk/doc/source.flv) by: @@ -67,8 +67,7 @@ Fast index for Wikis: * How to deliver HTTP-FLV streaming?([CN][v4_CN_SampleHttpFlv], [EN][v4_EN_SampleHttpFlv]) * How to deliver HLS streaming?([CN][v4_CN_SampleHLS], [EN][v4_EN_SampleHLS]) * How to deliver low-latency streaming?([CN][v4_CN_SampleRealtime], [EN][v4_EN_SampleRealtime]) -* Usage: How to play WebRTC from SRS? [#307](https://github.com/ossrs/srs/issues/307) -* Usage: How to publish WebRTC to SRS? [#307](https://github.com/ossrs/srs/issues/307) +* How to use WebRTC? ([CN][v4_CN_WebRTC], [EN][v4_EN_WebRTC]) Other important wiki: @@ -1258,8 +1257,8 @@ Maintainers of SRS project: * [Winlin](https://github.com/winlinvip): All areas of streaming server and documents. * [Wenjie](https://github.com/wenjiegit): The focus of his work is on the [HDS](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_DeliveryHDS) module. * [Runner365](https://github.com/runner365): The focus of his work is on the [SRT](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_SRTWiki) module. -* [John](https://github.com/xiaozhihong): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_RTCWiki) module. -* [B.P.Y(Bepartofyou)](https://github.com/Bepartofyou): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_RTCWiki) module. +* [John](https://github.com/xiaozhihong): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_WebRTC) module. +* [B.P.Y(Bepartofyou)](https://github.com/Bepartofyou): Focus on [WebRTC](https://github.com/simple-rtmp-server/srs/wiki/v4_CN_WebRTC) module. * [Lixin](https://github.com/xialixin): Focus on [GB28181](https://github.com/ossrs/srs/issues/1500) module. A big THANK YOU goes to: @@ -1382,6 +1381,8 @@ Winlin [v4_EN_SampleForward]: https://github.com/ossrs/srs/wiki/v4_EN_SampleForward [v4_CN_SampleRealtime]: https://github.com/ossrs/srs/wiki/v4_CN_SampleRealtime [v4_EN_SampleRealtime]: https://github.com/ossrs/srs/wiki/v4_EN_SampleRealtime +[v4_CN_WebRTC]: https://github.com/ossrs/srs/wiki/v4_CN_WebRTC +[v4_EN_WebRTC]: https://github.com/ossrs/srs/wiki/v4_EN_WebRTC [v4_CN_SampleARM]: https://github.com/ossrs/srs/wiki/v4_CN_SampleARM [v4_EN_SampleARM]: https://github.com/ossrs/srs/wiki/v4_EN_SampleARM [v4_CN_SampleIngest]: https://github.com/ossrs/srs/wiki/v4_CN_SampleIngest @@ -1812,7 +1813,6 @@ Winlin [bug #1543]: https://github.com/ossrs/srs/issues/1543 [bug #1509]: https://github.com/ossrs/srs/issues/1509 [bug #1575]: https://github.com/ossrs/srs/issues/1575 -[bug #307]: https://github.com/ossrs/srs/issues/307 [bug #1070]: https://github.com/ossrs/srs/issues/1070 [bug #1580]: https://github.com/ossrs/srs/issues/1580 [bug #1547]: https://github.com/ossrs/srs/issues/1547 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index d33092cd4..978b237d0 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -446,14 +446,14 @@ srt_server { rtc_server { # Whether enable WebRTC server. # default: off - enabled on; + enabled on; # The udp listen port, we will reuse it for connections. # default: 8000 - listen 8000; + listen 8000; # The exposed candidate IPs, response in SDP candidate line. It can be: # * Retrieve server IP automatically, from all network interfaces. # eth0 Retrieve server IP by specified network interface name. # TODO: Implements it. - # $CANDIDATE Read the IP from ENV variable, use * if not set, see https://github.com/ossrs/srs/issues/307#issuecomment-599028124 + # $CANDIDATE Read the IP from ENV variable, use * if not set. # x.x.x.x A specified IP address or DNS name, which can be access by client such as Chrome. # You can specific more than one interface name: # eth0 eth1 Use network interface eth0 and eth1. # TODO: Implements it. @@ -461,37 +461,38 @@ rtc_server { # 192.168.1.3 10.1.2.3 rtc.me # TODO: Implements it. # And by multiple ENV variables: # $CANDIDATE $EIP # TODO: Implements it. - # @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name, see also https://github.com/ossrs/srs/pull/1998#issuecomment-726442999 + # @remark For Firefox, the candidate MUST be IP, MUST NOT be DNS name. + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate # default: * - candidate *; - # The IP family filter for candidate, it can be: + candidate *; + # The IP family filter for auto discover candidate, it can be: # ipv4 Filter IP v4 candidates. # ipv6 Filter IP v6 candidates. # all Filter all IP v4 or v6 candidates. # For example, if set to ipv4, we only use the IPv4 address as candidate. # default: ipv4 - ip_family ipv4; + ip_family ipv4; # Whether use ECDSA certificate. # If not, use RSA certificate. # default: on - ecdsa on; + ecdsa on; # Whether encrypt RTP packet by SRTP. # @remark Should always turn it on, or Chrome will fail. # default: on - encrypt on; + encrypt on; # We listen multiple times at the same port, by REUSEPORT, to increase the UDP queue. # Note that you can set to 1 and increase the system UDP buffer size by net.core.rmem_max # and net.core.rmem_default or just increase this to get larger UDP recv and send buffer. # default: 1 - reuseport 1; + reuseport 1; # Whether merge multiple NALUs into one. # @see https://github.com/ossrs/srs/issues/307#issuecomment-612806318 # default: off - merge_nalus off; + merge_nalus off; # Whether enable the perf stat at http://localhost:1985/api/v1/perf # TODO: FIXME: We should enable it when refined. # default: off - perf_stat off; + perf_stat off; # For RTP packet and its payload cache. rtp_cache { # Whether enable the RTP packet cache. @@ -533,61 +534,58 @@ vhost rtc.vhost.srs.com { rtc { # Whether enable WebRTC server. # default: off - enabled on; - # The strategy for bframe. - # keep Keep bframe, which may make browser with playing problems. - # discard Discard bframe, maybe cause browser with little problems. - # default: keep - bframe discard; - # The strategy for aac audio. - # transcode Transcode aac to opus. - # discard Discard aac audio packet. - # default: transcode - aac transcode; + enabled on; + # Whether support NACK. + # default: on + nack on; + # Whether directly use the packet, avoid copy. + # default: on + nack_no_copy on; + # Whether support TWCC. + # default: on + twcc on; # The timeout in seconds for session timeout. # Client will send ping(STUN binding request) to server, we use it as heartbeat. # default: 30 - stun_timeout 30; - # The strick check when process stun. + stun_timeout 30; + # The strict check when process stun. # default: off stun_strict_check on; # The role of dtls when peer is actpass: passive or active # default: passive - dtls_role passive; + dtls_role passive; # The version of dtls, support dtls1.0, dtls1.2, and auto # default: auto dtls_version auto; # Drop the packet with the pt(payload type), 0 never drop. # default: 0 drop_for_pt 0; + ############################################################### + # For transmuxing RTMP to RTC, the strategy for bframe. + # keep Keep bframe, which may make browser with playing problems. + # discard Discard bframe, maybe cause browser with little problems. + # default: discard + bframe discard; + # For transmuxing RTMP to RTC, the strategy for aac audio. + # transcode Transcode aac to opus. + # discard Discard aac audio packet. + # default: transcode + aac transcode; } - # whether enable min delay mode for vhost. + ############################################################### + # For transmuxing RTMP to RTC, it will impact the default values if RTC is on. + # Whether enable min delay mode for vhost. # default: on, for RTC. - min_latency on; + min_latency on; play { # set the MW(merged-write) latency in ms. # @remark For WebRTC, we enable pass-timestamp mode, so we ignore this config. # default: 0 (For WebRTC) - mw_latency 0; + mw_latency 0; # Set the MW(merged-write) min messages. - # default: 0 (For Real-Time, min_latency on) - # default: 1 (For WebRTC, min_latency off) - mw_msgs 0; - } - # For NACK. - nack { - # Whether support NACK. - # default: on - enabled on; - # Whether directly use the packet, avoid copy. - # default: on - no_copy on; - } - # For TWCC. - twcc { - # Whether support TWCC. - # default: on - enabled on; + # default: 0 (For Real-Time, that is min_latency on) + # default: 1 (For WebRTC, that is min_latency off) + mw_msgs 0; } } diff --git a/trunk/conf/rtc.conf b/trunk/conf/rtc.conf index b5a70eb07..0a3360c9f 100644 --- a/trunk/conf/rtc.conf +++ b/trunk/conf/rtc.conf @@ -18,21 +18,20 @@ stats { network 0; } rtc_server { - enabled on; + enabled on; # Listen at udp://8000 - listen 8000; + listen 8000; # # The $CANDIDATE means fetch from env, if not configed, use * as default. # # The * means retrieving server IP automatically, from all network interfaces, - # @see https://github.com/ossrs/srs/issues/307#issuecomment-599028124 - candidate $CANDIDATE; + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate + candidate $CANDIDATE; } vhost __defaultVhost__ { rtc { enabled on; - bframe discard; } http_remux { enabled on; diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index e69f0772e..142881496 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3789,8 +3789,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "play" && n != "publish" && n != "cluster" && n != "security" && n != "http_remux" && n != "dash" && n != "http_static" && n != "hds" && n != "exec" - && n != "in_ack_size" && n != "out_ack_size" && n != "rtc" && n != "nack" - && n != "twcc") { + && n != "in_ack_size" && n != "out_ack_size" && n != "rtc") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.%s", n.c_str()); } // for each sub directives of vhost. @@ -5104,7 +5103,7 @@ bool SrsConfig::get_rtc_enabled(string vhost) bool SrsConfig::get_rtc_bframe_discard(string vhost) { - static bool DEFAULT = false; + static bool DEFAULT = true; SrsConfDirective* conf = get_rtc(vhost); @@ -5117,7 +5116,7 @@ bool SrsConfig::get_rtc_bframe_discard(string vhost) return DEFAULT; } - return conf->arg0() == "discard"; + return conf->arg0() != "keep"; } bool SrsConfig::get_rtc_aac_discard(string vhost) @@ -5214,7 +5213,7 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) { static int DEFAULT = 0; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5231,7 +5230,7 @@ bool SrsConfig::get_rtc_nack_enabled(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5241,11 +5240,6 @@ bool SrsConfig::get_rtc_nack_enabled(string vhost) return DEFAULT; } - conf = conf->get("enabled"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - return SRS_CONF_PERFER_TRUE(conf->arg0()); } @@ -5253,17 +5247,12 @@ bool SrsConfig::get_rtc_nack_no_copy(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); - if (!conf) { - return DEFAULT; - } - - conf = conf->get("nack"); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } - conf = conf->get("no_copy"); + conf = conf->get("nack_no_copy"); if (!conf || conf->arg0().empty()) { return DEFAULT; } @@ -5275,7 +5264,7 @@ bool SrsConfig::get_rtc_twcc_enabled(string vhost) { static bool DEFAULT = true; - SrsConfDirective* conf = get_vhost(vhost); + SrsConfDirective* conf = get_rtc(vhost); if (!conf) { return DEFAULT; } @@ -5285,11 +5274,6 @@ bool SrsConfig::get_rtc_twcc_enabled(string vhost) return DEFAULT; } - conf = conf->get("enabled"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - return SRS_CONF_PERFER_TRUE(conf->arg0()); } diff --git a/trunk/src/app/srs_app_http_stream.hpp b/trunk/src/app/srs_app_http_stream.hpp index 0e56a63c1..1ba6c1e5a 100755 --- a/trunk/src/app/srs_app_http_stream.hpp +++ b/trunk/src/app/srs_app_http_stream.hpp @@ -179,6 +179,7 @@ public: }; // HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format. +// TODO: FIXME: Rename to SrsHttpLive class SrsLiveStream : public ISrsHttpHandler { private: diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 8a35b5155..84474fbd8 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -500,7 +500,8 @@ public: virtual void on_unpublish() = 0; }; -// live streaming source. +// The live streaming source. +// TODO: FIXME: Rename to SrsLiveStream. class SrsSource : public ISrsReloadHandler { friend class SrsOriginHub; diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 441c0fa7d..a4c0ba3bf 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 91 +#define VERSION_REVISION 92 #endif From 8747dd6630bca70cc78d3d7f41bb15473478415f Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Apr 2021 11:04:05 +0800 Subject: [PATCH 2/8] Timer: Extract shared FastTimer to use one timer for all connections --- trunk/src/app/srs_app_hourglass.cpp | 144 ++++++++++++++++++++++++++++ trunk/src/app/srs_app_hourglass.hpp | 47 ++++++++- trunk/src/app/srs_app_hybrid.cpp | 79 +++++---------- trunk/src/app/srs_app_hybrid.hpp | 11 ++- 4 files changed, 218 insertions(+), 63 deletions(-) diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 58d7dee72..fe9360475 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -27,11 +27,22 @@ using namespace std; #include #include +#include #include SrsPps* _srs_pps_timer = new SrsPps(); +extern SrsPps* _srs_pps_clock_15ms; +extern SrsPps* _srs_pps_clock_20ms; +extern SrsPps* _srs_pps_clock_25ms; +extern SrsPps* _srs_pps_clock_30ms; +extern SrsPps* _srs_pps_clock_35ms; +extern SrsPps* _srs_pps_clock_40ms; +extern SrsPps* _srs_pps_clock_80ms; +extern SrsPps* _srs_pps_clock_160ms; +extern SrsPps* _srs_pps_timer_s; + ISrsHourGlass::ISrsHourGlass() { } @@ -89,6 +100,14 @@ srs_error_t SrsHourGlass::tick(int event, srs_utime_t interval) return err; } +void SrsHourGlass::untick(int event) +{ + map::iterator it = ticks.find(event); + if (it != ticks.end()) { + ticks.erase(it); + } +} + srs_error_t SrsHourGlass::cycle() { srs_error_t err = srs_success; @@ -119,3 +138,128 @@ srs_error_t SrsHourGlass::cycle() return err; } + +ISrsFastTimer::ISrsFastTimer() +{ +} + +ISrsFastTimer::~ISrsFastTimer() +{ +} + +SrsFastTimer::SrsFastTimer(std::string label, srs_utime_t resolution) +{ + timer_ = new SrsHourGlass(label, this, resolution); +} + +SrsFastTimer::~SrsFastTimer() +{ + srs_freep(timer_); +} + +srs_error_t SrsFastTimer::start() +{ + srs_error_t err = srs_success; + + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "start timer"); + } + + return err; +} + +void SrsFastTimer::subscribe(srs_utime_t interval, ISrsFastTimer* timer) +{ + static int g_event = 0; + + int event = g_event++; + + // TODO: FIXME: Error leak. Change tick to void in future. + timer_->tick(event, interval); + + handlers_[event] = timer; +} + +void SrsFastTimer::unsubscribe(ISrsFastTimer* timer) +{ + for (map::iterator it = handlers_.begin(); it != handlers_.end();) { + if (it->second != timer) { + ++it; + continue; + } + + handlers_.erase(it++); + + int event = it->first; + timer_->untick(event); + } +} + +srs_error_t SrsFastTimer::notify(int event, srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + for (map::iterator it = handlers_.begin(); it != handlers_.end(); ++it) { + ISrsFastTimer* timer = it->second; + + if (event != it->first) { + continue; + } + + if ((err = timer->on_timer(interval, tick)) != srs_success) { + return srs_error_wrap(err, "tick for event=%d, interval=%dms, tick=%dms", + event, srsu2msi(interval), srsu2msi(tick)); + } + + break; + } + + return err; +} + +SrsClockWallMonitor::SrsClockWallMonitor() +{ +} + +SrsClockWallMonitor::~SrsClockWallMonitor() +{ +} + +srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + static srs_utime_t clock = 0; + + srs_utime_t now = srs_update_system_time(); + if (!clock) { + clock = now; + return err; + } + + srs_utime_t elapsed = now - clock; + clock = now; + + if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_15ms->sugar; + } else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_20ms->sugar; + } else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_25ms->sugar; + } else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_30ms->sugar; + } else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_35ms->sugar; + } else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_40ms->sugar; + } else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_80ms->sugar; + } else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) { + ++_srs_pps_clock_160ms->sugar; + } else { + ++_srs_pps_timer_s->sugar; + } + + return err; +} + diff --git a/trunk/src/app/srs_app_hourglass.hpp b/trunk/src/app/srs_app_hourglass.hpp index 5404f89aa..d46bd1c6b 100644 --- a/trunk/src/app/srs_app_hourglass.hpp +++ b/trunk/src/app/srs_app_hourglass.hpp @@ -68,7 +68,7 @@ public: // // // The hg will create a thread for timer. // hg->start(); -class SrsHourGlass : virtual public ISrsCoroutineHandler +class SrsHourGlass : public ISrsCoroutineHandler { private: std::string label_; @@ -96,10 +96,55 @@ public: // @param interval the interval in srs_utime_t of tick. virtual srs_error_t tick(srs_utime_t interval); virtual srs_error_t tick(int event, srs_utime_t interval); + // Remove the tick by event. + void untick(int event); public: // Cycle the hourglass, which will sleep resolution every time. // and call handler when ticked. virtual srs_error_t cycle(); }; +// The handler for fast timer. +class ISrsFastTimer +{ +public: + ISrsFastTimer(); + virtual ~ISrsFastTimer(); +public: + // Tick when timer is active. + virtual srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick) = 0; +}; + +// The fast timer, shared by objects, for high performance. +// For example, we should never start a timer for each connection or publisher or player, +// instead, we should start only one fast timer in server. +class SrsFastTimer : public ISrsHourGlass +{ +private: + SrsHourGlass* timer_; + std::map handlers_; +public: + SrsFastTimer(std::string label, srs_utime_t resolution); + virtual ~SrsFastTimer(); +public: + srs_error_t start(); +public: + void subscribe(srs_utime_t interval, ISrsFastTimer* timer); + void unsubscribe(ISrsFastTimer* timer); +// Interface ISrsHourGlass +private: + virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); +}; + +// To monitor the system wall clock timer deviation. +class SrsClockWallMonitor : public ISrsFastTimer +{ +public: + SrsClockWallMonitor(); + virtual ~SrsClockWallMonitor(); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); +}; + #endif diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index bedd5b816..a4ed6f539 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -208,11 +208,18 @@ SrsServer* SrsServerAdapter::instance() SrsHybridServer::SrsHybridServer() { + // Note that the timer depends on other global variables, + // so we MUST never create it in constructor. timer_ = NULL; + + clock_monitor_ = new SrsClockWallMonitor(); } SrsHybridServer::~SrsHybridServer() { + srs_freep(clock_monitor_); + srs_freep(timer_); + vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; @@ -235,10 +242,20 @@ srs_error_t SrsHybridServer::initialize() return srs_error_wrap(err, "initialize st failed"); } - if ((err = setup_ticks()) != srs_success) { - return srs_error_wrap(err, "tick"); + // Create global shared timer. + timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS); + + // Start the timer first. + if ((err = timer_->start()) != srs_success) { + return srs_error_wrap(err, "start timer"); } + // The hybrid server start a timer, do routines of hybrid server. + timer_->subscribe(5 * SRS_UTIME_SECONDS, this); + + // A monitor to check the clock wall deviation, per clock tick. + timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_); + vector::iterator it; for (it = servers.begin(); it != servers.end(); ++it) { ISrsHybridServer* server = *it; @@ -289,67 +306,15 @@ SrsServerAdapter* SrsHybridServer::srs() return NULL; } -srs_error_t SrsHybridServer::setup_ticks() +SrsFastTimer* SrsHybridServer::timer() { - srs_error_t err = srs_success; - - // Start timer for system global works. - timer_ = new SrsHourGlass("hybrid", this, 20 * SRS_UTIME_MILLISECONDS); - - if ((err = timer_->tick(1, 20 * SRS_UTIME_MILLISECONDS)) != srs_success) { - return srs_error_wrap(err, "tick"); - } - - if ((err = timer_->tick(2, 5 * SRS_UTIME_SECONDS)) != srs_success) { - return srs_error_wrap(err, "tick"); - } - - if ((err = timer_->start()) != srs_success) { - return srs_error_wrap(err, "start"); - } - - return err; + return timer_; } -srs_error_t SrsHybridServer::notify(int event, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; - // Update system wall clock. - if (event == 1) { - static srs_utime_t clock = 0; - - srs_utime_t now = srs_update_system_time(); - if (!clock) { - clock = now; - return err; - } - - srs_utime_t elapsed = now - clock; - clock = now; - - if (elapsed <= 15 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_15ms->sugar; - } else if (elapsed <= 21 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_20ms->sugar; - } else if (elapsed <= 25 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_25ms->sugar; - } else if (elapsed <= 30 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_30ms->sugar; - } else if (elapsed <= 35 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_35ms->sugar; - } else if (elapsed <= 40 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_40ms->sugar; - } else if (elapsed <= 80 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_80ms->sugar; - } else if (elapsed <= 160 * SRS_UTIME_MILLISECONDS) { - ++_srs_pps_clock_160ms->sugar; - } else { - ++_srs_pps_timer_s->sugar; - } - return err; - } - // Show statistics for RTC server. SrsProcSelfStat* u = srs_get_self_proc_stat(); // Resident Set Size: number of pages the process has in real memory. diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp index b13c381e6..5c6599bda 100644 --- a/trunk/src/app/srs_app_hybrid.hpp +++ b/trunk/src/app/srs_app_hybrid.hpp @@ -64,11 +64,12 @@ public: }; // The hybrid server manager. -class SrsHybridServer : public ISrsHourGlass +class SrsHybridServer : public ISrsFastTimer { private: std::vector servers; - SrsHourGlass* timer_; + SrsFastTimer* timer_; + SrsClockWallMonitor* clock_monitor_; public: SrsHybridServer(); virtual ~SrsHybridServer(); @@ -80,10 +81,10 @@ public: virtual void stop(); public: virtual SrsServerAdapter* srs(); -// interface ISrsHourGlass + SrsFastTimer* timer(); +// interface ISrsFastTimer private: - virtual srs_error_t setup_ticks(); - virtual srs_error_t notify(int event, srs_utime_t interval, srs_utime_t tick); + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); }; extern SrsHybridServer* _srs_hybrid; From 00c192ede1955ad376697faa9f60fa2cfe2207a9 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Apr 2021 11:17:58 +0800 Subject: [PATCH 3/8] Timer: Apply shared FastTimer to RTC server. 4.0.93 --- README.md | 1 + trunk/src/app/srs_app_rtc_server.cpp | 15 ++++----------- trunk/src/app/srs_app_rtc_server.hpp | 9 ++++----- trunk/src/core/srs_core_version4.hpp | 2 +- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index c8eb328e9..89aa0abb5 100755 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Other important wiki: ## V4 changes +* v4.0, 2021-05-01, Timer: Extract and apply shared FastTimer. 4.0.93 * v4.0, 2021-04-29, RTC: Support AV1 for Chrome M90. 4.0.91 * v4.0, 2021-04-24, Change push-RTSP as deprecated feature. * v4.0, 2021-04-24, Player: Change the default from RTMP to HTTP-FLV. diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index af00cbd0c..0f83bc9a6 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -259,7 +259,6 @@ SrsRtcServer::SrsRtcServer() { handler = NULL; hijacker = NULL; - timer = new SrsHourGlass("server", this, 1 * SRS_UTIME_SECONDS); _srs_config->subscribe(this); } @@ -268,8 +267,6 @@ SrsRtcServer::~SrsRtcServer() { _srs_config->unsubscribe(this); - srs_freep(timer); - if (true) { vector::iterator it; for (it = listeners.begin(); it != listeners.end(); ++it) { @@ -283,14 +280,10 @@ srs_error_t SrsRtcServer::initialize() { srs_error_t err = srs_success; - if ((err = timer->tick(5 * SRS_UTIME_SECONDS)) != srs_success) { - return srs_error_wrap(err, "hourglass tick"); - } - - if ((err = timer->start()) != srs_success) { - return srs_error_wrap(err, "start timer"); - } + // The RTC server start a timer, do routines of RTC server. + _srs_hybrid->timer()->subscribe(5 * SRS_UTIME_SECONDS, this); + // Initialize the black hole. if ((err = _srs_blackhole->initialize()) != srs_success) { return srs_error_wrap(err, "black hole"); } @@ -640,7 +633,7 @@ SrsRtcConnection* SrsRtcServer::find_session_by_username(const std::string& user return dynamic_cast(conn); } -srs_error_t SrsRtcServer::notify(int type, srs_utime_t interval, srs_utime_t tick) +srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 988d65115..ef3064dbb 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -105,10 +105,9 @@ public: }; // The RTC server instance, listen UDP port, handle UDP packet, manage RTC connections. -class SrsRtcServer : virtual public ISrsUdpMuxHandler, virtual public ISrsHourGlass, virtual public ISrsReloadHandler +class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrsReloadHandler { private: - SrsHourGlass* timer; std::vector listeners; ISrsRtcServerHandler* handler; ISrsRtcServerHijacker* hijacker; @@ -137,9 +136,9 @@ private: srs_error_t do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local_sdp, SrsRtcConnection* session); public: SrsRtcConnection* find_session_by_username(const std::string& ufrag); -// interface ISrsHourGlass -public: - virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); }; // The RTC server adapter. diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index a4c0ba3bf..25dcf6fdb 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 92 +#define VERSION_REVISION 93 #endif From c10232b4e2cd7a3a78233ddf96704b9fe781ac92 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 20 Apr 2021 19:57:21 +0800 Subject: [PATCH 4/8] Bridger: Refine transcoder to support aac2opus and opus2aac. 4.0.94 --- README.md | 1 + trunk/src/app/srs_app_rtc_codec.cpp | 645 +++++++++++--------------- trunk/src/app/srs_app_rtc_codec.hpp | 123 ++--- trunk/src/app/srs_app_rtc_source.cpp | 87 ++-- trunk/src/app/srs_app_rtc_source.hpp | 9 +- trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/kernel/srs_kernel_codec.hpp | 15 +- 7 files changed, 346 insertions(+), 536 deletions(-) diff --git a/README.md b/README.md index 89aa0abb5..c273502fb 100755 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Other important wiki: ## V4 changes +* v5.0, 2021-04-20, Refine transcoder to support aac2opus and opus2aac. 4.0.94 * v4.0, 2021-05-01, Timer: Extract and apply shared FastTimer. 4.0.93 * v4.0, 2021-04-29, RTC: Support AV1 for Chrome M90. 4.0.91 * v4.0, 2021-04-24, Change push-RTSP as deprecated feature. diff --git a/trunk/src/app/srs_app_rtc_codec.cpp b/trunk/src/app/srs_app_rtc_codec.cpp index 2c9ebd2a9..eb2d66613 100644 --- a/trunk/src/app/srs_app_rtc_codec.cpp +++ b/trunk/src/app/srs_app_rtc_codec.cpp @@ -1,4 +1,3 @@ - /** * The MIT License (MIT) * @@ -22,14 +21,13 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#include -#include #include -static const int kFrameBufMax = 40960; -static const int kPacketBufMax = 8192; +#include +#include +#include -static const char* id2codec_name(SrsAudioCodecId id) +static const char* id2codec_name(SrsAudioCodecId id) { switch (id) { case SrsAudioCodecIdAAC: @@ -41,506 +39,379 @@ static const char* id2codec_name(SrsAudioCodecId id) } } -SrsAudioDecoder::SrsAudioDecoder(SrsAudioCodecId codec) - : codec_id_(codec) +SrsAudioTranscoder::SrsAudioTranscoder() { - frame_ = NULL; - packet_ = NULL; - codec_ctx_ = NULL; + dec_ = NULL; + dec_frame_ = NULL; + dec_packet_ = NULL; + enc_ = NULL; + enc_frame_ = NULL; + enc_packet_ = NULL; + swr_ = NULL; + swr_data_ = NULL; + fifo_ = NULL; + new_pkt_pts_ = AV_NOPTS_VALUE; + next_out_pts_ = AV_NOPTS_VALUE; } -SrsAudioDecoder::~SrsAudioDecoder() +SrsAudioTranscoder::~SrsAudioTranscoder() { - if (codec_ctx_) { - avcodec_free_context(&codec_ctx_); - codec_ctx_ = NULL; - } - if (frame_) { - av_frame_free(&frame_); - frame_ = NULL; - } - if (packet_) { - av_packet_free(&packet_); - packet_ = NULL; + if (dec_) { + avcodec_free_context(&dec_); } -} - -srs_error_t SrsAudioDecoder::initialize() -{ - srs_error_t err = srs_success; - //check codec name,only support "aac","opus" - if (codec_id_ != SrsAudioCodecIdAAC && codec_id_ != SrsAudioCodecIdOpus) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Invalid codec name %d", codec_id_); + if (dec_frame_) { + av_frame_free(&dec_frame_); } - const char* codec_name = id2codec_name(codec_id_); - const AVCodec *codec = avcodec_find_decoder_by_name(codec_name); - if (!codec) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name %d(%s)", codec_id_, codec_name); + if (dec_packet_) { + av_packet_free(&dec_packet_); } - codec_ctx_ = avcodec_alloc_context3(codec); - if (!codec_ctx_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context"); + if (swr_) { + swr_free(&swr_); } - if (avcodec_open2(codec_ctx_, codec, NULL) < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not open codec"); + free_swr_samples(); + + if (enc_) { + avcodec_free_context(&enc_); } - frame_ = av_frame_alloc(); - if (!frame_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio frame"); + if (enc_frame_) { + av_frame_free(&enc_frame_); } - packet_ = av_packet_alloc(); - if (!packet_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio packet"); + if (enc_packet_) { + av_packet_free(&enc_packet_); } - return err; + if (fifo_) { + av_audio_fifo_free(fifo_); + fifo_ = NULL; + } } -srs_error_t SrsAudioDecoder::decode(SrsSample *pkt, char *buf, int &size) +srs_error_t SrsAudioTranscoder::initialize(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec, int dst_channels, int dst_samplerate, int dst_bit_rate) { srs_error_t err = srs_success; - packet_->data = (uint8_t *)pkt->bytes; - packet_->size = pkt->size; + if ((err = init_dec(src_codec)) != srs_success) { + return srs_error_wrap(err, "dec init codec:%d", src_codec); + } - int ret = avcodec_send_packet(codec_ctx_, packet_); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error submitting the packet to the decoder"); + if ((err = init_enc(dst_codec, dst_channels, dst_samplerate, dst_bit_rate)) != srs_success) { + return srs_error_wrap(err, "enc init codec:%d, channels:%d, samplerate:%d, bitrate:%d", + dst_codec, dst_channels, dst_samplerate, dst_bit_rate); } - int max = size; - size = 0; + if ((err = init_fifo()) != srs_success) { + return srs_error_wrap(err, "fifo init"); + } - while (ret >= 0) { - ret = avcodec_receive_frame(codec_ctx_, frame_); - if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { - return err; - } else if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding"); - } + return err; +} - int pcm_size = av_get_bytes_per_sample(codec_ctx_->sample_fmt); - if (pcm_size < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Failed to calculate data size"); - } +srs_error_t SrsAudioTranscoder::transcode(SrsAudioFrame *in_pkt, std::vector& out_pkts) +{ + srs_error_t err = srs_success; - // @see https://github.com/ossrs/srs/pull/2011/files - for (int i = 0; i < codec_ctx_->channels; i++) { - if (size + pcm_size * frame_->nb_samples <= max) { - memcpy(buf + size,frame_->data[i],pcm_size * frame_->nb_samples); - size += pcm_size * frame_->nb_samples; - } - } + if ((err = decode_and_resample(in_pkt)) != srs_success) { + return srs_error_wrap(err, "decode and resample"); + } + + if ((err = encode(out_pkts)) != srs_success) { + return srs_error_wrap(err, "encode"); } return err; } -AVCodecContext* SrsAudioDecoder::codec_ctx() +void SrsAudioTranscoder::free_frames(std::vector& frames) { - return codec_ctx_; + for (std::vector::iterator it = frames.begin(); it != frames.end(); ++it) { + SrsAudioFrame* p = *it; + + for (int i = 0; i < p->nb_samples; i++) { + char* pa = p->samples[i].bytes; + srs_freepa(pa); + } + + srs_freep(p); + } } -SrsAudioEncoder::SrsAudioEncoder(SrsAudioCodecId codec, int samplerate, int channels) - : channels_(channels), - sampling_rate_(samplerate), - codec_id_(codec), - want_bytes_(0) +void SrsAudioTranscoder::aac_codec_header(uint8_t **data, int *len) { - codec_ctx_ = NULL; + //srs_assert(dst_codec == SrsAudioCodecIdAAC); + *len = enc_->extradata_size; + *data = enc_->extradata; } -SrsAudioEncoder::~SrsAudioEncoder() +srs_error_t SrsAudioTranscoder::init_dec(SrsAudioCodecId src_codec) { - if (codec_ctx_) { - avcodec_free_context(&codec_ctx_); + const char* codec_name = id2codec_name(src_codec); + const AVCodec *codec = avcodec_find_decoder_by_name(codec_name); + if (!codec) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name(%d,%s)", src_codec, codec_name); } - if (frame_) { - av_frame_free(&frame_); + dec_ = avcodec_alloc_context3(codec); + if (!dec_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context"); } - -} -srs_error_t SrsAudioEncoder::initialize() -{ - srs_error_t err = srs_success; + if (avcodec_open2(dec_, codec, NULL) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not open codec"); + } - if (codec_id_ != SrsAudioCodecIdAAC && codec_id_ != SrsAudioCodecIdOpus) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Invalid codec name %d", codec_id_); + dec_frame_ = av_frame_alloc(); + if (!dec_frame_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio decode out frame"); } - frame_ = av_frame_alloc(); - if (!frame_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio frame"); + dec_packet_ = av_packet_alloc(); + if (!dec_packet_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio decode in packet"); } - const char* codec_name = id2codec_name(codec_id_); + new_pkt_pts_ = AV_NOPTS_VALUE; + return srs_success; +} + +srs_error_t SrsAudioTranscoder::init_enc(SrsAudioCodecId dst_codec, int dst_channels, int dst_samplerate, int dst_bit_rate) +{ + const char* codec_name = id2codec_name(dst_codec); const AVCodec *codec = avcodec_find_encoder_by_name(codec_name); if (!codec) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name %d(%s)", codec_id_, codec_name); + return srs_error_new(ERROR_RTC_RTP_MUXER, "Codec not found by name(%d,%s)", dst_codec, codec_name); } - codec_ctx_ = avcodec_alloc_context3(codec); - if (!codec_ctx_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context"); + enc_ = avcodec_alloc_context3(codec); + if (!enc_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio codec context(%d,%s)", dst_codec, codec_name); } - codec_ctx_->sample_rate = sampling_rate_; - codec_ctx_->channels = channels_; - codec_ctx_->channel_layout = av_get_default_channel_layout(channels_); - codec_ctx_->bit_rate = 48000; - if (codec_id_ == SrsAudioCodecIdOpus) { - codec_ctx_->sample_fmt = AV_SAMPLE_FMT_S16; + enc_->sample_rate = dst_samplerate; + enc_->channels = dst_channels; + enc_->channel_layout = av_get_default_channel_layout(dst_channels); + enc_->bit_rate = dst_bit_rate; + enc_->sample_fmt = codec->sample_fmts[0]; + enc_->time_base = {1, 1000}; + if (dst_codec == SrsAudioCodecIdOpus) { //TODO: for more level setting - codec_ctx_->compression_level = 1; - } else if (codec_id_ == SrsAudioCodecIdAAC) { - codec_ctx_->sample_fmt = AV_SAMPLE_FMT_FLTP; + enc_->compression_level = 1; + } else if (dst_codec == SrsAudioCodecIdAAC) { + enc_->strict_std_compliance = FF_COMPLIANCE_EXPERIMENTAL; } // TODO: FIXME: Show detail error. - if (avcodec_open2(codec_ctx_, codec, NULL) < 0) { + if (avcodec_open2(enc_, codec, NULL) < 0) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not open codec"); } - want_bytes_ = codec_ctx_->channels * codec_ctx_->frame_size * av_get_bytes_per_sample(codec_ctx_->sample_fmt); + enc_frame_ = av_frame_alloc(); + if (!enc_frame_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio encode in frame"); + } - frame_->format = codec_ctx_->sample_fmt; - frame_->nb_samples = codec_ctx_->frame_size; - frame_->channel_layout = codec_ctx_->channel_layout; + enc_frame_->format = enc_->sample_fmt; + enc_frame_->nb_samples = enc_->frame_size; + enc_frame_->channel_layout = enc_->channel_layout; - if (av_frame_get_buffer(frame_, 0) < 0) { + if (av_frame_get_buffer(enc_frame_, 0) < 0) { return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not get audio frame buffer"); } - return err; -} + enc_packet_ = av_packet_alloc(); + if (!enc_packet_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate audio encode out packet"); + } -int SrsAudioEncoder::want_bytes() -{ - return want_bytes_; + next_out_pts_ = AV_NOPTS_VALUE; + return srs_success; } -srs_error_t SrsAudioEncoder::encode(SrsSample *frame, char *buf, int &size) +srs_error_t SrsAudioTranscoder::init_swr(AVCodecContext* decoder) { - srs_error_t err = srs_success; - - if (want_bytes_ > 0 && frame->size != want_bytes_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "invalid frame size %d, should be %d", frame->size, want_bytes_); + swr_ = swr_alloc_set_opts(NULL, enc_->channel_layout, enc_->sample_fmt, enc_->sample_rate, + decoder->channel_layout, decoder->sample_fmt, decoder->sample_rate, 0, NULL); + if (!swr_) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr"); } - // TODO: Directly use frame? - memcpy(frame_->data[0], frame->bytes, frame->size); - - /* send the frame for encoding */ - int r0 = avcodec_send_frame(codec_ctx_, frame_); - if (r0 < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error sending the frame to the encoder, %d", r0); + int error; + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; + if ((error = swr_init(swr_)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "open swr(%d:%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - AVPacket pkt; - av_init_packet(&pkt); - pkt.data = NULL; - pkt.size = 0; - - /* read all the available output packets (in general there may be any - * number of them */ - size = 0; - while (r0 >= 0) { - r0 = avcodec_receive_packet(codec_ctx_, &pkt); - if (r0 == AVERROR(EAGAIN) || r0 == AVERROR_EOF) { - break; - } else if (r0 < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding %d", r0); - } - - //TODO: fit encoder out more pkt - memcpy(buf, pkt.data, pkt.size); - size = pkt.size; - av_packet_unref(&pkt); - - // TODO: FIXME: Refine api, got more than one packets. + /* Allocate as many pointers as there are audio channels. + * Each pointer will later point to the audio samples of the corresponding + * channels (although it may be NULL for interleaved formats). + */ + if (!(swr_data_ = (uint8_t **)calloc(enc_->channels, sizeof(*swr_data_)))) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr buffer"); } - return err; -} - -AVCodecContext* SrsAudioEncoder::codec_ctx() -{ - return codec_ctx_; -} + /* Allocate memory for the samples of all channels in one consecutive + * block for convenience. */ + if ((error = av_samples_alloc(swr_data_, NULL, enc_->channels, enc_->frame_size, enc_->sample_fmt, 0)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc swr buffer(%d:%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } -SrsAudioResample::SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt, - int src_nb, int dst_rate, int dst_layout, AVSampleFormat dst_fmt) - : src_rate_(src_rate), - src_ch_layout_(src_layout), - src_sample_fmt_(src_fmt), - src_nb_samples_(src_nb), - dst_rate_(dst_rate), - dst_ch_layout_(dst_layout), - dst_sample_fmt_(dst_fmt) -{ - src_nb_channels_ = 0; - dst_nb_channels_ = 0; - src_linesize_ = 0; - dst_linesize_ = 0; - dst_nb_samples_ = 0; - src_data_ = NULL; - dst_data_ = 0; - - max_dst_nb_samples_ = 0; - swr_ctx_ = NULL; + return srs_success; } -SrsAudioResample::~SrsAudioResample() +srs_error_t SrsAudioTranscoder::init_fifo() { - if (src_data_) { - av_freep(&src_data_[0]); - av_freep(&src_data_); - src_data_ = NULL; - } - if (dst_data_) { - av_freep(&dst_data_[0]); - av_freep(&dst_data_); - dst_data_ = NULL; - } - if (swr_ctx_) { - swr_free(&swr_ctx_); - swr_ctx_ = NULL; + if (!(fifo_ = av_audio_fifo_alloc(enc_->sample_fmt, enc_->channels, 1))) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate FIFO"); } + return srs_success; } -srs_error_t SrsAudioResample::initialize() +srs_error_t SrsAudioTranscoder::decode_and_resample(SrsAudioFrame *pkt) { srs_error_t err = srs_success; - swr_ctx_ = swr_alloc(); - if (!swr_ctx_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate resampler context"); - } - - av_opt_set_int(swr_ctx_, "in_channel_layout", src_ch_layout_, 0); - av_opt_set_int(swr_ctx_, "in_sample_rate", src_rate_, 0); - av_opt_set_sample_fmt(swr_ctx_, "in_sample_fmt", src_sample_fmt_, 0); + dec_packet_->data = (uint8_t *)pkt->samples[0].bytes; + dec_packet_->size = pkt->samples[0].size; - av_opt_set_int(swr_ctx_, "out_channel_layout", dst_ch_layout_, 0); - av_opt_set_int(swr_ctx_, "out_sample_rate", dst_rate_, 0); - av_opt_set_sample_fmt(swr_ctx_, "out_sample_fmt", dst_sample_fmt_, 0); + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; - int ret; - if ((ret = swr_init(swr_ctx_)) < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Failed to initialize the resampling context"); + int error = avcodec_send_packet(dec_, dec_packet_); + if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "submit to dec(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - src_nb_channels_ = av_get_channel_layout_nb_channels(src_ch_layout_); - ret = av_samples_alloc_array_and_samples(&src_data_, &src_linesize_, src_nb_channels_, - src_nb_samples_, src_sample_fmt_, 0); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate source samples"); - } + new_pkt_pts_ = pkt->dts + pkt->cts; + while (error >= 0) { + error = avcodec_receive_frame(dec_, dec_frame_); + if (error == AVERROR(EAGAIN) || error == AVERROR_EOF) { + return err; + } else if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } + + // Decoder is OK now, try to init swr if not initialized. + if (!swr_ && (err = init_swr(dec_)) != srs_success) { + return srs_error_wrap(err, "resample init"); + } - max_dst_nb_samples_ = dst_nb_samples_ = - av_rescale_rnd(src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP); + int in_samples = dec_frame_->nb_samples; + const uint8_t **in_data = (const uint8_t**)dec_frame_->extended_data; + do { + /* Convert the samples using the resampler. */ + int frame_size = swr_convert(swr_, swr_data_, enc_->frame_size, in_data, in_samples); + if ((error = frame_size) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not convert input samples(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } - dst_nb_channels_ = av_get_channel_layout_nb_channels(dst_ch_layout_); - ret = av_samples_alloc_array_and_samples(&dst_data_, &dst_linesize_, dst_nb_channels_, - dst_nb_samples_, dst_sample_fmt_, 0); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not allocate destination samples"); + in_data = NULL; in_samples = 0; + if ((err = add_samples_to_fifo(swr_data_, frame_size)) != srs_success) { + return srs_error_wrap(err, "write samples"); + } + } while (swr_get_out_samples(swr_, in_samples) >= enc_->frame_size); } return err; } -srs_error_t SrsAudioResample::resample(SrsSample *pcm, char *buf, int &size) +srs_error_t SrsAudioTranscoder::encode(std::vector &pkts) { - srs_error_t err = srs_success; - - int ret, plane = 1; - if (src_sample_fmt_ == AV_SAMPLE_FMT_FLTP) { - plane = 2; - } - if (src_linesize_ * plane < pcm->size || pcm->size < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "size not ok"); - } - memcpy(src_data_[0], pcm->bytes, pcm->size); - - dst_nb_samples_ = av_rescale_rnd(swr_get_delay(swr_ctx_, src_rate_) + - src_nb_samples_, dst_rate_, src_rate_, AV_ROUND_UP); - if (dst_nb_samples_ > max_dst_nb_samples_) { - av_freep(&dst_data_[0]); - ret = av_samples_alloc(dst_data_, &dst_linesize_, dst_nb_channels_, - dst_nb_samples_, dst_sample_fmt_, 1); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "alloc error"); + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; + + if (next_out_pts_ == AV_NOPTS_VALUE) { + next_out_pts_ = new_pkt_pts_; + } else { + int64_t diff = llabs(new_pkt_pts_ - next_out_pts_); + if (diff > 1000) { + srs_trace("time diff to large=%lld, next out=%lld, new pkt=%lld, set to new pkt", + diff, next_out_pts_, new_pkt_pts_); + next_out_pts_ = new_pkt_pts_; } - max_dst_nb_samples_ = dst_nb_samples_; } - ret = swr_convert(swr_ctx_, dst_data_, dst_nb_samples_, (const uint8_t **)src_data_, src_nb_samples_); - if (ret < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Error while converting"); - } + int frame_cnt = 0; + while (av_audio_fifo_size(fifo_) >= enc_->frame_size) { + /* Read as many samples from the FIFO buffer as required to fill the frame. + * The samples are stored in the frame temporarily. */ + if (av_audio_fifo_read(fifo_, (void **)enc_frame_->data, enc_->frame_size) < enc_->frame_size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not read data from FIFO"); + } + /* send the frame for encoding */ + enc_frame_->pts = next_out_pts_ + av_rescale(enc_->frame_size * frame_cnt, 1000, enc_->sample_rate); + ++frame_cnt; + int error = avcodec_send_frame(enc_, enc_frame_); + if (error < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error sending the frame to the encoder(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } - int dst_bufsize = av_samples_get_buffer_size(&dst_linesize_, dst_nb_channels_, - ret, dst_sample_fmt_, 1); - if (dst_bufsize < 0) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not get sample buffer size"); - } + av_init_packet(enc_packet_); + enc_packet_->data = NULL; + enc_packet_->size = 0; + /* read all the available output packets (in general there may be any + * number of them */ + while (error >= 0) { + error = avcodec_receive_packet(enc_, enc_packet_); + if (error == AVERROR(EAGAIN) || error == AVERROR_EOF) { + break; + } else if (error < 0) { + free_frames(pkts); + return srs_error_new(ERROR_RTC_RTP_MUXER, "Error during decoding(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); + } - int max = size; - size = 0; - if (max >= dst_bufsize) { - memcpy(buf, dst_data_[0], dst_bufsize); - size = dst_bufsize; + SrsAudioFrame *out_frame = new SrsAudioFrame; + char *buf = new char[enc_packet_->size]; + memcpy(buf, enc_packet_->data, enc_packet_->size); + out_frame->add_sample(buf, enc_packet_->size); + out_frame->dts = enc_packet_->dts; + out_frame->cts = enc_packet_->pts - enc_packet_->dts; + pkts.push_back(out_frame); + } } - return err; -} - -SrsAudioRecode::SrsAudioRecode(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec,int channels, int samplerate) - : dst_channels_(channels), - dst_samplerate_(samplerate), - src_codec_(src_codec), - dst_codec_(dst_codec) -{ - size_ = 0; - data_ = NULL; + next_out_pts_ += av_rescale(enc_->frame_size * frame_cnt, 1000, enc_->sample_rate); - dec_ = NULL; - enc_ = NULL; - resample_ = NULL; + return srs_success; } -SrsAudioRecode::~SrsAudioRecode() +srs_error_t SrsAudioTranscoder::add_samples_to_fifo(uint8_t **samples, int frame_size) { - srs_freep(dec_); - srs_freep(enc_); - srs_freep(resample_); - srs_freepa(data_); -} + char err_buf[AV_ERROR_MAX_STRING_SIZE] = {0}; -srs_error_t SrsAudioRecode::initialize() -{ - srs_error_t err = srs_success; + int error; - dec_ = new SrsAudioDecoder(src_codec_); - if ((err = dec_->initialize()) != srs_success) { - return srs_error_wrap(err, "dec init"); + /* Make the FIFO as large as it needs to be to hold both, + * the old and the new samples. */ + if ((error = av_audio_fifo_realloc(fifo_, av_audio_fifo_size(fifo_) + frame_size)) < 0) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not reallocate FIFO(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - enc_ = new SrsAudioEncoder(dst_codec_, dst_samplerate_, dst_channels_); - if ((err = enc_->initialize()) != srs_success) { - return srs_error_wrap(err, "enc init"); - } - - enc_want_bytes_ = enc_->want_bytes(); - if (enc_want_bytes_ > 0) { - data_ = new char[enc_want_bytes_]; - srs_assert(data_); + /* Store the new samples in the FIFO buffer. */ + if ((error = av_audio_fifo_write(fifo_, (void **)samples, frame_size)) < frame_size) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "Could not write data to FIFO(%d,%s)", error, + av_make_error_string(err_buf, AV_ERROR_MAX_STRING_SIZE, error)); } - return err; + return srs_success; } -srs_error_t SrsAudioRecode::transcode(SrsSample *pkt, char **buf, int *buf_len, int &n) +void SrsAudioTranscoder::free_swr_samples() { - srs_error_t err = srs_success; - - if (!dec_) { - return srs_error_new(ERROR_RTC_RTP_MUXER, "dec_ nullptr"); + if (swr_data_) { + av_freep(&swr_data_[0]); + free(swr_data_); + swr_data_ = NULL; } - - int decode_len = kPacketBufMax; - static char decode_buffer[kPacketBufMax]; - if ((err = dec_->decode(pkt, decode_buffer, decode_len)) != srs_success) { - return srs_error_wrap(err, "decode error"); - } - - if (!resample_) { - int channel_layout = av_get_default_channel_layout(dst_channels_); - AVCodecContext *codec_ctx = dec_->codec_ctx(); - resample_ = new SrsAudioResample(codec_ctx->sample_rate, (int)codec_ctx->channel_layout, \ - codec_ctx->sample_fmt, codec_ctx->frame_size, dst_samplerate_, channel_layout, \ - enc_->codec_ctx()->sample_fmt); - if ((err = resample_->initialize()) != srs_success) { - return srs_error_wrap(err, "init resample"); - } - } - - SrsSample pcm; - pcm.bytes = decode_buffer; - pcm.size = decode_len; - int resample_len = kFrameBufMax; - static char resample_buffer[kFrameBufMax]; - static char encode_buffer[kPacketBufMax]; - if ((err = resample_->resample(&pcm, resample_buffer, resample_len)) != srs_success) { - return srs_error_wrap(err, "resample error"); - } - - n = 0; - - // We can encode it in one time. - if (enc_want_bytes_ <= 0) { - int encode_len; - pcm.bytes = (char *)data_; - pcm.size = size_; - if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) { - return srs_error_wrap(err, "encode error"); - } - - memcpy(buf[n], encode_buffer, encode_len); - buf_len[n] = encode_len; - n++; - - return err; - } - - // Need to refill the sample to data, because the frame size is not matched to encoder. - int data_left = resample_len; - if (size_ + data_left < enc_want_bytes_) { - memcpy(data_ + size_, resample_buffer, data_left); - size_ += data_left; - return err; - } - - int index = 0; - while (1) { - data_left = data_left - (enc_want_bytes_ - size_); - memcpy(data_ + size_, resample_buffer + index, enc_want_bytes_ - size_); - index += enc_want_bytes_ - size_; - size_ += enc_want_bytes_ - size_; - - int encode_len; - pcm.bytes = (char *)data_; - pcm.size = size_; - if ((err = enc_->encode(&pcm, encode_buffer, encode_len)) != srs_success) { - return srs_error_wrap(err, "encode error"); - } - - if (encode_len > 0) { - memcpy(buf[n], encode_buffer, encode_len); - buf_len[n] = encode_len; - n++; - } - - size_ = 0; - if(!data_left) { - break; - } - - if(data_left < enc_want_bytes_) { - memcpy(data_ + size_, resample_buffer + index, data_left); - size_ += data_left; - break; - } - } - - return err; } + diff --git a/trunk/src/app/srs_app_rtc_codec.hpp b/trunk/src/app/srs_app_rtc_codec.hpp index b9da7daf5..e38f184a5 100644 --- a/trunk/src/app/srs_app_rtc_codec.hpp +++ b/trunk/src/app/srs_app_rtc_codec.hpp @@ -26,6 +26,8 @@ #include +#include + #include #ifdef __cplusplus @@ -39,98 +41,59 @@ extern "C" { #include #include #include +#include #ifdef __cplusplus } #endif -class SrsSample; - -class SrsAudioDecoder -{ -private: - AVFrame* frame_; - AVPacket* packet_; - AVCodecContext* codec_ctx_; - SrsAudioCodecId codec_id_; -public: - //Only support "aac","opus" - SrsAudioDecoder(SrsAudioCodecId codec); - virtual ~SrsAudioDecoder(); - srs_error_t initialize(); - virtual srs_error_t decode(SrsSample *pkt, char *buf, int &size); - AVCodecContext* codec_ctx(); -}; - -class SrsAudioEncoder +class SrsAudioTranscoder { private: - int channels_; - int sampling_rate_; - AVCodecContext* codec_ctx_; - SrsAudioCodecId codec_id_; - int want_bytes_; - AVFrame* frame_; -public: - //Only support "aac","opus" - SrsAudioEncoder(SrsAudioCodecId codec, int samplerate, int channelsy); - virtual ~SrsAudioEncoder(); - srs_error_t initialize(); - //The encoder wanted bytes to call encode, if > 0, caller must feed the same bytes - //Call after initialize successed - int want_bytes(); - virtual srs_error_t encode(SrsSample *frame, char *buf, int &size); - AVCodecContext* codec_ctx(); -}; + AVCodecContext *dec_; + AVFrame *dec_frame_; + AVPacket *dec_packet_; -class SrsAudioResample -{ -private: - int src_rate_; - int src_ch_layout_; - int src_nb_channels_; - enum AVSampleFormat src_sample_fmt_; - int src_linesize_; - int src_nb_samples_; - uint8_t **src_data_; + AVCodecContext *enc_; + AVFrame *enc_frame_; + AVPacket *enc_packet_; - int dst_rate_; - int dst_ch_layout_; - int dst_nb_channels_; - enum AVSampleFormat dst_sample_fmt_; - int dst_linesize_; - int dst_nb_samples_; - uint8_t **dst_data_; + SwrContext *swr_; + //buffer for swr out put + uint8_t **swr_data_; + AVAudioFifo *fifo_; - int max_dst_nb_samples_; - struct SwrContext *swr_ctx_; + int64_t new_pkt_pts_; + int64_t next_out_pts_; public: - SrsAudioResample(int src_rate, int src_layout, enum AVSampleFormat src_fmt, - int src_nb, int dst_rate, int dst_layout, enum AVSampleFormat dst_fmt); - virtual ~SrsAudioResample(); - srs_error_t initialize(); - virtual srs_error_t resample(SrsSample *pcm, char *buf, int &size); -}; - -// TODO: FIXME: Rename to Transcoder. -class SrsAudioRecode -{ -private: - SrsAudioDecoder *dec_; - SrsAudioEncoder *enc_; - SrsAudioResample *resample_; - int dst_channels_; - int dst_samplerate_; - int size_; - char *data_; - SrsAudioCodecId src_codec_; - SrsAudioCodecId dst_codec_; - int enc_want_bytes_; + SrsAudioTranscoder(); + virtual ~SrsAudioTranscoder(); public: - SrsAudioRecode(SrsAudioCodecId src_codec, SrsAudioCodecId dst_codec,int channels, int samplerate); - virtual ~SrsAudioRecode(); - srs_error_t initialize(); - virtual srs_error_t transcode(SrsSample *pkt, char **buf, int *buf_len, int &n); + // Initialize the transcoder, transcode from codec as to codec. + // The channels specifies the number of output channels for encoder, for example, 2. + // The sample_rate specifies the sample rate of encoder, for example, 48000. + // The bit_rate specifies the bitrate of encoder, for example, 48000. + srs_error_t initialize(SrsAudioCodecId from, SrsAudioCodecId to, int channels, int sample_rate, int bit_rate); + // Transcode the input audio frame in, as output audio frames outs. + virtual srs_error_t transcode(SrsAudioFrame* in, std::vector& outs); + // Free the generated audio frames by transcode. + void free_frames(std::vector& frames); +public: + // Get the aac codec header, for example, FLV sequence header. + // @remark User should never free the data, it's managed by this transcoder. + void aac_codec_header(uint8_t** data, int* len); +private: + srs_error_t init_dec(SrsAudioCodecId from); + srs_error_t init_enc(SrsAudioCodecId to, int channels, int samplerate, int bit_rate); + srs_error_t init_swr(AVCodecContext* decoder); + srs_error_t init_fifo(); + + srs_error_t decode_and_resample(SrsAudioFrame* pkt); + srs_error_t encode(std::vector &pkts); + + srs_error_t add_samples_to_fifo(uint8_t** samples, int frame_size); + void free_swr_samples(); }; #endif /* SRS_APP_AUDIO_RECODE_HPP */ + diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index a1a3d699d..4e2108d9d 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -70,11 +70,6 @@ const int kAudioSamplerate = 48000; const int kVideoPayloadType = 102; const int kVideoSamplerate = 90000; -// An AAC packet may be transcoded to many OPUS packets. -const int kMaxOpusPackets = 8; -// The max size for each OPUS packet. -const int kMaxOpusPacketSize = 4096; - // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings // For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, @@ -632,12 +627,11 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) req = NULL; source_ = source; format = new SrsRtmpFormat(); - codec = new SrsAudioRecode(SrsAudioCodecIdAAC, SrsAudioCodecIdOpus, kAudioChannel, kAudioSamplerate); + codec_ = new SrsAudioTranscoder(); discard_aac = false; discard_bframe = false; merge_nalus = false; meta = new SrsMetaCache(); - audio_timestamp = 0; audio_sequence = 0; video_sequence = 0; @@ -687,7 +681,7 @@ SrsRtcFromRtmpBridger::SrsRtcFromRtmpBridger(SrsRtcStream* source) SrsRtcFromRtmpBridger::~SrsRtcFromRtmpBridger() { srs_freep(format); - srs_freep(codec); + srs_freep(codec_); srs_freep(meta); } @@ -701,7 +695,8 @@ srs_error_t SrsRtcFromRtmpBridger::initialize(SrsRequest* r) return srs_error_wrap(err, "format initialize"); } - if ((err = codec->initialize()) != srs_success) { + int bitrate = 48000; // The output bitrate in bps. + if ((err = codec_->initialize(SrsAudioCodecIdAAC, SrsAudioCodecIdOpus, kAudioChannel, kAudioSamplerate, bitrate)) != srs_success) { return srs_error_wrap(err, "init codec"); } @@ -779,72 +774,58 @@ srs_error_t SrsRtcFromRtmpBridger::on_audio(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "aac append header"); } - if (adts_audio) { - err = transcode(adts_audio, nn_adts_audio); - srs_freep(adts_audio); + if (!adts_audio) { + return err; } + SrsAudioFrame aac; + aac.dts = format->audio->dts; + aac.cts = format->audio->cts; + if ((err = aac.add_sample(adts_audio, nn_adts_audio)) == srs_success) { + // If OK, transcode the AAC to Opus and consume it. + err = transcode(&aac); + } + + srs_freepa(adts_audio); + return err; } -srs_error_t SrsRtcFromRtmpBridger::transcode(char* adts_audio, int nn_adts_audio) +srs_error_t SrsRtcFromRtmpBridger::transcode(SrsAudioFrame* pkt) { srs_error_t err = srs_success; - // Opus packet cache. - static char* opus_payloads[kMaxOpusPackets]; - - static bool initialized = false; - if (!initialized) { - initialized = true; - - static char opus_packets_cache[kMaxOpusPackets][kMaxOpusPacketSize]; - opus_payloads[0] = &opus_packets_cache[0][0]; - for (int i = 1; i < kMaxOpusPackets; i++) { - opus_payloads[i] = opus_packets_cache[i]; - } - } - - // Transcode an aac packet to many opus packets. - SrsSample aac; - aac.bytes = adts_audio; - aac.size = nn_adts_audio; - - int nn_opus_packets = 0; - int opus_sizes[kMaxOpusPackets]; - if ((err = codec->transcode(&aac, opus_payloads, opus_sizes, nn_opus_packets)) != srs_success) { + std::vector out_pkts; + if ((err = codec_->transcode(pkt, out_pkts)) != srs_success) { return srs_error_wrap(err, "recode error"); } // Save OPUS packets in shared message. - if (nn_opus_packets <= 0) { + if (out_pkts.empty()) { return err; } - int nn_max_extra_payload = 0; - for (int i = 0; i < nn_opus_packets; i++) { - char* data = (char*)opus_payloads[i]; - int size = (int)opus_sizes[i]; - - // TODO: FIXME: Use it to padding audios. - nn_max_extra_payload = srs_max(nn_max_extra_payload, size); - + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { SrsRtpPacketCacheHelper* helper = new SrsRtpPacketCacheHelper(); SrsAutoFree(SrsRtpPacketCacheHelper, helper); - if ((err = package_opus(data, size, helper)) != srs_success) { - return srs_error_wrap(err, "package opus"); + if ((err = package_opus(*it, helper)) != srs_success) { + err = srs_error_wrap(err, "package opus"); + break; } if ((err = source_->on_rtp(helper->pkt)) != srs_success) { - return srs_error_wrap(err, "consume opus"); + err = srs_error_wrap(err, "consume opus"); + break; } } + codec_->free_frames(out_pkts); + return err; } -srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper) +srs_error_t SrsRtcFromRtmpBridger::package_opus(SrsAudioFrame* audio, SrsRtpPacketCacheHelper* helper) { srs_error_t err = srs_success; @@ -854,16 +835,14 @@ srs_error_t SrsRtcFromRtmpBridger::package_opus(char* data, int size, SrsRtpPack pkt->frame_type = SrsFrameTypeAudio; pkt->header.set_marker(true); pkt->header.set_sequence(audio_sequence++); - pkt->header.set_timestamp(audio_timestamp); - - // TODO: FIXME: Why 960? Need Refactoring? - audio_timestamp += 960; + pkt->header.set_timestamp(audio->dts * 48); SrsRtpRawPayload* raw = _srs_rtp_raw_cache->allocate(); pkt->set_payload(raw, SrsRtpPacketPayloadTypeRaw); - raw->payload = pkt->wrap(data, size); - raw->nn_payload = size; + srs_assert(audio->nb_samples == 1); + raw->payload = pkt->wrap(audio->samples[0].bytes, audio->samples[0].size); + raw->nn_payload = audio->samples[0].size; return err; } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index e29711feb..1495f3eaf 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -45,7 +45,7 @@ class SrsCommonMessage; class SrsMessageArray; class SrsRtcStream; class SrsRtcFromRtmpBridger; -class SrsAudioRecode; +class SrsAudioTranscoder; class SrsRtpPacket2; class SrsRtpPacketCacheHelper; class SrsSample; @@ -263,10 +263,9 @@ private: SrsMetaCache* meta; private: bool discard_aac; - SrsAudioRecode* codec; + SrsAudioTranscoder* codec_; bool discard_bframe; bool merge_nalus; - uint32_t audio_timestamp; uint16_t audio_sequence; uint16_t video_sequence; uint32_t audio_ssrc; @@ -280,8 +279,8 @@ public: virtual void on_unpublish(); virtual srs_error_t on_audio(SrsSharedPtrMessage* msg); private: - srs_error_t transcode(char* adts_audio, int nn_adts_audio); - srs_error_t package_opus(char* data, int size, SrsRtpPacketCacheHelper* helper); + srs_error_t transcode(SrsAudioFrame* audio); + srs_error_t package_opus(SrsAudioFrame* audio, SrsRtpPacketCacheHelper* helper); public: virtual srs_error_t on_video(SrsSharedPtrMessage* msg); private: diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 25dcf6fdb..a4c3312f9 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 93 +#define VERSION_REVISION 94 #endif diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp index 56b0bf1d5..a4c0b3284 100644 --- a/trunk/src/kernel/srs_kernel_codec.hpp +++ b/trunk/src/kernel/srs_kernel_codec.hpp @@ -650,9 +650,8 @@ public: virtual bool is_avc_codec_ok(); }; -/** - * A frame, consists of a codec and a group of samples. - */ +// A frame, consists of a codec and a group of samples. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsFrame { public: @@ -677,9 +676,8 @@ public: virtual srs_error_t add_sample(char* bytes, int size); }; -/** - * A audio frame, besides a frame, contains the audio frame info, such as frame type. - */ +// A audio frame, besides a frame, contains the audio frame info, such as frame type. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsAudioFrame : public SrsFrame { public: @@ -691,9 +689,8 @@ public: virtual SrsAudioCodecConfig* acodec(); }; -/** - * A video frame, besides a frame, contains the video frame info, such as frame type. - */ +// A video frame, besides a frame, contains the video frame info, such as frame type. +// TODO: FIXME: Rename to packet to follow names of FFmpeg, which means before decoding or after decoding. class SrsVideoFrame : public SrsFrame { public: From c770e6d7bcb3c104e6b7709273aab4be34f13805 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 21 Apr 2021 19:17:56 +0800 Subject: [PATCH 5/8] Bridger: Start RTMP2RTC bridger in RTMP publisher --- trunk/src/app/srs_app_rtc_source.cpp | 72 ---------------------------- trunk/src/app/srs_app_rtc_source.hpp | 24 ---------- trunk/src/app/srs_app_rtmp_conn.cpp | 47 +++++++++++++----- trunk/src/app/srs_app_source.cpp | 40 +++++----------- trunk/src/app/srs_app_source.hpp | 4 +- 5 files changed, 49 insertions(+), 138 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 4e2108d9d..8570252be 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -337,7 +337,6 @@ SrsRtcStream::SrsRtcStream() stream_desc_ = NULL; req = NULL; - bridger_ = new SrsRtcDummyBridger(this); } SrsRtcStream::~SrsRtcStream() @@ -347,7 +346,6 @@ SrsRtcStream::~SrsRtcStream() consumers.clear(); srs_freep(req); - srs_freep(bridger_); srs_freep(stream_desc_); } @@ -408,11 +406,6 @@ SrsContextId SrsRtcStream::pre_source_id() return _pre_source_id; } -ISrsSourceBridger* SrsRtcStream::bridger() -{ - return bridger_; -} - srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -477,16 +470,6 @@ srs_error_t SrsRtcStream::on_publish() is_created_ = true; is_delivering_packets_ = true; - // Create a new bridger, because it's been disposed when unpublish. -#ifdef SRS_FFMPEG_FIT - SrsRtcFromRtmpBridger* impl = new SrsRtcFromRtmpBridger(this); - if ((err = impl->initialize(req)) != srs_success) { - return srs_error_wrap(err, "bridge initialize"); - } - - bridger_->setup(impl); -#endif - // Notify the consumers about stream change event. if ((err = on_source_changed()) != srs_success) { return srs_error_wrap(err, "source id change"); @@ -522,11 +505,6 @@ void SrsRtcStream::on_unpublish() // release unpublish stream description. set_stream_desc(NULL); - // Dispose the impl of bridger, to free memory. -#ifdef SRS_FFMPEG_FIT - bridger_->setup(NULL); -#endif - // TODO: FIXME: Handle by statistic. } @@ -1196,56 +1174,6 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vectoron_publish(); - } - return rtc_->on_publish(); -} - -srs_error_t SrsRtcDummyBridger::on_audio(SrsSharedPtrMessage* audio) -{ - if (impl_) { - return impl_->on_audio(audio); - } - return srs_success; -} - -srs_error_t SrsRtcDummyBridger::on_video(SrsSharedPtrMessage* video) -{ - if (impl_) { - return impl_->on_video(video); - } - return srs_success; -} - -void SrsRtcDummyBridger::on_unpublish() -{ - if (impl_) { - impl_->on_unpublish(); - return; - } - rtc_->on_unpublish(); -} - -void SrsRtcDummyBridger::setup(ISrsSourceBridger* impl) -{ - srs_freep(impl_); - impl_ = impl; -} - SrsCodecPayload::SrsCodecPayload() { pt_of_publisher_ = pt_ = 0; diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 1495f3eaf..a7eb4abfc 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -56,7 +56,6 @@ class SrsRtpRingBuffer; class SrsRtpNackForReceiver; class SrsJsonObject; class SrsErrorPithyPrint; -class SrsRtcDummyBridger; class SrsNtp { @@ -177,8 +176,6 @@ private: SrsContextId _pre_source_id; SrsRequest* req; ISrsRtcPublishStream* publish_stream_; - // Transmux RTMP to RTC. - SrsRtcDummyBridger* bridger_; // Steam description for this steam. SrsRtcStreamDescription* stream_desc_; private: @@ -204,8 +201,6 @@ public: // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); - // Get the bridger. - ISrsSourceBridger* bridger(); public: // Create consumer // @param consumer, output the create consumer. @@ -293,25 +288,6 @@ private: }; #endif -class SrsRtcDummyBridger : public ISrsSourceBridger -{ -private: - SrsRtcStream* rtc_; - // The optional implementation bridger, ignore if NULL. - ISrsSourceBridger* impl_; -public: - SrsRtcDummyBridger(SrsRtcStream* s); - virtual ~SrsRtcDummyBridger(); -public: - virtual srs_error_t on_publish(); - virtual srs_error_t on_audio(SrsSharedPtrMessage* audio); - virtual srs_error_t on_video(SrsSharedPtrMessage* video); - virtual void on_unpublish(); -public: - // Setup a new implementation bridger, which might be NULL to free previous one. - void setup(ISrsSourceBridger* impl); -}; - // TODO: FIXME: Rename it. class SrsCodecPayload { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e82ca3fbb..4cf58610f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -55,6 +55,7 @@ using namespace std; #include #include #include +#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -959,23 +960,47 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source) srs_error_t err = srs_success; SrsRequest* req = info->req; - + + // Check whether RTC stream is busy. +#ifdef SRS_RTC + SrsRtcStream *rtc = NULL; + bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); + bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); + if (rtc_server_enabled && rtc_enabled && !info->edge) { + if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if (!rtc->can_publish()) { + return srs_error_new(ERROR_RTC_SOURCE_BUSY, "rtc stream %s busy", req->get_stream_url().c_str()); + } + } +#endif + + // Check whether RTMP stream is busy. if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } - - // when edge, ignore the publish event, directly proxy it. - if (info->edge) { - if ((err = source->on_edge_start_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: edge start publish"); + + // Bridge to RTC streaming. +#ifdef SRS_RTC + if (rtc) { + SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc); + if ((err = bridger->initialize(req)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "bridger init"); } + + source->set_bridger(bridger); + } +#endif + + // Start publisher now. + if (info->edge) { + return source->on_edge_start_publish(); } else { - if ((err = source->on_publish()) != srs_success) { - return srs_error_wrap(err, "rtmp: source publish"); - } + return source->on_publish(); } - - return err; } void SrsRtmpConn::release_publish(SrsSource* source) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 1a9553d41..ce09e6614 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1728,19 +1728,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); -#ifdef SRS_RTC - bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); - bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost); - - // Get the RTC source and bridger. - SrsRtcStream* rtc = NULL; - if (rtc_server_enabled && rtc_enabled) { - if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) { - err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str()); - goto failed; - } - } -#endif srs_trace("new source, stream_url=%s", stream_url.c_str()); source = new SrsSource(); @@ -1748,14 +1735,6 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); goto failed; } - -#ifdef SRS_RTC - // If rtc enabled, bridge RTMP source to RTC, - // all RTMP packets will be forwarded to RTC source. - if (source && rtc) { - source->bridge_to(rtc->bridger()); - } -#endif pool[stream_url] = source; *pps = source; @@ -1883,7 +1862,7 @@ SrsSource::SrsSource() die_at = 0; handler = NULL; - bridger = NULL; + bridger_ = NULL; play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); @@ -1915,6 +1894,7 @@ SrsSource::~SrsSource() srs_freep(gop_cache); srs_freep(req); + srs_freep(bridger_); } void SrsSource::dispose() @@ -1990,9 +1970,10 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h) return err; } -void SrsSource::bridge_to(ISrsSourceBridger* v) +void SrsSource::set_bridger(ISrsSourceBridger* v) { - bridger = v; + srs_freep(bridger_); + bridger_ = v; } srs_error_t SrsSource::on_reload_vhost_play(string vhost) @@ -2245,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_audio(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume audio"); } @@ -2375,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) } // For bridger to consume the message. - if (bridger && (err = bridger->on_video(msg)) != srs_success) { + if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume video"); } @@ -2539,7 +2520,7 @@ srs_error_t SrsSource::on_publish() return srs_error_wrap(err, "handle publish"); } - if (bridger && (err = bridger->on_publish()) != srs_success) { + if (bridger_ && (err = bridger_->on_publish()) != srs_success) { return srs_error_wrap(err, "bridger publish"); } @@ -2584,8 +2565,9 @@ void SrsSource::on_unpublish() handler->on_unpublish(this, req); - if (bridger) { - bridger->on_unpublish(); + if (bridger_) { + bridger_->on_unpublish(); + srs_freep(bridger_); } // no consumer, stream is die. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 84474fbd8..5244bbbfd 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -535,7 +535,7 @@ private: // The event handler. ISrsSourceHandler* handler; // The source bridger for other source. - ISrsSourceBridger* bridger; + ISrsSourceBridger* bridger_; // The edge control service SrsPlayEdge* play_edge; SrsPublishEdge* publish_edge; @@ -563,7 +563,7 @@ public: // Initialize the hls with handlers. virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h); // Bridge to other source, forward packets to it. - void bridge_to(ISrsSourceBridger* v); + void set_bridger(ISrsSourceBridger* v); // Interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); From 3d225973efd6fcc04d26e3822478423b18ec6f95 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 23 Apr 2021 10:42:15 +0800 Subject: [PATCH 6/8] Bridger: Support RTC2RTMP bridger and shared FastTimer. 4.0.95 --- README.md | 1 + trunk/conf/full.conf | 9 + trunk/src/app/srs_app_config.cpp | 43 +- trunk/src/app/srs_app_config.hpp | 2 + trunk/src/app/srs_app_listener.cpp | 4 + trunk/src/app/srs_app_rtc_conn.cpp | 22 + trunk/src/app/srs_app_rtc_source.cpp | 523 ++++++++++++++++++++++++ trunk/src/app/srs_app_rtc_source.hpp | 67 ++- trunk/src/app/srs_app_source.hpp | 2 +- trunk/src/core/srs_core_version4.hpp | 2 +- trunk/src/kernel/srs_kernel_rtc_rtp.cpp | 27 ++ trunk/src/kernel/srs_kernel_rtc_rtp.hpp | 2 + 12 files changed, 700 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c273502fb..2b5606684 100755 --- a/README.md +++ b/README.md @@ -157,6 +157,7 @@ Other important wiki: ## V4 changes +* v5.0, 2021-04-20, Support RTC2RTMP bridger and shared FastTimer. 4.0.95 * v5.0, 2021-04-20, Refine transcoder to support aac2opus and opus2aac. 4.0.94 * v4.0, 2021-05-01, Timer: Extract and apply shared FastTimer. 4.0.93 * v4.0, 2021-04-29, RTC: Support AV1 for Chrome M90. 4.0.91 diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 978b237d0..d894ed62e 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -571,6 +571,15 @@ vhost rtc.vhost.srs.com { # discard Discard aac audio packet. # default: transcode aac transcode; + ############################################################### + # For transmuxing RTC to RTMP. + # Whether trans-mux RTC to RTMP streaming. + # Default: off + rtc_to_rtmp off; + # The PLI interval in seconds, for RTC to RTMP. + # Note the available range is [0.5, 30] + # Default: 6.0 + pli_for_rtmp 6.0; } ############################################################### # For transmuxing RTMP to RTC, it will impact the default values if RTC is on. diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 142881496..8944954f5 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3940,7 +3940,8 @@ srs_error_t SrsConfig::check_normal_config() for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; if (m != "enabled" && m != "bframe" && m != "aac" && m != "stun_timeout" && m != "stun_strict_check" - && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt") { + && m != "dtls_role" && m != "dtls_version" && m != "drop_for_pt" && m != "rtc_to_rtmp" + && m != "pli_for_rtmp") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.rtc.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -5226,6 +5227,46 @@ int SrsConfig::get_rtc_drop_for_pt(string vhost) return ::atoi(conf->arg0().c_str()); } +bool SrsConfig::get_rtc_to_rtmp(string vhost) +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("rtc_to_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +srs_utime_t SrsConfig::get_rtc_pli_for_rtmp(string vhost) +{ + static srs_utime_t DEFAULT = 6 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = get_rtc(vhost); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("pli_for_rtmp"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + srs_utime_t v = (srs_utime_t)(::atof(conf->arg0().c_str()) * SRS_UTIME_SECONDS); + if (v < 500 * SRS_UTIME_MILLISECONDS || v > 30 * SRS_UTIME_SECONDS) { + srs_warn("Reset pli %dms to %dms", srsu2msi(v), srsu2msi(DEFAULT)); + return DEFAULT; + } + + return v; +} + bool SrsConfig::get_rtc_nack_enabled(string vhost) { static bool DEFAULT = true; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 0e31bcb6d..95d507bd7 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -562,6 +562,8 @@ public: std::string get_rtc_dtls_role(std::string vhost); std::string get_rtc_dtls_version(std::string vhost); int get_rtc_drop_for_pt(std::string vhost); + bool get_rtc_to_rtmp(std::string vhost); + srs_utime_t get_rtc_pli_for_rtmp(std::string vhost); bool get_rtc_nack_enabled(std::string vhost); bool get_rtc_nack_no_copy(std::string vhost); bool get_rtc_twcc_enabled(std::string vhost); diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c38609c9f..0037a6d9c 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -536,6 +536,10 @@ srs_error_t SrsUdpMuxListener::listen() srs_freep(trd); trd = new SrsSTCoroutine("udp", this, cid); + + //change stack size to 256K, fix crash when call some 3rd-part api. + ((SrsSTCoroutine*)trd)->set_stack_size(1 << 18); + if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start thread"); } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index c220f0f83..547ef0aa6 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -998,6 +998,28 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti } source->set_publish_stream(this); + // Bridge to rtmp + bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost); + if (rtc_to_rtmp) { + SrsSource *rtmp = NULL; + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // TODO: FIMXE: Check it in SrsRtcConnection::add_publisher? + if (!rtmp->can_publish(false)) { + return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str()); + } + + SrsRtmpFromRtcBridger *bridger = new SrsRtmpFromRtcBridger(rtmp); + if ((err = bridger->initialize(r)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } + + source->set_bridger(bridger); + } + return err; } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 8570252be..344d42168 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -328,6 +328,14 @@ ISrsRtcStreamEventHandler::~ISrsRtcStreamEventHandler() { } +ISrsRtcSourceBridger::ISrsRtcSourceBridger() +{ +} + +ISrsRtcSourceBridger::~ISrsRtcSourceBridger() +{ +} + SrsRtcStream::SrsRtcStream() { is_created_ = false; @@ -337,6 +345,7 @@ SrsRtcStream::SrsRtcStream() stream_desc_ = NULL; req = NULL; + bridger_ = NULL; } SrsRtcStream::~SrsRtcStream() @@ -346,6 +355,7 @@ SrsRtcStream::~SrsRtcStream() consumers.clear(); srs_freep(req); + srs_freep(bridger_); srs_freep(stream_desc_); } @@ -406,6 +416,12 @@ SrsContextId SrsRtcStream::pre_source_id() return _pre_source_id; } +void SrsRtcStream::set_bridger(ISrsRtcSourceBridger *bridger) +{ + srs_freep(bridger_); + bridger_ = bridger; +} + srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer) { srs_error_t err = srs_success; @@ -475,6 +491,17 @@ srs_error_t SrsRtcStream::on_publish() return srs_error_wrap(err, "source id change"); } + // If bridge to other source, handle event and start timer to request PLI. + if (bridger_) { + if ((err = bridger_->on_publish()) != srs_success) { + return srs_error_wrap(err, "bridger on publish"); + } + + // For SrsRtcStream::on_timer() + srs_utime_t pli_for_rtmp = _srs_config->get_rtc_pli_for_rtmp(req->vhost); + _srs_hybrid->timer()->subscribe(pli_for_rtmp, this); + } + // TODO: FIXME: Handle by statistic. return err; @@ -502,6 +529,15 @@ void SrsRtcStream::on_unpublish() h->on_unpublish(); } + //free bridger resource + if (bridger_) { + // For SrsRtcStream::on_timer() + _srs_hybrid->timer()->unsubscribe(this); + + bridger_->on_unpublish(); + srs_freep(bridger_); + } + // release unpublish stream description. set_stream_desc(NULL); @@ -545,6 +581,10 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) } } + if (bridger_ && (err = bridger_->on_rtp(pkt)) != srs_success) { + return srs_error_wrap(err, "bridger consume message"); + } + return err; } @@ -586,6 +626,22 @@ std::vector SrsRtcStream::get_track_desc(std::string ty return track_descs; } +srs_error_t SrsRtcStream::on_timer(srs_utime_t interval, srs_utime_t tick) +{ + srs_error_t err = srs_success; + + if (!publish_stream_) { + return err; + } + + for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) { + SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i); + publish_stream_->request_keyframe(desc->ssrc_); + } + + return err; +} + SrsRtpPacketCacheHelper::SrsRtpPacketCacheHelper() { pkt = _srs_rtp_cache->allocate(); @@ -1172,6 +1228,473 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vectorinitialize(from, to, channels, sample_rate, bitrate)) != srs_success) { + return srs_error_wrap(err, "bridge initialize"); + } + + if ((err = format->initialize()) != srs_success) { + return srs_error_wrap(err, "format initialize"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_publish() +{ + srs_error_t err = srs_success; + + is_first_audio = true; + is_first_video = true; + + // TODO: FIXME: Should sync with bridger? + if ((err = source_->on_publish()) != srs_success) { + return srs_error_wrap(err, "source publish"); + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::on_rtp(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + if (!pkt->payload()) { + return err; + } + + if (pkt->is_audio()) { + err = trancode_audio(pkt); + } else { + err = packet_video(pkt); + } + + return err; +} + +void SrsRtmpFromRtcBridger::on_unpublish() +{ + // TODO: FIXME: Should sync with bridger? + source_->on_unpublish(); +} + +srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket2 *pkt) +{ + srs_error_t err = srs_success; + + // to common message. + uint32_t ts = pkt->header.get_timestamp()/(48000/1000); + if (is_first_audio) { + int header_len = 0; + uint8_t* header = NULL; + codec_->aac_codec_header(&header, &header_len); + + SrsCommonMessage out_rtmp; + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "source on audio"); + } + + is_first_audio = false; + } + + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + + SrsAudioFrame frame; + frame.add_sample(payload->payload, payload->nn_payload); + frame.dts = ts; + frame.cts = 0; + + err = codec_->transcode(&frame, out_pkts); + if (err != srs_success) { + return err; + } + + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + SrsCommonMessage out_rtmp; + out_rtmp.header.timestamp = (*it)->dts*(48000/1000); + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + + if ((err = source_->on_audio(&out_rtmp)) != srs_success) { + err = srs_error_wrap(err, "source on audio"); + break; + } + } + codec_->free_frames(out_pkts); + + return err; +} + +void SrsRtmpFromRtcBridger::packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header) +{ + int rtmp_len = len + 2; + audio->header.initialize_audio(rtmp_len, pts, 1); + audio->create_payload(rtmp_len); + SrsBuffer stream(audio->payload, rtmp_len); + uint8_t aac_flag = (SrsAudioCodecIdAAC << 4) | (SrsAudioSampleRate44100 << 2) | (SrsAudioSampleBits16bit << 1) | SrsAudioChannelsStereo; + stream.write_1bytes(aac_flag); + if (is_header) { + stream.write_1bytes(0); + } else { + stream.write_1bytes(1); + } + stream.write_bytes(data, len); + audio->size = rtmp_len; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video(SrsRtpPacket2* src) +{ + srs_error_t err = srs_success; + + // TODO: Only copy when need + SrsRtpPacket2* pkt = src->copy(); + + if (pkt->is_keyframe()) { + return packet_video_key_frame(pkt); + } + + // store in cache + int index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + // check whether to recovery lost packet and can construct a video frame + if (lost_sn_ == pkt->header.get_sequence()) { + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(lost_sn_, tail_sn); + if (-1 == sn ) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to pack video frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket2* pkt) +{ + srs_error_t err = srs_success; + + // TODO: handle sps and pps in 2 rtp packets + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + SrsSample* sps = stap_payload->get_sps(); + SrsSample* pps = stap_payload->get_pps(); + if (NULL == sps || NULL == pps) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "no sps or pps in stap-a rtp. sps: %p, pps:%p", sps, pps); + } else { + //type_codec1 + avc_type + composition time + fix header + count of sps + len of sps + sps + count of pps + len of pps + pps + int nb_payload = 1 + 1 + 3 + 5 + 1 + 2 + sps->size + 1 + 2 + pps->size; + SrsCommonMessage rtmp; + rtmp.header.initialize_video(nb_payload, pkt->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + //TODO: call api + payload.write_1bytes(0x17);// type(4 bits): key frame; code(4bits): avc + payload.write_1bytes(0x0); // avc_type: sequence header + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + payload.write_1bytes(0x01); // version + payload.write_1bytes(sps->bytes[1]); + payload.write_1bytes(sps->bytes[2]); + payload.write_1bytes(sps->bytes[3]); + payload.write_1bytes(0xff); + payload.write_1bytes(0xe1); + payload.write_2bytes(sps->size); + payload.write_bytes(sps->bytes, sps->size); + payload.write_1bytes(0x01); + payload.write_2bytes(pps->size); + payload.write_bytes(pps->bytes, pps->size); + if ((err = source_->on_video(&rtmp)) != srs_success) { + return err; + } + } + } + + if (-1 == key_frame_ts_) { + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + // Received key frame and clean cache of old p frame pkts + clear_cached_video(); + srs_trace("set ts=%lld, header=%hu, lost=%hu", key_frame_ts_, header_sn_, lost_sn_); + } else if (key_frame_ts_ != pkt->header.get_timestamp()) { + //new key frame, clean cache + int64_t old_ts = key_frame_ts_; + uint16_t old_header_sn = header_sn_; + uint16_t old_lost_sn = lost_sn_; + key_frame_ts_ = pkt->header.get_timestamp(); + header_sn_ = pkt->header.get_sequence(); + lost_sn_ = header_sn_ + 1; + clear_cached_video(); + srs_trace("drop old ts=%lld, header=%hu, lost=%hu, set new ts=%lld, header=%hu, lost=%hu", + old_ts, old_header_sn, old_lost_sn, key_frame_ts_, header_sn_, lost_sn_); + } + + uint16_t index = cache_index(pkt->header.get_sequence()); + cache_video_pkts_[index].in_use = true; + cache_video_pkts_[index].pkt = pkt; + cache_video_pkts_[index].sn = pkt->header.get_sequence(); + cache_video_pkts_[index].ts = pkt->header.get_timestamp(); + + int32_t sn = lost_sn_; + uint16_t tail_sn = 0; + if (srs_rtp_seq_distance(header_sn_, pkt->header.get_sequence()) < 0){ + // When receive previous pkt in the same frame, update header sn; + header_sn_ = pkt->header.get_sequence(); + sn = find_next_lost_sn(header_sn_, tail_sn); + } else if (lost_sn_ == pkt->header.get_sequence()) { + sn = find_next_lost_sn(lost_sn_, tail_sn); + } + + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + if ((err = packet_video_rtmp(header_sn_, tail_sn)) != srs_success) { + err = srs_error_wrap(err, "fail to packet key frame"); + } + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = (uint16_t)sn; + } + + return err; +} + +srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const uint16_t end) +{ + srs_error_t err = srs_success; + + //type_codec1 + avc_type + composition time + nalu size + nalu + int nb_payload = 1 + 1 + 3; + uint16_t cnt = end - start + 1; + + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t sn = start + i; + uint16_t index = cache_index(sn); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + // calculate nalu len + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nb_payload += 1 + 4; + } + nb_payload += fua_payload->size; + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + nb_payload += 4 + sample->size; + } + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + nb_payload += 4 + raw_payload->nn_payload; + continue; + } + } + + SrsCommonMessage rtmp; + SrsRtpPacket2* header = cache_video_pkts_[cache_index(start)].pkt; + rtmp.header.initialize_video(nb_payload, header->header.get_timestamp() / 90, 1); + rtmp.create_payload(nb_payload); + rtmp.size = nb_payload; + SrsBuffer payload(rtmp.payload, rtmp.size); + if (header->is_keyframe()) { + payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc + key_frame_ts_ = -1; + } else { + payload.write_1bytes(0x27); // type(4 bits): inter frame; code(4bits): avc + } + payload.write_1bytes(0x01); // avc_type: nalu + payload.write_1bytes(0x0); // composition time + payload.write_1bytes(0x0); + payload.write_1bytes(0x0); + + int nalu_len = 0; + for (uint16_t i = 0; i < cnt; ++i) { + uint16_t index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + cache_video_pkts_[index].in_use = false; + cache_video_pkts_[index].pkt = NULL; + cache_video_pkts_[index].ts = 0; + cache_video_pkts_[index].sn = 0; + + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + nalu_len = fua_payload->size + 1; + //skip 4 bytes to write nalu_len future + payload.skip(4); + payload.write_1bytes(fua_payload->nri | fua_payload->nalu_type); + payload.write_bytes(fua_payload->payload, fua_payload->size); + } else { + nalu_len += fua_payload->size; + payload.write_bytes(fua_payload->payload, fua_payload->size); + if (fua_payload->end) { + //write nalu_len back + payload.skip(-(4 + nalu_len)); + payload.write_4bytes(nalu_len); + payload.skip(nalu_len); + } + } + srs_freep(pkt); + continue; + } + + SrsRtpSTAPPayload* stap_payload = dynamic_cast(pkt->payload()); + if (stap_payload) { + for (int j = 0; j < stap_payload->nalus.size(); ++j) { + SrsSample* sample = stap_payload->nalus.at(j); + payload.write_4bytes(sample->size); + payload.write_bytes(sample->bytes, sample->size); + } + srs_freep(pkt); + continue; + } + + SrsRtpRawPayload* raw_payload = dynamic_cast(pkt->payload()); + if (raw_payload) { + payload.write_4bytes(raw_payload->nn_payload); + payload.write_bytes(raw_payload->payload, raw_payload->nn_payload); + srs_freep(pkt); + continue; + } + + srs_freep(pkt); + } + + if ((err = source_->on_video(&rtmp)) != srs_success) { + srs_warn("fail to pack video frame"); + } + + header_sn_ = end + 1; + uint16_t tail_sn = 0; + int sn = find_next_lost_sn(header_sn_, tail_sn); + if (-1 == sn) { + if (check_frame_complete(header_sn_, tail_sn)) { + err = packet_video_rtmp(header_sn_, tail_sn); + } + } else if (-2 == sn) { + return srs_error_new(ERROR_RTC_RTP_MUXER, "video cache is overflow"); + } else { + lost_sn_ = sn; + } + + return err; +} + +int32_t SrsRtmpFromRtcBridger::find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn) +{ + uint32_t last_ts = cache_video_pkts_[cache_index(header_sn_)].ts; + for (int i = 0; i < s_cache_size; ++i) { + uint16_t lost_sn = current_sn + i; + int index = cache_index(lost_sn); + + if (!cache_video_pkts_[index].in_use) { + return lost_sn; + } + //check time first, avoid two small frame mixed case decode fail + if (last_ts != cache_video_pkts_[index].ts) { + end_sn = lost_sn - 1; + return -1; + } + + if (cache_video_pkts_[index].pkt->header.get_marker()) { + end_sn = lost_sn; + return -1; + } + } + + srs_error("the cache is mess. the packet count of video frame is more than %u", s_cache_size); + return -2; +} + +void SrsRtmpFromRtcBridger::clear_cached_video() +{ + for (size_t i = 0; i < s_cache_size; i++) + { + if (cache_video_pkts_[i].in_use) { + srs_freep(cache_video_pkts_[i].pkt); + cache_video_pkts_[i].sn = 0; + cache_video_pkts_[i].ts = 0; + cache_video_pkts_[i].in_use = false; + } + } +} + +bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uint16_t end) +{ + uint16_t cnt = (end - start + 1); + uint16_t fu_s_c = 0; + uint16_t fu_e_c = 0; + for (uint16_t i = 0; i < cnt; ++i) { + int index = cache_index((start + i)); + SrsRtpPacket2* pkt = cache_video_pkts_[index].pkt; + SrsRtpFUAPayload2* fua_payload = dynamic_cast(pkt->payload()); + if (fua_payload) { + if (fua_payload->start) { + ++fu_s_c; + } + + if (fua_payload->end) { + ++fu_e_c; + } + } + } + + return fu_s_c == fu_e_c; +} #endif SrsCodecPayload::SrsCodecPayload() diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index a7eb4abfc..2543daa6f 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -163,8 +163,20 @@ public: virtual void on_consumers_finished() = 0; }; +// SrsRtcStream bridge to SrsSource +class ISrsRtcSourceBridger +{ +public: + ISrsRtcSourceBridger(); + virtual ~ISrsRtcSourceBridger(); +public: + virtual srs_error_t on_publish() = 0; + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt) = 0; + virtual void on_unpublish() = 0; +}; + // A Source is a stream, to publish and to play with, binding to SrsRtcPublishStream and SrsRtcPlayStream. -class SrsRtcStream +class SrsRtcStream : public ISrsFastTimer { private: // For publish, it's the publish client id. @@ -178,6 +190,8 @@ private: ISrsRtcPublishStream* publish_stream_; // Steam description for this steam. SrsRtcStreamDescription* stream_desc_; + // The Source bridger, bridger stream to other source. + ISrsRtcSourceBridger* bridger_; private: // To delivery stream to clients. std::vector consumers; @@ -201,6 +215,8 @@ public: // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); +public: + void set_bridger(ISrsRtcSourceBridger *bridger); public: // Create consumer // @param consumer, output the create consumer. @@ -234,6 +250,9 @@ public: bool has_stream_desc(); void set_stream_desc(SrsRtcStreamDescription* stream_desc); std::vector get_track_desc(std::string type, std::string media_type); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick); }; // A helper class, to release the packet to cache. @@ -286,6 +305,52 @@ private: srs_error_t package_fu_a(SrsSharedPtrMessage* msg, SrsSample* sample, int fu_payload_size, std::vector& helpers); srs_error_t consume_packets(std::vector& helpers); }; + +class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger +{ +private: + SrsSource *source_; + SrsAudioTranscoder *codec_; + bool is_first_audio; + bool is_first_video; + // The format, codec information. + SrsRtmpFormat* format; + + //TODO:use SrsRtpRingBuffer + //TODO:jitter buffer class + struct RtcPacketCache { + bool in_use; + uint16_t sn; + uint32_t ts; + SrsRtpPacket2* pkt; + }; + const static uint16_t s_cache_size = 512; + RtcPacketCache cache_video_pkts_[s_cache_size]; + uint16_t header_sn_; + uint16_t lost_sn_; + int64_t key_frame_ts_; +public: + SrsRtmpFromRtcBridger(SrsSource *src); + virtual ~SrsRtmpFromRtcBridger(); +public: + srs_error_t initialize(SrsRequest* r); +public: + virtual srs_error_t on_publish(); + virtual srs_error_t on_rtp(SrsRtpPacket2 *pkt); + virtual void on_unpublish(); +private: + srs_error_t trancode_audio(SrsRtpPacket2 *pkt); + void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); + srs_error_t packet_video(SrsRtpPacket2* pkt); + srs_error_t packet_video_key_frame(SrsRtpPacket2* pkt); + srs_error_t packet_video_rtmp(const uint16_t start, const uint16_t end); + int32_t find_next_lost_sn(uint16_t current_sn, uint16_t& end_sn); + void clear_cached_video(); + inline uint16_t cache_index(uint16_t current_sn) { + return current_sn%s_cache_size; + } + bool check_frame_complete(const uint16_t start, const uint16_t end); +}; #endif // TODO: FIXME: Rename it. diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 5244bbbfd..4a5905895 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -487,7 +487,7 @@ public: // Global singleton instance. extern SrsSourceManager* _srs_sources; -// For two sources to bridge with each other. +// For RTMP2RTC, bridge SrsSource to SrsRtcStream class ISrsSourceBridger { public: diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index a4c3312f9..64f88dea1 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 94 +#define VERSION_REVISION 95 #endif diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp index bcc04d766..5474d9106 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.cpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.cpp @@ -1054,6 +1054,33 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf) return err; } +bool SrsRtpPacket2::is_keyframe() +{ + // False if audio packet + if(SrsFrameTypeAudio == frame_type) { + return false; + } + + // It's normal H264 video rtp packet + if (nalu_type == kStapA) { + SrsRtpSTAPPayload* stap_payload = dynamic_cast(payload_); + if(NULL != stap_payload->get_sps() || NULL != stap_payload->get_pps()) { + return true; + } + } else if (nalu_type == kFuA) { + SrsRtpFUAPayload2* fua_payload = dynamic_cast(payload_); + if(SrsAvcNaluTypeIDR == fua_payload->nalu_type) { + return true; + } + } else { + if((SrsAvcNaluTypeIDR == nalu_type) || (SrsAvcNaluTypeSPS == nalu_type) || (SrsAvcNaluTypePPS == nalu_type)) { + return true; + } + } + + return false; +} + SrsRtpObjectCacheManager* _srs_rtp_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpPacket2)); SrsRtpObjectCacheManager* _srs_rtp_raw_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpRawPayload)); SrsRtpObjectCacheManager* _srs_rtp_fua_cache = new SrsRtpObjectCacheManager(sizeof(SrsRtpFUAPayload2)); diff --git a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp index c0da94f21..b00135e85 100644 --- a/trunk/src/kernel/srs_kernel_rtc_rtp.hpp +++ b/trunk/src/kernel/srs_kernel_rtc_rtp.hpp @@ -353,6 +353,8 @@ public: virtual uint64_t nb_bytes(); virtual srs_error_t encode(SrsBuffer* buf); virtual srs_error_t decode(SrsBuffer* buf); +public: + bool is_keyframe(); }; // For object cache manager to stat the object dropped. From 8a30cc86d9ffa2f1dea41942915ac47ad63d45a6 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 25 Apr 2021 15:52:34 +0800 Subject: [PATCH 7/8] Bridger: Fix build fail if disable rtc and ffmpeg-fit --- trunk/src/app/srs_app_rtc_codec.cpp | 2 +- trunk/src/app/srs_app_rtc_conn.cpp | 2 ++ trunk/src/app/srs_app_rtmp_conn.cpp | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_codec.cpp b/trunk/src/app/srs_app_rtc_codec.cpp index eb2d66613..c923deb5a 100644 --- a/trunk/src/app/srs_app_rtc_codec.cpp +++ b/trunk/src/app/srs_app_rtc_codec.cpp @@ -197,7 +197,7 @@ srs_error_t SrsAudioTranscoder::init_enc(SrsAudioCodecId dst_codec, int dst_chan enc_->channel_layout = av_get_default_channel_layout(dst_channels); enc_->bit_rate = dst_bit_rate; enc_->sample_fmt = codec->sample_fmts[0]; - enc_->time_base = {1, 1000}; + enc_->time_base.num = 1; enc_->time_base.den = 1000; // {1, 1000} if (dst_codec == SrsAudioCodecIdOpus) { //TODO: for more level setting enc_->compression_level = 1; diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 547ef0aa6..8b5d97d07 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -999,6 +999,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti source->set_publish_stream(this); // Bridge to rtmp +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req->vhost); if (rtc_to_rtmp) { SrsSource *rtmp = NULL; @@ -1019,6 +1020,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcStreamDescripti source->set_bridger(bridger); } +#endif return err; } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 4cf58610f..d6fe9d630 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -983,7 +983,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source) } // Bridge to RTC streaming. -#ifdef SRS_RTC +#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) if (rtc) { SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc); if ((err = bridger->initialize(req)) != srs_success) { From f7473c90bce0698020c9c6d5ebe09cc2e5112154 Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 1 May 2021 18:19:17 +0800 Subject: [PATCH 8/8] For #2303: Add conf for RTC2RTMP --- trunk/conf/rtc2rtmp.conf | 42 +++++++++++++++++++++++++++++++++++ trunk/conf/rtc_live.conf | 47 ---------------------------------------- 2 files changed, 42 insertions(+), 47 deletions(-) create mode 100644 trunk/conf/rtc2rtmp.conf delete mode 100644 trunk/conf/rtc_live.conf diff --git a/trunk/conf/rtc2rtmp.conf b/trunk/conf/rtc2rtmp.conf new file mode 100644 index 000000000..3410cdaea --- /dev/null +++ b/trunk/conf/rtc2rtmp.conf @@ -0,0 +1,42 @@ + +listen 1935; +max_connections 1000; +daemon off; +srs_log_tank console; + +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} + +http_api { + enabled on; + listen 1985; +} +stats { + network 0; +} +rtc_server { + enabled on; + # Listen at udp://8000 + listen 8000; + # + # The $CANDIDATE means fetch from env, if not configed, use * as default. + # + # The * means retrieving server IP automatically, from all network interfaces, + # @see https://github.com/ossrs/srs/wiki/v4_CN_RTCWiki#config-candidate + candidate $CANDIDATE; +} + +vhost __defaultVhost__ { + rtc { + enabled on; + rtc_to_rtmp on; + } + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + } +} + diff --git a/trunk/conf/rtc_live.conf b/trunk/conf/rtc_live.conf deleted file mode 100644 index 66682d09d..000000000 --- a/trunk/conf/rtc_live.conf +++ /dev/null @@ -1,47 +0,0 @@ - -listen 1935; -max_connections 1000; -daemon off; -srs_log_tank console; - -http_server { - enabled on; - listen 8080; -} - -http_api { - enabled on; - listen 1985; -} - -rtc_server { - enabled on; - listen 8000; - candidate $CANDIDATE; -} - -vhost __defaultVhost__ { - rtc { - enabled on; - bframe discard; - } - - http_remux { - enabled on; - mount [vhost]/[app]/[stream].flv; - } - - tcp_nodelay on - min_latency on; - - play { - gop_cache off; - queue_length 10; - mw_latency 100; - } - - publish { - mr off; - } -} -