@ -22,6 +22,10 @@
# include <srs_app_pithy_print.hpp>
# include <srs_app_rtmp_conn.hpp>
# include <srs_protocol_raw_avc.hpp>
# include <srs_app_server.hpp>
# include <srs_protocol_json.hpp>
# include <srs_app_http_api.hpp>
# include <srs_app_statistic.hpp>
# include <sstream>
using namespace std ;
@ -420,12 +424,12 @@ srs_error_t SrsGbListener::initialize(SrsConfDirective* conf)
bool sip_enabled = _srs_config - > get_stream_caster_sip_enable ( conf ) ;
if ( ! sip_enabled ) {
return srs_error_new ( ERROR_GB_CONFIG , " GB SIP is required " ) ;
srs_warn ( " GB SIP is disabled. " ) ;
} else {
int port = _srs_config - > get_stream_caster_sip_listen ( conf ) ;
sip_listener_ - > set_endpoint ( ip , port ) - > set_label ( " SIP-TCP " ) ;
}
int port = _srs_config - > get_stream_caster_sip_listen ( conf ) ;
sip_listener_ - > set_endpoint ( ip , port ) - > set_label ( " SIP-TCP " ) ;
return err ;
}
@ -441,6 +445,24 @@ srs_error_t SrsGbListener::listen()
return srs_error_wrap ( err , " listen " ) ;
}
if ( ( err = listen_api ( ) ) ! = srs_success ) {
return srs_error_wrap ( err , " listen api " ) ;
}
return err ;
}
srs_error_t SrsGbListener : : listen_api ( )
{
srs_error_t err = srs_success ;
// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux * http_api_mux = _srs_hybrid - > srs ( ) - > instance ( ) - > api_server ( ) ;
if ( ( err = http_api_mux - > handle ( " /gb/v1/publish/ " , new SrsGoApiGbPublish ( conf_ ) ) ) ! = srs_success ) {
return srs_error_wrap ( err , " handle publish " ) ;
}
return err ;
}
@ -549,11 +571,16 @@ std::string SrsGbSipTcpConn::device_id()
return register_ - > device_id ( ) ;
}
void SrsGbSipTcpConn : : set_device_id ( const std : : string & id )
{
register_ - > from_address_user_ = id ;
}
void SrsGbSipTcpConn : : set_cid ( const SrsContextId & cid )
{
if ( owner_cid_ ) owner_cid_ - > set_cid ( cid ) ;
receiver_ - > set_cid ( cid ) ;
sender_ - > set_cid ( cid ) ;
if ( receiver_ ) receiver_ - > set_cid ( cid ) ;
if ( sender_ ) sender_ - > set_cid ( cid ) ;
cid_ = cid ;
}
@ -2681,5 +2708,117 @@ void srs_sip_parse_address(const std::string& address, std::string& user, std::s
}
}
SrsGoApiGbPublish : : SrsGoApiGbPublish ( SrsConfDirective * conf )
{
conf_ = conf - > copy ( ) ;
}
SrsGoApiGbPublish : : ~ SrsGoApiGbPublish ( )
{
srs_freep ( conf_ ) ;
}
srs_error_t SrsGoApiGbPublish : : serve_http ( ISrsHttpResponseWriter * w , ISrsHttpMessage * r )
{
srs_error_t err = srs_success ;
SrsUniquePtr < SrsJsonObject > res ( SrsJsonAny : : object ( ) ) ;
if ( ( err = do_serve_http ( w , r , res . get ( ) ) ) ! = srs_success ) {
srs_warn ( " GB error %s " , srs_error_desc ( err ) . c_str ( ) ) ;
res - > set ( " code " , SrsJsonAny : : integer ( srs_error_code ( err ) ) ) ;
res - > set ( " desc " , SrsJsonAny : : str ( srs_error_code_str ( err ) . c_str ( ) ) ) ;
srs_freep ( err ) ;
return srs_api_response ( w , r , res - > dumps ( ) ) ;
}
return srs_api_response ( w , r , res - > dumps ( ) ) ;
}
srs_error_t SrsGoApiGbPublish : : do_serve_http ( ISrsHttpResponseWriter * w , ISrsHttpMessage * r , SrsJsonObject * res )
{
srs_error_t err = srs_success ;
// For each GB session, we use short-term HTTP connection.
w - > header ( ) - > set ( " Connection " , " Close " ) ;
// Parse req, the request json object, from body.
SrsSharedPtr < SrsJsonObject > req ;
if ( true ) {
string req_json ;
if ( ( err = r - > body_read_all ( req_json ) ) ! = srs_success ) {
return srs_error_wrap ( err , " read body " ) ;
}
SrsJsonAny * json = SrsJsonAny : : loads ( req_json ) ;
if ( ! json | | ! json - > is_object ( ) ) {
srs_freep ( json ) ;
return srs_error_new ( ERROR_HTTP_DATA_INVALID , " invalid body %s " , req_json . c_str ( ) ) ;
}
req = SrsSharedPtr < SrsJsonObject > ( json - > to_object ( ) ) ;
}
// Fetch params from req object.
SrsJsonAny * prop = NULL ;
if ( ( prop = req - > ensure_property_string ( " id " ) ) = = NULL ) {
return srs_error_new ( ERROR_HTTP_DATA_INVALID , " id required " ) ;
}
string id = prop - > to_str ( ) ;
if ( ( prop = req - > ensure_property_string ( " ssrc " ) ) = = NULL ) {
return srs_error_new ( ERROR_HTTP_DATA_INVALID , " ssrc required " ) ;
}
uint64_t ssrc = atoi ( prop - > to_str ( ) . c_str ( ) ) ;
if ( ( err = bind_session ( id , ssrc ) ) ! = srs_success ) {
return srs_error_wrap ( err , " bind session " ) ;
}
res - > set ( " code " , SrsJsonAny : : integer ( ERROR_SUCCESS ) ) ;
int port = _srs_config - > get_stream_caster_listen ( conf_ ) ;
res - > set ( " port " , SrsJsonAny : : integer ( port ) ) ;
res - > set ( " is_tcp " , SrsJsonAny : : boolean ( true ) ) ; // only tcp supported
srs_trace ( " GB publish id: %s, ssrc=%lu " , id . c_str ( ) , ssrc ) ;
return err ;
}
srs_error_t SrsGoApiGbPublish : : bind_session ( std : : string id , uint64_t ssrc )
{
srs_error_t err = srs_success ;
SrsSharedResource < SrsGbSession > * session = NULL ;
session = dynamic_cast < SrsSharedResource < SrsGbSession > * > ( _srs_gb_manager - > find_by_id ( id ) ) ;
if ( session ) {
return srs_error_new ( ERROR_SYSTEM_STREAM_BUSY , " stream already exists " ) ;
}
session = dynamic_cast < SrsSharedResource < SrsGbSession > * > ( _srs_gb_manager - > find_by_fast_id ( ssrc ) ) ;
if ( session ) {
return srs_error_new ( ERROR_SYSTEM_STREAM_BUSY , " ssrc already exists " ) ;
}
// Create new GB session.
SrsGbSession * raw_session = new SrsGbSession ( ) ;
raw_session - > setup ( conf_ ) ;
session = new SrsSharedResource < SrsGbSession > ( raw_session ) ;
_srs_gb_manager - > add_with_id ( id , session ) ;
_srs_gb_manager - > add_with_fast_id ( ssrc , session ) ;
SrsExecutorCoroutine * executor = new SrsExecutorCoroutine ( _srs_gb_manager , session , raw_session , raw_session ) ;
raw_session - > setup_owner ( session , executor , executor ) ;
raw_session - > sip_transport ( ) - > set_device_id ( id ) ;
if ( ( err = executor - > start ( ) ) ! = srs_success ) {
srs_freep ( executor ) ;
return srs_error_wrap ( err , " gb session " ) ;
}
return err ;
}
SrsResourceManager * _srs_gb_manager = NULL ;