diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index 7b53dead3..909f2d7c6 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -89,6 +89,11 @@ bool SrsAmf0Any::is_null() return marker == RTMP_AMF0_Null; } +bool SrsAmf0Any::is_undefined() +{ + return marker == RTMP_AMF0_Undefined; +} + bool SrsAmf0Any::is_object() { return marker == RTMP_AMF0_Object; @@ -145,6 +150,15 @@ SrsAmf0Null::~SrsAmf0Null() { } +SrsAmf0Undefined::SrsAmf0Undefined() +{ + marker = RTMP_AMF0_Undefined; +} + +SrsAmf0Undefined::~SrsAmf0Undefined() +{ +} + SrsAmf0ObjectEOF::SrsAmf0ObjectEOF() { marker = RTMP_AMF0_ObjectEnd; @@ -523,6 +537,45 @@ int srs_amf0_write_null(SrsStream* stream) return ret; } +int srs_amf0_read_undefined(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 read undefined marker failed. ret=%d", ret); + return ret; + } + + char marker = stream->read_1bytes(); + if (marker != RTMP_AMF0_Undefined) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 check undefined marker failed. " + "marker=%#x, required=%#x, ret=%d", marker, RTMP_AMF0_Undefined, ret); + return ret; + } + srs_verbose("amf0 read undefined success"); + + return ret; +} +int srs_amf0_write_undefined(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + // marker + if (!stream->require(1)) { + ret = ERROR_RTMP_AMF0_ENCODE; + srs_error("amf0 write undefined marker failed. ret=%d", ret); + return ret; + } + + stream->write_1bytes(RTMP_AMF0_Undefined); + srs_verbose("amf0 write undefined marker success"); + + return ret; +} + int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) { int ret = ERROR_SUCCESS; @@ -572,6 +625,10 @@ int srs_amf0_read_any(SrsStream* stream, SrsAmf0Any*& value) value = new SrsAmf0Null(); return ret; } + case RTMP_AMF0_Undefined: { + value = new SrsAmf0Undefined(); + return ret; + } case RTMP_AMF0_ObjectEnd: { SrsAmf0ObjectEOF* p = NULL; if ((ret = srs_amf0_read_object_eof(stream, p)) != ERROR_SUCCESS) { @@ -628,6 +685,9 @@ int srs_amf0_write_any(SrsStream* stream, SrsAmf0Any* value) case RTMP_AMF0_Null: { return srs_amf0_write_null(stream); } + case RTMP_AMF0_Undefined: { + return srs_amf0_write_undefined(stream); + } case RTMP_AMF0_ObjectEnd: { SrsAmf0ObjectEOF* p = srs_amf0_convert(value); return srs_amf0_write_object_eof(stream, p); @@ -676,6 +736,10 @@ int srs_amf0_get_any_size(SrsAmf0Any* value) size += srs_amf0_get_null_size(); break; } + case RTMP_AMF0_Undefined: { + size += srs_amf0_get_undefined_size(); + break; + } case RTMP_AMF0_ObjectEnd: { size += srs_amf0_get_object_eof_size(); break; @@ -1009,6 +1073,11 @@ int srs_amf0_get_null_size() return 1; } +int srs_amf0_get_undefined_size() +{ + return 1; +} + int srs_amf0_get_boolean_size() { return 1 + 1; diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 61cd38104..3624eacd0 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -55,6 +55,7 @@ struct SrsAmf0Any virtual bool is_boolean(); virtual bool is_number(); virtual bool is_null(); + virtual bool is_undefined(); virtual bool is_object(); virtual bool is_object_eof(); virtual bool is_ecma_array(); @@ -114,6 +115,17 @@ struct SrsAmf0Null : public SrsAmf0Any virtual ~SrsAmf0Null(); }; +/** +* read amf0 undefined from stream. +* 2.8 undefined Type +* undefined-type = undefined-marker +*/ +struct SrsAmf0Undefined : public SrsAmf0Any +{ + SrsAmf0Undefined(); + virtual ~SrsAmf0Undefined(); +}; + /** * 2.11 Object End Type * object-end-type = UTF-8-empty object-end-marker @@ -207,6 +219,14 @@ extern int srs_amf0_write_number(SrsStream* stream, double value); extern int srs_amf0_read_null(SrsStream* stream); extern int srs_amf0_write_null(SrsStream* stream); +/** +* read amf0 undefined from stream. +* 2.8 undefined Type +* undefined-type = undefined-marker +*/ +extern int srs_amf0_read_undefined(SrsStream* stream); +extern int srs_amf0_write_undefined(SrsStream* stream); + /** * read amf0 object from stream. * 2.5 Object Type @@ -233,6 +253,7 @@ extern int srs_amf0_get_utf8_size(std::string value); extern int srs_amf0_get_string_size(std::string value); extern int srs_amf0_get_number_size(); extern int srs_amf0_get_null_size(); +extern int srs_amf0_get_undefined_size(); extern int srs_amf0_get_boolean_size(); extern int srs_amf0_get_object_size(SrsAmf0Object* obj); extern int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr); diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index fa1e54f84..b5c7eb610 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -197,6 +197,8 @@ messages. #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" #define RTMP_AMF0_COMMAND_ON_STATUS "onStatus" #define RTMP_AMF0_COMMAND_RESULT "_result" +#define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" +#define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" #define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess" /**************************************************************************** @@ -952,6 +954,14 @@ int SrsMessage::decode_packet() srs_info("decode the AMF0/AMF3 command(paly message)."); packet = new SrsPlayPacket(); return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) { + srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message)."); + packet = new SrsFMLEStartPacket(); + return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) { + srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); + packet = new SrsFMLEStartPacket(); + return packet->decode(stream); } // default packet to drop message. @@ -1351,6 +1361,119 @@ int SrsCreateStreamResPacket::encode_packet(SrsStream* stream) return ret; } +SrsFMLEStartPacket::SrsFMLEStartPacket() +{ + command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; + transaction_id = 0; +} + +SrsFMLEStartPacket::~SrsFMLEStartPacket() +{ +} + +int SrsFMLEStartPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() + || command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM + || command_name != RTMP_AMF0_COMMAND_FC_PUBLISH + ) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode FMLE start command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode FMLE start packet success"); + + return ret; +} + +SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id) +{ + command_name = RTMP_AMF0_COMMAND_RESULT; + transaction_id = _transaction_id; + command_object = new SrsAmf0Null(); + args = new SrsAmf0Undefined(); +} + +SrsFMLEStartResPacket::~SrsFMLEStartResPacket() +{ + if (command_object) { + delete command_object; + command_object = NULL; + } + if (args) { + delete args; + args = NULL; + } +} + +int SrsFMLEStartResPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsFMLEStartResPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsFMLEStartResPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_undefined_size(); +} + +int SrsFMLEStartResPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) { + srs_error("encode args failed. ret=%d", ret); + return ret; + } + srs_verbose("encode args success."); + + + srs_info("encode FMLE start response packet success."); + + return ret; +} + SrsPlayPacket::SrsPlayPacket() { command_name = RTMP_AMF0_COMMAND_PLAY; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 9b7262204..e76ac20ea 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -46,6 +46,7 @@ class SrsMessage; class SrsChunkStream; class SrsAmf0Object; class SrsAmf0Null; +class SrsAmf0Undefined; // convert class name to string. #define CLASS_NAME_STRING(className) #className @@ -427,6 +428,57 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* FMLE start publish: ReleaseStream/PublishStream +*/ +class SrsFMLEStartPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsFMLEStartPacket); + } +public: + std::string command_name; + double transaction_id; + std::string stream_name; +public: + SrsFMLEStartPacket(); + virtual ~SrsFMLEStartPacket(); +public: + virtual int decode(SrsStream* stream); +}; +/** +* response for SrsFMLEStartPacket. +*/ +class SrsFMLEStartResPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsFMLEStartResPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + SrsAmf0Undefined* args; +public: + SrsFMLEStartResPacket(double _transaction_id); + virtual ~SrsFMLEStartResPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + /** * 4.2.1. play * The client sends this command to the server to play a stream. diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index c6ab4b7b9..ab8796378 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -307,9 +307,15 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { + srs_info("identify client by create stream, play or flash publish."); return identify_create_stream_client( dynamic_cast(pkt), stream_id, type, stream_name); } + if (dynamic_cast(pkt)) { + srs_info("identify client by releaseStream, fmle publish."); + return identify_fmle_publish_client( + dynamic_cast(pkt), stream_id, type, stream_name); + } srs_trace("ignore AMF0/AMF3 command message."); } @@ -487,3 +493,83 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea return ret; } +int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) +{ + int ret = ERROR_SUCCESS; + + type = SrsClientPublish; + stream_name = req->stream_name; + + // createStream response + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); + + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send releaseStream response message failed. ret=%d", ret); + return ret; + } + srs_info("send releaseStream response message success."); + } + + // FCPublish + double fc_publish_tid = 0; + if (true) { + SrsMessage* msg = NULL; + SrsFMLEStartPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("recv FCPublish message failed. ret=%d", ret); + return ret; + } + srs_info("recv FCPublish request message success."); + + SrsAutoFree(SrsMessage, msg, false); + fc_publish_tid = pkt->transaction_id; + } + // FCPublish response + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid); + + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send FCPublish response message failed. ret=%d", ret); + return ret; + } + srs_info("send FCPublish response message success."); + } + + // createStream + double create_stream_tid = 0; + if (true) { + SrsMessage* msg = NULL; + SrsCreateStreamPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("recv createStream message failed. ret=%d", ret); + return ret; + } + srs_info("recv createStream request message success."); + + SrsAutoFree(SrsMessage, msg, false); + create_stream_tid = pkt->transaction_id; + } + // createStream response + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id); + + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send createStream response message failed. ret=%d", ret); + return ret; + } + srs_info("send createStream response message success."); + } + + return ret; +} + diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 9d672b26b..9c51ed5ba 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -36,6 +36,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsProtocol; class SrsCreateStreamPacket; +class SrsFMLEStartPacket; /** * the original request from client. @@ -113,6 +114,7 @@ public: virtual int start_play(int stream_id); private: virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); + virtual int identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); }; #endif \ No newline at end of file