diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index d3218f2e0..8a2e07ad2 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -22,3 +22,69 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include + +#include +#include + +// AMF0 marker +#define RTMP_AMF0_Number 0x00 +#define RTMP_AMF0_Boolean 0x01 +#define RTMP_AMF0_String 0x02 +#define RTMP_AMF0_Object 0x03 +#define RTMP_AMF0_MovieClip 0x04 // reserved, not supported +#define RTMP_AMF0_Null 0x05 +#define RTMP_AMF0_Undefined 0x06 +#define RTMP_AMF0_Reference 0x07 +#define RTMP_AMF0_EcmaArray 0x08 +#define RTMP_AMF0_ObjectEnd 0x09 +#define RTMP_AMF0_StrictArray 0x0A +#define RTMP_AMF0_Date 0x0B +#define RTMP_AMF0_LongString 0x0C +#define RTMP_AMF0_UnSupported 0x0D +#define RTMP_AMF0_RecordSet 0x0E // reserved, not supported +#define RTMP_AMF0_XmlDocument 0x0F +#define RTMP_AMF0_TypedObject 0x10 +// AVM+ object is the AMF3 object. +#define RTMP_AMF0_AVMplusObject 0x11 +// origin array whos data takes the same form as LengthValueBytes +#define RTMP_AMF0_OriginStrictArray 0x20 + +std::string srs_amf0_read_string(SrsStream* stream) +{ + std::string str; + + // marker + if (!stream->require(1)) { + return str; + } + + char marker = stream->read_char(); + if (marker != RTMP_AMF0_String) { + return str; + } + + // len + if (!stream->require(2)) { + return str; + } + int16_t len = stream->read_2bytes(); + + // data + if (!stream->require(len)) { + return str; + } + str = stream->read_string(len); + + // support utf8-1 only + // 1.3.1 Strings and UTF-8 + // UTF8-1 = %x00-7F + for (int i = 0; i < len; i++) { + char ch = *(str.data() + i); + if ((ch & 0x80) != 0) { + srs_warn("only support utf8-1, 0x00-0x7F, actual is %#x", (int)ch); + return ""; + } + } + + return str; +} diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 8c3a02f7b..7adb47822 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -30,4 +30,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include + +class SrsStream; + +/** +* read amf0 string from stream. +*/ +extern std::string srs_amf0_read_string(SrsStream* stream); + #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_buffer.cpp b/trunk/src/core/srs_core_buffer.cpp index 300309337..df1376d76 100755 --- a/trunk/src/core/srs_core_buffer.cpp +++ b/trunk/src/core/srs_core_buffer.cpp @@ -62,6 +62,8 @@ int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size) { int ret = ERROR_SUCCESS; + srs_assert(required_size >= 0); + while (size() < required_size) { char buffer[SOCKET_READ_SIZE]; diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index b2c5d36b5..991ef40df 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -52,6 +52,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 #define ERROR_RTMP_MSG_INVLIAD_SIZE 302 +#define ERROR_RTMP_AMF0_DECODE 303 #define ERROR_SYSTEM_STREAM_INIT 400 diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 7f66ef156..f58150597 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -24,9 +24,11 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include #include #include +#include /** 5. Protocol Control Messages @@ -37,12 +39,12 @@ reserved for usage with RTM Chunk Stream protocol. Protocol messages with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID 7 is used between edge server and origin server. */ -#define RTMP_MSG_SetChunkSize 0x01 -#define RTMP_MSG_AbortMessage 0x02 -#define RTMP_MSG_Acknowledgement 0x03 -#define RTMP_MSG_UserControlMessage 0x04 -#define RTMP_MSG_WindowAcknowledgementSize 0x05 -#define RTMP_MSG_SetPeerBandwidth 0x06 +#define RTMP_MSG_SetChunkSize 0x01 +#define RTMP_MSG_AbortMessage 0x02 +#define RTMP_MSG_Acknowledgement 0x03 +#define RTMP_MSG_UserControlMessage 0x04 +#define RTMP_MSG_WindowAcknowledgementSize 0x05 +#define RTMP_MSG_SetPeerBandwidth 0x06 #define RTMP_MSG_EdgeAndOriginServerCommand 0x07 /** * The server sends this event to test whether the client is reachable. @@ -50,14 +52,14 @@ with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID * Event data is a 4-byte timestamp, representing the local server time when the server dispatched the command. * The client responds with PingResponse on receiving PingRequest. */ -#define RTMP_MSG_PCUC_PingRequest 0x06 +#define RTMP_MSG_PCUC_PingRequest 0x06 /** * The client sends this event to the server in response to the ping request. * * The event data is a 4-byte timestamp, which was received with the PingRequest request. */ -#define RTMP_MSG_PCUC_PingResponse 0x07 +#define RTMP_MSG_PCUC_PingResponse 0x07 /** 3. Types of messages The server and the client send messages over the network to @@ -77,8 +79,8 @@ contains related parameters. A client or a server can request Remote Procedure Calls (RPC) over streams that are communicated using the command messages to the peer. */ -#define RTMP_MSG_AMF3CommandMessage 17 // 0x11 -#define RTMP_MSG_AMF0CommandMessage 20 // 0x14 +#define RTMP_MSG_AMF3CommandMessage 17 // 0x11 +#define RTMP_MSG_AMF0CommandMessage 20 // 0x14 /** 3.2. Data message The client or the server sends this message to send Metadata or any @@ -87,8 +89,8 @@ data(audio, video etc.) like creation time, duration, theme and so on. These messages have been assigned message type value of 18 for AMF0 and message type value of 15 for AMF3. */ -#define RTMP_MSG_AMF0DataMessage 18 // 0x12 -#define RTMP_MSG_AMF3DataMessage 15 // 0x0F +#define RTMP_MSG_AMF0DataMessage 18 // 0x12 +#define RTMP_MSG_AMF3DataMessage 15 // 0x0F /** 3.3. Shared object message A shared object is a Flash object (a collection of name value pairs) @@ -97,14 +99,14 @@ so on. The message types kMsgContainer=19 for AMF0 and kMsgContainerEx=16 for AMF3 are reserved for shared object events. Each message can contain multiple events. */ -#define RTMP_MSG_AMF3SharedObject 16 // 0x10 -#define RTMP_MSG_AMF0SharedObject 19 // 0x13 +#define RTMP_MSG_AMF3SharedObject 16 // 0x10 +#define RTMP_MSG_AMF0SharedObject 19 // 0x13 /** 3.4. Audio message The client or the server sends this message to send audio data to the peer. The message type value of 8 is reserved for audio messages. */ -#define RTMP_MSG_AudioMessage 8 // 0x08 +#define RTMP_MSG_AudioMessage 8 // 0x08 /* * 3.5. Video message The client or the server sends this message to send video data to the @@ -113,14 +115,14 @@ These messages are large and can delay the sending of other type of messages. To avoid such a situation, the video message is assigned the lowest priority. */ -#define RTMP_MSG_VideoMessage 9 // 0x09 +#define RTMP_MSG_VideoMessage 9 // 0x09 /** 3.6. Aggregate message An aggregate message is a single message that contains a list of submessages. The message type value of 22 is reserved for aggregate messages. */ -#define RTMP_MSG_AggregateMessage 22 // 0x16 +#define RTMP_MSG_AggregateMessage 22 // 0x16 /** * 6.1.2. Chunk Message Header @@ -131,21 +133,21 @@ messages. // Chunks of Type 0 are 11 bytes long. This type MUST be used at the // start of a chunk stream, and whenever the stream timestamp goes // backward (e.g., because of a backward seek). -#define RTMP_FMT_TYPE0 0 +#define RTMP_FMT_TYPE0 0 // 6.1.2.2. Type 1 // Chunks of Type 1 are 7 bytes long. The message stream ID is not // included; this chunk takes the same stream ID as the preceding chunk. // Streams with variable-sized messages (for example, many video // formats) SHOULD use this format for the first chunk of each new // message after the first. -#define RTMP_FMT_TYPE1 1 +#define RTMP_FMT_TYPE1 1 // 6.1.2.3. Type 2 // Chunks of Type 2 are 3 bytes long. Neither the stream ID nor the // message length is included; this chunk has the same stream ID and // message length as the preceding chunk. Streams with constant-sized // messages (for example, some audio and data formats) SHOULD use this // format for the first chunk of each message after the first. -#define RTMP_FMT_TYPE2 2 +#define RTMP_FMT_TYPE2 2 // 6.1.2.4. Type 3 // Chunks of Type 3 have no header. Stream ID, message length and // timestamp delta are not present; chunks of this type take values from @@ -160,7 +162,7 @@ messages. // need for a chunk of type 2 to register the delta. If Type 3 chunk // follows a Type 0 chunk, then timestamp delta for this Type 3 chunk is // the same as the timestamp of Type 0 chunk. -#define RTMP_FMT_TYPE3 3 +#define RTMP_FMT_TYPE3 3 /** * 6. Chunking @@ -172,7 +174,7 @@ messages. * good for high-bit rate streaming. Chunk size is maintained * independently for each direction. */ -#define RTMP_DEFAULT_CHUNK_SIZE 128 +#define RTMP_DEFAULT_CHUNK_SIZE 128 /** * 6.1. Chunk Format @@ -185,7 +187,12 @@ messages. * the normal timestamp field MUST NOT be used and MUST be set to * 0xffffff and the extended timestamp MUST be sent. */ -#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF +#define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF + +/** +* amf0 command message, command name: "connect" +*/ +#define RTMP_AMF0_COMMAND_CONNECT "connect" SrsMessageHeader::SrsMessageHeader() { @@ -218,6 +225,7 @@ SrsChunkStream::~SrsChunkStream() SrsMessage::SrsMessage() { size = 0; + stream = NULL; payload = NULL; decoded_payload = NULL; } @@ -233,6 +241,11 @@ SrsMessage::~SrsMessage() delete decoded_payload; decoded_payload = NULL; } + + if (stream) { + delete stream; + stream = NULL; + } } SrsPacket* SrsMessage::get_packet() @@ -249,7 +262,44 @@ int SrsMessage::decode_packet() { int ret = ERROR_SUCCESS; - // TODO: decode packet. + srs_assert(payload != NULL); + srs_assert(size > 0); + + if (!stream) { + srs_verbose("create decode stream for message."); + stream = new SrsStream(); + } + + if (header.message_type == RTMP_MSG_AMF0CommandMessage) { + srs_verbose("start to decode AMF0 command message."); + + // amf0 command message. + // need to read the command name. + if ((ret = stream->initialize((char*)payload, size)) != ERROR_SUCCESS) { + srs_error("initialize stream failed. ret=%d", ret); + return ret; + } + srs_verbose("decode stream initialized success"); + + std::string command = srs_amf0_read_string(stream); + srs_verbose("AMF0 command message, command_name=%s", command.c_str()); + + stream->reset(); + if (command == RTMP_AMF0_COMMAND_CONNECT) { + srs_info("decode the AMF0 command(connect vhost/app message)."); + decoded_payload = new SrsConnectAppPacket(); + return decoded_payload->decode(stream); + } + + // default packet to drop message. + srs_trace("drop the AMF0 command message, command_name=%s", command.c_str()); + decoded_payload = new SrsPacket(); + return ret; + } + + // default packet to drop message. + srs_trace("drop the unknown message, type=%d", header.message_type); + decoded_payload = new SrsPacket(); return ret; } @@ -262,6 +312,12 @@ SrsPacket::~SrsPacket() { } +int SrsPacket::decode(SrsStream* /*stream*/) +{ + int ret = ERROR_SUCCESS; + return ret; +} + SrsConnectAppPacket::SrsConnectAppPacket() { } @@ -270,6 +326,24 @@ SrsConnectAppPacket::~SrsConnectAppPacket() { } +int SrsConnectAppPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = super::decode(stream)) != ERROR_SUCCESS) { + return ret; + } + + command_name = srs_amf0_read_string(stream); + if (command_name.empty()) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect command_name failed. ret=%d", ret); + return ret; + } + + return ret; +} + SrsProtocol::SrsProtocol(st_netfd_t client_stfd) { stfd = client_stfd; @@ -317,12 +391,21 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) srs_error("recv interlaced message failed. ret=%d", ret); return ret; } + srs_verbose("entire msg received"); if (!msg) { continue; } - // return the msg with raw/undecoded payload + if (msg->size <= 0 || msg->header.payload_length <= 0) { + srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).", + msg->header.message_type, msg->header.payload_length, + msg->header.timestamp, msg->header.stream_id); + delete msg; + continue; + } + + srs_verbose("get a msg with raw/undecoded payload"); *pmsg = msg; break; } @@ -598,9 +681,12 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh // need erase the header in buffer. buffer->erase(bh_size + mh_size); - srs_warn("get an empty RTMP " + srs_trace("get an empty RTMP " "message(type=%d, size=%d, time=%d, sid=%d)", chunk->header.message_type, chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id); + + *pmsg = chunk->msg; + chunk->msg = NULL; return ret; } diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 123e24d6a..30458fb79 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -31,6 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include #include @@ -40,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsSocket; class SrsBuffer; class SrsPacket; +class SrsStream; class SrsMessage; class SrsChunkStream; @@ -127,6 +129,7 @@ public: int8_t* payload; // decoded message payload. private: + SrsStream* stream; SrsPacket* decoded_payload; public: /** @@ -150,13 +153,21 @@ class SrsPacket public: SrsPacket(); virtual ~SrsPacket(); +public: + virtual int decode(SrsStream* stream); }; class SrsConnectAppPacket : public SrsPacket { +private: + typedef SrsPacket super; +private: + std::string command_name; public: SrsConnectAppPacket(); virtual ~SrsConnectAppPacket(); +public: + virtual int decode(SrsStream* stream); }; /** diff --git a/trunk/src/core/srs_core_stream.cpp b/trunk/src/core/srs_core_stream.cpp index a2636851a..5a90e4b8f 100755 --- a/trunk/src/core/srs_core_stream.cpp +++ b/trunk/src/core/srs_core_stream.cpp @@ -28,7 +28,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. SrsStream::SrsStream() { - bytes = NULL; + p = bytes = NULL; size = 0; } @@ -53,8 +53,54 @@ int SrsStream::initialize(char* _bytes, int _size) } size = _size; - bytes = _bytes; + p = bytes = _bytes; return ret; } +void SrsStream::reset() +{ + p = bytes; +} + +bool SrsStream::empty() +{ + return !p || !bytes || (p >= bytes + size); +} + +bool SrsStream::require(int required_size) +{ + return !empty() && (required_size < bytes + size - p); +} + +char SrsStream::read_char() +{ + srs_assert(require(1)); + + return *p++; +} + +int16_t SrsStream::read_2bytes() +{ + srs_assert(require(2)); + + int16_t value; + pp = (char*)&value; + pp[1] = *p++; + pp[0] = *p++; + + return value; +} + +std::string SrsStream::read_string(int len) +{ + srs_assert(require(len)); + + std::string value; + value.append(p, len); + + p += len; + + return value; +} + diff --git a/trunk/src/core/srs_core_stream.hpp b/trunk/src/core/srs_core_stream.hpp index 55d5c91b9..9b5787dee 100755 --- a/trunk/src/core/srs_core_stream.hpp +++ b/trunk/src/core/srs_core_stream.hpp @@ -30,9 +30,14 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +#include + class SrsStream { protected: + char* p; + char* pp; char* bytes; int size; public: @@ -46,6 +51,24 @@ public: * @remark, stream never free the _bytes, user must free it. */ virtual int initialize(char* _bytes, int _size); + /** + * reset the position to beginning. + */ + virtual void reset(); + /** + * whether stream is empty. + * if empty, never read or write. + */ + virtual bool empty(); + /** + * whether required size is ok. + * @return true if stream can read/write specified required_size bytes. + */ + virtual bool require(int required_size); +public: + virtual char read_char(); + virtual int16_t read_2bytes(); + virtual std::string read_string(int len); }; #endif \ No newline at end of file