Merge branch 'feature/rtc' into develop

pull/1753/head
winlin 5 years ago
commit 38d6d5b077

@ -63,6 +63,7 @@ using namespace std;
#include <srs_app_statistic.hpp> #include <srs_app_statistic.hpp>
#include <srs_app_pithy_print.hpp> #include <srs_app_pithy_print.hpp>
#include <srs_service_st.hpp> #include <srs_service_st.hpp>
#include <srs_app_janus.hpp>
// The RTP payload max size, reserved some paddings for SRTP as such: // The RTP payload max size, reserved some paddings for SRTP as such:
// kRtpPacketSize = kRtpMaxPayloadSize + paddings // kRtpPacketSize = kRtpMaxPayloadSize + paddings
@ -90,7 +91,7 @@ static bool is_rtcp(const uint8_t* data, size_t len)
return (len >= 12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); return (len >= 12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209);
} }
static string gen_random_str(int len) string gen_random_str(int len)
{ {
static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
@ -2288,7 +2289,7 @@ SrsRtcSession::SrsRtcSession(SrsRtcServer* s)
server_ = s; server_ = s;
dtls_ = new SrsRtcDtls(this); dtls_ = new SrsRtcDtls(this);
session_state = INIT; state_ = INIT;
last_stun_time = 0; last_stun_time = 0;
sessionStunTimeout = 0; sessionStunTimeout = 0;
@ -2328,14 +2329,14 @@ void SrsRtcSession::set_remote_sdp(const SrsSdp& sdp)
remote_sdp = sdp; remote_sdp = sdp;
} }
SrsRtcSessionStateType SrsRtcSession::get_session_state() SrsRtcSessionStateType SrsRtcSession::state()
{ {
return session_state; return state_;
} }
void SrsRtcSession::set_session_state(SrsRtcSessionStateType state) void SrsRtcSession::set_state(SrsRtcSessionStateType state)
{ {
session_state = state; state_ = state;
} }
string SrsRtcSession::id() string SrsRtcSession::id()
@ -2714,13 +2715,13 @@ srs_error_t SrsRtcSession::on_binding_request(SrsStunPacket* r)
return srs_error_wrap(err, "stun binding response send failed"); return srs_error_wrap(err, "stun binding response send failed");
} }
if (get_session_state() == WAITING_STUN) { if (state_ == WAITING_STUN) {
set_session_state(DOING_DTLS_HANDSHAKE); state_ = DOING_DTLS_HANDSHAKE;
peer_id_ = sendonly_skt->peer_id(); peer_id_ = sendonly_skt->peer_id();
server_->insert_into_id_sessions(peer_id_, this); server_->insert_into_id_sessions(peer_id_, this);
set_session_state(DOING_DTLS_HANDSHAKE); state_ = DOING_DTLS_HANDSHAKE;
srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str()); srs_trace("rtc session=%s, STUN done, waitting DTLS handshake.", id().c_str());
} }
@ -3242,10 +3243,12 @@ srs_error_t SrsUdpMuxSender::on_reload_rtc_server()
SrsRtcServer::SrsRtcServer() SrsRtcServer::SrsRtcServer()
{ {
timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS); timer = new SrsHourGlass(this, 1 * SRS_UTIME_SECONDS);
janus = new SrsJanusServer(this);
} }
SrsRtcServer::~SrsRtcServer() SrsRtcServer::~SrsRtcServer()
{ {
srs_freep(janus);
srs_freep(timer); srs_freep(timer);
if (true) { if (true) {
@ -3342,6 +3345,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt)
srs_verbose("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d", srs_verbose("recv stun packet from %s, use-candidate=%d, ice-controlled=%d, ice-controlling=%d",
skt->peer_id().c_str(), ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling()); skt->peer_id().c_str(), ping.get_use_candidate(), ping.get_ice_controlled(), ping.get_ice_controlling());
// TODO: FIXME: For ICE trickle, we may get STUN packets before SDP answer, so maybe should response it.
if (!session) { if (!session) {
session = find_session_by_username(ping.get_username()); session = find_session_by_username(ping.get_username());
if (session) { if (session) {
@ -3377,8 +3381,9 @@ srs_error_t SrsRtcServer::listen_api()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// TODO: FIXME: Fetch api from hybrid manager. // TODO: FIXME: Fetch api from hybrid manager, not from SRS.
SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server(); SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) { if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play"); return srs_error_wrap(err, "handle play");
} }
@ -3386,11 +3391,15 @@ srs_error_t SrsRtcServer::listen_api()
if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) { if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
return srs_error_wrap(err, "handle publish"); return srs_error_wrap(err, "handle publish");
} }
if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) { if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {
return srs_error_wrap(err, "handle nack"); return srs_error_wrap(err, "handle nack");
} }
// For Janus style API.
if ((err = janus->listen_api()) != srs_success) {
return srs_error_wrap(err, "janus listen api");
}
return err; return err;
} }
@ -3451,11 +3460,69 @@ srs_error_t SrsRtcServer::create_session(
} }
} }
// TODO: FIXME: In answer, we should use the same SSRC as in offer.
session->set_remote_sdp(remote_sdp); session->set_remote_sdp(remote_sdp);
session->set_local_sdp(local_sdp); session->set_local_sdp(local_sdp);
session->set_state(WAITING_STUN);
return err;
}
session->set_session_state(WAITING_STUN); srs_error_t SrsRtcServer::create_session2(SrsSdp& local_sdp, SrsRtcSession** psession)
{
srs_error_t err = srs_success;
std::string local_pwd = gen_random_str(32);
// TODO: FIXME: Collision detect.
std::string local_ufrag = gen_random_str(8);
SrsRtcSession* session = new SrsRtcSession(this);
*psession = session;
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(SrsDtls::instance()->get_fingerprint());
// We allows to mock the eip of server.
std::vector<string> candidate_ips = get_candidate_ips();
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(), "host");
}
session->set_local_sdp(local_sdp);
session->set_state(WAITING_ANSWER);
return err;
}
srs_error_t SrsRtcServer::setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp)
{
srs_error_t err = srs_success;
if (session->state() != WAITING_ANSWER) {
return err;
}
SrsSource* source = NULL;
// TODO: FIXME: Should refactor it, directly use http server as handler.
ISrsSourceHandler* handler = _srs_hybrid->srs()->instance();
if ((err = _srs_sources->fetch_or_create(req, handler, &source)) != srs_success) {
return srs_error_wrap(err, "create source");
}
// TODO: FIXME: Collision detect.
string username = session->get_local_sdp()->get_ice_ufrag() + ":" + remote_sdp.get_ice_ufrag();
int cid = _srs_context->get_id();
if ((err = session->initialize(source, req, false, username, cid)) != srs_success) {
return srs_error_wrap(err, "init");
}
map_username_session.insert(make_pair(username, session));
session->set_remote_sdp(remote_sdp);
session->set_state(WAITING_STUN);
return err; return err;
} }

