diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index c9215790d..d5ed5ff82 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -50,6 +50,123 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // origin array whos data takes the same form as LengthValueBytes #define RTMP_AMF0_OriginStrictArray 0x20 +// User defined +#define RTMP_AMF0_Invalid 0x3F + +SrsAmf0Any::SrsAmf0Any() +{ + marker = RTMP_AMF0_Invalid; +} + +SrsAmf0Any::~SrsAmf0Any() +{ +} + +bool SrsAmf0Any::is_string() +{ + return marker == RTMP_AMF0_String; +} + +bool SrsAmf0Any::is_boolean() +{ + return marker == RTMP_AMF0_Boolean; +} + +bool SrsAmf0Any::is_number() +{ + return marker == RTMP_AMF0_Number; +} + +bool SrsAmf0Any::is_object() +{ + return marker == RTMP_AMF0_Object; +} + +bool SrsAmf0Any::is_object_eof() +{ + return marker == RTMP_AMF0_ObjectEnd; +} + +SrsAmf0String::SrsAmf0String() +{ + marker = RTMP_AMF0_String; +} + +SrsAmf0String::~SrsAmf0String() +{ +} + +SrsAmf0Boolean::SrsAmf0Boolean() +{ + marker = RTMP_AMF0_Boolean; + value = false; +} + +SrsAmf0Boolean::~SrsAmf0Boolean() +{ +} + +SrsAmf0Number::SrsAmf0Number() +{ + marker = RTMP_AMF0_Number; + value = 0; +} + +SrsAmf0Number::~SrsAmf0Number() +{ + marker = RTMP_AMF0_ObjectEnd; +} + +SrsAmf0ObjectEOF::SrsAmf0ObjectEOF() +{ + utf8_empty = 0x00; +} + +SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF() +{ +} + +SrsAmf0Object::SrsAmf0Object() +{ + marker = RTMP_AMF0_Object; +} + +SrsAmf0Object::~SrsAmf0Object() +{ + std::map::iterator it; + for (it = properties.begin(); it != properties.end(); ++it) { + SrsAmf0Any* any = it->second; + delete any; + } + properties.clear(); +} + +SrsAmf0Any* SrsAmf0Object::get_property(std::string name) +{ + std::map::iterator it; + + if ((it = properties.find(name)) == properties.end()) { + return NULL; + } + + return it->second; +} + +SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) +{ + SrsAmf0Any* prop = get_property(name); + + if (!prop) { + return NULL; + } + + if (!prop->is_string()) { + return NULL; + } + + return prop; +} + int srs_amf0_read_object_eof(SrsStream* stream, SrsAmf0ObjectEOF*&); int srs_amf0_read_utf8(SrsStream* stream, std::string& value) @@ -254,10 +371,12 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) value = p; return ret; } - default: - value = new SrsAmf0Any(); - value->marker = stream->read_char(); + case RTMP_AMF0_Invalid: + default: { + ret = ERROR_RTMP_AMF0_INVALID; + srs_error("invalid amf0 message type. marker=%#x, ret=%d", marker, ret); return ret; + } } return ret; @@ -343,91 +462,3 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value) return ret; } - -SrsAmf0Any::SrsAmf0Any() -{ - marker = RTMP_AMF0_Null; -} - -SrsAmf0Any::~SrsAmf0Any() -{ -} - -bool SrsAmf0Any::is_string() -{ - return marker == RTMP_AMF0_String; -} - -bool SrsAmf0Any::is_boolean() -{ - return marker == RTMP_AMF0_Boolean; -} - -bool SrsAmf0Any::is_number() -{ - return marker == RTMP_AMF0_Number; -} - -bool SrsAmf0Any::is_object() -{ - return marker == RTMP_AMF0_Object; -} - -bool SrsAmf0Any::is_object_eof() -{ - return marker == RTMP_AMF0_ObjectEnd; -} - -SrsAmf0String::SrsAmf0String() -{ - marker = RTMP_AMF0_String; -} - -SrsAmf0String::~SrsAmf0String() -{ -} - -SrsAmf0Boolean::SrsAmf0Boolean() -{ - marker = RTMP_AMF0_Boolean; - value = false; -} - -SrsAmf0Boolean::~SrsAmf0Boolean() -{ -} - -SrsAmf0Number::SrsAmf0Number() -{ - marker = RTMP_AMF0_Number; - value = 0; -} - -SrsAmf0Number::~SrsAmf0Number() -{ - marker = RTMP_AMF0_ObjectEnd; -} - -SrsAmf0ObjectEOF::SrsAmf0ObjectEOF() -{ - utf8_empty = 0x00; -} - -SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF() -{ -} - -SrsAmf0Object::SrsAmf0Object() -{ - marker = RTMP_AMF0_Object; -} - -SrsAmf0Object::~SrsAmf0Object() -{ - std::map::iterator it; - for (it = properties.begin(); it != properties.end(); ++it) { - SrsAmf0Any* any = it->second; - delete any; - } - properties.clear(); -} diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 47887994c..1fa9db737 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -36,46 +36,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsStream; class SrsAmf0Object; -/** -* read amf0 utf8 string from stream. -* 1.3.1 Strings and UTF-8 -* UTF-8 = U16 *(UTF8-char) -* UTF8-char = UTF8-1 | UTF8-2 | UTF8-3 | UTF8-4 -* UTF8-1 = %x00-7F -* @remark only support UTF8-1 char. -*/ -extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value); - -/** -* read amf0 string from stream. -* 2.4 String Type -* string-type = string-marker UTF-8 -*/ -extern int srs_amf0_read_string(SrsStream* stream, std::string& value); - -/** -* read amf0 boolean from stream. -* 2.4 String Type -* boolean-type = boolean-marker U8 -* 0 is false, <> 0 is true -*/ -extern int srs_amf0_read_boolean(SrsStream* stream, bool& value); - -/** -* read amf0 number from stream. -* 2.2 Number Type -* number-type = number-marker DOUBLE -*/ -extern int srs_amf0_read_number(SrsStream* stream, double& value); - -/** -* read amf0 object from stream. -* 2.5 Object Type -* anonymous-object-type = object-marker *(object-property) -* object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker) -*/ -extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value); - /** * any amf0 value. * 2.1 Types Overview @@ -166,7 +126,50 @@ struct SrsAmf0Object : public SrsAmf0Any SrsAmf0Object(); virtual ~SrsAmf0Object(); + + virtual SrsAmf0Any* get_property(std::string name); + virtual SrsAmf0Any* ensure_property_string(std::string name); }; + +/** +* read amf0 utf8 string from stream. +* 1.3.1 Strings and UTF-8 +* UTF-8 = U16 *(UTF8-char) +* UTF8-char = UTF8-1 | UTF8-2 | UTF8-3 | UTF8-4 +* UTF8-1 = %x00-7F +* @remark only support UTF8-1 char. +*/ +extern int srs_amf0_read_utf8(SrsStream* stream, std::string& value); + +/** +* read amf0 string from stream. +* 2.4 String Type +* string-type = string-marker UTF-8 +*/ +extern int srs_amf0_read_string(SrsStream* stream, std::string& value); + +/** +* read amf0 boolean from stream. +* 2.4 String Type +* boolean-type = boolean-marker U8 +* 0 is false, <> 0 is true +*/ +extern int srs_amf0_read_boolean(SrsStream* stream, bool& value); + +/** +* read amf0 number from stream. +* 2.2 Number Type +* number-type = number-marker DOUBLE +*/ +extern int srs_amf0_read_number(SrsStream* stream, double& value); + +/** +* read amf0 object from stream. +* 2.5 Object Type +* anonymous-object-type = object-marker *(object-property) +* object-property = (UTF-8 value-type) | (UTF-8-empty object-end-marker) +*/ +extern int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value); /** * convert the any to specified object. diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index a8624b2ed..450632574 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -33,6 +33,7 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { ip = NULL; + req = new SrsRequest(); rtmp = new SrsRtmp(client_stfd); } @@ -43,6 +44,11 @@ SrsClient::~SrsClient() ip = NULL; } + if (req) { + delete req; + req = NULL; + } + if (rtmp) { delete rtmp; rtmp = NULL; @@ -65,12 +71,13 @@ int SrsClient::do_cycle() } srs_verbose("rtmp handshake success"); - SrsApp* app = NULL; - if ((ret = rtmp->connect_app(&app)) != ERROR_SUCCESS) { + if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { srs_warn("rtmp connect vhost/app failed. ret=%d", ret); return ret; } - srs_verbose("rtmp connect vhost/app success"); + srs_info("rtmp connect success. tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s", + req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), + req->app.c_str()); return ret; } diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 279692234..5f030fda9 100755 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -33,6 +33,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsRtmp; +class SrsRequest; /** * the client provides the main logic control for RTMP clients. @@ -41,6 +42,7 @@ class SrsClient : public SrsConnection { private: char* ip; + SrsRequest* req; SrsRtmp* rtmp; public: SrsClient(SrsServer* srs_server, st_netfd_t client_stfd); diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 991ef40df..1fb0dd225 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -53,6 +53,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_CHUNK_START 301 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302 #define ERROR_RTMP_AMF0_DECODE 303 +#define ERROR_RTMP_AMF0_INVALID 304 +#define ERROR_RTMP_REQ_CONNECT 305 #define ERROR_SYSTEM_STREAM_INIT 400 diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index c521e60c2..5a79e4049 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -194,190 +194,6 @@ messages. */ #define RTMP_AMF0_COMMAND_CONNECT "connect" -SrsMessageHeader::SrsMessageHeader() -{ - message_type = 0; - payload_length = 0; - timestamp = 0; - stream_id = 0; -} - -SrsMessageHeader::~SrsMessageHeader() -{ -} - -SrsChunkStream::SrsChunkStream(int _cid) -{ - fmt = 0; - cid = _cid; - extended_timestamp = false; - msg = NULL; -} - -SrsChunkStream::~SrsChunkStream() -{ - if (msg) { - delete msg; - msg = NULL; - } -} - -SrsMessage::SrsMessage() -{ - size = 0; - stream = NULL; - payload = NULL; - decoded_payload = NULL; -} - -SrsMessage::~SrsMessage() -{ - if (payload) { - delete[] payload; - payload = NULL; - } - - if (decoded_payload) { - delete decoded_payload; - decoded_payload = NULL; - } - - if (stream) { - delete stream; - stream = NULL; - } -} - -SrsPacket* SrsMessage::get_packet() -{ - if (!decoded_payload) { - srs_error("the payload is raw/undecoded, invoke decode_packet to decode it."); - } - srs_assert(decoded_payload != NULL); - - return decoded_payload; -} - -int SrsMessage::decode_packet() -{ - int ret = ERROR_SUCCESS; - - srs_assert(payload != NULL); - srs_assert(size > 0); - - if (!stream) { - srs_verbose("create decode stream for message."); - stream = new SrsStream(); - } - - if (header.message_type == RTMP_MSG_AMF0CommandMessage) { - srs_verbose("start to decode AMF0 command message."); - - // amf0 command message. - // need to read the command name. - if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { - srs_error("initialize stream failed. ret=%d", ret); - return ret; - } - srs_verbose("decode stream initialized success"); - - std::string command; - if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { - srs_error("decode AMF0 command name failed. ret=%d", ret); - return ret; - } - srs_verbose("AMF0 command message, command_name=%s", command.c_str()); - - stream->reset(); - if (command == RTMP_AMF0_COMMAND_CONNECT) { - srs_info("decode the AMF0 command(connect vhost/app message)."); - decoded_payload = new SrsConnectAppPacket(); - return decoded_payload->decode(stream); - } - - // default packet to drop message. - srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); - decoded_payload = new SrsPacket(); - return ret; - } - - // default packet to drop message. - srs_trace("drop the unknown message, type=%d", header.message_type); - decoded_payload = new SrsPacket(); - - return ret; -} - -SrsPacket::SrsPacket() -{ -} - -SrsPacket::~SrsPacket() -{ -} - -int SrsPacket::decode(SrsStream* /*stream*/) -{ - int ret = ERROR_SUCCESS; - return ret; -} - -SrsConnectAppPacket::SrsConnectAppPacket() -{ - command_name = RTMP_AMF0_COMMAND_CONNECT; - transaction_id = 1; - command_object = NULL; -} - -SrsConnectAppPacket::~SrsConnectAppPacket() -{ -} - -int SrsConnectAppPacket::decode(SrsStream* stream) -{ - int ret = ERROR_SUCCESS; - - if ((ret = super::decode(stream)) != ERROR_SUCCESS) { - return ret; - } - - if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { - srs_error("amf0 decode connect command_name failed. ret=%d", ret); - return ret; - } - if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CONNECT) { - ret = ERROR_RTMP_AMF0_DECODE; - srs_error("amf0 decode connect command_name failed. " - "command_name=%s, ret=%d", command_name.c_str(), ret); - return ret; - } - - if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { - srs_error("amf0 decode connect transaction_id failed. ret=%d", ret); - return ret; - } - if (transaction_id != 1.0) { - ret = ERROR_RTMP_AMF0_DECODE; - srs_error("amf0 decode connect transaction_id failed. " - "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret); - return ret; - } - - if ((ret = srs_amf0_read_object(stream, command_object)) != ERROR_SUCCESS) { - srs_error("amf0 decode connect command_object failed. ret=%d", ret); - return ret; - } - if (command_object == NULL) { - ret = ERROR_RTMP_AMF0_DECODE; - srs_error("amf0 decode connect command_object failed. ret=%d", ret); - return ret; - } - - srs_info("amf0 decode connect packet success"); - - return ret; -} - SrsProtocol::SrsProtocol(st_netfd_t client_stfd) { stfd = client_stfd; @@ -771,3 +587,187 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh return ret; } +SrsMessageHeader::SrsMessageHeader() +{ + message_type = 0; + payload_length = 0; + timestamp = 0; + stream_id = 0; +} + +SrsMessageHeader::~SrsMessageHeader() +{ +} + +SrsChunkStream::SrsChunkStream(int _cid) +{ + fmt = 0; + cid = _cid; + extended_timestamp = false; + msg = NULL; +} + +SrsChunkStream::~SrsChunkStream() +{ + if (msg) { + delete msg; + msg = NULL; + } +} + +SrsMessage::SrsMessage() +{ + size = 0; + stream = NULL; + payload = NULL; + decoded_payload = NULL; +} + +SrsMessage::~SrsMessage() +{ + if (payload) { + delete[] payload; + payload = NULL; + } + + if (decoded_payload) { + delete decoded_payload; + decoded_payload = NULL; + } + + if (stream) { + delete stream; + stream = NULL; + } +} + +SrsPacket* SrsMessage::get_packet() +{ + if (!decoded_payload) { + srs_error("the payload is raw/undecoded, invoke decode_packet to decode it."); + } + srs_assert(decoded_payload != NULL); + + return decoded_payload; +} + +int SrsMessage::decode_packet() +{ + int ret = ERROR_SUCCESS; + + srs_assert(payload != NULL); + srs_assert(size > 0); + + if (!stream) { + srs_verbose("create decode stream for message."); + stream = new SrsStream(); + } + + if (header.message_type == RTMP_MSG_AMF0CommandMessage) { + srs_verbose("start to decode AMF0 command message."); + + // amf0 command message. + // need to read the command name. + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { + srs_error("initialize stream failed. ret=%d", ret); + return ret; + } + srs_verbose("decode stream initialized success"); + + std::string command; + if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { + srs_error("decode AMF0 command name failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0 command message, command_name=%s", command.c_str()); + + stream->reset(); + if (command == RTMP_AMF0_COMMAND_CONNECT) { + srs_info("decode the AMF0 command(connect vhost/app message)."); + decoded_payload = new SrsConnectAppPacket(); + return decoded_payload->decode(stream); + } + + // default packet to drop message. + srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); + decoded_payload = new SrsPacket(); + return ret; + } + + // default packet to drop message. + srs_trace("drop the unknown message, type=%d", header.message_type); + decoded_payload = new SrsPacket(); + + return ret; +} + +SrsPacket::SrsPacket() +{ +} + +SrsPacket::~SrsPacket() +{ +} + +int SrsPacket::decode(SrsStream* /*stream*/) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +SrsConnectAppPacket::SrsConnectAppPacket() +{ + command_name = RTMP_AMF0_COMMAND_CONNECT; + transaction_id = 1; + command_object = NULL; +} + +SrsConnectAppPacket::~SrsConnectAppPacket() +{ +} + +int SrsConnectAppPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = super::decode(stream)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CONNECT) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect transaction_id failed. ret=%d", ret); + return ret; + } + if (transaction_id != 1.0) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect transaction_id failed. " + "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret); + return ret; + } + + if ((ret = srs_amf0_read_object(stream, command_object)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect command_object failed. ret=%d", ret); + return ret; + } + if (command_object == NULL) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect command_object failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode connect packet success"); + + return ret; +} + diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 542e5b893..e01af95f6 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -46,6 +46,39 @@ class SrsMessage; class SrsChunkStream; class SrsAmf0Object; +/** +* the protocol provides the rtmp-message-protocol services, +* to recv RTMP message from RTMP chunk stream, +* and to send out RTMP message over RTMP chunk stream. +*/ +class SrsProtocol +{ +private: + std::map chunk_streams; + st_netfd_t stfd; + SrsBuffer* buffer; + SrsSocket* skt; + int32_t in_chunk_size; + int32_t out_chunk_size; +public: + SrsProtocol(st_netfd_t client_stfd); + virtual ~SrsProtocol(); +public: + /** + * recv a message with raw/undecoded payload from peer. + * the payload is not decoded, use srs_rtmp_expect_message if requires + * specifies message. + * @pmsg, user must free it. NULL if not success. + * @remark, only when success, user can use and must free the pmsg. + */ + virtual int recv_message(SrsMessage** pmsg); +private: + virtual int recv_interlaced_message(SrsMessage** pmsg); + virtual int read_basic_header(char& fmt, int& cid, int& size); + virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); +}; + /** * 4.1. Message Header */ @@ -162,7 +195,7 @@ class SrsConnectAppPacket : public SrsPacket { private: typedef SrsPacket super; -private: +public: std::string command_name; double transaction_id; SrsAmf0Object* command_object; @@ -174,80 +207,48 @@ public: }; /** -* the protocol provides the rtmp-message-protocol services, -* to recv RTMP message from RTMP chunk stream, -* and to send out RTMP message over RTMP chunk stream. +* expect a specified message, drop others util got specified one. +* @pmsg, user must free it. NULL if not success. +* @ppacket, store in the pmsg, user must never free it. NULL if not success. +* @remark, only when success, user can use and must free the pmsg/ppacket. */ -class SrsProtocol +template +int srs_rtmp_expect_message(SrsProtocol* protocol, SrsMessage** pmsg, T** ppacket) { -private: - std::map chunk_streams; - st_netfd_t stfd; - SrsBuffer* buffer; - SrsSocket* skt; - int32_t in_chunk_size; - int32_t out_chunk_size; -public: - SrsProtocol(st_netfd_t client_stfd); - virtual ~SrsProtocol(); -public: - /** - * recv a message with raw/undecoded payload from peer. - * the payload is not decoded, use expect_message if requires specifies message. - * @pmsg, user must free it. NULL if not success. - * @remark, only when success, user can use and must free the pmsg. - */ - virtual int recv_message(SrsMessage** pmsg); -public: - /** - * expect a specified message, drop others util got specified one. - * @pmsg, user must free it. NULL if not success. - * @ppacket, store in the pmsg, user must never free it. NULL if not success. - * @remark, only when success, user can use and must free the pmsg/ppacket. - */ - template - int expect_message(SrsMessage** pmsg, T** ppacket) - { - *pmsg = NULL; - *ppacket = NULL; + *pmsg = NULL; + *ppacket = NULL; + + int ret = ERROR_SUCCESS; + + while (true) { + SrsMessage* msg = NULL; + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv message failed. ret=%d", ret); + return ret; + } + srs_verbose("recv message success."); - int ret = ERROR_SUCCESS; + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + delete msg; + srs_error("decode message failed. ret=%d", ret); + return ret; + } - while (true) { - SrsMessage* msg = NULL; - if ((ret = recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv message failed. ret=%d", ret); - return ret; - } - srs_verbose("recv message success."); - - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { - delete msg; - srs_error("decode message failed. ret=%d", ret); - return ret; - } - - T* pkt = dynamic_cast(msg->get_packet()); - if (!pkt) { - delete msg; - srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).", - msg->header.message_type, msg->header.payload_length, - msg->header.timestamp, msg->header.stream_id); - continue; - } - - *pmsg = msg; - *ppacket = pkt; - break; + T* pkt = dynamic_cast(msg->get_packet()); + if (!pkt) { + delete msg; + srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).", + msg->header.message_type, msg->header.payload_length, + msg->header.timestamp, msg->header.stream_id); + continue; } - return ret; + *pmsg = msg; + *ppacket = pkt; + break; } -private: - virtual int recv_interlaced_message(SrsMessage** pmsg); - virtual int read_basic_header(char& fmt, int& cid, int& size); - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); -}; + + return ret; +} #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 14872a7ca..b06efb896 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -28,6 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include SrsRtmp::SrsRtmp(st_netfd_t client_stfd) { @@ -89,19 +90,28 @@ int SrsRtmp::handshake() return ret; } -int SrsRtmp::connect_app(SrsApp** papp) +int SrsRtmp::connect_app(SrsRequest* req) { int ret = ERROR_SUCCESS; SrsMessage* msg = NULL; SrsConnectAppPacket* pkt = NULL; - if ((ret = protocol->expect_message(&msg, &pkt)) != ERROR_SUCCESS) { + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { srs_error("expect connect app message failed. ret=%d", ret); return ret; } SrsAutoFree(SrsMessage, msg, false); srs_info("get connect app message"); + SrsAmf0Any* prop = NULL; + + if ((prop = pkt->command_object->ensure_property_string("tcUrl")) == NULL) { + ret = ERROR_RTMP_REQ_CONNECT; + srs_error("invalid request, must specifies the tcUrl. ret=%d", ret); + return ret; + } + req->tcUrl = srs_amf0_convert(prop)->value; + return ret; } diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index b0602e499..62ee71c55 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -36,10 +36,18 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsProtocol; -struct SrsApp +/** +* the original request from client. +*/ +struct SrsRequest { + std::string tcUrl; + + std::string schema; std::string vhost; + std::string port; std::string app; + std::string stream; }; /** @@ -57,7 +65,7 @@ public: virtual ~SrsRtmp(); public: virtual int handshake(); - virtual int connect_app(SrsApp** papp); + virtual int connect_app(SrsRequest* req); }; #endif \ No newline at end of file