Refactor RTC source create

pull/1753/head
winlin 5 years ago
parent eace693ae9
commit db586903ba

@ -69,7 +69,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
return 1;
}
srs_error_t SrsDtls::init(const SrsRequest& req)
srs_error_t SrsDtls::init(SrsRequest* r)
{
srs_error_t err = srs_success;

@ -43,7 +43,7 @@ private:
SrsDtls();
virtual ~SrsDtls();
public:
srs_error_t init(const SrsRequest& req);
srs_error_t init(SrsRequest* r);
public:
static SrsDtls* instance();
SSL_CTX* get_dtls_ctx() { return dtls_ctx; }

@ -919,7 +919,10 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
// TODO: FIXME: Maybe need a better name?
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, eip);
SrsRtcSession* rtc_session = NULL;
if ((err = rtc_server->create_rtc_session(&request, remote_sdp, local_sdp, eip, false, &rtc_session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
if (encrypt.empty()) {
rtc_session->set_encrypt(_srs_config->get_rtc_server_encrypt());
} else {
@ -1191,8 +1194,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
}
srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length());
// For client to specifies the EIP of server.
string eip = r->query_get("eip");
srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s",
streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(), eip.c_str());
// TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information.
SrsSdp remote_sdp;
@ -1222,7 +1228,10 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
// TODO: FIXME: Maybe need a better name?
// TODO: FIXME: When server enabled, but vhost disabled, should report error.
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, "");
SrsRtcSession* rtc_session = NULL;
if ((err = rtc_server->create_rtc_session(&request, remote_sdp, local_sdp, eip, true, &rtc_session)) != srs_success) {
return srs_error_wrap(err, "create session");
}
ostringstream os;
if ((err = local_sdp.encode(os)) != srs_success) {

@ -229,11 +229,11 @@ SrsDtlsSession::~SrsDtlsSession()
}
}
srs_error_t SrsDtlsSession::initialize(const SrsRequest& req)
srs_error_t SrsDtlsSession::initialize(SrsRequest* r)
{
srs_error_t err = srs_success;
if ((err = SrsDtls::instance()->init(req)) != srs_success) {
if ((err = SrsDtls::instance()->init(r)) != srs_success) {
return srs_error_wrap(err, "DTLS init");
}
@ -682,7 +682,7 @@ srs_error_t SrsRtcSenderThread::on_reload_rtc_server()
srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost)
{
SrsRequest* req = &rtc_session->request;
SrsRequest* req = rtc_session->req;
if (req->vhost != vhost) {
return srs_success;
@ -746,7 +746,7 @@ srs_error_t SrsRtcSenderThread::cycle()
srs_error_t err = srs_success;
SrsSource* source = NULL;
SrsRequest* req = &rtc_session->request;
SrsRequest* req = rtc_session->req;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
@ -1491,7 +1491,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
{
sendonly_ukt = NULL;
report_timer = new SrsHourGlass(this, 20 * SRS_UTIME_MILLISECONDS);
report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS);
rtc_session = session;
rtp_h264_demuxer = new SrsRtpH264Demuxer();
@ -1504,6 +1504,11 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session)
SrsRtcPublisher::~SrsRtcPublisher()
{
// TODO: FIXME: Do unpublish when session timeout.
if (source) {
source->on_unpublish();
}
srs_freep(report_timer);
srs_freep(rtp_h264_demuxer);
srs_freep(rtp_opus_demuxer);
@ -1511,7 +1516,7 @@ SrsRtcPublisher::~SrsRtcPublisher()
srs_freep(rtp_audio_queue);
}
srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest request)
srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* r)
{
srs_error_t err = srs_success;
@ -1519,7 +1524,7 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui
video_ssrc = vssrc;
audio_ssrc = assrc;
this->request = request;
req = r;
srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc);
@ -1531,6 +1536,16 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui
return srs_error_wrap(err, "start report_timer");
}
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
}
return err;
}
@ -2036,15 +2051,6 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
if (video_header_change) {
srs_verbose("sps/pps change or init");
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->on_publish();
}
uint8_t* video_header = new uint8_t[1500];
SrsBuffer *stream = new SrsBuffer((char*)video_header, 1500);
@ -2088,15 +2094,6 @@ srs_error_t SrsRtcPublisher::collect_video_frame()
}
if (!sps.empty() && !pps.empty()) {
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
source->on_publish();
}
if (idr) {
frame_buffer[0] = 0x17;
} else {
@ -2146,27 +2143,29 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t
return srs_success;
}
SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id)
SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id)
{
rtc_server = rtc_svr;
rtc_server = s;
session_state = INIT;
dtls_session = new SrsDtlsSession(this);
// TODO: FIXME: Check error.
dtls_session->initialize(req);
sender = NULL;
username = un;
last_stun_time = srs_get_system_time();
request = req;
req = r->copy();
source = NULL;
cid = context_id;
encrypt = true;
// TODO: FIXME: Support reload.
sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req.vhost);
sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost);
publisher = NULL;
}
@ -2176,6 +2175,7 @@ SrsRtcSession::~SrsRtcSession()
srs_freep(sender);
srs_freep(publisher);
srs_freep(dtls_session);
srs_freep(req);
}
void SrsRtcSession::set_local_sdp(const SrsSdp& sdp)
@ -2219,21 +2219,6 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req
return err;
}
srs_error_t SrsRtcSession::check_source()
{
srs_error_t err = srs_success;
if (source == NULL) {
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
}
return err;
}
#ifdef SRS_AUTO_OSX
// These functions are similar to the older byteorder(3) family of functions.
// For example, be32toh() is identical to ntohl().
@ -2245,7 +2230,7 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke
{
srs_error_t err = srs_success;
bool strict_check = _srs_config->get_rtc_stun_strict_check(request.vhost);
bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost);
if (strict_check && stun_req->get_ice_controlled()) {
// @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1
// TODO: Send 487 (Role Conflict) error response.
@ -2334,15 +2319,9 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock
srs_verbose("pid=%u, blp=%d", pid, blp);
if ((err = check_source()) != srs_success) {
return srs_error_wrap(err, "check");
}
if (! source) {
return srs_error_new(ERROR_RTC_SOURCE_CHECK, "can not found source");
}
vector<SrsRtpSharedPacket*> resend_pkts;
SrsRtpSharedPacket* pkt = source->find_rtp_packet(pid);
// TODO: FIXME: Support ARQ.
SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(pid);
if (pkt) {
resend_pkts.push_back(pkt);
}
@ -2355,7 +2334,9 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock
uint32_t loss_seq = pid + i;
SrsRtpSharedPacket* pkt = source->find_rtp_packet(loss_seq);
// TODO: FIXME: Support ARQ.
(void)loss_seq;
SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(loss_seq);
if (! pkt) {
continue;
}
@ -2592,7 +2573,7 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt)
}
// FIXME: err process.
if ((err = publisher->initialize(skt, video_ssrc, audio_ssrc, request)) != srs_success) {
if ((err = publisher->initialize(skt, video_ssrc, audio_ssrc, req)) != srs_success) {
return srs_error_wrap(err, "rtc publisher init");
}
@ -3097,8 +3078,25 @@ srs_error_t SrsRtcServer::listen_api()
return err;
}
SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const string& mock_eip)
{
srs_error_t SrsRtcServer::create_rtc_session(
SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish,
SrsRtcSession** psession
) {
srs_error_t err = srs_success;
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
// TODO: FIXME: Refine the API for stream status manage.
if (!source->can_publish(false)) {
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str());
}
std::string local_pwd = gen_random_str(32);
std::string local_ufrag = "";
std::string username = "";
@ -3134,7 +3132,9 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const Srs
session->set_session_state(WAITING_STUN);
return session;
*psession = session;
return err;
}
SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id)

