diff --git a/trunk/3rdparty/libsrtp-2.0.0.zip b/trunk/3rdparty/libsrtp-2.0.0.zip new file mode 100644 index 000000000..e0f7c414d Binary files /dev/null and b/trunk/3rdparty/libsrtp-2.0.0.zip differ diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh index a4c8730df..b2c69bbdb 100755 --- a/trunk/auto/depends.sh +++ b/trunk/auto/depends.sh @@ -242,6 +242,27 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then if [ ! -f ${SRS_OBJS}/st/libst.a ]; then echo "Build state-threads static lib failed."; exit -1; fi fi +##################################################################################### +# srtp +##################################################################################### +if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then + # Patched ST from https://github.com/ossrs/state-threads/tree/srs + if [[ -f ${SRS_OBJS}/srtp2/lib/libsrtp2.a ]]; then + echo "The srtp2 is ok."; + else + echo "Building srtp2."; + ( + rm -rf ${SRS_OBJS}/srtp2 && cd ${SRS_OBJS} && + unzip -q ../3rdparty/libsrtp-2.0.0.zip && cd libsrtp-2.0.0 && + ./configure --prefix=`pwd`/_release && make ${SRS_JOBS} && make install && + cd .. && rm -f srtp2 && ln -sf libsrtp-2.0.0/_release srtp2 + ) + fi + # check status + ret=$?; if [[ $ret -ne 0 ]]; then echo "Build srtp2 failed, ret=$ret"; exit $ret; fi + if [ ! -f ${SRS_OBJS}/srtp2/lib/libsrtp2.a ]; then echo "Build srtp2 static lib failed."; exit -1; fi +fi + ##################################################################################### # nginx for HLS, nginx-1.5.0 ##################################################################################### diff --git a/trunk/conf/rtc.conf b/trunk/conf/rtc.conf new file mode 100644 index 000000000..fe8d545ec --- /dev/null +++ b/trunk/conf/rtc.conf @@ -0,0 +1,40 @@ +# main config for srs. +# @see full.conf for detail config. + +listen 1935; +max_connections 1000; +srs_log_tank file; +srs_log_file ./objs/srs.log; +http_api { + enabled on; + listen 1985; + raw_api { + enabled on; + allow_reload on; + allow_query on; + allow_update on; + } +} +http_server { + enabled on; + listen 8080; + dir ./objs/nginx/html; +} +rtc { + enabled on; + listen 9527; + # candidate device ip: *(all interface), 192.168.1.1 ... + candidate *; +} +stats { + network 0; + disk sda sdb xvda xvdb; +} +vhost __defaultVhost__ { + http_remux { + enabled on; + mount [vhost]/[app]/[stream].flv; + } +} + + diff --git a/trunk/configure b/trunk/configure index 8ba25a8ba..9c54c092b 100755 --- a/trunk/configure +++ b/trunk/configure @@ -148,6 +148,9 @@ END # st(state-threads) the basic network library for SRS. LibSTRoot="${SRS_OBJS_DIR}/st"; LibSTfile="${LibSTRoot}/libst.a" if [[ $SRS_SHARED_ST == YES ]]; then LibSTfile="-lst"; fi +# srtp +LibSrtpRoot="${SRS_OBJS_DIR}/srtp2/include"; LibSrtpFile="${SRS_OBJS_DIR}/srtp2/lib/libsrtp2.a" +if [[ $SRS_SHARED_SRTP == YES ]]; then LibSrtpFile="-lsrtp2"; fi # openssl-1.1.0e, for the RTMP complex handshake. LibSSLRoot="";LibSSLfile="" if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == NO ]]; then @@ -201,7 +204,7 @@ MODULE_ID="KERNEL" MODULE_DEPENDS=("CORE") ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_kernel_error" "srs_kernel_log" "srs_kernel_buffer" - "srs_kernel_utility" "srs_kernel_flv" "srs_kernel_codec" "srs_kernel_io" + "srs_kernel_utility" "srs_kernel_flv" "srs_kernel_rtp" "srs_kernel_codec" "srs_kernel_io" "srs_kernel_consts" "srs_kernel_aac" "srs_kernel_mp3" "srs_kernel_ts" "srs_kernel_stream" "srs_kernel_balance" "srs_kernel_mp4" "srs_kernel_file") KERNEL_INCS="src/kernel"; MODULE_DIR=${KERNEL_INCS} . auto/modules.sh @@ -214,7 +217,7 @@ ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack" "srs_rtmp_handshake" "srs_protocol_utility" "srs_rtmp_msg_array" "srs_protocol_stream" "srs_raw_avc" "srs_rtsp_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json" - "srs_protocol_format") + "srs_stun_stack" "srs_protocol_format") PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh PROTOCOL_OBJS="${MODULE_OBJS[@]}" # @@ -233,7 +236,7 @@ fi if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="SERVICE" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL") - ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibSSLRoot}) + ModuleLibIncs=(${LibSTRoot} ${LibSrtpRoot} ${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_service_log" "srs_service_st" "srs_service_http_client" "srs_service_http_conn" "srs_service_rtmp_conn" "srs_service_utility" "srs_service_conn") @@ -246,7 +249,7 @@ fi if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="APP" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE") - ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibSSLRoot}) + ModuleLibIncs=(${LibSTRoot} ${LibSrtpRoot} ${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_source" "srs_app_refer" "srs_app_hls" "srs_app_forward" "srs_app_encoder" "srs_app_http_stream" "srs_app_thread" "srs_app_bandwidth" "srs_app_st" "srs_app_log" "srs_app_config" @@ -254,7 +257,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" - "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" + "srs_app_mpegts_udp" "srs_app_rtp" "srs_app_rtc_conn" "srs_app_dtls" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid") @@ -284,7 +287,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then if [[ $SRS_SRT == YES ]]; then MODULE_DEPENDS+=("SRT") fi - ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) + ModuleLibIncs=(${LibSTRoot} ${LibSrtpRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) if [[ $SRS_SRT == YES ]]; then ModuleLibIncs+=("${LibSRTRoot[*]}") fi @@ -297,7 +300,7 @@ fi if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then MODULE_ID="MAIN" MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE") - ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) + ModuleLibIncs=(${LibSTRoot} ${LibSrtpRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) MODULE_FILES=() DEFINES="" # add each modules for main @@ -324,13 +327,13 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then done # # all depends libraries - ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile}) + ModuleLibFiles=(${LibSTfile} ${LibSrtpFile} ${LibSSLfile} ${LibGperfFile}) if [[ $SRS_SRT == YES ]]; then ModuleLibFiles+=("${LibSRTfile[*]}") fi # all depends objects MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]} ${SERVER_OBJS[@]}" - ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) + ModuleLibIncs=(${LibSTRoot} ${LibSrtpRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot}) if [[ $SRS_SRT == YES ]]; then MODULE_OBJS="${MODULE_OBJS} ${SRT_OBJS[@]}" fi @@ -341,7 +344,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then # # For modules, without the app module. MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${MAIN_OBJS[@]}" - ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile}) + ModuleLibFiles=(${LibSTfile} ${LibSrtpFile} ${LibSSLfile} ${LibGperfFile}) # for SRS_MODULE in ${SRS_MODULES[*]}; do . $SRS_MODULE/config @@ -361,11 +364,11 @@ if [ $SRS_UTEST = YES ]; then MODULE_FILES=("srs_utest" "srs_utest_amf0" "srs_utest_protocol" "srs_utest_kernel" "srs_utest_core" "srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload" "srs_utest_mp4" "srs_utest_service" "srs_utest_app") - ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSTRoot} ${LibSSLRoot}) + ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSTRoot} ${LibSrtpRoot} ${LibSSLRoot}) if [[ $SRS_SRT == YES ]]; then ModuleLibIncs+=("${LibSRTRoot[*]}") fi - ModuleLibFiles=(${LibSTfile} ${LibSSLfile}) + ModuleLibFiles=(${LibSTfile} ${LibSrtpFile} ${LibSSLfile}) if [[ $SRS_SRT == YES ]]; then ModuleLibFiles+=("${LibSRTfile[*]}") fi diff --git a/trunk/research/players/rtc.html b/trunk/research/players/rtc.html new file mode 100644 index 000000000..4dd8e44f2 --- /dev/null +++ b/trunk/research/players/rtc.html @@ -0,0 +1,90 @@ + + + + + + + + +rtc_media_player:
+ + + + + + + \ No newline at end of file diff --git a/trunk/research/players/rtc_upload.html b/trunk/research/players/rtc_upload.html new file mode 100644 index 000000000..68c515d8e --- /dev/null +++ b/trunk/research/players/rtc_upload.html @@ -0,0 +1,97 @@ + + + + + + + + +
local_media_player:
+ +
rtc_media_player:
+ + + + + + + diff --git a/trunk/research/players/srs_rtc_player.html b/trunk/research/players/srs_rtc_player.html new file mode 100644 index 000000000..bbb605ab5 --- /dev/null +++ b/trunk/research/players/srs_rtc_player.html @@ -0,0 +1,90 @@ + + + + + + + + +rtc_media_player:
+ + + + + + + diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index f21d464ed..a206dc6da 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -3486,7 +3486,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" - && n != "http_server" && n != "stream_caster" && n != "srt_server" + && n != "http_server" && n != "stream_caster" && n != "rtc" && n != "srt_server" && n != "utc_time" && n != "work_dir" && n != "asprocess" && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" @@ -4266,6 +4266,62 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf) return ::atoi(conf->arg0().c_str()); } +int SrsConfig::get_rtc_enabled() +{ + SrsConfDirective* conf = root->get("rtc"); + return get_rtc_enabled(conf); +} + +bool SrsConfig::get_rtc_enabled(SrsConfDirective* conf) +{ + static bool DEFAULT = false; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +int SrsConfig::get_rtc_listen() +{ + static int DEFAULT = 9527; + + SrsConfDirective* conf = root->get("rtc"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("listen"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +std::string SrsConfig::get_rtc_candidates() +{ + static string DEFAULT = "*"; + + SrsConfDirective* conf = root->get("rtc"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("candidate"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return (conf->arg0().c_str()); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 103e6f25c..768e8368f 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -498,6 +498,14 @@ public: virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf); // Get the max udp port for rtp of stream caster rtsp. virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf); + +// rtc section +public: + virtual int get_rtc_enabled(); + virtual bool get_rtc_enabled(SrsConfDirective* conf); + virtual int get_rtc_listen(); + virtual std::string get_rtc_candidates(); + // vhost specified section public: // Get the vhost directive by vhost name. diff --git a/trunk/src/app/srs_app_dtls.cpp b/trunk/src/app/srs_app_dtls.cpp new file mode 100644 index 000000000..4a863fd92 --- /dev/null +++ b/trunk/src/app/srs_app_dtls.cpp @@ -0,0 +1,137 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +using namespace std; + +#include + +#include + +#include + +SrsDtls* SrsDtls::_instance = NULL; + +SrsDtls::SrsDtls() +{ +} + +SrsDtls::~SrsDtls() +{ +} + +SrsDtls* SrsDtls::instance() +{ + if (!_instance) { + _instance = new SrsDtls(); + _instance->init(); + } + return _instance; +} + +void SrsDtls::init() +{ + // srtp init first + srs_assert(srtp_init() == 0); + + // init dtls context + EVP_PKEY* dtls_private_key = EVP_PKEY_new(); + srs_assert(dtls_private_key); + + RSA* rsa = RSA_new(); + srs_assert(rsa); + + BIGNUM* exponent = BN_new(); + srs_assert(exponent); + + BN_set_word(exponent, RSA_F4); + + const std::string& aor = "www.hw.com"; + int expire_day = 365; + int private_key_len = 1024; + + RSA_generate_key_ex(rsa, private_key_len, exponent, NULL); + + srs_assert(EVP_PKEY_set1_RSA(dtls_private_key, rsa) == 1); + + X509* dtls_cert = X509_new(); + srs_assert(dtls_cert); + + X509_NAME* subject = X509_NAME_new(); + srs_assert(subject); + + int serial = rand(); + ASN1_INTEGER_set(X509_get_serialNumber(dtls_cert), serial); + + X509_NAME_add_entry_by_txt(subject, "CN", MBSTRING_ASC, (unsigned char *) aor.data(), aor.size(), -1, 0); + + X509_set_issuer_name(dtls_cert, subject); + X509_set_subject_name(dtls_cert, subject); + + const long cert_duration = 60*60*24*expire_day; + + X509_gmtime_adj(X509_get_notBefore(dtls_cert), 0); + X509_gmtime_adj(X509_get_notAfter(dtls_cert), cert_duration); + + srs_assert(X509_set_pubkey(dtls_cert, dtls_private_key) == 1); + + srs_assert(X509_sign(dtls_cert, dtls_private_key, EVP_sha1()) != 0); + + // cleanup + RSA_free(rsa); + BN_free(exponent); + X509_NAME_free(subject); + + dtls_ctx = SSL_CTX_new(DTLSv1_2_method()); + srs_assert(SSL_CTX_use_certificate(dtls_ctx, dtls_cert) == 1); + + srs_assert(SSL_CTX_use_PrivateKey(dtls_ctx, dtls_private_key) == 1); + srs_assert(SSL_CTX_set_cipher_list(dtls_ctx, "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH") == 1); + srs_assert(SSL_CTX_set_tlsext_use_srtp(dtls_ctx, "SRTP_AES128_CM_SHA1_80") == 0); + + SSL_CTX_set_verify_depth (dtls_ctx, 4); + SSL_CTX_set_read_ahead(dtls_ctx, 1); + + // dtls fingerprint + char fp[100] = {0}; + char *p = fp; + unsigned char md[EVP_MAX_MD_SIZE]; + unsigned int n = 0; + + int r = X509_digest(dtls_cert, EVP_sha256(), md, &n); + + for (unsigned int i = 0; i < n; i++, ++p) { + sprintf(p, "%02X", md[i]); + p += 2; + + if(i < (n-1)) { + *p = ':'; + } else { + *p = '\0'; + } + } + + fingerprint.assign(fp, strlen(fp)); + srs_trace("fingerprint=%s", fingerprint.c_str()); +} diff --git a/trunk/src/app/srs_app_dtls.hpp b/trunk/src/app/srs_app_dtls.hpp new file mode 100644 index 000000000..5853f91b3 --- /dev/null +++ b/trunk/src/app/srs_app_dtls.hpp @@ -0,0 +1,52 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_DTLS_HPP +#define SRS_APP_DTLS_HPP + +#include + +#include + +#include + +class SrsDtls +{ +private: + static SrsDtls* _instance; +private: + std::string fingerprint; + SSL_CTX* dtls_ctx; +private: + SrsDtls(); + virtual ~SrsDtls(); + + void init(); +public: + static SrsDtls* instance(); + SSL_CTX* get_dtls_ctx() { return dtls_ctx; } +public: + std::string get_fingerprint() const { return fingerprint; } +}; + +#endif diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 5355068eb..fd418bf86 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -46,6 +46,7 @@ using namespace std; #include #include #include +#include srs_error_t srs_api_response_jsonp(ISrsHttpResponseWriter* w, string callback, string data) { @@ -780,6 +781,96 @@ srs_error_t SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa return srs_api_response(w, r, obj->dumps()); } +SrsGoApiSdp::SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr) +{ + server = svr; + rtc_server = rtc_svr; +} + +SrsGoApiSdp::~SrsGoApiSdp() +{ +} + +srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + srs_error_t err = srs_success; + + SrsStatistic* stat = SrsStatistic::instance(); + + // path: {pattern}{stream_id} + // e.g. /api/v1/streams/100 pattern= /api/v1/streams/, stream_id=100 + int sid = r->parse_rest_id(entry->pattern); + + SrsStatisticStream* stream = NULL; + if (sid >= 0 && (stream = stat->find_stream(sid)) == NULL) { + return srs_api_response_code(w, r, ERROR_RTMP_STREAM_NOT_FOUND); + } + + string req_json; + r->body_read_all(req_json); + srs_trace("req_json=%s", req_json.c_str()); + + SrsJsonAny* json = SrsJsonAny::loads(req_json); + SrsJsonObject* req_obj = json->to_object(); + + SrsJsonAny* remote_sdp_obj = req_obj->get_property("sdp"); + SrsJsonAny* app_obj = req_obj->get_property("app"); + SrsJsonAny* stream_name_obj = req_obj->get_property("stream"); + + if (remote_sdp_obj == NULL || app_obj == NULL || stream_name_obj == NULL) { + return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); + } + + string remote_sdp_str = remote_sdp_obj->to_str(); + string app = app_obj->to_str(); + string stream_name = stream_name_obj->to_str(); + + srs_trace("remote_sdp_str=%s", remote_sdp_str.c_str()); + srs_trace("app=%s, stream=%s", app.c_str(), stream_name.c_str()); + + SrsSdp remote_sdp; + err = remote_sdp.decode(remote_sdp_str); + if (err != srs_success) { + return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); + } + + SrsSdp local_sdp; + SrsRtcSession* rtc_session = rtc_server->create_rtc_session(remote_sdp, local_sdp); + rtc_session->set_app_stream(app, stream_name); + + string local_sdp_str = ""; + err = local_sdp.encode(local_sdp_str); + if (err != srs_success) { + return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest); + } + + SrsJsonObject* obj = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, obj); + + obj->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); + obj->set("server", SrsJsonAny::integer(stat->server_id())); + + // XXX: ice candidate + //string candidate_str = "candidate:1 1 udp 2115783679 192.168.170.129:9527 typ host generation 0 ufrag " + // + local_sdp.get_ice_ufrag() + "netwrok-cost 50"; + + //SrsJsonObject* candidate_obj = SrsJsonAny::object(); + //SrsAutoFree(SrsJsonObject, candidate_obj); + + //candidate_obj->set("candidate", SrsJsonAny::str(candidate_str.c_str())); + //candidate_obj->set("sdpMid", SrsJsonAny::str("0")); + //candidate_obj->set("sdpMLineIndex", SrsJsonAny::str("0")); + + if (r->is_http_post()) { + obj->set("sdp", SrsJsonAny::str(local_sdp_str.c_str())); + // obj->set("candidate", candidate_obj); + } else { + return srs_go_http_error(w, SRS_CONSTS_HTTP_MethodNotAllowed); + } + + return srs_api_response(w, r, obj->dumps()); +} + SrsGoApiClients::SrsGoApiClients() { } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 5957ff2f3..c4cf02a6c 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -31,6 +31,7 @@ class ISrsHttpMessage; class SrsHttpParser; class SrsHttpHandler; class SrsServer; +class SrsRtcServer; #include #include @@ -164,6 +165,18 @@ public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; +class SrsGoApiSdp : public ISrsHttpHandler +{ +private: + SrsServer* server; + SrsRtcServer* rtc_server; +public: + SrsGoApiSdp(SrsServer* svr, SrsRtcServer* rtc_svr); + virtual ~SrsGoApiSdp(); +public: + virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +}; + class SrsGoApiClients : public ISrsHttpHandler { public: diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index b33b4f6f8..0ba7684cd 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,19 @@ srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) return srs_success; } +ISrsUdpMuxHandler::ISrsUdpMuxHandler() +{ +} + +ISrsUdpMuxHandler::~ISrsUdpMuxHandler() +{ +} + +srs_error_t ISrsUdpMuxHandler::on_stfd_change(srs_netfd_t /*fd*/) +{ + return srs_success; +} + ISrsTcpHandler::ISrsTcpHandler() { } @@ -207,3 +221,194 @@ srs_error_t SrsTcpListener::cycle() return err; } +SrsUdpMuxSocket::SrsUdpMuxSocket(srs_netfd_t fd) +{ + nb_buf = SRS_UDP_MAX_PACKET_SIZE; + buf = new char[nb_buf]; + nread = 0; + + lfd = fd; + + fromlen = 0; +} + +SrsUdpMuxSocket::~SrsUdpMuxSocket() +{ + srs_freepa(buf); +} + +SrsUdpMuxSocket::SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs) +{ + operator=(rhs); +} + +SrsUdpMuxSocket& SrsUdpMuxSocket::operator=(const SrsUdpMuxSocket& rhs) +{ + buf = NULL; + nb_buf = 0; + nread = 0; + lfd = rhs.lfd; + from = rhs.from; + fromlen = rhs.fromlen; + peer_ip = rhs.peer_ip; + peer_port = rhs.peer_port; + + return *this; +} + +int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) +{ + fromlen = sizeof(from); + nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &fromlen, timeout); + + if (nread > 0) { + char address_string[64]; + char port_string[16]; + if (getnameinfo((sockaddr*)&from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)) { + return -1; + } + + peer_ip = std::string(address_string); + peer_port = atoi(port_string); + } + + return nread; +} + +int SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout) +{ + return srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout); +} + +int SrsUdpMuxSocket::sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout) +{ + struct msghdr udphdr = {0}; + udphdr.msg_name = &from; + udphdr.msg_namelen = fromlen; + udphdr.msg_iov = iov; + udphdr.msg_iovlen = iovlen; + + return srs_sendmsg(lfd, &udphdr, 0, timeout); +} + +std::string SrsUdpMuxSocket::get_peer_id() +{ + char id_buf[1024]; + int len = snprintf(id_buf, sizeof(id_buf), "%s:%d", peer_ip.c_str(), peer_port); + + return string(id_buf, len); +} + +SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p) +{ + handler = h; + ip = i; + port = p; + lfd = NULL; + + nb_buf = SRS_UDP_MAX_PACKET_SIZE; + buf = new char[nb_buf]; + + trd = new SrsDummyCoroutine(); +} + +SrsUdpMuxListener::~SrsUdpMuxListener() +{ + srs_freep(trd); + srs_close_stfd(lfd); + srs_freepa(buf); +} + +int SrsUdpMuxListener::fd() +{ + return srs_netfd_fileno(lfd); +} + +srs_netfd_t SrsUdpMuxListener::stfd() +{ + return lfd; +} + +srs_error_t SrsUdpMuxListener::listen() +{ + srs_error_t err = srs_success; + + if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + } + + set_socket_buffer(); + + srs_freep(trd); + trd = new SrsSTCoroutine("udp", this); + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "start thread"); + } + + return err; +} + +void SrsUdpMuxListener::set_socket_buffer() +{ + int sndbuf_size = 0; + socklen_t opt_len = sizeof(sndbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len); + srs_trace("default udp remux socket sndbuf=%d", sndbuf_size); + + sndbuf_size = 1024*1024*10; // 10M + if (setsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, sizeof(sndbuf_size)) < 0) { + srs_warn("set sock opt SO_SNDBUFFORCE failed"); + } + + opt_len = sizeof(sndbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_SNDBUF, (void*)&sndbuf_size, &opt_len); + srs_trace("udp remux socket sndbuf=%d", sndbuf_size); + + int rcvbuf_size = 0; + opt_len = sizeof(rcvbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len); + srs_trace("default udp remux socket rcvbuf=%d", rcvbuf_size); + + rcvbuf_size = 1024*1024*10; // 10M + if (setsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, sizeof(rcvbuf_size)) < 0) { + srs_warn("set sock opt SO_RCVBUFFORCE failed"); + } + + opt_len = sizeof(rcvbuf_size); + getsockopt(fd(), SOL_SOCKET, SO_RCVBUF, (void*)&rcvbuf_size, &opt_len); + srs_trace("udp remux socket rcvbuf=%d", rcvbuf_size); +} + +srs_error_t SrsUdpMuxListener::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "udp listener"); + } + + SrsUdpMuxSocket udp_mux_skt(lfd); + + if (udp_mux_skt.recvfrom(SRS_UTIME_NO_TIMEOUT) <= 0) { + srs_error("udp recv error"); + // remux udp never return + continue; + } + + if ((err = handler->on_udp_packet(&udp_mux_skt)) != srs_success) { + // remux udp never return + srs_error("udp packet handler error:%s", srs_error_desc(err).c_str()); + continue; + } + + if (SrsUdpPacketRecvCycleInterval > 0) { + srs_usleep(SrsUdpPacketRecvCycleInterval); + } + } + + return err; +} diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index d7d930e91..605929902 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -26,6 +26,8 @@ #include +#include + #include #include @@ -33,6 +35,8 @@ struct sockaddr; +class SrsUdpMuxSocket; + // The udp packet handler. class ISrsUdpHandler { @@ -54,6 +58,16 @@ public: virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0; }; +class ISrsUdpMuxHandler +{ +public: + ISrsUdpMuxHandler(); + virtual ~ISrsUdpMuxHandler(); +public: + virtual srs_error_t on_stfd_change(srs_netfd_t fd); + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) = 0; +}; + // The tcp connection handler. class ISrsTcpHandler { @@ -68,13 +82,13 @@ public: // Bind udp port, start thread to recv packet and handler it. class SrsUdpListener : public ISrsCoroutineHandler { -private: +protected: srs_netfd_t lfd; SrsCoroutine* trd; -private: +protected: char* buf; int nb_buf; -private: +protected: ISrsUdpHandler* handler; std::string ip; int port; @@ -113,4 +127,60 @@ public: virtual srs_error_t cycle(); }; +class SrsUdpMuxSocket +{ +private: + char* buf; + int nb_buf; + int nread; + srs_netfd_t lfd; + sockaddr_storage from; + int fromlen; + std::string peer_ip; + int peer_port; +public: + SrsUdpMuxSocket(srs_netfd_t fd); + virtual ~SrsUdpMuxSocket(); + + SrsUdpMuxSocket(const SrsUdpMuxSocket& rhs); + SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs); + + int recvfrom(srs_utime_t timeout); + int sendto(void* data, int size, srs_utime_t timeout); + int sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout); + + char* data() { return buf; } + int size() { return nread; } + std::string get_peer_ip() const { return peer_ip; } + int get_peer_port() const { return peer_port; } + std::string get_peer_id(); +}; + +class SrsUdpMuxListener : public ISrsCoroutineHandler +{ +protected: + srs_netfd_t lfd; + SrsCoroutine* trd; +protected: + char* buf; + int nb_buf; +protected: + ISrsUdpMuxHandler* handler; + std::string ip; + int port; +public: + SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p); + virtual ~SrsUdpMuxListener(); +public: + virtual int fd(); + virtual srs_netfd_t stfd(); +public: + virtual srs_error_t listen(); +// Interface ISrsReusableThreadHandler. +public: + virtual srs_error_t cycle(); +private: + void set_socket_buffer(); +}; + #endif diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 4d73ebf8e..f743a056c 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -37,7 +37,7 @@ #include // the max size of a line of log. -#define LOG_MAX_SIZE 4096 +#define LOG_MAX_SIZE 4096000 // the tail append to each log. #define LOG_TAIL '\n' diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp new file mode 100644 index 000000000..802e911c8 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -0,0 +1,1107 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +using namespace std; + +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static bool is_stun(const char* data, const int size) +{ + return data != NULL && size > 0 && (data[0] == 0 || data[0] == 1); +} + +static bool is_dtls(const char* data, size_t len) +{ + return (len >= 13 && (data[0] > 19 && data[0] < 64)); +} + +static bool is_rtp_or_rtcp(const char* data, size_t len) +{ + return (len >= 12 && (data[0] & 0xC0) == 0x80); +} + +static bool is_rtcp(const char* data, size_t len) +{ + return (len >=12) && (data[0] & 0x80) && (data[1] >= 200 && data[1] <= 209); +} + +static string gen_random_str(int len) +{ + static string random_table = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + string ret; + ret.reserve(len); + for (int i = 0; i < len; ++i) { + ret.append(1, random_table[random() % random_table.size()]); + } + + return ret; +} + +const int SRTP_MASTER_KEY_KEY_LEN = 16; +const int SRTP_MASTER_KEY_SALT_LEN = 14; + +SrsCandidate::SrsCandidate() +{ +} + +SrsCandidate::~SrsCandidate() +{ +} + +std::vector SrsCandidate::get_candidate_ips() +{ + std::vector candidate_ips; + + string candidate = _srs_config->get_rtc_candidates(); + if (candidate == "*" || candidate == "0.0.0.0") { + std::vector tmp = srs_get_local_ips(); + for (int i = 0; i < tmp.size(); ++i) { + if (tmp[i] != "127.0.0.1") { + candidate_ips.push_back(tmp[i]); + } + } + } else { + candidate_ips.push_back(candidate); + } + + return candidate_ips; +} + +SrsSdpMediaInfo::SrsSdpMediaInfo() +{ +} + +SrsSdpMediaInfo::~SrsSdpMediaInfo() +{ +} + +SrsSdp::SrsSdp() +{ +} + +SrsSdp::~SrsSdp() +{ +} + +srs_error_t SrsSdp::decode(const string& sdp_str) +{ + srs_error_t err = srs_success; + + if (sdp_str.size() < 2 || sdp_str[0] != 'v' || sdp_str[1] != '=') { + return srs_error_wrap(err, "invalid sdp_str"); + } + + string line; + istringstream is(sdp_str); + while (getline(is, line)) { + srs_trace("line=%s", line.c_str()); + + if (line.size() < 2 || line[1] != '=') { + return srs_error_wrap(err, "invalid sdp line=%s", line.c_str()); + } + + switch (line[0]) { + case 'v' :{ + break; + } + case 'o' :{ + break; + } + case 's' :{ + break; + } + case 't' :{ + break; + } + case 'c' :{ + break; + } + case 'a' :{ + if (parse_attr(line) != srs_success) { + return srs_error_wrap(err, "decode sdp line=%s failed", line.c_str()); + } + break; + } + case 'm' :{ + break; + } + } + } + + return err; +} + +srs_error_t SrsSdp::encode(string& sdp_str) +{ + srs_error_t err = srs_success; + + string candidate_lines = ""; + + std::vector candidate_ips = SrsCandidate::get_candidate_ips(); + for (int i = 0; i < candidate_ips.size(); ++i) { + ostringstream os; + os << "a=candidate:10 1 udp 2115783679 " << candidate_ips[i] << " " << _srs_config->get_rtc_listen() <<" typ host generation 0\\r\\n"; + candidate_lines += os.str(); + } + + // FIXME: + sdp_str = + "v=0\\r\\n" + "o=- 0 0 IN IP4 127.0.0.1\\r\\n" + "s=-\\r\\n" + "t=0 0\\r\\n" + "a=ice-lite\\r\\n" + "a=group:BUNDLE 0 1\\r\\n" + "a=msid-semantic: WMS 6VrfBKXrwK\\r\\n" + "m=audio 9 UDP/TLS/RTP/SAVPF 111\\r\\n" + "c=IN IP4 0.0.0.0\\r\\n" + + candidate_lines + + "a=rtcp:9 IN IP4 0.0.0.0\\r\\n" + "a=ice-ufrag:" + ice_ufrag + "\\r\\n" + "a=ice-pwd:" + ice_pwd + "\\r\\n" + "a=ice-options:trickle\\r\\n" + "a=fingerprint:sha-256 " + SrsDtls::instance()->get_fingerprint() + "\\r\\n" + "a=sendrecv\\r\\n" + "a=mid:0\\r\\n" + "a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level\\r\\n" + "a=extmap:3 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\\r\\n" + "a=rtcp-mux\\r\\n" + "a=rtpmap:111 opus/48000/2\\r\\n" + "a=fmtp:111 minptime=10;useinbandfec=1\\r\\n" + "a=maxptime:60\\r\\n" + "a=ssrc:3233846890 cname:o/i14u9pJrxRKAsu\\r\\n" + "a=ssrc:3233846890 msid:6VrfBKXrwK a0\\r\\n" + "a=ssrc:3233846890 mslabel:6VrfBKXrwK\\r\\n" + "a=ssrc:3233846890 label:6VrfBKXrwKa0\\r\\n" + "m=video 9 UDP/TLS/RTP/SAVPF 102\\r\\n" + "c=IN IP4 0.0.0.0\\r\\n" + + candidate_lines + + "a=rtcp:9 IN IP4 0.0.0.0\\r\\n" + "b=as:2000000\\r\\n" + "a=ice-ufrag:" + ice_ufrag + "\\r\\n" + "a=ice-pwd:" + ice_pwd + "\\r\\n" + "a=ice-options:trickle\\r\\n" + "a=extmap:2 urn:ietf:params:rtp-hdrext:toffset\\r\\n" + "a=extmap:3 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\\r\\n" + "a=extmap:4 urn:3gpp:video-orientation\\r\\n" + "a=fingerprint:sha-256 " + SrsDtls::instance()->get_fingerprint() + "\\r\\n" + "a=sendrecv\\r\\n" + "a=mid:1\\r\\n" + "a=rtcp-mux\\r\\n" + "a=rtpmap:102 H264/90000\\r\\n" + "a=rtcp-fb:102 goog-remb\\r\\n" + "a=rtcp-fb:102 transport-cc\\r\\n" + "a=rtcp-fb:102 ccm fir \\r\\n" + "a=rtcp-fb:102 nack\\r\\n" + "a=rtcp-fb:102 nack pli \\r\\n" + "a=fmtp:102 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f\\r\\n" + "a=ssrc:3233846889 cname:o/i14u9pJrxRKAsu\\r\\n" + "a=ssrc:3233846889 msid:6VrfBKXrwK v0\\r\\n" + "a=ssrc:3233846889 mslabel:6VrfBKXrwK\\r\\n" + "a=ssrc:3233846889 label:6VrfBKXrwKv0\\r\\n"; + + return err; +} + +srs_error_t SrsSdp::parse_attr(const string& line) +{ + srs_error_t err = srs_success; + + string key = ""; + string val = ""; + string* p = &key; + for (int i = 2; i < line.size(); ++i) { + if (line[i] == ':' && p == &key) { + p = &val; + } else { + if (line[i] != '\r' && line[i] != '\n') { + p->append(1, line[i]); + } + } + } + + srs_trace("sdp attribute key=%s, val=%s", key.c_str(), val.c_str()); + + if (key == "ice-ufrag") { + ice_ufrag = val; + } else if (key == "ice-pwd") { + ice_pwd = val; + } else if (key == "fingerprint") { + + } else { + } + + return err; +} + +SrsDtlsSession::SrsDtlsSession(SrsRtcSession* s) +{ + rtc_session = s; + + dtls = NULL; + bio_in = NULL; + bio_out = NULL; + + client_key = ""; + server_key = ""; + + srtp_send = NULL; + srtp_recv = NULL; + + handshake_done = false; +} + +SrsDtlsSession::~SrsDtlsSession() +{ + if (dtls) { + // this function will free bio_in and bio_out + SSL_free(dtls); + dtls = NULL; + } + + if (srtp_send) { + srtp_dealloc(srtp_send); + } + + if (srtp_recv) { + srtp_dealloc(srtp_recv); + } +} + +srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + int ret = SSL_do_handshake(dtls); + + unsigned char *out_bio_data; + int out_bio_len = BIO_get_mem_data(bio_out, &out_bio_data); + + int ssl_err = SSL_get_error(dtls, ret); + switch(ssl_err) { + case SSL_ERROR_NONE: { + err = on_dtls_handshake_done(udp_mux_skt); + } + break; + + case SSL_ERROR_WANT_READ: { + break; + } + + case SSL_ERROR_WANT_WRITE: { + break; + } + + default: { + break; + } + } + + if (out_bio_len) { + srs_trace("send dtls handshake data"); + udp_mux_skt->sendto(out_bio_data, out_bio_len, 0); + } + + return err; +} + +srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + if (! handshake_done) { + BIO_reset(bio_in); + BIO_reset(bio_out); + BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()); + + handshake(udp_mux_skt); + } else { + BIO_reset(bio_in); + BIO_reset(bio_out); + BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()); + + while (BIO_ctrl_pending(bio_in) > 0) { + char dtls_read_buf[8092]; + int nb = SSL_read(dtls, dtls_read_buf, sizeof(dtls_read_buf)); + + if (nb > 0) { + on_dtls_application_data(dtls_read_buf, nb); + } + } + } + + return err; +} + +srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + srs_trace("dtls handshake done"); + + handshake_done = true; + if ((err = srtp_initialize()) != srs_success) { + srs_error("srtp init failed, err=%s", srs_error_desc(err).c_str()); + return srs_error_wrap(err, "srtp init failed"); + } + + rtc_session->on_connection_established(udp_mux_skt); + + return err; +} + +srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf) +{ + srs_error_t err = srs_success; + + return err; +} + +void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) +{ + if (dtls == NULL) { + srs_trace("send client hello"); + + dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx()); + SSL_set_connect_state(dtls); + + bio_in = BIO_new(BIO_s_mem()); + bio_out = BIO_new(BIO_s_mem()); + + SSL_set_bio(dtls, bio_in, bio_out); + + handshake(udp_mux_skt); + } +} + +srs_error_t SrsDtlsSession::srtp_initialize() +{ + srs_error_t err = srs_success; + + unsigned char material[SRTP_MASTER_KEY_LEN * 2] = {0}; // client(SRTP_MASTER_KEY_KEY_LEN + SRTP_MASTER_KEY_SALT_LEN) + server + static const string dtls_srtp_lable = "EXTRACTOR-dtls_srtp"; + if (! SSL_export_keying_material(dtls, material, sizeof(material), dtls_srtp_lable.c_str(), dtls_srtp_lable.size(), NULL, 0, 0)) { + return srs_error_wrap(err, "SSL_export_keying_material failed"); + } + + size_t offset = 0; + + std::string client_master_key(reinterpret_cast(material), SRTP_MASTER_KEY_KEY_LEN); + offset += SRTP_MASTER_KEY_KEY_LEN; + std::string server_master_key(reinterpret_cast(material + offset), SRTP_MASTER_KEY_KEY_LEN); + offset += SRTP_MASTER_KEY_KEY_LEN; + std::string client_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + offset += SRTP_MASTER_KEY_SALT_LEN; + std::string server_master_salt(reinterpret_cast(material + offset), SRTP_MASTER_KEY_SALT_LEN); + + client_key = client_master_key + client_master_salt; + server_key = server_master_key + server_master_salt; + + if (srtp_send_init() != srs_success) { + return srs_error_wrap(err, "srtp send init failed"); + } + + if (srtp_recv_init() != srs_success) { + return srs_error_wrap(err, "srtp recv init failed"); + } + + return err; +} + +srs_error_t SrsDtlsSession::srtp_send_init() +{ + srs_error_t err = srs_success; + + srtp_policy_t policy; + bzero(&policy, sizeof(policy)); + + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp); + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp); + + policy.ssrc.type = ssrc_any_outbound; + + policy.ssrc.value = 0; + // TODO: adjust window_size + policy.window_size = 8192; + policy.allow_repeat_tx = 1; + policy.next = NULL; + + uint8_t *key = new uint8_t[client_key.size()]; + memcpy(key, client_key.data(), client_key.size()); + policy.key = key; + + if (srtp_create(&srtp_send, &policy) != 0) { + srs_freepa(key); + return srs_error_wrap(err, "srtp_create failed"); + } + + srs_freepa(key); + + return err; +} + +srs_error_t SrsDtlsSession::srtp_recv_init() +{ + srs_error_t err = srs_success; + + srtp_policy_t policy; + bzero(&policy, sizeof(policy)); + + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp); + srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp); + + policy.ssrc.type = ssrc_any_inbound; + + policy.ssrc.value = 0; + // TODO: adjust window_size + policy.window_size = 8192; + policy.allow_repeat_tx = 1; + policy.next = NULL; + + uint8_t *key = new uint8_t[server_key.size()]; + memcpy(key, server_key.data(), server_key.size()); + policy.key = key; + + if (srtp_create(&srtp_recv, &policy) != 0) { + srs_freepa(key); + return srs_error_wrap(err, "srtp_create failed"); + } + + srs_freepa(key); + + return err; +} + +srs_error_t SrsDtlsSession::protect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_send) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_protect(srtp_send, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtp protect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtp protect failed"); +} + +srs_error_t SrsDtlsSession::unprotect_rtp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_recv) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_unprotect(srtp_recv, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtp unprotect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtp unprotect failed"); +} + +srs_error_t SrsDtlsSession::protect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_send) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_protect_rtcp(srtp_send, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtcp protect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtcp protect failed"); +} + +srs_error_t SrsDtlsSession::unprotect_rtcp(char* out_buf, const char* in_buf, int& nb_out_buf) +{ + srs_error_t err = srs_success; + + if (srtp_recv) { + memcpy(out_buf, in_buf, nb_out_buf); + if (srtp_unprotect_rtcp(srtp_recv, out_buf, &nb_out_buf) != 0) { + return srs_error_wrap(err, "rtcp unprotect failed"); + } + + return err; + } + + return srs_error_wrap(err, "rtcp unprotect failed"); +} + +SrsRtcSenderThread::SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid) + : ukt(NULL) +{ + _parent_cid = parent_cid; + trd = new SrsDummyCoroutine(); + + rtc_session = s; + ukt = *u; +} + +SrsRtcSenderThread::~SrsRtcSenderThread() +{ + srs_freep(trd); +} + +int SrsRtcSenderThread::cid() +{ + return trd->cid(); +} + +srs_error_t SrsRtcSenderThread::start() +{ + srs_error_t err = srs_success; + + srs_freep(trd); + trd = new SrsSTCoroutine("rtc_sender", this, _parent_cid); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtc_sender"); + } + + return err; +} + +void SrsRtcSenderThread::stop() +{ + trd->stop(); +} + +void SrsRtcSenderThread::stop_loop() +{ + trd->interrupt(); +} + + +srs_error_t SrsRtcSenderThread::cycle() +{ + srs_error_t err = srs_success; + + SrsSource* source = NULL; + SrsRequest req; + req.app = rtc_session->app; + req.stream = rtc_session->stream; + + if (_srs_sources->fetch_or_create(&req, rtc_session->server, &source) != srs_success) { + srs_error("rtc fetch source failed"); + return srs_error_wrap(err, "rtc fetch source failed"); + } + + srs_trace("rtc fetch source success, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + + SrsConsumer* consumer = NULL; + if (source->create_consumer(NULL, consumer) != srs_success) { + srs_trace("rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + return srs_error_wrap(err, "rtc create consumer, app=%s, stream=%s", rtc_session->app.c_str(), rtc_session->stream.c_str()); + } + + SrsAutoFree(SrsConsumer, consumer); + + while (true) { + if ((err = trd->pull()) != srs_success) { + return srs_error_wrap(err, "rtc sender thread"); + } + + SrsMessageArray msgs(SRS_PERF_MW_MSGS); + +#ifdef SRS_PERF_QUEUE_COND_WAIT + consumer->wait(0, SRS_PERF_MW_SLEEP); +#endif + + int msg_count = 0; + if (consumer->dump_packets(&msgs, msg_count) != srs_success) { + continue; + } + + if (msg_count <= 0) { +#ifndef SRS_PERF_QUEUE_COND_WAIT + srs_usleep(mw_sleep); +#endif + // ignore when nothing got. + continue; + } + + send_and_free_messages(msgs.msgs, msg_count, &ukt); + } +} + +void SrsRtcSenderThread::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt) +{ + for (int i = 0; i < nb_msgs; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + + for (int i = 0; i < msg->rtp_packets.size(); ++i) { + if (rtc_session->dtls_session) { + char protected_buf[kRtpPacketSize]; + int nb_protected_buf = msg->rtp_packets[i]->size; + + rtc_session->dtls_session->protect_rtp(protected_buf, msg->rtp_packets[i]->payload, nb_protected_buf); + udp_mux_skt->sendto(protected_buf, nb_protected_buf, 0); + } + } + + srs_freep(msg); + } +} + +SrsRtcSession::SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const string& un) +{ + server = svr; + rtc_server = rtc_svr; + session_state = INIT; + dtls_session = NULL; + strd = NULL; + + username = un; + + last_stun_time = srs_get_system_time(); +} + +SrsRtcSession::~SrsRtcSession() +{ + srs_freep(dtls_session); + + if (strd) { + strd->stop(); + } + srs_freep(strd); +} + +srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) +{ + srs_error_t err = srs_success; + + if (stun_req->is_binding_request()) { + if (on_binding_request(udp_mux_skt, stun_req) != srs_success) { + return srs_error_wrap(err, "stun binding request failed"); + } + } + + last_stun_time = srs_get_system_time(); + + return err; +} + +srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req) +{ + srs_error_t err = srs_success; + + SrsStunPacket stun_binding_response; + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stun_binding_response.set_message_type(BindingResponse); + stun_binding_response.set_local_ufrag(stun_req->get_remote_ufrag()); + stun_binding_response.set_remote_ufrag(stun_req->get_local_ufrag()); + stun_binding_response.set_transcation_id(stun_req->get_transcation_id()); + // FIXME: inet_addr is deprecated, IPV6 support + stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port()); + + if (stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream) != srs_success) { + return srs_error_wrap(err, "stun binding response encode failed"); + } + + if (udp_mux_skt->sendto(stream->data(), stream->pos(), 0) <= 0) { + return srs_error_wrap(err, "stun binding response send failed"); + } + + if (get_session_state() == WAITING_STUN) { + set_session_state(DOING_DTLS_HANDSHAKE); + send_client_hello(udp_mux_skt); + + peer_id = udp_mux_skt->get_peer_id(); + rtc_server->insert_into_id_sessions(peer_id, this); + } + + return err; +} + +srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt) +{ + if (dtls_session == NULL) { + dtls_session = new SrsDtlsSession(this); + } + + dtls_session->send_client_hello(udp_mux_skt); +} + +void SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_trace("rtc session=%s, connection established", id().c_str()); + start_play(udp_mux_skt); +} + +srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + srs_freep(strd); + strd = new SrsRtcSenderThread(this, udp_mux_skt, _srs_context->get_id()); + strd->start(); + + return err; +} + +srs_error_t SrsRtcSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +{ + return dtls_session->on_dtls(udp_mux_skt); +} + +srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + if (dtls_session == NULL) { + return srs_error_wrap(err, "recv unexpect rtp packet before dtls done"); + } + + char unprotected_buf[1460]; + int nb_unprotected_buf = udp_mux_skt->size(); + if (dtls_session->unprotect_rtp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) { + return srs_error_wrap(err, "rtp unprotect failed"); + } + + // FIXME: use SrsRtpPacket + SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); + SrsAutoFree(SrsBuffer, stream); + uint8_t first = stream->read_1bytes(); + uint8_t second = stream->read_1bytes(); + + bool padding = (first & 0x20); + bool ext = (first & 0x10); + uint8_t cc = (first & 0x0F); + + bool marker = (second & 0x80); + + uint16_t sequence = stream->read_2bytes(); + uint32_t timestamp = stream->read_4bytes(); + uint32_t ssrc = stream->read_4bytes(); + + srs_verbose("sequence=%u, timestamp=%u, ssrc=%u, padding=%d, ext=%d, cc=%u, marker=%d, payload_type=%u", + sequence, timestamp, ssrc, padding, ext, cc, marker, payload_type); + + for (uint8_t i = 0; i < cc; ++i) { + uint32_t csrc = 0; + csrc = stream->read_4bytes(); + } + + if (ext) { + uint16_t extern_profile = stream->read_2bytes(); + uint16_t extern_length = stream->read_2bytes(); + + srs_verbose("extern_profile=%u, extern_length=%u", extern_profile, extern_length); + + stream->read_string(extern_length * 4); + } + + return err; +} + +srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + if (dtls_session == NULL) { + return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done"); + } + + char unprotected_buf[1460]; + int nb_unprotected_buf = udp_mux_skt->size(); + if (dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) { + return srs_error_wrap(err, "rtcp unprotect failed"); + } + + // FIXME: use SrsRtpPacket + SrsBuffer* stream = new SrsBuffer(unprotected_buf, nb_unprotected_buf); + SrsAutoFree(SrsBuffer, stream); + uint8_t first = stream->read_1bytes(); + uint8_t payload_type = stream->read_1bytes(); + + if (payload_type == kSR) { + } else if (payload_type == kRR) { + } else if (kSDES) { + } else if (kBye) { + } else if (kApp) { + } else { + return srs_error_wrap(err, "unknown rtcp type=%u", payload_type); + } + + return err; +} + +SrsRtcServer::SrsRtcServer(SrsServer* svr) +{ + server = svr; +} + +SrsRtcServer::~SrsRtcServer() +{ + rttrd->stop(); + srs_freep(rttrd); +} + +srs_error_t SrsRtcServer::initialize() +{ + srs_error_t err = srs_success; + + rttrd = new SrsRtcTimerThread(this, _srs_context->get_id()); + if (rttrd->start() != srs_success) { + return srs_error_wrap(err, "rtc timer thread init failed"); + } + + return err; +} + +srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + if (is_stun(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_stun(udp_mux_skt); + } else if (is_dtls(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_dtls(udp_mux_skt); + } else if (is_rtp_or_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + return on_rtp_or_rtcp(udp_mux_skt); + } + + return srs_error_wrap(err, "unknown udp packet type"); +} + +SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp) +{ + std::string local_pwd = gen_random_str(32); + std::string local_ufrag = ""; + std::string username = ""; + while (true) { + local_ufrag = gen_random_str(8); + + username = local_ufrag + ":" + remote_sdp.get_ice_ufrag(); + if (! map_username_session.count(username)) + break; + } + + SrsRtcSession* session = new SrsRtcSession(server, this, username); + map_username_session.insert(make_pair(username, session)); + + local_sdp.set_ice_ufrag(local_ufrag); + local_sdp.set_ice_pwd(local_pwd); + + session->set_remote_sdp(remote_sdp); + session->set_local_sdp(local_sdp); + + session->set_session_state(WAITING_STUN); + + return session; +} + +SrsRtcSession* SrsRtcServer::find_rtc_session_by_peer_id(const string& peer_id) +{ + map::iterator iter = map_id_session.find(peer_id); + if (iter == map_id_session.end()) { + return NULL; + } + + return iter->second; +} + +srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + srs_trace("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str()); + + SrsStunPacket stun_req; + if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) { + return srs_error_wrap(err, "decode stun packet failed"); + } + + std::string username = stun_req.get_username(); + SrsRtcSession* rtc_session = find_rtc_session_by_username(username); + if (rtc_session == NULL) { + return srs_error_wrap(err, "can not find rtc_session, stun username=%s", username.c_str()); + } + + return rtc_session->on_stun(udp_mux_skt, &stun_req); +} + +srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + srs_trace("on dtls"); + + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); + + if (rtc_session == NULL) { + return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); + } + + rtc_session->on_dtls(udp_mux_skt); + + return err; +} + +srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt) +{ + srs_error_t err = srs_success; + + SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id()); + + if (rtc_session == NULL) { + return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str()); + } + + if (is_rtcp(udp_mux_skt->data(), udp_mux_skt->size())) { + rtc_session->on_rtcp(udp_mux_skt); + } else { + rtc_session->on_rtp(udp_mux_skt); + } + + return err; +} + +SrsRtcSession* SrsRtcServer::find_rtc_session_by_username(const std::string& username) +{ + map::iterator iter = map_username_session.find(username); + if (iter == map_username_session.end()) { + return NULL; + } + + return iter->second; +} + +bool SrsRtcServer::insert_into_id_sessions(const string& peer_id, SrsRtcSession* rtc_session) +{ + return map_id_session.insert(make_pair(peer_id, rtc_session)).second; +} + +void SrsRtcServer::check_and_clean_timeout_session() +{ + map::iterator iter = map_username_session.begin(); + while (iter != map_username_session.end()) { + SrsRtcSession* session = iter->second; + if (session == NULL) { + map_username_session.erase(iter++); + continue; + } + + if (session->is_stun_timeout()) { + srs_trace("rtc session=%s, stun timeout", session->id().c_str()); + map_username_session.erase(iter++); + map_id_session.erase(session->get_peer_id()); + delete session; + continue; + } + + ++iter; + } +} + +SrsRtcTimerThread::SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid) +{ + _parent_cid = parent_cid; + trd = new SrsDummyCoroutine(); + + rtc_server = rtc_svr; +} + +SrsRtcTimerThread::~SrsRtcTimerThread() +{ + srs_freep(trd); +} + +int SrsRtcTimerThread::cid() +{ + return trd->cid(); +} + +srs_error_t SrsRtcTimerThread::start() +{ + srs_error_t err = srs_success; + + srs_freep(trd); + trd = new SrsSTCoroutine("rtc_timer", this, _parent_cid); + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "rtc_timer"); + } + + return err; +} + +void SrsRtcTimerThread::stop() +{ + trd->stop(); +} + +void SrsRtcTimerThread::stop_loop() +{ + trd->interrupt(); +} + +srs_error_t SrsRtcTimerThread::cycle() +{ + srs_error_t err = srs_success; + + while (true) { + if ((err = trd->pull()) != srs_success) { + srs_trace("rtc_timer cycle failed"); + return srs_error_wrap(err, "rtc timer thread"); + } + + srs_usleep(1*1000*1000LL); + rtc_server->check_and_clean_timeout_session(); + } +} diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp new file mode 100644 index 000000000..fc67ba431 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -0,0 +1,277 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_RTC_CONN_HPP +#define SRS_APP_RTC_CONN_HPP + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +class SrsUdpMuxSocket; +class SrsServer; +class SrsConsumer; +class SrsStunPacket; +class SrsRtcServer; +class SrsRtcSession; +class SrsSharedPtrMessage; + +const uint8_t kSR = 200; +const uint8_t kRR = 201; +const uint8_t kSDES = 202; +const uint8_t kBye = 203; +const uint8_t kApp = 204; + +const srs_utime_t kSrsRtcSessionStunTimeoutUs = 10*1000*1000LL; + +class SrsCandidate +{ +private: +public: + SrsCandidate(); + virtual ~SrsCandidate(); + + static std::vector get_candidate_ips(); +}; + +class SrsSdpMediaInfo +{ +private: +public: + SrsSdpMediaInfo(); + virtual ~SrsSdpMediaInfo(); +}; + +class SrsSdp +{ +private: + std::string sdp; + int version; + std::string ice_ufrag; + std::string ice_pwd; + std::string fingerprint; + std::string setup; + std::vector media_infos; +public: + SrsSdp(); + virtual ~SrsSdp(); + + srs_error_t decode(const std::string& sdp_str); + srs_error_t encode(std::string& sdp_str); + + std::string get_ice_ufrag() const { return ice_ufrag; } + std::string get_ice_pwd() const { return ice_pwd; } + + void set_ice_ufrag(const std::string& u) { ice_ufrag = u; } + void set_ice_pwd(const std::string& p) { ice_pwd = p; } +private: + srs_error_t parse_attr(const std::string& line); +}; + +enum SrsRtcSessionStateType +{ + INIT = -1, + WAITING_STUN = 1, + DOING_DTLS_HANDSHAKE = 2, + ESTABLISHED = 3, + CLOSED = 4, +}; + +class SrsDtlsSession +{ +private: + SrsRtcSession* rtc_session; + + SSL* dtls; + BIO* bio_in; + BIO* bio_out; + + std::string client_key; + std::string server_key; + + srtp_t srtp_send; + srtp_t srtp_recv; + + bool handshake_done; + +public: + SrsDtlsSession(SrsRtcSession* s); + virtual ~SrsDtlsSession(); + + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls_application_data(const char* data, const int len); + + void send_client_hello(SrsUdpMuxSocket* udp_mux_skt); +public: + srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); + srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); + srs_error_t protect_rtcp(char* protected_buf, const char* ori_buf, int& nb_protected_buf); + srs_error_t unprotect_rtcp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf); +private: + srs_error_t handshake(SrsUdpMuxSocket* udp_mux_skt); +private: + srs_error_t srtp_initialize(); + srs_error_t srtp_send_init(); + srs_error_t srtp_recv_init(); +}; + +class SrsRtcSenderThread : public ISrsCoroutineHandler +{ +protected: + SrsCoroutine* trd; + int _parent_cid; +private: + SrsRtcSession* rtc_session; + SrsUdpMuxSocket ukt; +public: + // Constructor. + // @param tm The receive timeout in srs_utime_t. + SrsRtcSenderThread(SrsRtcSession* s, SrsUdpMuxSocket* u, int parent_cid); + virtual ~SrsRtcSenderThread(); +public: + virtual int cid(); +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void stop_loop(); +public: + virtual srs_error_t cycle(); +private: + void send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, SrsUdpMuxSocket* udp_mux_skt); +}; + +class SrsRtcSession +{ + friend class SrsRtcSenderThread; +private: + SrsServer* server; + SrsRtcServer* rtc_server; + SrsSdp remote_sdp; + SrsSdp local_sdp; + SrsRtcSessionStateType session_state; + SrsDtlsSession* dtls_session; + SrsRtcSenderThread* strd; + std::string username; + std::string peer_id; + srs_utime_t last_stun_time; +public: + std::string app; + std::string stream; +public: + SrsRtcSession(SrsServer* svr, SrsRtcServer* rtc_svr, const std::string& un); + virtual ~SrsRtcSession(); +public: + SrsSdp* get_local_sdp() { return &local_sdp; } + void set_local_sdp(const SrsSdp& sdp) { local_sdp = sdp; } + + SrsSdp* get_remote_sdp() { return &remote_sdp; } + void set_remote_sdp(const SrsSdp& sdp) { remote_sdp = sdp; } + + SrsRtcSessionStateType get_session_state() { return session_state; } + void set_session_state(SrsRtcSessionStateType state) { session_state = state; } + + std::string id() const { return peer_id + "_" + username; } + + void set_app_stream(const std::string& a, const std::string& s) { app = a; stream = s; } + + std::string get_peer_id() const { return peer_id; } + void set_peer_id(const std::string& id) { peer_id = id; } +public: + srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtp(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt); +public: + srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt); + void on_connection_established(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt); +public: + bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); } +private: + srs_error_t on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket* stun_req); +private: + srs_error_t do_playing(SrsConsumer* consumer, SrsUdpMuxSocket* udp_mux_skt); +}; + +// XXX: is there any other timer thread? +class SrsRtcTimerThread : public ISrsCoroutineHandler +{ +protected: + SrsCoroutine* trd; + int _parent_cid; +private: + SrsRtcServer* rtc_server; +public: + // Constructor. + // @param tm The receive timeout in srs_utime_t. + SrsRtcTimerThread(SrsRtcServer* rtc_svr, int parent_cid); + virtual ~SrsRtcTimerThread(); +public: + virtual int cid(); +public: + virtual srs_error_t start(); + virtual void stop(); + virtual void stop_loop(); +public: + virtual srs_error_t cycle(); +}; + +class SrsRtcServer : public ISrsUdpMuxHandler +{ +private: + SrsServer* server; + SrsRtcTimerThread* rttrd; +private: + std::map map_username_session; // key: username(local_ufrag + ":" + remote_ufrag) + std::map map_id_session; // key: peerip(ip + ":" + port) +public: + SrsRtcServer(SrsServer* svr); + virtual ~SrsRtcServer(); +public: + virtual srs_error_t initialize(); + + virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* udp_mux_skt); + + SrsRtcSession* create_rtc_session(const SrsSdp& remote_sdp, SrsSdp& local_sdp); + bool insert_into_id_sessions(const std::string& peer_id, SrsRtcSession* rtc_session); + void check_and_clean_timeout_session(); +private: + srs_error_t on_stun(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_dtls(SrsUdpMuxSocket* udp_mux_skt); + srs_error_t on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt); +private: + SrsRtcSession* find_rtc_session_by_username(const std::string& ufrag); + SrsRtcSession* find_rtc_session_by_peer_id(const std::string& peer_id); +}; + +#endif + diff --git a/trunk/src/app/srs_app_rtp.cpp b/trunk/src/app/srs_app_rtp.cpp new file mode 100644 index 000000000..ee88f419d --- /dev/null +++ b/trunk/src/app/srs_app_rtp.cpp @@ -0,0 +1,366 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +SrsRtpMuxer::SrsRtpMuxer() +{ + sequence = 0; +} + +SrsRtpMuxer::~SrsRtpMuxer() +{ +} + +srs_error_t SrsRtpMuxer::frame_to_packet(SrsSharedPtrMessage* shared_frame, SrsFormat* format) +{ + srs_error_t err = srs_success; + + if (format->is_avc_sequence_header()) { + sps.assign(format->vcodec->sequenceParameterSetNALUnit.data(), format->vcodec->sequenceParameterSetNALUnit.size()); + pps.assign(format->vcodec->pictureParameterSetNALUnit.data(), format->vcodec->pictureParameterSetNALUnit.size()); + } + + vector rtp_packet_vec; + + for (int i = 0; i < format->video->nb_samples; ++i) { + SrsSample sample = format->video->samples[i]; + + uint8_t header = sample.bytes[0]; + uint8_t nal_type = header & kNalTypeMask; + + if (nal_type == 0x06) { + srs_trace("ignore SEI"); + continue; + } + + if (sample.size <= max_payload_size) { + packet_single_nalu(shared_frame, format, &sample, rtp_packet_vec); + } else { + packet_fu_a(shared_frame, format, &sample, rtp_packet_vec); + } + +#if 0 + srs_trace("nal size=%d, nal=%s", sample.size, dump_string_hex(sample.bytes, sample.size, sample.size).c_str()); + for (int i = 0; i < shared_frame->nb_rtp_fragments; ++i) { + srs_trace("rtp=%s", dump_string_hex(shared_frame->rtp_fragments[i].bytes, shared_frame->rtp_fragments[i].size, kRtpPacketSize).c_str()); + } +#endif + } + + shared_frame->set_rtp_packets(rtp_packet_vec); + + return err; +} + +srs_error_t SrsRtpMuxer::packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector& rtp_packet_vec) +{ + srs_error_t err = srs_success; + + char* p = sample->bytes + 1; + int nb_left = sample->size - 1; + uint8_t header = sample->bytes[0]; + uint8_t nal_type = header & kNalTypeMask; + + if (nal_type == kIdr) { + packet_stap_a(sps, pps, shared_frame, rtp_packet_vec); + } + + int num_of_packet = (sample->size - 1 + max_payload_size) / max_payload_size; + int avg_packet_size = sample->size / num_of_packet; + for (int i = 0; i < num_of_packet; ++i) { + char* buf = new char[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize); + SrsAutoFree(SrsBuffer, stream); + + int packet_size = min(nb_left, max_payload_size); + + // v=2,p=0,x=0,cc=0 + stream->write_1bytes(0x80); + // marker payloadtype + if (i == num_of_packet - 1) { + stream->write_1bytes(kMarker | kH264PayloadType); + } else { + stream->write_1bytes(kH264PayloadType); + } + // sequence + stream->write_2bytes(sequence); + // timestamp + stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); + // ssrc + stream->write_4bytes(int32_t(kVideoSSRC)); + + // fu-indicate + uint8_t fu_indicate = kFuA; + fu_indicate |= (header & (~kNalTypeMask)); + stream->write_1bytes(fu_indicate); + + uint8_t fu_header = nal_type; + if (i == 0) + fu_header |= kStart; + if (i == num_of_packet - 1) + fu_header |= kEnd; + stream->write_1bytes(fu_header); + + stream->write_bytes(p, packet_size); + p += packet_size; + nb_left -= packet_size; + + + SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket(); + rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos()); + + rtp_packet_vec.push_back(rtp_shared_pkt); + } +} + +srs_error_t SrsRtpMuxer::packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, vector& rtp_packet_vec) +{ + srs_error_t err = srs_success; + + uint8_t header = sample->bytes[0]; + uint8_t nal_type = header & kNalTypeMask; + + char* buf = new char[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize); + SrsAutoFree(SrsBuffer, stream); + + if (nal_type == kIdr) { + packet_stap_a(sps, pps, shared_frame, rtp_packet_vec); + } + + // v=2,p=0,x=0,cc=0 + stream->write_1bytes(0x80); + // marker payloadtype + stream->write_1bytes(kMarker | kH264PayloadType); + // sequenct + stream->write_2bytes(sequence); + // timestamp + stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); + // ssrc + stream->write_4bytes(int32_t(kVideoSSRC)); + + stream->write_bytes(sample->bytes, sample->size); + + SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket(); + rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos()); + + rtp_packet_vec.push_back(rtp_shared_pkt); + + return err; +} + +srs_error_t SrsRtpMuxer::packet_stap_a(const string &sps, const string& pps, SrsSharedPtrMessage* shared_frame, vector& rtp_packet_vec) +{ + srs_error_t err = srs_success; + + uint8_t header = sps[0]; + uint8_t nal_type = header & kNalTypeMask; + + char* buf = new char[kRtpPacketSize]; + SrsBuffer* stream = new SrsBuffer(buf, kRtpPacketSize); + SrsAutoFree(SrsBuffer, stream); + + // v=2,p=0,x=0,cc=0 + stream->write_1bytes(0x80); + // marker payloadtype + stream->write_1bytes(kMarker | kH264PayloadType); + // sequenct + stream->write_2bytes(sequence); + // timestamp + stream->write_4bytes(int32_t(shared_frame->timestamp * 90)); + // ssrc + stream->write_4bytes(int32_t(kVideoSSRC)); + + // stap-a header + uint8_t stap_a_header = kStapA; + stap_a_header |= (nal_type & (~kNalTypeMask)); + stream->write_1bytes(stap_a_header); + + stream->write_2bytes(sps.size()); + stream->write_bytes((char*)sps.data(), sps.size()); + + stream->write_2bytes(pps.size()); + stream->write_bytes((char*)pps.data(), pps.size()); + + SrsRtpSharedPacket* rtp_shared_pkt = new SrsRtpSharedPacket(); + rtp_shared_pkt->create((shared_frame->timestamp * 90), sequence++, kVideoSSRC, kH264PayloadType, stream->data(), stream->pos()); + + rtp_packet_vec.push_back(rtp_shared_pkt); + + return err; +} + +SrsRtp::SrsRtp() +{ + req = NULL; + hub = NULL; + + enabled = false; + disposable = false; + last_update_time = 0; +} + +SrsRtp::~SrsRtp() +{ +} + +void SrsRtp::dispose() +{ + if (enabled) { + on_unpublish(); + } +} + +srs_error_t SrsRtp::cycle() +{ + srs_error_t err = srs_success; + + return err; +} + +srs_error_t SrsRtp::initialize(SrsOriginHub* h, SrsRequest* r) +{ + srs_error_t err = srs_success; + + hub = h; + req = r; + + rtp_h264_muxer = new SrsRtpMuxer(); + + return err; +} + +srs_error_t SrsRtp::on_publish() +{ + srs_error_t err = srs_success; + + // update the hls time, for hls_dispose. + last_update_time = srs_get_system_time(); + + // support multiple publish. + if (enabled) { + return err; + } + + // if enabled, open the muxer. + enabled = true; + + // ok, the hls can be dispose, or need to be dispose. + disposable = true; + + return err; +} + +void SrsRtp::on_unpublish() +{ + srs_error_t err = srs_success; + + // support multiple unpublish. + if (!enabled) { + return; + } + + enabled = false; +} + +srs_error_t SrsRtp::on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format) +{ + srs_error_t err = srs_success; + + if (!enabled) { + return err; + } + + // Ignore if no format->acodec, it means the codec is not parsed, or unknown codec. + // @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474 + if (!format->acodec) { + return err; + } + + // update the hls time, for hls_dispose. + last_update_time = srs_get_system_time(); + + // ts support audio codec: aac/mp3 + SrsAudioCodecId acodec = format->acodec->id; + if (acodec != SrsAudioCodecIdAAC && acodec != SrsAudioCodecIdMP3) { + return err; + } + + // ignore sequence header + srs_assert(format->audio); + + // TODO: rtc no support aac + return err; +} + +srs_error_t SrsRtp::on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format) +{ + srs_error_t err = srs_success; + + if (!enabled) { + return err; + } + + // Ignore if no format->vcodec, it means the codec is not parsed, or unknown codec. + // @issue https://github.com/ossrs/srs/issues/1506#issuecomment-562079474 + if (!format->vcodec) { + return err; + } + + // update the hls time, for hls_dispose. + last_update_time = srs_get_system_time(); + + // ignore info frame, + // @see https://github.com/ossrs/srs/issues/288#issuecomment-69863909 + srs_assert(format->video); + return rtp_h264_muxer->frame_to_packet(shared_video, format); +} diff --git a/trunk/src/app/srs_app_rtp.hpp b/trunk/src/app/srs_app_rtp.hpp new file mode 100644 index 000000000..2a23440cd --- /dev/null +++ b/trunk/src/app/srs_app_rtp.hpp @@ -0,0 +1,97 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_RTP_HPP +#define SRS_APP_RTP_HPP + +#include + +#include +#include +#include + +class SrsFormat; +class SrsSample; +class SrsSharedPtrMessage; +class SrsRtpSharedPacket; +class SrsRequest; +class SrsOriginHub; + +const int max_payload_size = 1200; +const int kRtpPacketSize = 1500; + +const uint8_t kMarker = 0x80; +const uint8_t kH264PayloadType = 102; + +const uint8_t kNalTypeMask = 0x1F; + +const uint8_t kIdr = 5; +const uint8_t kStapA = 24; +const uint8_t kFuA = 28; + +const uint8_t kStart = 0x80; +const uint8_t kEnd = 0x40; + +const uint32_t kVideoSSRC = 3233846889; + +class SrsRtpMuxer +{ +private: + uint16_t sequence; + std::string sps; + std::string pps; +public: + SrsRtpMuxer(); + virtual ~SrsRtpMuxer(); +public: + srs_error_t frame_to_packet(SrsSharedPtrMessage* shared_video, SrsFormat* format); +private: + srs_error_t packet_fu_a(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector& rtp_packet_vec); + srs_error_t packet_single_nalu(SrsSharedPtrMessage* shared_frame, SrsFormat* format, SrsSample* sample, std::vector& rtp_packet_vec); + srs_error_t packet_stap_a(const std::string &sps, const std::string& pps, SrsSharedPtrMessage* shared_frame, std::vector& rtp_packet_vec); +}; + +class SrsRtp +{ +private: + SrsRequest* req; + bool enabled; + bool disposable; + srs_utime_t last_update_time; + SrsRtpMuxer* rtp_h264_muxer; + SrsOriginHub* hub; +public: + SrsRtp(); + virtual ~SrsRtp(); +public: + virtual void dispose(); + virtual srs_error_t cycle(); +public: + virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r); + virtual srs_error_t on_publish(); + virtual void on_unpublish(); + virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* format); + virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); +}; + +#endif diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index e4fdf4db2..5e3294c0b 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -45,6 +45,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -109,6 +110,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "RTSP"; case SrsListenerFlv: return "HTTP-FLV"; + case SrsListenerRtc: + return "RTC"; default: return "UNKONWN"; } @@ -339,6 +342,45 @@ SrsUdpCasterListener::~SrsUdpCasterListener() srs_freep(caster); } +SrsRtcListener::SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t) : SrsListener(svr, t) +{ + srs_assert(type == SrsListenerRtc); + rtc = rtc_svr; +} + +SrsRtcListener::~SrsRtcListener() +{ +} + +srs_error_t SrsRtcListener::listen(std::string i, int p) +{ + srs_error_t err = srs_success; + + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListenerRtc); + + ip = i; + port = p; + + srs_freep(listener); + listener = new SrsUdpMuxListener(rtc, ip, port); + + if ((err = listener->listen()) != srs_success) { + return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); + } + + // notify the handler the fd changed. + if ((err = rtc->on_stfd_change(listener->stfd())) != srs_success) { + return srs_error_wrap(err, "notify fd change failed"); + } + + string v = srs_listener_type2string(type); + srs_trace("%s listen at udp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd()); + + return err; +} + SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager::SrsSignalManager(SrsServer* s) @@ -630,6 +672,7 @@ SrsServer::SrsServer() // new these objects in initialize instead. http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); + rtc_server = new SrsRtcServer(this); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); } @@ -754,6 +797,10 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) if ((err = http_server->initialize()) != srs_success) { return srs_error_wrap(err, "http server initialize"); } + + if ((err = rtc_server->initialize()) != srs_success) { + return srs_error_wrap(err, "rtc server initialize"); + } return err; } @@ -876,6 +923,10 @@ srs_error_t SrsServer::listen() if ((err = listen_stream_caster()) != srs_success) { return srs_error_wrap(err, "stream caster listen"); } + + if ((err = listen_rtc()) != srs_success) { + return srs_error_wrap(err, "rtc listen"); + } if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); @@ -938,6 +989,9 @@ srs_error_t SrsServer::http_handle() if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) { return srs_error_wrap(err, "handle streams"); } + if ((err = http_api_mux->handle("/api/v1/sdp/", new SrsGoApiSdp(this, rtc_server))) != srs_success) { + return srs_error_wrap(err, "handle sdp"); + } if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) { return srs_error_wrap(err, "handle clients"); } @@ -1346,6 +1400,35 @@ srs_error_t SrsServer::listen_stream_caster() return err; } +srs_error_t SrsServer::listen_rtc() +{ + srs_error_t err = srs_success; + + close_listeners(SrsListenerRtc); + + if (!_srs_config->get_rtc_enabled()) { + return err; + } + + SrsListener* listener = NULL; + + listener = new SrsRtcListener(this, rtc_server, SrsListenerRtc); + srs_assert(listener != NULL); + + listeners.push_back(listener); + + int port = _srs_config->get_rtc_listen(); + if (port <= 0) { + return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port); + } + + if ((err = listener->listen(srs_any_address_for_listener(), port)) != srs_success) { + return srs_error_wrap(err, "listen at %d", port); + } + + return err; +} + void SrsServer::close_listeners(SrsListenerType type) { std::vector::iterator it; @@ -1362,6 +1445,23 @@ void SrsServer::close_listeners(SrsListenerType type) } } +SrsListener* SrsServer::find_listener(SrsListenerType type) +{ + std::vector::iterator it; + for (it = listeners.begin(); it != listeners.end();) { + SrsListener* listener = *it; + + if (listener->listen_type() != type) { + ++it; + continue; + } + + return *it; + } + + return NULL; +} + void SrsServer::resample_kbps() { SrsStatistic* stat = SrsStatistic::instance(); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 56fa945df..282e058a2 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -41,6 +41,7 @@ class SrsServer; class SrsConnection; class SrsHttpServeMux; class SrsHttpServer; +class SrsRtcServer; class SrsIngester; class SrsHttpHeartbeat; class SrsKbps; @@ -69,6 +70,8 @@ enum SrsListenerType SrsListenerRtsp = 4, // TCP stream, FLV stream over HTTP. SrsListenerFlv = 5, + // UDP remux, rtp over udp + SrsListenerRtc = 6, }; // A common tcp listener, for RTMP/HTTP server. @@ -156,6 +159,19 @@ public: virtual ~SrsUdpCasterListener(); }; +// A UDP listener, for udp remux rtc server +class SrsRtcListener : public SrsListener +{ +protected: + SrsUdpMuxListener* listener; + ISrsUdpMuxHandler* rtc; +public: + SrsRtcListener(SrsServer* svr, SrsRtcServer* rtc_svr, SrsListenerType t); + virtual ~SrsRtcListener(); +public: + virtual srs_error_t listen(std::string i, int p); +}; + // Convert signal to io, // @see: st-1.9/docs/notes.html class SrsSignalManager : public ISrsCoroutineHandler @@ -225,6 +241,7 @@ private: // TODO: FIXME: rename to http_api SrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; + SrsRtcServer* rtc_server; SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; SrsCoroutineManager* conn_manager; @@ -303,6 +320,7 @@ private: virtual srs_error_t listen_http_api(); virtual srs_error_t listen_http_stream(); virtual srs_error_t listen_stream_caster(); + virtual srs_error_t listen_rtc(); // Close the listeners for specified type, // Remove the listen object from manager. virtual void close_listeners(SrsListenerType type); @@ -338,6 +356,8 @@ public: public: virtual srs_error_t on_publish(SrsSource* s, SrsRequest* r); virtual void on_unpublish(SrsSource* s, SrsRequest* r); +// listeners commuction + virtual SrsListener* find_listener(SrsListenerType type); }; #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 3473909eb..a7a3297bc 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -32,6 +32,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -824,6 +825,7 @@ SrsOriginHub::SrsOriginHub() dash = new SrsDash(); dvr = new SrsDvr(); encoder = new SrsEncoder(); + rtp = new SrsRtp(); #ifdef SRS_AUTO_HDS hds = new SrsHds(); #endif @@ -868,6 +870,10 @@ srs_error_t SrsOriginHub::initialize(SrsSource* s, SrsRequest* r) return srs_error_wrap(err, "format initialize"); } + if ((err = rtp->initialize(this, req)) != srs_success) { + return srs_error_wrap(err, "rtp initialize"); + } + if ((err = hls->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "hls initialize"); } @@ -965,6 +971,12 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); } + + if ((err = rtp->on_audio(msg, format)) != srs_success) { + srs_warn("rtp: ignore audio error %s", srs_error_desc(err).c_str()); + srs_error_reset(err); + rtp->on_unpublish(); + } if ((err = hls->on_audio(msg, format)) != srs_success) { // apply the error strategy for hls. @@ -1058,6 +1070,12 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se if (format->vcodec && !format->vcodec->is_avc_codec_ok()) { return err; } + + if ((err = rtp->on_video(msg, format)) != srs_success) { + srs_warn("rtp: ignore video error %s", srs_error_desc(err).c_str()); + srs_error_reset(err); + rtp->on_unpublish(); + } if ((err = hls->on_video(msg, format)) != srs_success) { // apply the error strategy for hls. @@ -1126,6 +1144,10 @@ srs_error_t SrsOriginHub::on_publish() return srs_error_wrap(err, "encoder publish"); } + if ((err = rtp->on_publish()) != srs_success) { + return srs_error_wrap(err, "rtp publish"); + } + if ((err = hls->on_publish()) != srs_success) { return srs_error_wrap(err, "hls publish"); } @@ -1163,6 +1185,7 @@ void SrsOriginHub::on_unpublish() destroy_forwarders(); encoder->on_unpublish(); + rtp->on_unpublish(); hls->on_unpublish(); dash->on_unpublish(); dvr->on_unpublish(); @@ -1705,6 +1728,7 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* SrsSource* source = NULL; if ((source = fetch(r)) != NULL) { + srs_trace("found source"); *pps = source; return err; } @@ -1714,6 +1738,8 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); + + srs_trace("new source, stream_url=%s", stream_url.c_str()); source = new SrsSource(); if ((err = source->initialize(r, h)) != srs_success) { @@ -2256,7 +2282,7 @@ srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video) return srs_error_wrap(err, "create message"); } - // directly process the audio message. + // directly process the video message. if (!mix_correct) { return on_video_imp(&msg); } @@ -2306,7 +2332,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg) if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) { return srs_error_wrap(err, "hub consume video"); } - + // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index f93bc314d..d28c37913 100644 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -54,6 +54,7 @@ class SrsNgExec; class SrsConnection; class SrsMessageHeader; class SrsHls; +class SrsRtp; class SrsDvr; class SrsDash; class SrsEncoder; @@ -335,6 +336,8 @@ private: private: // The format, codec information. SrsRtmpFormat* format; + // rtp handler + SrsRtp* rtp; // hls handler. SrsHls* hls; // The DASH encoder. diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index 04ece7184..66ae11269 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -1193,3 +1193,37 @@ void srs_api_dump_summaries(SrsJsonObject* obj) sys->set("conn_srs", SrsJsonAny::integer(nrs->nb_conn_srs)); } +string dump_string_hex(const std::string& str, const int& max_len) +{ + return dump_string_hex(str.c_str(), str.size(), max_len); +} + +string dump_string_hex(const char* buf, const int nb_buf, const int& max_len) +{ + string ret; + ret.reserve(max_len * 4); + + char tmp_buf[1024*16]; + tmp_buf[0] = '\n'; + int len = 1; + + for (int i = 0; i < nb_buf && i < max_len; ++i) { + //int nb = snprintf(tmp_buf + len, sizeof(tmp_buf) - len - 2, "(%03d)%02X ", i, (uint8_t)buf[i]); + int nb = snprintf(tmp_buf + len, sizeof(tmp_buf) - len - 2, "%02X ", (uint8_t)buf[i]); + if (nb <= 0) + break; + + len += nb; + + if (i % 48 == 47) { + tmp_buf[len++] = '\n'; + ret.append(tmp_buf, len); + len = 0; + } + } + tmp_buf[len] = '\0'; + ret.append(tmp_buf, len); + + return ret; + +} diff --git a/trunk/src/app/srs_app_utility.hpp b/trunk/src/app/srs_app_utility.hpp index be7cbced6..77d28f802 100644 --- a/trunk/src/app/srs_app_utility.hpp +++ b/trunk/src/app/srs_app_utility.hpp @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -649,5 +650,8 @@ extern bool srs_is_boolean(std::string str); // Dump summaries for /api/v1/summaries. extern void srs_api_dump_summaries(SrsJsonObject* obj); +extern std::string dump_string_hex(const std::string& str, const int& max_len = INT_MAX); +extern std::string dump_string_hex(const char* buf, const int nb_buf, const int& max_len = INT_MAX); + #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 558bde405..79e30ba69 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -324,6 +324,8 @@ #define ERROR_HTTP_302_INVALID 4038 #define ERROR_BASE64_DECODE 4039 #define ERROR_HTTP_STREAM_EOF 4040 +#define ERROR_RTC_PORT 4041 +#define ERROR_RTP_PACKET_CREATE 4042 /////////////////////////////////////////////////////// // HTTP API error. diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp index 0259def06..5996b60b5 100644 --- a/trunk/src/kernel/srs_kernel_flv.cpp +++ b/trunk/src/kernel/srs_kernel_flv.cpp @@ -36,6 +36,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -228,6 +229,10 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage() ptr->shared_count--; } } + + for (int i = 0; i < rtp_packets.size(); ++i) { + srs_freep(rtp_packets[i]); + } } srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage* msg) @@ -345,10 +350,19 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy() copy->stream_id = stream_id; copy->payload = ptr->payload; copy->size = ptr->size; - + + for (int i = 0; i < rtp_packets.size(); ++i) { + copy->rtp_packets.push_back(rtp_packets[i]->copy()); + } + return copy; } +void SrsSharedPtrMessage::set_rtp_packets(const std::vector& pkts) +{ + rtp_packets = pkts; +} + SrsFlvTransmuxer::SrsFlvTransmuxer() { writer = NULL; diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp index ebc587f39..3bf3afc69 100644 --- a/trunk/src/kernel/srs_kernel_flv.hpp +++ b/trunk/src/kernel/srs_kernel_flv.hpp @@ -27,6 +27,7 @@ #include #include +#include // For srs-librtmp, @see https://github.com/ossrs/srs/issues/213 #ifndef _WIN32 @@ -38,6 +39,8 @@ class ISrsWriter; class ISrsReader; class SrsFileReader; class SrsPacket; +class SrsSample; +class SrsRtpSharedPacket; #define SRS_FLV_TAG_HEADER_SIZE 11 #define SRS_FLV_PREVIOUS_TAG_SIZE 4 @@ -285,6 +288,8 @@ public: // @remark, not all message payload can be decoded to packet. for example, // video/audio packet use raw bytes, no video/audio packet. char* payload; + + std::vector rtp_packets; private: class SrsSharedPtrPayload { @@ -339,6 +344,8 @@ public: // copy current shared ptr message, use ref-count. // @remark, assert object is created. virtual SrsSharedPtrMessage* copy(); +public: + virtual void set_rtp_packets(const std::vector& pkts); }; // Transmux RTMP packets to FLV stream. diff --git a/trunk/src/kernel/srs_kernel_rtp.cpp b/trunk/src/kernel/srs_kernel_rtp.cpp new file mode 100644 index 000000000..516ae89ff --- /dev/null +++ b/trunk/src/kernel/srs_kernel_rtp.cpp @@ -0,0 +1,112 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +using namespace std; + +#include +#include +#include +#include + +SrsRtpSharedPacket::SrsRtpSharedPacketPayload::SrsRtpSharedPacketPayload() +{ + payload = NULL; + size = 0; + shared_count = 0; +} + +SrsRtpSharedPacket::SrsRtpSharedPacketPayload::~SrsRtpSharedPacketPayload() +{ + srs_freepa(payload); +} + +SrsRtpSharedPacket::SrsRtpSharedPacket() +{ + payload_ptr = NULL; + + payload = NULL; + size = 0; + + timestamp = -1; + sequence = 0; + ssrc = 0; + payload_type = 0; +} + +SrsRtpSharedPacket::~SrsRtpSharedPacket() +{ + if (payload_ptr) { + if (payload_ptr->shared_count == 0) { + srs_freep(payload_ptr); + } else { + --payload_ptr->shared_count; + } + } +} + +srs_error_t SrsRtpSharedPacket::create(int64_t t, uint16_t seq, uint32_t sc, uint16_t pt, char* p, int s) +{ + srs_error_t err = srs_success; + + if (size < 0) { + return srs_error_new(ERROR_RTP_PACKET_CREATE, "create packet size=%d", size); + } + + srs_assert(!payload_ptr); + + timestamp = t; + sequence = seq; + ssrc = sc; + payload_type = pt; + + payload_ptr = new SrsRtpSharedPacketPayload(); + payload_ptr->payload = p; + payload_ptr->size = s; + + payload = payload_ptr->payload; + size = payload_ptr->size; + + return err; +} + +SrsRtpSharedPacket* SrsRtpSharedPacket::copy() +{ + SrsRtpSharedPacket* copy = new SrsRtpSharedPacket(); + + copy->payload_ptr = payload_ptr; + payload_ptr->shared_count++; + + copy->payload = payload; + copy->size = size; + + copy->timestamp = timestamp; + copy->sequence = sequence; + copy->ssrc = ssrc; + copy->payload_type = payload_type; + + return copy; +} diff --git a/trunk/src/kernel/srs_kernel_rtp.hpp b/trunk/src/kernel/srs_kernel_rtp.hpp new file mode 100644 index 000000000..b348946ec --- /dev/null +++ b/trunk/src/kernel/srs_kernel_rtp.hpp @@ -0,0 +1,62 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_KERNEL_RTP_HPP +#define SRS_KERNEL_RTP_HPP + +#include + +#include + +class SrsRtpSharedPacket +{ +private: + class SrsRtpSharedPacketPayload + { + public: + char* payload; + int size; + int shared_count; + public: + SrsRtpSharedPacketPayload(); + virtual ~SrsRtpSharedPacketPayload(); + }; +private: + SrsRtpSharedPacketPayload* payload_ptr; +public: + char* payload; + int size; +public: + int64_t timestamp; + uint16_t sequence; + uint32_t ssrc; + uint16_t payload_type; +public: + SrsRtpSharedPacket(); + virtual ~SrsRtpSharedPacket(); +public: + srs_error_t create(int64_t t, uint16_t seq, uint32_t sc, uint16_t pt, char* p, int s); + SrsRtpSharedPacket* copy(); +}; + +#endif diff --git a/trunk/src/protocol/srs_stun_stack.cpp b/trunk/src/protocol/srs_stun_stack.cpp new file mode 100644 index 000000000..a057715d6 --- /dev/null +++ b/trunk/src/protocol/srs_stun_stack.cpp @@ -0,0 +1,273 @@ +#include + +using namespace std; + +#include +#include +#include +#include + +#include +#include +#include +#include + +static string dump_string_hex(const std::string& str, const int& max_len = 128) +{ + char buf[1024*16]; + int len = 0; + + for (int i = 0; i < str.size() && i < max_len; ++i) { + int nb = snprintf(buf + len, sizeof(buf) - len - 1, "%02X ", (uint8_t)str[i]); + if (nb <= 0) + break; + + len += nb; + } + buf[len] = '\0'; + + return string(buf, len); +} + +static srs_error_t hmac_encode(const std::string& algo, const char* key, const int& key_length, + const char* input, const int input_length, char* output, unsigned int& output_length) +{ + srs_error_t err = srs_success; + + const EVP_MD* engine = NULL; + if (algo == "sha512") { + engine = EVP_sha512(); + } else if(algo == "sha256") { + engine = EVP_sha256(); + } else if(algo == "sha1") { + engine = EVP_sha1(); + } else if(algo == "md5") { + engine = EVP_md5(); + } else if(algo == "sha224") { + engine = EVP_sha224(); + } else if(algo == "sha384") { + engine = EVP_sha384(); + } else { + return srs_error_wrap(err, "unknown algo=%s", algo.c_str()); + } + + HMAC_CTX* ctx = HMAC_CTX_new(); + if (HMAC_Init_ex(ctx, key, key_length, engine, NULL) < 0) { + HMAC_CTX_free(ctx); + return srs_error_wrap(err, "hmac init faied"); + } + + if (HMAC_Update(ctx, (const unsigned char*)input, input_length) < 0) { + HMAC_CTX_free(ctx); + return srs_error_wrap(err, "hmac update faied"); + } + + if (HMAC_Final(ctx, (unsigned char*)output, &output_length) < 0) { + HMAC_CTX_free(ctx); + return srs_error_wrap(err, "hmac final faied"); + } + + HMAC_CTX_free(ctx); + + return err; +} + + +SrsStunPacket::SrsStunPacket() +{ + message_type = 0; + local_ufrag = ""; + remote_ufrag = ""; +} + +SrsStunPacket::~SrsStunPacket() +{ +} + +srs_error_t SrsStunPacket::decode(const char* buf, const int nb_buf) +{ + srs_error_t err = srs_success; + + SrsBuffer* stream = new SrsBuffer(const_cast(buf), nb_buf); + SrsAutoFree(SrsBuffer, stream); + + if (stream->left() < 20) { + return srs_error_wrap(err, "invalid stun packet, size=%d", stream->size()); + } + + srs_trace("stun packet, nb_buf=%d", nb_buf); + + message_type = stream->read_2bytes(); + uint16_t message_len = stream->read_2bytes(); + string magic_cookie = stream->read_string(4); + transcation_id = stream->read_string(12); + + srs_trace("message_type=%u, message_len=%u, magic_cookie=%s, transcation_id=%s", + message_type, message_len, magic_cookie.c_str(), transcation_id.c_str()); + + if (nb_buf != 20 + message_len) { + return srs_error_wrap(err, "invalid stun packet, message_len=%d, nb_buf=%d", message_len, nb_buf); + } + + while (stream->left() >= 4) { + uint16_t type = stream->read_2bytes(); + uint16_t len = stream->read_2bytes(); + + srs_trace("type=%u, len=%u", type, len); + + if (stream->left() < len) { + return srs_error_wrap(err, "invalid stun packet"); + } + + string val = stream->read_string(len); + // padding + if (len % 4 != 0) { + stream->read_string(4 - (len % 4)); + } + //srs_trace("val=%s", val.c_str()); + + switch (type) { + // FIXME: enum + case 6: { + username = val; + size_t p = val.find(":"); + if (p != string::npos) { + local_ufrag = val.substr(0, p); + remote_ufrag = val.substr(p + 1); + srs_trace("stun packet local_ufrag=%s, remote_ufrag=%s", local_ufrag.c_str(), remote_ufrag.c_str()); + } + break; + } + + default: { + break; + } + } + } + + return err; +} + +srs_error_t SrsStunPacket::encode(const string& pwd, SrsBuffer* stream) +{ + srs_error_t err = srs_success; + if (is_binding_response()) { + return encode_binding_response(pwd, stream); + } + + return srs_error_wrap(err, "unknown stun type=%d", get_message_type()); +} + +// FIXME: make this function easy to read +srs_error_t SrsStunPacket::encode_binding_response(const string& pwd, SrsBuffer* stream) +{ + srs_error_t err = srs_success; + + string property_username = encode_username(); + string mapped_address = encode_mapped_address(); + + stream->write_2bytes(BindingResponse); + stream->write_2bytes(property_username.size() + mapped_address.size()); + stream->write_4bytes(0x2112A442); + stream->write_string(transcation_id); + stream->write_string(property_username); + stream->write_string(mapped_address); + + stream->data()[2] = ((stream->pos() - 20 + 20 + 4) & 0x0000FF00) >> 8; + stream->data()[3] = ((stream->pos() - 20 + 20 + 4) & 0x000000FF); + + char hmac_buf[20] = {0}; + unsigned int hmac_buf_len = 0; + if (hmac_encode("sha1", pwd.c_str(), pwd.size(), stream->data(), stream->pos(), hmac_buf, hmac_buf_len) != srs_success) { + return srs_error_wrap(err, "hmac encode failed"); + } + + string hmac = encode_hmac(hmac_buf, hmac_buf_len); + + stream->write_string(hmac); + stream->data()[2] = ((stream->pos() - 20 + 8) & 0x0000FF00) >> 8; + stream->data()[3] = ((stream->pos() - 20 + 8) & 0x000000FF); + + uint32_t crc32 = srs_crc32_ieee(stream->data(), stream->pos(), 0) ^ 0x5354554E; + + string fingerprint = encode_fingerprint(crc32); + + stream->write_string(fingerprint); + + stream->data()[2] = ((stream->pos() - 20) & 0x0000FF00) >> 8; + stream->data()[3] = ((stream->pos() - 20) & 0x000000FF); + + return err; +} + +string SrsStunPacket::encode_username() +{ + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + string username = remote_ufrag + ":" + local_ufrag; + + stream->write_2bytes(Username); + stream->write_2bytes(username.size()); + stream->write_string(username); + + if (stream->pos() % 4 != 0) { + static char padding[4] = {0}; + stream->write_bytes(padding, 4 - (stream->pos() % 4)); + } + + return string(stream->data(), stream->pos()); +} + +string SrsStunPacket::encode_mapped_address() +{ + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + uint32_t magic_cookie = 0x2112A442; +#if 1 + stream->write_2bytes(XorMappedAddress); + stream->write_2bytes(8); + stream->write_1bytes(0); // ignore this bytes + stream->write_1bytes(1); // ipv4 family + stream->write_2bytes(mapped_port ^ (magic_cookie >> 16)); + stream->write_4bytes(mapped_address ^ magic_cookie); +#else + stream->write_2bytes(MappedAddress); + stream->write_2bytes(8); + stream->write_1bytes(0); // ignore this bytes + stream->write_1bytes(1); // ipv4 family + stream->write_2bytes(mapped_port); + stream->write_4bytes(mapped_address); +#endif + + return string(stream->data(), stream->pos()); +} + +string SrsStunPacket::encode_hmac(char* hmac_buf, const int hmac_buf_len) +{ + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stream->write_2bytes(MessageIntegrity); + stream->write_2bytes(hmac_buf_len); + stream->write_bytes(hmac_buf, hmac_buf_len); + + return string(stream->data(), stream->pos()); +} + +string SrsStunPacket::encode_fingerprint(uint32_t crc32) +{ + char buf[1460]; + SrsBuffer* stream = new SrsBuffer(buf, sizeof(buf)); + SrsAutoFree(SrsBuffer, stream); + + stream->write_2bytes(Fingerprint); + stream->write_2bytes(4); + stream->write_4bytes(crc32); + + return string(stream->data(), stream->pos()); +} diff --git a/trunk/src/protocol/srs_stun_stack.hpp b/trunk/src/protocol/srs_stun_stack.hpp new file mode 100644 index 000000000..aa2bc2192 --- /dev/null +++ b/trunk/src/protocol/srs_stun_stack.hpp @@ -0,0 +1,111 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_PROTOCOL_STUN_HPP +#define SRS_PROTOCOL_STUN_HPP + +#include + +#include +#include + +class SrsBuffer; + +enum SrsStunMessageType +{ + // see @ https://tools.ietf.org/html/rfc3489#section-11.1 + BindingRequest = 0x0001, + BindingResponse = 0x0101, + BindingErrorResponse = 0x0111, + SharedSecretRequest = 0x0002, + SharedSecretResponse = 0x0102, + SharedSecretErrorResponse = 0x0112, +}; + +enum SrsStunMessageAttribute +{ + // see @ https://tools.ietf.org/html/rfc3489#section-11.2 + MappedAddress = 0x0001, + ResponseAddress = 0x0002, + ChangeRequest = 0x0003, + SourceAddress = 0x0004, + ChangedAddress = 0x0005, + Username = 0x0006, + Password = 0x0007, + MessageIntegrity = 0x0008, + ErrorCode = 0x0009, + UnknownAttributes = 0x000A, + ReflectedFrom = 0x000B, + + // see @ https://tools.ietf.org/html/rfc5389#section-18.2 + Realm = 0x0014, + Nonce = 0x0015, + XorMappedAddress = 0x0020, + Software = 0x8022, + AlternateServer = 0x8023, + Fingerprint = 0x8028, +}; + +class SrsStunPacket +{ +private: + uint16_t message_type; + std::string username; + std::string local_ufrag; + std::string remote_ufrag; + std::string transcation_id; + uint32_t mapped_address; + uint16_t mapped_port; +public: + SrsStunPacket(); + virtual ~SrsStunPacket(); + + bool is_binding_request() const { return message_type == BindingRequest; } + bool is_binding_response() const { return message_type == BindingResponse; } + + uint16_t get_message_type() const { return message_type; } + std::string get_username() const { return username; } + std::string get_local_ufrag() const { return local_ufrag; } + std::string get_remote_ufrag() const { return remote_ufrag; } + std::string get_transcation_id() const { return transcation_id; } + uint32_t get_mapped_address() const { return mapped_address; } + uint16_t get_mapped_port() const { return mapped_port; } + + void set_message_type(const uint16_t& m) { message_type = m; } + void set_local_ufrag(const std::string& u) { local_ufrag = u; } + void set_remote_ufrag(const std::string& u) { remote_ufrag = u; } + void set_transcation_id(const std::string& t) { transcation_id = t; } + void set_mapped_address(const uint32_t& addr) { mapped_address = addr; } + void set_mapped_port(const uint32_t& port) { mapped_port = port; } + + srs_error_t decode(const char* buf, const int nb_buf); + srs_error_t encode(const std::string& pwd, SrsBuffer* stream); +private: + srs_error_t encode_binding_response(const std::string& pwd, SrsBuffer* stream); + std::string encode_username(); + std::string encode_mapped_address(); + std::string encode_hmac(char* hamc_buf, const int hmac_buf_len); + std::string encode_fingerprint(uint32_t crc32); +}; + +#endif diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index f63cd4279..947150f61 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -397,6 +397,16 @@ int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, in return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout); } +int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, int tolen, srs_utime_t timeout) +{ + return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout); +} + +int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout) +{ + return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout); +} + srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) { return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 510b9ba8a..947950e85 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -88,6 +88,8 @@ extern srs_netfd_t srs_netfd_open_socket(int osfd); extern srs_netfd_t srs_netfd_open(int osfd); extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); +extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout); +extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout); extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);