From db2950ac6961f12a8aa76d42d839951b16e2abd5 Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Fri, 31 Dec 2021 10:49:15 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[fix]=20rtmp=20=E8=A7=A3=E5=86=B3=E6=9C=AA?= =?UTF-8?q?=E5=A4=84=E7=90=86PingRequest=20=E5=9B=9E=E5=BA=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rtmp/client_session.go | 10 ++++++++++ pkg/rtmp/message_packer.go | 20 ++++++++++++++++++++ pkg/rtmp/server_session.go | 12 +++++++++++- 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 35dac46..8e8d2eb 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -367,6 +367,7 @@ func (s *ClientSession) doMsg(stream *Stream) error { nazalog.Warnf("[%s] read user control message, ignore. buf=%s", s.uniqueKey, hex.Dump(stream.msg.buff.Peek(32))) } + s.doUserControl(stream) case base.RtmpTypeIdAudio: fallthrough case base.RtmpTypeIdVideo: @@ -383,6 +384,15 @@ func (s *ClientSession) doAck(stream *Stream) error { nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum) return nil } +func (s *ClientSession) doUserControl(stream *Stream) error { + userControlType := bele.BeUint16(stream.msg.buff.Bytes()) + if userControlType == uint16(base.RtmpUserControlPingRequest) { + stream.msg.buff.Skip(2) + timeStamp := bele.BeUint32(stream.msg.buff.Bytes()) + s.packer.writePingResponse(s.conn, timeStamp) + } + return nil +} func (s *ClientSession) doDataMessageAmf0(stream *Stream) error { val, err := stream.msg.peekStringWithType() diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index 0fb6b55..a54adee 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -253,6 +253,26 @@ func (packer *MessagePacker) writeStreamBegin(writer io.Writer, streamid uint32) return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) } +func (packer *MessagePacker) writePingRequest(writer io.Writer, timeStamp uint32) error { + packer.b.ModWritePos(12) + + // 6 + _ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlPingRequest)) + _ = bele.WriteBe(packer.b, uint32(timeStamp)) + + return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) +} + +func (packer *MessagePacker) writePingResponse(writer io.Writer, timeStamp uint32) error { + packer.b.ModWritePos(12) + + // 6 + _ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlPingResponse)) + _ = bele.WriteBe(packer.b, uint32(timeStamp)) + + return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) +} + // --------------------------------------------------------------------------------------------------------------------- // TODO(chef): 整理所有的buffer diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index 146eec2..a4ad7b7 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -219,6 +219,8 @@ func (s *ServerSession) doMsg(stream *Stream) error { return s.doDataMessageAmf0(stream) case base.RtmpTypeIdAck: return s.doAck(stream) + case base.RtmpTypeIdUserControl: + s.doUserControl(stream) case base.RtmpTypeIdAudio: fallthrough case base.RtmpTypeIdVideo: @@ -238,7 +240,15 @@ func (s *ServerSession) doAck(stream *Stream) error { nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum) return nil } - +func (s *ServerSession) doUserControl(stream *Stream) error { + userControlType := bele.BeUint16(stream.msg.buff.Bytes()) + if userControlType == uint16(base.RtmpUserControlPingRequest) { + stream.msg.buff.Skip(2) + timeStamp := bele.BeUint32(stream.msg.buff.Bytes()) + s.packer.writePingResponse(s.conn, timeStamp) + } + return nil +} func (s *ServerSession) doDataMessageAmf0(stream *Stream) error { if s.t != ServerSessionTypePub { return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg) From ca4faccfcc61dc3368d5c1a51032d36695039210 Mon Sep 17 00:00:00 2001 From: joestarzxh Date: Fri, 31 Dec 2021 19:58:48 +0800 Subject: [PATCH 2/2] =?UTF-8?q?[fix]rtmp=5Ft=E4=BF=AE=E6=94=B9=E6=9C=AA?= =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/rtmp_t.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/base/rtmp_t.go b/pkg/base/rtmp_t.go index cfc389e..f756080 100644 --- a/pkg/base/rtmp_t.go +++ b/pkg/base/rtmp_t.go @@ -24,8 +24,10 @@ const ( RtmpTypeIdAggregateMessage uint8 = 22 // user control message type - RtmpUserControlStreamBegin uint8 = 0 - RtmpUserControlRecorded uint8 = 4 + RtmpUserControlStreamBegin uint8 = 0 + RtmpUserControlRecorded uint8 = 4 + RtmpUserControlPingRequest uint8 = 6 + RtmpUserControlPingResponse uint8 = 7 // spec-video_file_format_spec_v10.pdf // Video tags