diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 631bde829..7f66ef156 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -28,6 +28,100 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +/** +5. Protocol Control Messages +RTMP reserves message type IDs 1-7 for protocol control messages. +These messages contain information needed by the RTM Chunk Stream +protocol or RTMP itself. Protocol messages with IDs 1 & 2 are +reserved for usage with RTM Chunk Stream protocol. Protocol messages +with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID +7 is used between edge server and origin server. +*/ +#define RTMP_MSG_SetChunkSize 0x01 +#define RTMP_MSG_AbortMessage 0x02 +#define RTMP_MSG_Acknowledgement 0x03 +#define RTMP_MSG_UserControlMessage 0x04 +#define RTMP_MSG_WindowAcknowledgementSize 0x05 +#define RTMP_MSG_SetPeerBandwidth 0x06 +#define RTMP_MSG_EdgeAndOriginServerCommand 0x07 +/** +* The server sends this event to test whether the client is reachable. +* +* Event data is a 4-byte timestamp, representing the local server time when the server dispatched the command. +* The client responds with PingResponse on receiving PingRequest. +*/ +#define RTMP_MSG_PCUC_PingRequest 0x06 + +/** +* The client sends this event to the server in response to the ping request. +* +* The event data is a 4-byte timestamp, which was received with the PingRequest request. +*/ +#define RTMP_MSG_PCUC_PingResponse 0x07 +/** +3. Types of messages +The server and the client send messages over the network to +communicate with each other. The messages can be of any type which +includes audio messages, video messages, command messages, shared +object messages, data messages, and user control messages. +3.1. Command message +Command messages carry the AMF-encoded commands between the client +and the server. These messages have been assigned message type value +of 20 for AMF0 encoding and message type value of 17 for AMF3 +encoding. These messages are sent to perform some operations like +connect, createStream, publish, play, pause on the peer. Command +messages like onstatus, result etc. are used to inform the sender +about the status of the requested commands. A command message +consists of command name, transaction ID, and command object that +contains related parameters. A client or a server can request Remote +Procedure Calls (RPC) over streams that are communicated using the +command messages to the peer. +*/ +#define RTMP_MSG_AMF3CommandMessage 17 // 0x11 +#define RTMP_MSG_AMF0CommandMessage 20 // 0x14 +/** +3.2. Data message +The client or the server sends this message to send Metadata or any +user data to the peer. Metadata includes details about the +data(audio, video etc.) like creation time, duration, theme and so +on. These messages have been assigned message type value of 18 for +AMF0 and message type value of 15 for AMF3. +*/ +#define RTMP_MSG_AMF0DataMessage 18 // 0x12 +#define RTMP_MSG_AMF3DataMessage 15 // 0x0F +/** +3.3. Shared object message +A shared object is a Flash object (a collection of name value pairs) +that are in synchronization across multiple clients, instances, and +so on. The message types kMsgContainer=19 for AMF0 and +kMsgContainerEx=16 for AMF3 are reserved for shared object events. +Each message can contain multiple events. +*/ +#define RTMP_MSG_AMF3SharedObject 16 // 0x10 +#define RTMP_MSG_AMF0SharedObject 19 // 0x13 +/** +3.4. Audio message +The client or the server sends this message to send audio data to the +peer. The message type value of 8 is reserved for audio messages. +*/ +#define RTMP_MSG_AudioMessage 8 // 0x08 +/* * +3.5. Video message +The client or the server sends this message to send video data to the +peer. The message type value of 9 is reserved for video messages. +These messages are large and can delay the sending of other type of +messages. To avoid such a situation, the video message is assigned +the lowest priority. +*/ +#define RTMP_MSG_VideoMessage 9 // 0x09 +/** +3.6. Aggregate message +An aggregate message is a single message that contains a list of submessages. +The message type value of 22 is reserved for aggregate +messages. +*/ +#define RTMP_MSG_AggregateMessage 22 // 0x16 + /** * 6.1.2. Chunk Message Header * There are four different formats for the chunk message header, @@ -93,6 +187,89 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #define RTMP_EXTENDED_TIMESTAMP 0xFFFFFF +SrsMessageHeader::SrsMessageHeader() +{ + message_type = 0; + payload_length = 0; + timestamp = 0; + stream_id = 0; +} + +SrsMessageHeader::~SrsMessageHeader() +{ +} + +SrsChunkStream::SrsChunkStream(int _cid) +{ + fmt = 0; + cid = _cid; + extended_timestamp = false; + msg = NULL; +} + +SrsChunkStream::~SrsChunkStream() +{ + if (msg) { + delete msg; + msg = NULL; + } +} + +SrsMessage::SrsMessage() +{ + size = 0; + payload = NULL; + decoded_payload = NULL; +} + +SrsMessage::~SrsMessage() +{ + if (payload) { + delete[] payload; + payload = NULL; + } + + if (decoded_payload) { + delete decoded_payload; + decoded_payload = NULL; + } +} + +SrsPacket* SrsMessage::get_packet() +{ + if (!decoded_payload) { + srs_error("the payload is raw/undecoded, invoke decode_packet to decode it."); + } + srs_assert(decoded_payload != NULL); + + return decoded_payload; +} + +int SrsMessage::decode_packet() +{ + int ret = ERROR_SUCCESS; + + // TODO: decode packet. + + return ret; +} + +SrsPacket::SrsPacket() +{ +} + +SrsPacket::~SrsPacket() +{ +} + +SrsConnectAppPacket::SrsConnectAppPacket() +{ +} + +SrsConnectAppPacket::~SrsConnectAppPacket() +{ +} + SrsProtocol::SrsProtocol(st_netfd_t client_stfd) { stfd = client_stfd; @@ -129,6 +306,8 @@ SrsProtocol::~SrsProtocol() int SrsProtocol::recv_message(SrsMessage** pmsg) { + *pmsg = NULL; + int ret = ERROR_SUCCESS; while (true) { @@ -143,7 +322,9 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) continue; } - // decode the msg + // return the msg with raw/undecoded payload + *pmsg = msg; + break; } return ret; @@ -440,26 +621,15 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh srs_verbose("create empty payload for RTMP message. size=%d", chunk->header.payload_length); } - // copy payload from buffer. - int copy_size = buffer->size() - bh_size - mh_size; - if (copy_size > payload_size) { - copy_size = payload_size; - } - memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, copy_size); - buffer->erase(bh_size + mh_size + copy_size); - chunk->msg->size += copy_size; - - // when empty, read the left bytes from socket. - int left_size = payload_size - copy_size; - if (left_size > 0) { - ssize_t nread; - if ((ret = skt->read_fully(chunk->msg->payload + chunk->msg->size, left_size, &nread)) != ERROR_SUCCESS) { - srs_error("read chunk payload from socket error. " - "payload_size=%d, copy_size=%d, left_size=%d, size=%d, msg_size=%d, ret=%d", - payload_size, copy_size, left_size, chunk->msg->size, chunk->header.payload_length, ret); - return ret; - } + // read payload to buffer + int required_size = bh_size + mh_size + payload_size; + if ((ret = buffer->ensure_buffer_bytes(skt, required_size)) != ERROR_SUCCESS) { + srs_error("read payload failed. required_size=%d, ret=%d", required_size, ret); + return ret; } + memcpy(chunk->msg->payload + chunk->msg->size, buffer->bytes() + bh_size + mh_size, payload_size); + buffer->erase(bh_size + mh_size + payload_size); + chunk->msg->size += payload_size; srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); @@ -481,45 +651,3 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh return ret; } -SrsMessageHeader::SrsMessageHeader() -{ - message_type = 0; - payload_length = 0; - timestamp = 0; - stream_id = 0; -} - -SrsMessageHeader::~SrsMessageHeader() -{ -} - -SrsChunkStream::SrsChunkStream(int _cid) -{ - fmt = 0; - cid = _cid; - extended_timestamp = false; - msg = NULL; -} - -SrsChunkStream::~SrsChunkStream() -{ - if (msg) { - delete msg; - msg = NULL; - } -} - -SrsMessage::SrsMessage() -{ - size = 0; - payload = NULL; -} - -SrsMessage::~SrsMessage() -{ - if (payload) { - delete[] payload; - payload = NULL; - } -} - diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index c949bd553..123e24d6a 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -34,37 +34,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include +#include + class SrsSocket; class SrsBuffer; +class SrsPacket; class SrsMessage; class SrsChunkStream; -/** -* the protocol provides the rtmp-message-protocol services, -* to recv RTMP message from RTMP chunk stream, -* and to send out RTMP message over RTMP chunk stream. -*/ -class SrsProtocol -{ -private: - std::map chunk_streams; - st_netfd_t stfd; - SrsBuffer* buffer; - SrsSocket* skt; - int32_t in_chunk_size; - int32_t out_chunk_size; -public: - SrsProtocol(st_netfd_t client_stfd); - virtual ~SrsProtocol(); -public: - virtual int recv_message(SrsMessage** pmsg); -private: - virtual int recv_interlaced_message(SrsMessage** pmsg); - virtual int read_basic_header(char& fmt, int& cid, int& size); - virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); - virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); -}; - /** * 4.1. Message Header */ @@ -127,7 +105,6 @@ public: public: SrsChunkStream(int _cid); virtual ~SrsChunkStream(); -public: }; /** @@ -148,10 +125,114 @@ public: */ int32_t size; int8_t* payload; +// decoded message payload. +private: + SrsPacket* decoded_payload; +public: + /** + * get the decoded packet, + * not all packets need to decode, for video/audio packet, + * passthrough to peer are ok. + * @remark, user must invoke decode_packet first. + */ + virtual SrsPacket* get_packet(); + virtual int decode_packet(); public: SrsMessage(); virtual ~SrsMessage(); +}; + +/** +* the decoded message payload. +*/ +class SrsPacket +{ +public: + SrsPacket(); + virtual ~SrsPacket(); +}; + +class SrsConnectAppPacket : public SrsPacket +{ +public: + SrsConnectAppPacket(); + virtual ~SrsConnectAppPacket(); +}; + +/** +* the protocol provides the rtmp-message-protocol services, +* to recv RTMP message from RTMP chunk stream, +* and to send out RTMP message over RTMP chunk stream. +*/ +class SrsProtocol +{ +private: + std::map chunk_streams; + st_netfd_t stfd; + SrsBuffer* buffer; + SrsSocket* skt; + int32_t in_chunk_size; + int32_t out_chunk_size; +public: + SrsProtocol(st_netfd_t client_stfd); + virtual ~SrsProtocol(); +public: + /** + * recv a message with raw/undecoded payload from peer. + * the payload is not decoded, use expect_message if requires specifies message. + * @pmsg, user must free it. NULL if not success. + * @remark, only when success, user can use and must free the pmsg. + */ + virtual int recv_message(SrsMessage** pmsg); public: + /** + * expect a specified message, drop others util got specified one. + * @pmsg, user must free it. NULL if not success. + * @ppacket, store in the pmsg, user must never free it. NULL if not success. + * @remark, only when success, user can use and must free the pmsg/ppacket. + */ + template + int expect_message(SrsMessage** pmsg, T** ppacket) + { + *pmsg = NULL; + *ppacket = NULL; + + int ret = ERROR_SUCCESS; + + while (true) { + SrsMessage* msg = NULL; + if ((ret = recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv message failed. ret=%d", ret); + return ret; + } + srs_verbose("recv message success."); + + if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + delete msg; + srs_error("decode message failed. ret=%d", ret); + return ret; + } + + T* pkt = dynamic_cast(msg->get_packet()); + if (!pkt) { + delete msg; + srs_trace("drop message(type=%d, size=%d, time=%d, sid=%d).", + msg->header.message_type, msg->header.payload_length, + msg->header.timestamp, msg->header.stream_id); + continue; + } + + *pmsg = msg; + *ppacket = pkt; + } + + return ret; + } +private: + virtual int recv_interlaced_message(SrsMessage** pmsg); + virtual int read_basic_header(char& fmt, int& cid, int& size); + virtual int read_message_header(SrsChunkStream* chunk, char fmt, int bh_size, int& mh_size); + virtual int read_message_payload(SrsChunkStream* chunk, int bh_size, int mh_size, int& payload_size, SrsMessage** pmsg); }; #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index d32f85140..2b593ff25 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -94,7 +94,12 @@ int SrsRtmp::connect_app(SrsApp** papp) int ret = ERROR_SUCCESS; SrsMessage* msg = NULL; - protocol->recv_message(&msg); + SrsConnectAppPacket* pkt = NULL; + if ((ret = protocol->expect_message(&msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect connect app message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsMessage, msg, false); return ret; }