From c67036d52e69b005e38e11b0a55f5c7d56ed8cc2 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 6 Jan 2017 14:04:34 +0800 Subject: [PATCH] for #730, reset ack follow flash player rules. 2.0.225 --- README.md | 1 + trunk/src/core/srs_core.hpp | 2 +- trunk/src/protocol/srs_rtmp_stack.cpp | 57 +++++++++++++++++---------- trunk/src/protocol/srs_rtmp_stack.hpp | 18 +++++---- 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 4f22f0d12..e59914962 100755 --- a/README.md +++ b/README.md @@ -345,6 +345,7 @@ Remark: ## History +* v2.0, 2017-01-06, for #730, reset ack follow flash player rules. 2.0.225 * v2.0, 2016-12-15, for #513, remove hls ram from srs2 to srs3+. 2.0.224 * v2.0, 2016-12-13, [2.0 beta3(2.0.223)][r2.0b3] released. 86685 lines. * v2.0, 2016-12-13, fix #713, disable the source cleanup. 2.0.223 diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index d6a50d034..88f280a60 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // current release version #define VERSION_MAJOR 2 #define VERSION_MINOR 0 -#define VERSION_REVISION 224 +#define VERSION_REVISION 225 // generated by configure, only macros. #include diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index f1c58b9fe..fece534aa 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -210,8 +210,8 @@ int SrsPacket::encode_packet(SrsStream* stream) SrsProtocol::AckWindowSize::AckWindowSize() { - ack_window_size = 0; - acked_size = 0; + window = 0; + sequence_number = nb_recv_bytes = 0; } SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io) @@ -1457,13 +1457,9 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) srs_assert(msg != NULL); - // acknowledgement - if (in_ack_size.ack_window_size > 0 - && skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size - ) { - if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) { - return ret; - } + // try to response acknowledgement + if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) { + return ret; } SrsPacket* packet = NULL; @@ -1492,7 +1488,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) srs_assert(pkt != NULL); if (pkt->ackowledgement_window_size > 0) { - in_ack_size.ack_window_size = pkt->ackowledgement_window_size; + in_ack_size.window = (uint32_t)pkt->ackowledgement_window_size; // @remark, we ignore this message, for user noneed to care. // but it's important for dev, for client/server will block if required // ack msg not arrived. @@ -1526,7 +1522,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) } in_chunk_size = pkt->chunk_size; - srs_trace("input chunk size to %d", pkt->chunk_size); + srs_info("in.chunk=%d", pkt->chunk_size); break; } @@ -1535,7 +1531,8 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) srs_assert(pkt != NULL); if (pkt->event_type == SrcPCUCSetBufferLength) { - srs_trace("ignored. set buffer length to %d", pkt->extra_data); + srs_trace("buffer=%d, in.ack=%d, out.ack=%d, in.chunk=%d, out.chunk=%d", pkt->extra_data, + in_ack_size.window, out_ack_size.window, in_chunk_size, out_chunk_size); } if (pkt->event_type == SrcPCUCPingRequest) { if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) { @@ -1563,11 +1560,13 @@ int SrsProtocol::on_send_packet(SrsMessageHeader* mh, SrsPacket* packet) switch (mh->message_type) { case RTMP_MSG_SetChunkSize: { SrsSetChunkSizePacket* pkt = dynamic_cast(packet); - srs_assert(pkt != NULL); - out_chunk_size = pkt->chunk_size; - - srs_trace("out chunk size to %d", pkt->chunk_size); + srs_info("out.chunk=%d", pkt->chunk_size); + break; + } + case RTMP_MSG_WindowAcknowledgementSize: { + SrsSetWindowAckSizePacket* pkt = dynamic_cast(packet); + out_ack_size.window = (uint32_t)pkt->ackowledgement_window_size; break; } case RTMP_MSG_AMF0CommandMessage: @@ -1606,9 +1605,27 @@ int SrsProtocol::response_acknowledgement_message() { int ret = ERROR_SUCCESS; + if (in_ack_size.window <= 0) { + return ret; + } + + // ignore when delta bytes not exceed half of window(ack size). + uint32_t delta = (uint32_t)(skt->get_recv_bytes() - in_ack_size.nb_recv_bytes); + if (delta < in_ack_size.window / 2) { + return ret; + } + in_ack_size.nb_recv_bytes = skt->get_recv_bytes(); + + // when the sequence number overflow, reset it. + uint32_t sequence_number = in_ack_size.sequence_number + delta; + if (sequence_number > 0xf0000000) { + sequence_number = delta; + } + in_ack_size.sequence_number = sequence_number; + SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket(); - in_ack_size.acked_size = skt->get_recv_bytes(); - pkt->sequence_number = (int32_t)in_ack_size.acked_size; + pkt->sequence_number = sequence_number; + srs_warn("ack sequence=%#x", sequence_number); // cache the message and use flush to send. if (!auto_response_when_recv) { @@ -5017,7 +5034,7 @@ SrsAcknowledgementPacket::~SrsAcknowledgementPacket() { } -int SrsAcknowledgementPacket::decode(SrsBuffer* stream) +int SrsAcknowledgementPacket::decode(SrsStream* stream) { int ret = ERROR_SUCCESS; @@ -5027,7 +5044,7 @@ int SrsAcknowledgementPacket::decode(SrsBuffer* stream) return ret; } - sequence_number = stream->read_4bytes(); + sequence_number = (uint32_t)stream->read_4bytes(); srs_info("decode acknowledgement success"); return ret; diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index 02915c77a..d0b8c81c6 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -190,8 +190,11 @@ private: class AckWindowSize { public: - int ack_window_size; - int64_t acked_size; + uint32_t window; + // number of received bytes. + int64_t nb_recv_bytes; + // previous responsed sequence number. + uint32_t sequence_number; AckWindowSize(); }; @@ -227,10 +230,11 @@ private: * input chunk size, default to 128, set by peer packet. */ int32_t in_chunk_size; - /** - * input ack size, when to send the acked packet. - */ + // The input ack window, to response acknowledge to peer, + // for example, to respose the encoder, for server got lots of packets. AckWindowSize in_ack_size; + // The output ack window, to require peer to response the ack. + AckWindowSize out_ack_size; /** * whether auto response when recv messages. * default to true for it's very easy to use the protocol stack. @@ -1843,13 +1847,13 @@ protected: class SrsAcknowledgementPacket : public SrsPacket { public: - int32_t sequence_number; + uint32_t sequence_number; public: SrsAcknowledgementPacket(); virtual ~SrsAcknowledgementPacket(); // decode functions for concrete packet to override. public: - virtual int decode(SrsBuffer* stream); + virtual int decode(SrsStream* stream); // encode functions for concrete packet to override. public: virtual int get_prefer_cid();