|
|
|
@ -210,8 +210,8 @@ int SrsPacket::encode_packet(SrsStream* stream)
|
|
|
|
|
|
|
|
|
|
SrsProtocol::AckWindowSize::AckWindowSize()
|
|
|
|
|
{
|
|
|
|
|
ack_window_size = 0;
|
|
|
|
|
acked_size = 0;
|
|
|
|
|
window = 0;
|
|
|
|
|
sequence_number = nb_recv_bytes = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
|
|
|
|
@ -1457,13 +1457,9 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
|
|
|
|
|
|
|
|
|
|
srs_assert(msg != NULL);
|
|
|
|
|
|
|
|
|
|
// acknowledgement
|
|
|
|
|
if (in_ack_size.ack_window_size > 0
|
|
|
|
|
&& skt->get_recv_bytes() - in_ack_size.acked_size > in_ack_size.ack_window_size
|
|
|
|
|
) {
|
|
|
|
|
if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
// try to response acknowledgement
|
|
|
|
|
if ((ret = response_acknowledgement_message()) != ERROR_SUCCESS) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SrsPacket* packet = NULL;
|
|
|
|
@ -1492,7 +1488,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
|
|
|
|
|
srs_assert(pkt != NULL);
|
|
|
|
|
|
|
|
|
|
if (pkt->ackowledgement_window_size > 0) {
|
|
|
|
|
in_ack_size.ack_window_size = pkt->ackowledgement_window_size;
|
|
|
|
|
in_ack_size.window = (uint32_t)pkt->ackowledgement_window_size;
|
|
|
|
|
// @remark, we ignore this message, for user noneed to care.
|
|
|
|
|
// but it's important for dev, for client/server will block if required
|
|
|
|
|
// ack msg not arrived.
|
|
|
|
@ -1526,7 +1522,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
in_chunk_size = pkt->chunk_size;
|
|
|
|
|
srs_trace("input chunk size to %d", pkt->chunk_size);
|
|
|
|
|
srs_info("in.chunk=%d", pkt->chunk_size);
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -1535,7 +1531,8 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg)
|
|
|
|
|
srs_assert(pkt != NULL);
|
|
|
|
|
|
|
|
|
|
if (pkt->event_type == SrcPCUCSetBufferLength) {
|
|
|
|
|
srs_trace("ignored. set buffer length to %d", pkt->extra_data);
|
|
|
|
|
srs_trace("buffer=%d, in.ack=%d, out.ack=%d, in.chunk=%d, out.chunk=%d", pkt->extra_data,
|
|
|
|
|
in_ack_size.window, out_ack_size.window, in_chunk_size, out_chunk_size);
|
|
|
|
|
}
|
|
|
|
|
if (pkt->event_type == SrcPCUCPingRequest) {
|
|
|
|
|
if ((ret = response_ping_message(pkt->event_data)) != ERROR_SUCCESS) {
|
|
|
|
@ -1563,11 +1560,13 @@ int SrsProtocol::on_send_packet(SrsMessageHeader* mh, SrsPacket* packet)
|
|
|
|
|
switch (mh->message_type) {
|
|
|
|
|
case RTMP_MSG_SetChunkSize: {
|
|
|
|
|
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet);
|
|
|
|
|
srs_assert(pkt != NULL);
|
|
|
|
|
|
|
|
|
|
out_chunk_size = pkt->chunk_size;
|
|
|
|
|
|
|
|
|
|
srs_trace("out chunk size to %d", pkt->chunk_size);
|
|
|
|
|
srs_info("out.chunk=%d", pkt->chunk_size);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case RTMP_MSG_WindowAcknowledgementSize: {
|
|
|
|
|
SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(packet);
|
|
|
|
|
out_ack_size.window = (uint32_t)pkt->ackowledgement_window_size;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case RTMP_MSG_AMF0CommandMessage:
|
|
|
|
@ -1606,9 +1605,27 @@ int SrsProtocol::response_acknowledgement_message()
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
if (in_ack_size.window <= 0) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ignore when delta bytes not exceed half of window(ack size).
|
|
|
|
|
uint32_t delta = (uint32_t)(skt->get_recv_bytes() - in_ack_size.nb_recv_bytes);
|
|
|
|
|
if (delta < in_ack_size.window / 2) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
in_ack_size.nb_recv_bytes = skt->get_recv_bytes();
|
|
|
|
|
|
|
|
|
|
// when the sequence number overflow, reset it.
|
|
|
|
|
uint32_t sequence_number = in_ack_size.sequence_number + delta;
|
|
|
|
|
if (sequence_number > 0xf0000000) {
|
|
|
|
|
sequence_number = delta;
|
|
|
|
|
}
|
|
|
|
|
in_ack_size.sequence_number = sequence_number;
|
|
|
|
|
|
|
|
|
|
SrsAcknowledgementPacket* pkt = new SrsAcknowledgementPacket();
|
|
|
|
|
in_ack_size.acked_size = skt->get_recv_bytes();
|
|
|
|
|
pkt->sequence_number = (int32_t)in_ack_size.acked_size;
|
|
|
|
|
pkt->sequence_number = sequence_number;
|
|
|
|
|
srs_warn("ack sequence=%#x", sequence_number);
|
|
|
|
|
|
|
|
|
|
// cache the message and use flush to send.
|
|
|
|
|
if (!auto_response_when_recv) {
|
|
|
|
@ -5017,7 +5034,7 @@ SrsAcknowledgementPacket::~SrsAcknowledgementPacket()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int SrsAcknowledgementPacket::decode(SrsBuffer* stream)
|
|
|
|
|
int SrsAcknowledgementPacket::decode(SrsStream* stream)
|
|
|
|
|
{
|
|
|
|
|
int ret = ERROR_SUCCESS;
|
|
|
|
|
|
|
|
|
@ -5027,7 +5044,7 @@ int SrsAcknowledgementPacket::decode(SrsBuffer* stream)
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sequence_number = stream->read_4bytes();
|
|
|
|
|
sequence_number = (uint32_t)stream->read_4bytes();
|
|
|
|
|
srs_info("decode acknowledgement success");
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|