RTC: Support WebRTC re-publish stream. 4.0.87

pull/2324/head
winlin 4 years ago
parent 0cb05a2953
commit d6c16a7e23

@ -190,6 +190,7 @@ Other documents:
## V4 changes
* v4.0, 2021-03-24, RTC: Support WebRTC re-publish stream. 4.0.87
* v4.0, 2021-03-24, RTC: Use fast parse TWCCID, ignore in packet parsing. 4.0.86
* v4.0, 2021-03-09, DTLS: Fix ARQ bug, use openssl timeout. 4.0.84
* v4.0, 2021-03-08, DTLS: Fix dead loop by duplicated Alert message. 4.0.83

@ -466,6 +466,30 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
return err;
}
void SrsRtcPlayStream::on_stream_change(SrsRtcStreamDescription* desc)
{
// Refresh the relation for audio.
// TODO: FIMXE: Match by label?
if (desc->audio_track_desc_ && audio_tracks_.size() == 1) {
uint32_t ssrc = desc->audio_track_desc_->ssrc_;
SrsRtcAudioSendTrack* track = audio_tracks_.begin()->second;
audio_tracks_.clear();
audio_tracks_.insert(make_pair(ssrc, track));
}
// Refresh the relation for video.
// TODO: FIMXE: Match by label?
if (desc->video_track_descs_.size() == 1 && desc->video_track_descs_.size() == 1) {
SrsRtcTrackDescription* vdesc = desc->video_track_descs_.at(0);
uint32_t ssrc = vdesc->ssrc_;
SrsRtcVideoSendTrack* track = video_tracks_.begin()->second;
video_tracks_.clear();
video_tracks_.insert(make_pair(ssrc, track));
}
}
srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost)
{
if (req_->vhost != vhost) {
@ -546,6 +570,9 @@ srs_error_t SrsRtcPlayStream::cycle()
return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());
}
srs_assert(consumer);
consumer->set_handler(this);
// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());

@ -211,7 +211,7 @@ public:
// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
{
private:
SrsContextId cid_;
@ -235,13 +235,16 @@ private:
bool nack_enabled_;
bool nack_no_copy_;
private:
// Whether palyer started.
// Whether player started.
bool is_started;
public:
SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid);
virtual ~SrsRtcPlayStream();
public:
srs_error_t initialize(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
// Interface ISrsRtcStreamChangeCallback
public:
void on_stream_change(SrsRtcStreamDescription* desc);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
@ -268,7 +271,7 @@ private:
srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
// inteface ISrsRtcPLIWorkerHandler
// Interface ISrsRtcPLIWorkerHandler
public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
};

@ -152,10 +152,19 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
return srs_ntp;
}
ISrsRtcStreamChangeCallback::ISrsRtcStreamChangeCallback()
{
}
ISrsRtcStreamChangeCallback::~ISrsRtcStreamChangeCallback()
{
}
SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s)
{
source = s;
should_update_source_id = false;
handler_ = NULL;
mw_wait = srs_cond_new();
mw_min_msgs = 0;
@ -231,6 +240,13 @@ void SrsRtcConsumer::wait(int nb_msgs)
srs_cond_wait(mw_wait);
}
void SrsRtcConsumer::on_stream_change(SrsRtcStreamDescription* desc)
{
if (handler_) {
handler_->on_stream_change(desc);
}
}
SrsRtcStreamManager::SrsRtcStreamManager()
{
lock = NULL;
@ -354,24 +370,34 @@ void SrsRtcStream::update_auth(SrsRequest* r)
req->update_auth(r);
}
srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id)
srs_error_t SrsRtcStream::on_source_changed()
{
srs_error_t err = srs_success;
if (!_source_id.compare(id)) {
return err;
}
// Update context id if changed.
bool id_changed = false;
const SrsContextId& id = _srs_context->get_id();
if (_source_id.compare(id)) {
id_changed = true;
if (_pre_source_id.empty()) {
_pre_source_id = id;
if (_pre_source_id.empty()) {
_pre_source_id = id;
}
_source_id = id;
}
_source_id = id;
// notice all consumer
// Notify all consumers.
std::vector<SrsRtcConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsRtcConsumer* consumer = *it;
consumer->update_source_id();
// Notify if context id changed.
if (id_changed) {
consumer->update_source_id();
}
// Notify about stream description.
consumer->on_stream_change(stream_desc_);
}
return err;
@ -456,9 +482,8 @@ srs_error_t SrsRtcStream::on_publish()
is_created_ = true;
is_delivering_packets_ = true;
// whatever, the publish thread is the source or edge source,
// save its id to srouce id.
if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) {
// Notify the consumers about stream change event.
if ((err = on_source_changed()) != srs_success) {
return srs_error_wrap(err, "source id change");
}

@ -75,6 +75,17 @@ public:
static uint64_t kMagicNtpFractionalUnit;
};
// When RTC stream publish and re-publish.
class ISrsRtcStreamChangeCallback
{
public:
ISrsRtcStreamChangeCallback();
virtual ~ISrsRtcStreamChangeCallback();
public:
virtual void on_stream_change(SrsRtcStreamDescription* desc) = 0;
};
// The RTC stream consumer, consume packets from RTC stream source.
class SrsRtcConsumer
{
private:
@ -87,6 +98,9 @@ private:
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
private:
// The callback for stream change event.
ISrsRtcStreamChangeCallback* handler_;
public:
SrsRtcConsumer(SrsRtcStream* s);
virtual ~SrsRtcConsumer();
@ -100,6 +114,9 @@ public:
virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
public:
void set_handler(ISrsRtcStreamChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler()
void on_stream_change(SrsRtcStreamDescription* desc);
};
class SrsRtcStreamManager
@ -154,7 +171,7 @@ private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
// invoke the on_source_changed() to let all clients know.
SrsContextId _source_id;
// previous source id.
SrsContextId _pre_source_id;
@ -180,8 +197,10 @@ public:
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id);
private:
// The stream source changed.
virtual srs_error_t on_source_changed();
public:
// Get current source id.
virtual SrsContextId source_id();
virtual SrsContextId pre_source_id();

@ -26,6 +26,6 @@
#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 86
#define VERSION_REVISION 87
#endif

Loading…
Cancel
Save