@ -120,7 +120,7 @@ public:
SrsDtlsSession(SrsRtcSession* s);
virtual ~SrsDtlsSession();
srs_error_t initialize(const SrsRequest& req);
srs_error_t initialize(SrsRequest* r);
srs_error_t on_dtls(SrsUdpMuxSocket* skt);
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* skt);
@ -190,6 +190,7 @@ public:
SrsRtpPacket2* at(int index);
};
// TODO: FIXME: Rename to RTC player or subscriber.
class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
{
protected:
@ -269,7 +270,7 @@ private:
SrsRtpQueue* rtp_video_queue;
SrsRtpQueue* rtp_audio_queue;
private:
SrsRequest request;
SrsRequest* req;
SrsSource* source;
std::string sps;
std::string pps;
@ -280,7 +281,7 @@ public:
SrsRtcPublisher(SrsRtcSession* session);
virtual ~SrsRtcPublisher();
public:
srs_error_t initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest request);
srs_error_t initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* req);
srs_error_t on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf);
srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt);
@ -327,12 +328,12 @@ private:
// The timeout of session, keep alive by STUN ping pong.
srs_utime_t sessionStunTimeout;
public:
SrsRequest request;
SrsRequest* req;
SrsSource* source;
private:
SrsRtcPublisher* publisher;
public:
SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id);
SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id);
virtual ~SrsRtcSession();
public:
SrsSdp* get_local_sdp() { return &local_sdp; }
@ -364,8 +365,6 @@ public:
srs_error_t start_publish(SrsUdpMuxSocket* skt);
public:
bool is_stun_timeout();
private:
srs_error_t check_source();
private:
srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req);
private:
@ -441,7 +440,10 @@ public:
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt);
public:
virtual srs_error_t listen_api();
SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip);
srs_error_t create_rtc_session(
SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish,
SrsRtcSession** psession
);
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session);
void check_and_clean_timeout_session();
int nn_sessions() { return (int)map_username_session.size(); }

@ -348,6 +348,7 @@
#define ERROR_RTC_SOURCE_CHECK 5017
#define ERROR_RTC_SDP_EXCHANGE 5018
#define ERROR_RTC_API_BODY 5019
#define ERROR_RTC_SOURCE_BUSY 5020
///////////////////////////////////////////////////////
// GB28181 API error.

Loading…
Cancel
Save