diff --git a/trunk/src/core/srs_core_log.hpp b/trunk/src/core/srs_core_log.hpp index 8c91ca94b..d2627f6cf 100755 --- a/trunk/src/core/srs_core_log.hpp +++ b/trunk/src/core/srs_core_log.hpp @@ -78,7 +78,7 @@ extern ILogContext* log_context; #undef srs_verbose #define srs_verbose(msg, ...) (void)0 #endif -#if 0 +#if 1 #undef srs_info #define srs_info(msg, ...) (void)0 #endif diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 0b007b8ac..6a7c0289e 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -249,6 +249,10 @@ messages. /**************************************************************************** ***************************************************************************** ****************************************************************************/ +SrsProtocol::AckWindowSize::AckWindowSize() +{ + ack_window_size = acked_size = 0; +} SrsProtocol::SrsProtocol(st_netfd_t client_stfd) { @@ -450,6 +454,21 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) int ret = ERROR_SUCCESS; srs_assert(msg != NULL); + + // acknowledgement + if (skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size) { + SrsCommonMessage* ack = new SrsCommonMessage(); + SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket(); + + in_ack_size.acked_size = pkt->sequence_number = skt->get_recv_bytes(); + ack->set_packet(pkt, 0); + + if ((ret = send_message(ack)) != ERROR_SUCCESS) { + srs_error("send acknowledgement failed. ret=%d", ret); + return ret; + } + srs_verbose("send acknowledgement success."); + } switch (msg->header.message_type) { case RTMP_MSG_SetChunkSize: @@ -466,8 +485,13 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) case RTMP_MSG_WindowAcknowledgementSize: { SrsSetWindowAckSizePacket* pkt = dynamic_cast(msg->get_packet()); srs_assert(pkt != NULL); - // TODO: take effect. - srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); + + if (pkt->ackowledgement_window_size > 0) { + in_ack_size.ack_window_size = pkt->ackowledgement_window_size; + srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); + } else { + srs_warn("ignored. set ack window size is %d", pkt->ackowledgement_window_size); + } break; } case RTMP_MSG_SetChunkSize: { @@ -2222,6 +2246,48 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream) return ret; } +SrsAcknowledgementPacket::SrsAcknowledgementPacket() +{ + sequence_number = 0; +} + +SrsAcknowledgementPacket::~SrsAcknowledgementPacket() +{ +} + +int SrsAcknowledgementPacket::get_perfer_cid() +{ + return RTMP_CID_ProtocolControl; +} + +int SrsAcknowledgementPacket::get_message_type() +{ + return RTMP_MSG_Acknowledgement; +} + +int SrsAcknowledgementPacket::get_size() +{ + return 4; +} + +int SrsAcknowledgementPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(4)) { + ret = ERROR_RTMP_MESSAGE_ENCODE; + srs_error("encode acknowledgement packet failed. ret=%d", ret); + return ret; + } + + stream->write_4bytes(sequence_number); + + srs_verbose("encode acknowledgement packet " + "success. sequence_number=%d", sequence_number); + + return ret; +} + SrsSetChunkSizePacket::SrsSetChunkSizePacket() { chunk_size = RTMP_DEFAULT_CHUNK_SIZE; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index b526f0b31..755d9d3aa 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -75,6 +75,14 @@ class ISrsMessage; */ class SrsProtocol { +private: + struct AckWindowSize + { + int ack_window_size; + int64_t acked_size; + + AckWindowSize(); + }; // peer in/out private: st_netfd_t stfd; @@ -85,6 +93,7 @@ private: std::map chunk_streams; SrsBuffer* buffer; int32_t in_chunk_size; + AckWindowSize in_ack_size; // peer out private: char out_header_fmt0[RTMP_MAX_FMT0_HEADER_SIZE]; @@ -849,6 +858,34 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* 5.3. Acknowledgement (3) +* The client or the server sends the acknowledgment to the peer after +* receiving bytes equal to the window size. +*/ +class SrsAcknowledgementPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsAcknowledgementPacket); + } +public: + int32_t sequence_number; +public: + SrsAcknowledgementPacket(); + virtual ~SrsAcknowledgementPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + /** * 7.1. Set Chunk Size * Protocol control message 1, Set Chunk Size, is used to notify the diff --git a/trunk/src/core/srs_core_socket.cpp b/trunk/src/core/srs_core_socket.cpp index 41abcea6a..5f540baf0 100755 --- a/trunk/src/core/srs_core_socket.cpp +++ b/trunk/src/core/srs_core_socket.cpp @@ -28,8 +28,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. SrsSocket::SrsSocket(st_netfd_t client_stfd) { stfd = client_stfd; - recv_timeout = ST_UTIME_NO_TIMEOUT; - send_timeout = ST_UTIME_NO_TIMEOUT; + send_timeout = recv_timeout = ST_UTIME_NO_TIMEOUT; + recv_bytes = send_bytes = 0; } SrsSocket::~SrsSocket() @@ -46,6 +46,16 @@ void SrsSocket::set_send_timeout(int timeout_ms) send_timeout = timeout_ms * 1000; } +int64_t SrsSocket::get_recv_bytes() +{ + return recv_bytes; +} + +int64_t SrsSocket::get_send_bytes() +{ + return send_bytes; +} + int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; @@ -63,8 +73,10 @@ int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) errno = ECONNRESET; } - ret = ERROR_SOCKET_READ; + return ERROR_SOCKET_READ; } + + recv_bytes += *nread; return ret; } @@ -86,9 +98,11 @@ int SrsSocket::read_fully(const void* buf, size_t size, ssize_t* nread) errno = ECONNRESET; } - ret = ERROR_SOCKET_READ_FULLY; + return ERROR_SOCKET_READ_FULLY; } + recv_bytes += *nread; + return ret; } @@ -103,8 +117,10 @@ int SrsSocket::write(const void* buf, size_t size, ssize_t* nwrite) return ERROR_SOCKET_TIMEOUT; } - ret = ERROR_SOCKET_WRITE; + return ERROR_SOCKET_WRITE; } + + send_bytes += *nwrite; return ret; } @@ -120,9 +136,11 @@ int SrsSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) return ERROR_SOCKET_TIMEOUT; } - ret = ERROR_SOCKET_WRITE; + return ERROR_SOCKET_WRITE; } + send_bytes += *nwrite; + return ret; } diff --git a/trunk/src/core/srs_core_socket.hpp b/trunk/src/core/srs_core_socket.hpp index e680aac17..320487a03 100755 --- a/trunk/src/core/srs_core_socket.hpp +++ b/trunk/src/core/srs_core_socket.hpp @@ -41,6 +41,8 @@ class SrsSocket private: int64_t recv_timeout; int64_t send_timeout; + int64_t recv_bytes; + int64_t send_bytes; st_netfd_t stfd; public: SrsSocket(st_netfd_t client_stfd); @@ -48,6 +50,9 @@ public: public: virtual void set_recv_timeout(int timeout_ms); virtual void set_send_timeout(int timeout_ms); + virtual int64_t get_recv_bytes(); + virtual int64_t get_send_bytes(); +public: virtual int read(const void* buf, size_t size, ssize_t* nread); virtual int read_fully(const void* buf, size_t size, ssize_t* nread); virtual int write(const void* buf, size_t size, ssize_t* nwrite);