From 9da31de1c1403737407adf072630db0c24cdf385 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 23 Oct 2013 22:17:57 +0800 Subject: [PATCH] support decode user control message. response ping automatically --- trunk/src/core/srs_core_protocol.cpp | 136 +++++++++++++++++++++++---- trunk/src/core/srs_core_protocol.hpp | 18 +++- trunk/src/core/srs_core_rtmp.cpp | 2 +- 3 files changed, 132 insertions(+), 24 deletions(-) diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 6a7c0289e..350ce3958 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -449,6 +449,47 @@ int SrsProtocol::send_message(ISrsMessage* msg) return ret; } +int SrsProtocol::response_acknowledgement_message() +{ + int ret = ERROR_SUCCESS; + + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket(); + + in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes(); + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send acknowledgement failed. ret=%d", ret); + return ret; + } + srs_verbose("send acknowledgement success."); + + return ret; +} + +int SrsProtocol::response_ping_message(int32_t timestamp) +{ + int ret = ERROR_SUCCESS; + + srs_trace("get a ping request, response it. timestamp=%d", timestamp); + + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsUserControlPacket* pkt = new SrsUserControlPacket(); + + pkt->event_type = SrcPCUCPingResponse; + pkt->event_data = timestamp; + msg->set_packet(pkt, 0); + + if ((ret = send_message(msg)) != ERROR_SUCCESS) { + srs_error("send ping response failed. ret=%d", ret); + return ret; + } + srs_verbose("send ping response success."); + + return ret; +} + int SrsProtocol::on_recv_message(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; @@ -457,21 +498,14 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) // acknowledgement if (skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) { - SrsCommonMessage* ack = new SrsCommonMessage(); - SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket(); - - in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes(); - ack->set_packet(pkt, 0); - - if ((ret = send_message(ack)) != ERROR_SUCCESS) { - srs_error("send acknowledgement failed. ret=%d", ret); + if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) { return ret; } - srs_verbose("send acknowledgement success."); } switch (msg->header.message_type) { case RTMP_MSG_SetChunkSize: + case RTMP_MSG_UserControlMessage: case RTMP_MSG_WindowAcknowledgementSize: if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { srs_error("decode packet from message payload failed. ret=%d", ret); @@ -503,6 +537,20 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) srs_trace("set input chunk size to %d", pkt->chunk_size); break; } + case RTMP_MSG_UserControlMessage: { + SrsUserControlPacket* pkt = dynamic_cast(msg->get_packet()); + srs_assert(pkt != NULL); + + if (pkt->event_type == SrcPCUCSetBufferLength) { + srs_trace("ignored. set buffer length to %d", pkt->extra_data); + } + if (pkt->event_type == SrcPCUCPingRequest) { + if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) { + return ret; + } + } + break; + } } return ret; @@ -963,6 +1011,11 @@ bool SrsMessageHeader::is_set_chunk_size() return message_type == RTMP_MSG_SetChunkSize; } +bool SrsMessageHeader::is_user_control_message() +{ + return message_type == RTMP_MSG_UserControlMessage; +} + SrsChunkStream::SrsChunkStream(int _cid) { fmt = 0; @@ -1098,6 +1151,10 @@ int SrsCommonMessage::decode_packet() srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); packet = new SrsPacket(); return ret; + } else if(header.is_user_control_message()) { + srs_verbose("start to decode user control message."); + packet = new SrsUserControlPacket(); + return packet->decode(stream); } else if(header.is_window_ackledgement_size()) { srs_verbose("start to decode set ack window size message."); packet = new SrsSetWindowAckSizePacket(); @@ -2396,45 +2453,86 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream) return ret; } -SrsPCUC4BytesPacket::SrsPCUC4BytesPacket() +SrsUserControlPacket::SrsUserControlPacket() { event_type = 0; event_data = 0; + extra_data = 0; } -SrsPCUC4BytesPacket::~SrsPCUC4BytesPacket() +SrsUserControlPacket::~SrsUserControlPacket() { } -int SrsPCUC4BytesPacket::get_perfer_cid() +int SrsUserControlPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(6)) { + ret = ERROR_RTMP_MESSAGE_DECODE; + srs_error("decode user control failed. ret=%d", ret); + return ret; + } + + event_type = stream->read_2bytes(); + event_data = stream->read_4bytes(); + + if (event_type == SrcPCUCSetBufferLength) { + if (!stream->require(2)) { + ret = ERROR_RTMP_MESSAGE_ENCODE; + srs_error("decode user control packet failed. ret=%d", ret); + return ret; + } + extra_data = stream->read_4bytes(); + } + + srs_info("decode user control success. " + "event_type=%d, event_data=%d, extra_data=%d", + event_type, event_data, extra_data); + + return ret; +} + +int SrsUserControlPacket::get_perfer_cid() { return RTMP_CID_ProtocolControl; } -int SrsPCUC4BytesPacket::get_message_type() +int SrsUserControlPacket::get_message_type() { return RTMP_MSG_UserControlMessage; } -int SrsPCUC4BytesPacket::get_size() +int SrsUserControlPacket::get_size() { - return 2 + 4; + if (event_type == SrcPCUCSetBufferLength) { + return 2 + 4 + 4; + } else { + return 2 + 4; + } } -int SrsPCUC4BytesPacket::encode_packet(SrsStream* stream) +int SrsUserControlPacket::encode_packet(SrsStream* stream) { int ret = ERROR_SUCCESS; - if (!stream->require(6)) { + if (!stream->require(get_size())) { ret = ERROR_RTMP_MESSAGE_ENCODE; - srs_error("encode set bandwidth packet failed. ret=%d", ret); + srs_error("encode user control packet failed. ret=%d", ret); return ret; } stream->write_2bytes(event_type); stream->write_4bytes(event_data); + + // when event type is set buffer length, + // read the extra buffer length. + if (event_type == SrcPCUCSetBufferLength) { + stream->write_2bytes(extra_data); + srs_verbose("user control message, buffer_length=%d", extra_data); + } - srs_verbose("encode PCUC packet success. " + srs_verbose("encode user control packet success. " "event_type=%d, event_data=%d", event_type, event_data); return ret; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index a7220851c..331ed5ea7 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -129,6 +129,8 @@ private: * when recv message, update the context. */ virtual int on_recv_message(SrsCommonMessage* msg); + virtual int response_acknowledgement_message(); + virtual int response_ping_message(int32_t timestamp); /** * when message sentout, update the context. */ @@ -205,6 +207,7 @@ struct SrsMessageHeader bool is_amf3_data(); bool is_window_ackledgement_size(); bool is_set_chunk_size(); + bool is_user_control_message(); }; /** @@ -965,6 +968,7 @@ enum SrcPCUCEventType * Stream Begin(=0) 4-bytes stream ID * Stream EOF(=1) 4-bytes stream ID * StreamDry(=2) 4-bytes stream ID +* SetBufferLength(=3) 8-bytes 4bytes stream ID, 4bytes buffer length. * StreamIsRecorded(=4) 4-bytes stream ID * PingRequest(=6) 4-bytes timestamp local server time * PingResponse(=7) 4-bytes timestamp received ping request. @@ -975,21 +979,27 @@ enum SrcPCUCEventType * +------------------------------+------------------------- * Figure 5 Pay load for the ‘User Control Message’. */ -class SrsPCUC4BytesPacket : public SrsPacket +class SrsUserControlPacket : public SrsPacket { private: typedef SrsPacket super; protected: virtual const char* get_class_name() { - return CLASS_NAME_STRING(SrsPCUC4BytesPacket); + return CLASS_NAME_STRING(SrsUserControlPacket); } public: int16_t event_type; int32_t event_data; + /** + * 4bytes if event_type is SetBufferLength; otherwise 0. + */ + int32_t extra_data; public: - SrsPCUC4BytesPacket(); - virtual ~SrsPCUC4BytesPacket(); + SrsUserControlPacket(); + virtual ~SrsUserControlPacket(); +public: + virtual int decode(SrsStream* stream); public: virtual int get_perfer_cid(); public: diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index dc2105fe0..7dbfefc24 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -412,7 +412,7 @@ int SrsRtmp::start_play(int stream_id) // StreamBegin if (true) { SrsCommonMessage* msg = new SrsCommonMessage(); - SrsPCUC4BytesPacket* pkt = new SrsPCUC4BytesPacket(); + SrsUserControlPacket* pkt = new SrsUserControlPacket(); pkt->event_type = SrcPCUCStreamBegin; pkt->event_data = stream_id;