From af5961432dc07fba6dd80a968056b734a8931e90 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 21 Oct 2013 22:42:36 +0800 Subject: [PATCH] support createStream and play, identity the play client --- trunk/src/core/srs_core_amf0.cpp | 70 ++++- trunk/src/core/srs_core_amf0.hpp | 21 ++ trunk/src/core/srs_core_client.cpp | 17 ++ trunk/src/core/srs_core_protocol.cpp | 373 ++++++++++++++++++++++++++- trunk/src/core/srs_core_protocol.hpp | 145 +++++++++++ trunk/src/core/srs_core_rtmp.cpp | 109 +++++++- trunk/src/core/srs_core_rtmp.hpp | 21 ++ 7 files changed, 740 insertions(+), 16 deletions(-) diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index 4dcb242de..7b53dead3 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -84,6 +84,11 @@ bool SrsAmf0Any::is_number() return marker == RTMP_AMF0_Number; } +bool SrsAmf0Any::is_null() +{ + return marker == RTMP_AMF0_Null; +} + bool SrsAmf0Any::is_object() { return marker == RTMP_AMF0_Object; @@ -131,6 +136,15 @@ SrsAmf0Number::~SrsAmf0Number() { } +SrsAmf0Null::SrsAmf0Null() +{ + marker = RTMP_AMF0_Null; +} + +SrsAmf0Null::~SrsAmf0Null() +{ +} + SrsAmf0ObjectEOF::SrsAmf0ObjectEOF() { marker = RTMP_AMF0_ObjectEnd; @@ -470,6 +484,45 @@ int srs_amf0_write_number(SrsStream* stream, double value) return ret; } +int srs_amf0_read_null(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 read null marker failed. ret=%d", ret); + return ret; + } + + char marker = stream->read_1bytes(); + if (marker != RTMP_AMF0_Null) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 check null marker failed. " + "marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_Null, ret); + return ret; + } + srs_verbose("amf0 read null success"); + + return ret; +} +int srs_amf0_write_null(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write null marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_Null); + srs_verbose("amf0 write null marker success"); + + return ret; +} + int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) { int ret = ERROR_SUCCESS; @@ -515,6 +568,10 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) srs_amf0_convert(value)->value = data; return ret; } + case RTMP_AMF0_Null: { + value = new SrsAmf0Null(); + return ret; + } case RTMP_AMF0_ObjectEnd: { SrsAmf0ObjectEOF* p = NULL; if ((ret = srs_amf0_read_object_eof(stream, p)) != ERROR_SUCCESS) { @@ -568,6 +625,9 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) double data = srs_amf0_convert(value)->value; return srs_amf0_write_number(stream, data); } + case RTMP_AMF0_Null: { + return srs_amf0_write_null(stream); + } case RTMP_AMF0_ObjectEnd: { SrsAmf0ObjectEOF* p = srs_amf0_convert(value); return srs_amf0_write_object_eof(stream, p); @@ -590,7 +650,6 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) return ret; } - int srs_amf0_get_any_size(SrsAmf0Any* value) { if (!value) { @@ -613,6 +672,10 @@ int srs_amf0_get_any_size(SrsAmf0Any* value) size += srs_amf0_get_number_size(); break; } + case RTMP_AMF0_Null: { + size += srs_amf0_get_null_size(); + break; + } case RTMP_AMF0_ObjectEnd: { size += srs_amf0_get_object_eof_size(); break; @@ -941,6 +1004,11 @@ int srs_amf0_get_number_size() return 1 + 8; } +int srs_amf0_get_null_size() +{ + return 1; +} + int srs_amf0_get_boolean_size() { return 1 + 1; diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 1d272e6be..61cd38104 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -54,6 +54,7 @@ struct SrsAmf0Any virtual bool is_string(); virtual bool is_boolean(); virtual bool is_number(); + virtual bool is_null(); virtual bool is_object(); virtual bool is_object_eof(); virtual bool is_ecma_array(); @@ -102,6 +103,17 @@ struct SrsAmf0Number : public SrsAmf0Any virtual ~SrsAmf0Number(); }; +/** +* read amf0 null from stream. +* 2.7 null Type +* null-type = null-marker +*/ +struct SrsAmf0Null : public SrsAmf0Any +{ + SrsAmf0Null(); + virtual ~SrsAmf0Null(); +}; + /** * 2.11 Object End Type * object-end-type = UTF-8-empty object-end-marker @@ -187,6 +199,14 @@ extern int srs_amf0_write_boolean(SrsStream* stream, bool value); extern int srs_amf0_read_number(SrsStream* stream, double& value); extern int srs_amf0_write_number(SrsStream* stream, double value); +/** +* read amf0 null from stream. +* 2.7 null Type +* null-type = null-marker +*/ +extern int srs_amf0_read_null(SrsStream* stream); +extern int srs_amf0_write_null(SrsStream* stream); + /** * read amf0 object from stream. * 2.5 Object Type @@ -212,6 +232,7 @@ extern int srs_amf0_write_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray* va extern int srs_amf0_get_utf8_size(std::string value); extern int srs_amf0_get_string_size(std::string value); extern int srs_amf0_get_number_size(); +extern int srs_amf0_get_null_size(); extern int srs_amf0_get_boolean_size(); extern int srs_amf0_get_object_size(SrsAmf0Object* obj); extern int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr); diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index aba988b71..4ebe76dfa 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -29,6 +29,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +// default stream id for response the createStream request. +#define SRS_DEFAULT_SID 1 + SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) { @@ -98,6 +101,20 @@ int SrsClient::do_cycle() return ret; } srs_verbose("response connect app success"); + + if ((ret = rtmp->on_bw_done()) != ERROR_SUCCESS) { + srs_error("on_bw_done failed. ret=%d", ret); + return ret; + } + srs_verbose("on_bw_done success"); + + SrsClientType type; + std::string stream_name; + if ((ret = rtmp->identify_client(SRS_DEFAULT_SID, type, stream_name)) != ERROR_SUCCESS) { + srs_error("identify client failed. ret=%d", ret); + return ret; + } + srs_verbose("identify client success. type=%d", type); return ret; } diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 6f546d93d..9a6b5c272 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -202,9 +202,13 @@ messages. ***************************************************************************** ****************************************************************************/ /** -* amf0 command message, command name: "connect" +* amf0 command message, command name macros */ #define RTMP_AMF0_COMMAND_CONNECT "connect" +#define RTMP_AMF0_COMMAND_CREATE_STREAM "createStream" +#define RTMP_AMF0_COMMAND_PLAY "play" +#define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" +#define RTMP_AMF0_COMMAND_RESULT "_result" /**************************************************************************** ***************************************************************************** @@ -585,11 +589,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz int ret = ERROR_SUCCESS; // when not exists cached msg, means get an new message, - // the fmt must be type0 which means new message. - if (!chunk->msg && fmt != RTMP_FMT_TYPE0) { + // the fmt must be type0/type1 which means new message. + if (!chunk->msg && fmt != RTMP_FMT_TYPE0 && fmt != RTMP_FMT_TYPE1) { ret = ERROR_RTMP_CHUNK_START; srs_error("chunk stream start, " - "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); + "fmt must be %d or %d, actual is %d. ret=%d", + RTMP_FMT_TYPE0, RTMP_FMT_TYPE1, fmt, ret); return ret; } @@ -604,7 +609,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // create msg when new chunk stream start if (!chunk->msg) { - srs_assert(fmt == RTMP_FMT_TYPE0); + srs_assert(fmt == RTMP_FMT_TYPE0 || fmt == RTMP_FMT_TYPE1); chunk->msg = new SrsMessage(); srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); } @@ -802,6 +807,21 @@ SrsMessageHeader::~SrsMessageHeader() { } +bool SrsMessageHeader::is_amf0_command() +{ + return message_type == RTMP_MSG_AMF0CommandMessage; +} + +bool SrsMessageHeader::is_amf3_command() +{ + return message_type == RTMP_MSG_AMF3CommandMessage; +} + +bool SrsMessageHeader::is_window_ackledgement_size() +{ + return message_type == RTMP_MSG_WindowAcknowledgementSize; +} + SrsChunkStream::SrsChunkStream(int _cid) { fmt = 0; @@ -870,30 +890,50 @@ int SrsMessage::decode_packet() srs_verbose("decode stream initialized success"); // decode specified packet type - if (header.message_type == RTMP_MSG_AMF0CommandMessage) { - srs_verbose("start to decode AMF0 command message."); + if (header.is_amf0_command() || header.is_amf3_command()) { + srs_verbose("start to decode AMF0/AMF3 command message."); + + // skip 1bytes to decode the amf3 command. + if (header.is_amf3_command() && stream->require(1)) { + srs_verbose("skip 1bytes to decode AMF3 command"); + stream->skip(1); + } // amf0 command message. // need to read the command name. std::string command; if ((ret = srs_amf0_read_string(stream, command)) != ERROR_SUCCESS) { - srs_error("decode AMF0 command name failed. ret=%d", ret); + srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret); return ret; } - srs_verbose("AMF0 command message, command_name=%s", command.c_str()); + srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str()); + // reset to zero(amf3 to 1) to restart decode. stream->reset(); + if (header.is_amf3_command()) { + stream->skip(1); + } + + // decode command object. if (command == RTMP_AMF0_COMMAND_CONNECT) { - srs_info("decode the AMF0 command(connect vhost/app message)."); + srs_info("decode the AMF0/AMF3 command(connect vhost/app message)."); packet = new SrsConnectAppPacket(); return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_CREATE_STREAM) { + srs_info("decode the AMF0/AMF3 command(createStream message)."); + packet = new SrsCreateStreamPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PLAY) { + srs_info("decode the AMF0/AMF3 command(paly message)."); + packet = new SrsPlayPacket(); + return packet->decode(stream); } // default packet to drop message. - srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); + srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); packet = new SrsPacket(); return ret; - } else if(header.message_type == RTMP_MSG_WindowAcknowledgementSize) { + } else if(header.is_window_ackledgement_size()) { srs_verbose("start to decode set ack window size message."); packet = new SrsSetWindowAckSizePacket(); return packet->decode(stream); @@ -1106,7 +1146,7 @@ int SrsConnectAppPacket::decode(SrsStream* stream) SrsConnectAppResPacket::SrsConnectAppResPacket() { - command_name = RTMP_AMF0_COMMAND_CONNECT; + command_name = RTMP_AMF0_COMMAND_RESULT; transaction_id = 1; props = new SrsAmf0Object(); info = new SrsAmf0Object(); @@ -1175,6 +1215,313 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream) return ret; } +SrsCreateStreamPacket::SrsCreateStreamPacket() +{ + command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; + transaction_id = 2; + command_object = new SrsAmf0Null(); +} + +SrsCreateStreamPacket::~SrsCreateStreamPacket() +{ + if (command_object) { + delete command_object; + command_object = NULL; + } +} + +int SrsCreateStreamPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode createStream command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode createStream command_object failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode createStream packet success"); + + return ret; +} + +SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, double _stream_id) +{ + command_name = RTMP_AMF0_COMMAND_RESULT; + transaction_id = _transaction_id; + command_object = new SrsAmf0Null(); + stream_id = _stream_id; +} + +SrsCreateStreamResPacket::~SrsCreateStreamResPacket() +{ + if (command_object) { + delete command_object; + command_object = NULL; + } +} + +int SrsCreateStreamResPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsCreateStreamResPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsCreateStreamResPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_number_size(); +} + +int SrsCreateStreamResPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) { + srs_error("encode stream_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode stream_id success."); + + + srs_info("encode createStream response packet success."); + + return ret; +} + +SrsPlayPacket::SrsPlayPacket() +{ + command_name = RTMP_AMF0_COMMAND_PLAY; + transaction_id = 0; + command_object = new SrsAmf0Null(); + + start = -2; + duration = -1; + reset = true; +} + +SrsPlayPacket::~SrsPlayPacket() +{ + if (command_object) { + delete command_object; + command_object = NULL; + } +} + +int SrsPlayPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PLAY) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode play command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode play transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_object failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode play stream_name failed. ret=%d", ret); + return ret; + } + + if (!stream->empty() && (ret = srs_amf0_read_number(stream, start)) != ERROR_SUCCESS) { + srs_error("amf0 decode play start failed. ret=%d", ret); + return ret; + } + if (!stream->empty() && (ret = srs_amf0_read_number(stream, duration)) != ERROR_SUCCESS) { + srs_error("amf0 decode play duration failed. ret=%d", ret); + return ret; + } + if (!stream->empty() && (ret = srs_amf0_read_boolean(stream, reset)) != ERROR_SUCCESS) { + srs_error("amf0 decode play reset failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode play packet success"); + + return ret; +} + +SrsPlayResPacket::SrsPlayResPacket() +{ + command_name = RTMP_AMF0_COMMAND_RESULT; + transaction_id = 0; + command_object = new SrsAmf0Null(); + desc = new SrsAmf0Object(); +} + +SrsPlayResPacket::~SrsPlayResPacket() +{ + if (command_object) { + delete command_object; + command_object = NULL; + } + + if (desc) { + delete desc; + desc = NULL; + } +} + +int SrsPlayResPacket::get_perfer_cid() +{ + return RTMP_CID_OverStream; +} + +int SrsPlayResPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsPlayResPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_object_size(desc); +} + +int SrsPlayResPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_object(stream, desc)) != ERROR_SUCCESS) { + srs_error("encode desc failed. ret=%d", ret); + return ret; + } + srs_verbose("encode desc success."); + + + srs_info("encode play response packet success."); + + return ret; +} + +SrsOnBWDonePacket::SrsOnBWDonePacket() +{ + command_name = RTMP_AMF0_COMMAND_ON_BW_DONE; + transaction_id = 0; + args = new SrsAmf0Null(); +} + +SrsOnBWDonePacket::~SrsOnBWDonePacket() +{ + if (args) { + delete args; + args = NULL; + } +} + +int SrsOnBWDonePacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsOnBWDonePacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsOnBWDonePacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size(); +} + +int SrsOnBWDonePacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode args failed. ret=%d", ret); + return ret; + } + srs_verbose("encode args success."); + + srs_info("encode onBWDone packet success."); + + return ret; +} + SrsSetWindowAckSizePacket::SrsSetWindowAckSizePacket() { ackowledgement_window_size = 0; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index eca407d90..c09439c07 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -45,6 +45,7 @@ class SrsStream; class SrsMessage; class SrsChunkStream; class SrsAmf0Object; +class SrsAmf0Null; // convert class name to string. #define CLASS_NAME_STRING(className) #className @@ -166,6 +167,10 @@ struct SrsMessageHeader SrsMessageHeader(); virtual ~SrsMessageHeader(); + + bool is_amf0_command(); + bool is_amf3_command(); + bool is_window_ackledgement_size(); }; /** @@ -357,6 +362,146 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* 4.1.3. createStream +* The client sends this command to the server to create a logical +* channel for message communication The publishing of audio, video, and +* metadata is carried out over stream channel created using the +* createStream command. +*/ +class SrsCreateStreamPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsCreateStreamPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; +public: + SrsCreateStreamPacket(); + virtual ~SrsCreateStreamPacket(); +public: + virtual int decode(SrsStream* stream); +}; +/** +* response for SrsCreateStreamPacket. +*/ +class SrsCreateStreamResPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsCreateStreamResPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + double stream_id; +public: + SrsCreateStreamResPacket(double _transaction_id, double _stream_id); + virtual ~SrsCreateStreamResPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + +/** +* 4.2.1. play +* The client sends this command to the server to play a stream. +*/ +class SrsPlayPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsPlayPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + std::string stream_name; + double start; + double duration; + bool reset; +public: + SrsPlayPacket(); + virtual ~SrsPlayPacket(); +public: + virtual int decode(SrsStream* stream); +}; +/** +* response for SrsPlayPacket. +* @remark, user must set the stream_id in header. +*/ +class SrsPlayResPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsPlayResPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + SrsAmf0Object* desc; +public: + SrsPlayResPacket(); + virtual ~SrsPlayResPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + +/** +* when bandwidth test done, notice client. +*/ +class SrsOnBWDonePacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsOnBWDonePacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* args; +public: + SrsOnBWDonePacket(); + virtual ~SrsOnBWDonePacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + /** * 5.5. Window Acknowledgement Size (5) * The client or the server sends this message to inform the peer which diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index a23cda2d2..824f5c0bb 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -221,8 +221,6 @@ int SrsRtmp::response_connect_app() SrsMessage* msg = new SrsMessage(); SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); - pkt->command_name = "_result"; - pkt->props->properties["fmsVer"] = new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER); pkt->props->properties["capabilities"] = new SrsAmf0Number(123); pkt->props->properties["mode"] = new SrsAmf0Number(1); @@ -250,3 +248,110 @@ int SrsRtmp::response_connect_app() return ret; } +int SrsRtmp::on_bw_done() +{ + int ret = ERROR_SUCCESS; + + SrsMessage* msg = new SrsMessage(); + SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket(); + + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onBWDone message failed. ret=%d", ret); + return ret; + } + srs_info("send onBWDone message success."); + + return ret; +} + +int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& stream_name) +{ + type = SrsClientUnknown; + int ret = ERROR_SUCCESS; + + while (true) { + SrsMessage* msg = NULL; + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv identify client message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsMessage, msg, false); + + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { + srs_trace("identify ignore messages except " + "AMF0/AMF3 command message. type=%#x", msg->header.message_type); + continue; + } + + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("identify decode message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + return identify_create_stream_client( + dynamic_cast(pkt), stream_id, type, stream_name); + } + + srs_trace("ignore AMF0/AMF3 command message."); + } + + return ret; +} + +int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) +{ + int ret = ERROR_SUCCESS; + + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id); + + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send createStream response message failed. ret=%d", ret); + return ret; + } + srs_info("send createStream response message success."); + } + + while (true) { + SrsMessage* msg = NULL; + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv identify client message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsMessage, msg, false); + + if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) { + srs_trace("identify ignore messages except " + "AMF0/AMF3 command message. type=%#x", msg->header.message_type); + continue; + } + + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + srs_error("identify decode message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + SrsPlayPacket* play = dynamic_cast(pkt); + type = SrsClientPublish; + stream_name = play->stream_name; + srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); + return ret; + } + + srs_trace("ignore AMF0/AMF3 command message."); + } + + return ret; +} + diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 10223f191..b106cff50 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include class SrsProtocol; +class SrsCreateStreamPacket; /** * the original request from client. @@ -57,6 +58,16 @@ struct SrsRequest virtual int discovery_app(); }; +/** +* the rtmp client type. +*/ +enum SrsClientType +{ + SrsClientUnknown, + SrsClientPlay, + SrsClientPublish, +}; + /** * the rtmp provices rtmp-command-protocol services, * a high level protocol, media stream oriented services, @@ -80,6 +91,16 @@ public: */ virtual int set_peer_bandwidth(int bandwidth, int type); virtual int response_connect_app(); + virtual int on_bw_done(); + /** + * recv some message to identify the client. + * @stream_id, client will createStream to play or publish by flash, + * the stream_id used to response the createStream request. + * @type, output the client type. + */ + virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name); +private: + virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); }; #endif \ No newline at end of file