diff --git a/trunk/src/app/srs_app_dtls.cpp b/trunk/src/app/srs_app_dtls.cpp index abc8abf54..5ecac847f 100644 --- a/trunk/src/app/srs_app_dtls.cpp +++ b/trunk/src/app/srs_app_dtls.cpp @@ -69,7 +69,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) return 1; } -srs_error_t SrsDtls::init(const SrsRequest& req) +srs_error_t SrsDtls::init(SrsRequest* r) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_dtls.hpp b/trunk/src/app/srs_app_dtls.hpp index 65d38c167..560386f77 100644 --- a/trunk/src/app/srs_app_dtls.hpp +++ b/trunk/src/app/srs_app_dtls.hpp @@ -43,7 +43,7 @@ private: SrsDtls(); virtual ~SrsDtls(); public: - srs_error_t init(const SrsRequest& req); + srs_error_t init(SrsRequest* r); public: static SrsDtls* instance(); SSL_CTX* get_dtls_ctx() { return dtls_ctx; } diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 318888c8c..797ba63c7 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -919,7 +919,10 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe // TODO: FIXME: Maybe need a better name? // TODO: FIXME: When server enabled, but vhost disabled, should report error. - SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, eip); + SrsRtcSession* rtc_session = NULL; + if ((err = rtc_server->create_rtc_session(&request, remote_sdp, local_sdp, eip, false, &rtc_session)) != srs_success) { + return srs_error_wrap(err, "create session"); + } if (encrypt.empty()) { rtc_session->set_encrypt(_srs_config->get_rtc_server_encrypt()); } else { @@ -1191,8 +1194,11 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param); } - srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB", - streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length()); + // For client to specifies the EIP of server. + string eip = r->query_get("eip"); + + srs_trace("RTC publish %s, api=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s", + streamurl.c_str(), api.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(), eip.c_str()); // TODO: FIXME: It seems remote_sdp doesn't represents the full SDP information. SrsSdp remote_sdp; @@ -1222,7 +1228,10 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt // TODO: FIXME: Maybe need a better name? // TODO: FIXME: When server enabled, but vhost disabled, should report error. - SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp, ""); + SrsRtcSession* rtc_session = NULL; + if ((err = rtc_server->create_rtc_session(&request, remote_sdp, local_sdp, eip, true, &rtc_session)) != srs_success) { + return srs_error_wrap(err, "create session"); + } ostringstream os; if ((err = local_sdp.encode(os)) != srs_success) { diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 427c44a3d..58d6e01c2 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -229,11 +229,11 @@ SrsDtlsSession::~SrsDtlsSession() } } -srs_error_t SrsDtlsSession::initialize(const SrsRequest& req) +srs_error_t SrsDtlsSession::initialize(SrsRequest* r) { srs_error_t err = srs_success; - if ((err = SrsDtls::instance()->init(req)) != srs_success) { + if ((err = SrsDtls::instance()->init(r)) != srs_success) { return srs_error_wrap(err, "DTLS init"); } @@ -682,7 +682,7 @@ srs_error_t SrsRtcSenderThread::on_reload_rtc_server() srs_error_t SrsRtcSenderThread::on_reload_vhost_play(string vhost) { - SrsRequest* req = &rtc_session->request; + SrsRequest* req = rtc_session->req; if (req->vhost != vhost) { return srs_success; @@ -746,7 +746,7 @@ srs_error_t SrsRtcSenderThread::cycle() srs_error_t err = srs_success; SrsSource* source = NULL; - SrsRequest* req = &rtc_session->request; + SrsRequest* req = rtc_session->req; // TODO: FIXME: Should refactor it, directly use http server as handler. ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); @@ -1491,7 +1491,7 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) { sendonly_ukt = NULL; - report_timer = new SrsHourGlass(this, 20 * SRS_UTIME_MILLISECONDS); + report_timer = new SrsHourGlass(this, 200 * SRS_UTIME_MILLISECONDS); rtc_session = session; rtp_h264_demuxer = new SrsRtpH264Demuxer(); @@ -1504,6 +1504,11 @@ SrsRtcPublisher::SrsRtcPublisher(SrsRtcSession* session) SrsRtcPublisher::~SrsRtcPublisher() { + // TODO: FIXME: Do unpublish when session timeout. + if (source) { + source->on_unpublish(); + } + srs_freep(report_timer); srs_freep(rtp_h264_demuxer); srs_freep(rtp_opus_demuxer); @@ -1511,7 +1516,7 @@ SrsRtcPublisher::~SrsRtcPublisher() srs_freep(rtp_audio_queue); } -srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest request) +srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* r) { srs_error_t err = srs_success; @@ -1519,7 +1524,7 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui video_ssrc = vssrc; audio_ssrc = assrc; - this->request = request; + req = r; srs_verbose("video_ssrc=%u, audio_ssrc=%u", video_ssrc, audio_ssrc); @@ -1531,6 +1536,16 @@ srs_error_t SrsRtcPublisher::initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, ui return srs_error_wrap(err, "start report_timer"); } + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + if ((err = source->on_publish()) != srs_success) { + return srs_error_wrap(err, "on publish"); + } + return err; } @@ -2036,15 +2051,6 @@ srs_error_t SrsRtcPublisher::collect_video_frame() if (video_header_change) { srs_verbose("sps/pps change or init"); - if (source == NULL) { - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - - source->on_publish(); - } uint8_t* video_header = new uint8_t[1500]; SrsBuffer *stream = new SrsBuffer((char*)video_header, 1500); @@ -2088,15 +2094,6 @@ srs_error_t SrsRtcPublisher::collect_video_frame() } if (!sps.empty() && !pps.empty()) { - if (source == NULL) { - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - source->on_publish(); - } - if (idr) { frame_buffer[0] = 0x17; } else { @@ -2146,27 +2143,29 @@ srs_error_t SrsRtcPublisher::notify(int type, srs_utime_t interval, srs_utime_t return srs_success; } -SrsRtcSession::SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id) +SrsRtcSession::SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id) { - rtc_server = rtc_svr; + rtc_server = s; session_state = INIT; + dtls_session = new SrsDtlsSession(this); // TODO: FIXME: Check error. dtls_session->initialize(req); + sender = NULL; username = un; last_stun_time = srs_get_system_time(); - request = req; + req = r->copy(); source = NULL; cid = context_id; encrypt = true; // TODO: FIXME: Support reload. - sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req.vhost); + sessionStunTimeout = _srs_config->get_rtc_stun_timeout(req->vhost); publisher = NULL; } @@ -2176,6 +2175,7 @@ SrsRtcSession::~SrsRtcSession() srs_freep(sender); srs_freep(publisher); srs_freep(dtls_session); + srs_freep(req); } void SrsRtcSession::set_local_sdp(const SrsSdp& sdp) @@ -2219,21 +2219,6 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req return err; } -srs_error_t SrsRtcSession::check_source() -{ - srs_error_t err = srs_success; - - if (source == NULL) { - // TODO: FIXME: Should refactor it, directly use http server as handler. - ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); - if ((err = _srs_sources->fetch_or_create(&request, handler, &source)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - } - - return err; -} - #ifdef SRS_AUTO_OSX // These functions are similar to the older byteorder(3) family of functions. // For example, be32toh() is identical to ntohl(). @@ -2245,7 +2230,7 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacke { srs_error_t err = srs_success; - bool strict_check = _srs_config->get_rtc_stun_strict_check(request.vhost); + bool strict_check = _srs_config->get_rtc_stun_strict_check(req->vhost); if (strict_check && stun_req->get_ice_controlled()) { // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-6.1.3.1 // TODO: Send 487 (Role Conflict) error response. @@ -2334,15 +2319,9 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock srs_verbose("pid=%u, blp=%d", pid, blp); - if ((err = check_source()) != srs_success) { - return srs_error_wrap(err, "check"); - } - if (! source) { - return srs_error_new(ERROR_RTC_SOURCE_CHECK, "can not found source"); - } - vector resend_pkts; - SrsRtpSharedPacket* pkt = source->find_rtp_packet(pid); + // TODO: FIXME: Support ARQ. + SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(pid); if (pkt) { resend_pkts.push_back(pkt); } @@ -2355,7 +2334,9 @@ srs_error_t SrsRtcSession::on_rtcp_feedback(char* buf, int nb_buf, SrsUdpMuxSock uint32_t loss_seq = pid + i; - SrsRtpSharedPacket* pkt = source->find_rtp_packet(loss_seq); + // TODO: FIXME: Support ARQ. + (void)loss_seq; + SrsRtpSharedPacket* pkt = NULL; // source->find_rtp_packet(loss_seq); if (! pkt) { continue; } @@ -2592,7 +2573,7 @@ srs_error_t SrsRtcSession::start_publish(SrsUdpMuxSocket* skt) } // FIXME: err process. - if ((err = publisher->initialize(skt, video_ssrc, audio_ssrc, request)) != srs_success) { + if ((err = publisher->initialize(skt, video_ssrc, audio_ssrc, req)) != srs_success) { return srs_error_wrap(err, "rtc publisher init"); } @@ -3097,8 +3078,25 @@ srs_error_t SrsRtcServer::listen_api() return err; } -SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const string& mock_eip) -{ +srs_error_t SrsRtcServer::create_rtc_session( + SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, + SrsRtcSession** psession +) { + srs_error_t err = srs_success; + + SrsSource* source = NULL; + + // TODO: FIXME: Should refactor it, directly use http server as handler. + ISrsSourceHandler* handler = _srs_hybrid->srs()->instance(); + if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // TODO: FIXME: Refine the API for stream status manage. + if (!source->can_publish(false)) { + return srs_error_new(ERROR_RTC_SOURCE_BUSY, "stream %s busy", req->get_stream_url().c_str()); + } + std::string local_pwd = gen_random_str(32); std::string local_ufrag = ""; std::string username = ""; @@ -3134,7 +3132,9 @@ SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const Srs session->set_session_state(WAITING_STUN); - return session; + *psession = session; + + return err; } SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 60e63f1ee..838cce636 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -120,7 +120,7 @@ public: SrsDtlsSession(SrsRtcSession* s); virtual ~SrsDtlsSession(); - srs_error_t initialize(const SrsRequest& req); + srs_error_t initialize(SrsRequest* r); srs_error_t on_dtls(SrsUdpMuxSocket* skt); srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* skt); @@ -190,6 +190,7 @@ public: SrsRtpPacket2* at(int index); }; +// TODO: FIXME: Rename to RTC player or subscriber. class SrsRtcSenderThread : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler { protected: @@ -269,7 +270,7 @@ private: SrsRtpQueue* rtp_video_queue; SrsRtpQueue* rtp_audio_queue; private: - SrsRequest request; + SrsRequest* req; SrsSource* source; std::string sps; std::string pps; @@ -280,7 +281,7 @@ public: SrsRtcPublisher(SrsRtcSession* session); virtual ~SrsRtcPublisher(); public: - srs_error_t initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest request); + srs_error_t initialize(SrsUdpMuxSocket* skt, uint32_t vssrc, uint32_t assrc, SrsRequest* req); srs_error_t on_rtp(SrsUdpMuxSocket* skt, char* buf, int nb_buf); srs_error_t on_rtcp_sender_report(char* buf, int nb_buf, SrsUdpMuxSocket* skt); srs_error_t on_rtcp_xr(char* buf, int nb_buf, SrsUdpMuxSocket* skt); @@ -327,12 +328,12 @@ private: // The timeout of session, keep alive by STUN ping pong. srs_utime_t sessionStunTimeout; public: - SrsRequest request; + SrsRequest* req; SrsSource* source; private: SrsRtcPublisher* publisher; public: - SrsRtcSession(SrsRtcServer* rtc_svr, const SrsRequest& req, const std::string& un, int context_id); + SrsRtcSession(SrsRtcServer* s, SrsRequest* r, const std::string& un, int context_id); virtual ~SrsRtcSession(); public: SrsSdp* get_local_sdp() { return &local_sdp; } @@ -364,8 +365,6 @@ public: srs_error_t start_publish(SrsUdpMuxSocket* skt); public: bool is_stun_timeout(); -private: - srs_error_t check_source(); private: srs_error_t on_binding_request(SrsUdpMuxSocket* skt, SrsStunPacket* stun_req); private: @@ -441,7 +440,10 @@ public: virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt); public: virtual srs_error_t listen_api(); - SrsRtcSession* create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip); + srs_error_t create_rtc_session( + SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, + SrsRtcSession** psession + ); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); void check_and_clean_timeout_session(); int nn_sessions() { return (int)map_username_session.size(); } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 2e6f57532..9f88c1dda 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -348,6 +348,7 @@ #define ERROR_RTC_SOURCE_CHECK 5017 #define ERROR_RTC_SDP_EXCHANGE 5018 #define ERROR_RTC_API_BODY 5019 +#define ERROR_RTC_SOURCE_BUSY 5020 /////////////////////////////////////////////////////// // GB28181 API error.