@ -44,6 +44,9 @@
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <srtp2/srtp.h> #include <srtp2/srtp.h>
// For Alibaba Tenfold.
class SrsJanusServer;
class SrsUdpMuxSocket; class SrsUdpMuxSocket;
class SrsConsumer; class SrsConsumer;
class SrsStunPacket; class SrsStunPacket;
@ -78,6 +81,8 @@ const uint8_t kSLI = 2;
const uint8_t kRPSI = 3; const uint8_t kRPSI = 3;
const uint8_t kAFB = 15; const uint8_t kAFB = 15;
extern std::string gen_random_str(int len);
class SrsNtp class SrsNtp
{ {
public: public:
@ -99,10 +104,11 @@ enum SrsRtcSessionStateType
{ {
// TODO: FIXME: Should prefixed by enum name. // TODO: FIXME: Should prefixed by enum name.
INIT = -1, INIT = -1,
WAITING_STUN = 1, WAITING_ANSWER = 1,
DOING_DTLS_HANDSHAKE = 2, WAITING_STUN = 2,
ESTABLISHED = 3, DOING_DTLS_HANDSHAKE = 3,
CLOSED = 4, ESTABLISHED = 4,
CLOSED = 5,
}; };
class SrsRtcDtls class SrsRtcDtls
@ -327,7 +333,7 @@ class SrsRtcSession
friend class SrsRtcPublisher; friend class SrsRtcPublisher;
private: private:
SrsRtcServer* server_; SrsRtcServer* server_;
SrsRtcSessionStateType session_state; SrsRtcSessionStateType state_;
SrsRtcDtls* dtls_; SrsRtcDtls* dtls_;
SrsRtcPlayer* player_; SrsRtcPlayer* player_;
SrsRtcPublisher* publisher_; SrsRtcPublisher* publisher_;
@ -364,8 +370,8 @@ public:
void set_local_sdp(const SrsSdp& sdp); void set_local_sdp(const SrsSdp& sdp);
SrsSdp* get_remote_sdp(); SrsSdp* get_remote_sdp();
void set_remote_sdp(const SrsSdp& sdp); void set_remote_sdp(const SrsSdp& sdp);
SrsRtcSessionStateType get_session_state(); SrsRtcSessionStateType state();
void set_session_state(SrsRtcSessionStateType state); void set_state(SrsRtcSessionStateType state);
std::string id(); std::string id();
std::string peer_id(); std::string peer_id();
void set_peer_id(std::string v); void set_peer_id(std::string v);
@ -451,6 +457,8 @@ private:
private: private:
std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) std::map<std::string, SrsRtcSession*> map_username_session; // key: username(local_ufrag + ":" + remote_ufrag)
std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port) std::map<std::string, SrsRtcSession*> map_id_session; // key: peerip(ip + ":" + port)
private:
SrsJanusServer* janus;
public: public:
SrsRtcServer(); SrsRtcServer();
virtual ~SrsRtcServer(); virtual ~SrsRtcServer();
@ -459,14 +467,19 @@ public:
public: public:
// TODO: FIXME: Support gracefully quit. // TODO: FIXME: Support gracefully quit.
// TODO: FIXME: Support reload. // TODO: FIXME: Support reload.
virtual srs_error_t listen_udp(); srs_error_t listen_udp();
virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt); virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt);
srs_error_t listen_api();
public: public:
virtual srs_error_t listen_api(); // Peer start offering, we answer it.
srs_error_t create_session( srs_error_t create_session(
SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish,
SrsRtcSession** psession SrsRtcSession** psession
); );
// We start offering, create_session2 to generate offer, setup_session2 to handle answer.
srs_error_t create_session2(SrsSdp& local_sdp, SrsRtcSession** psession);
srs_error_t setup_session2(SrsRtcSession* session, SrsRequest* req, const SrsSdp& remote_sdp);
public:
bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* session); bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* session);
void check_and_clean_timeout_session(); void check_and_clean_timeout_session();
int nn_sessions(); int nn_sessions();

@ -33,6 +33,7 @@ using namespace std;
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
// TODO: FIXME: Maybe we should use json.encode to escape it?
const std::string kCRLF = "\\r\\n"; const std::string kCRLF = "\\r\\n";
#define FETCH(is,word) \ #define FETCH(is,word) \

Loading…
Cancel
Save