Merge pull request from joestarzxh/master

[opt] rtmp: 对ping request回应ping response(包含ClientSession和ServerSession)
pull/130/head
yoko committed by GitHub
commit faa19973d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,8 +24,10 @@ const (
RtmpTypeIdAggregateMessage uint8 = 22 RtmpTypeIdAggregateMessage uint8 = 22
// user control message type // user control message type
RtmpUserControlStreamBegin uint8 = 0 RtmpUserControlStreamBegin uint8 = 0
RtmpUserControlRecorded uint8 = 4 RtmpUserControlRecorded uint8 = 4
RtmpUserControlPingRequest uint8 = 6
RtmpUserControlPingResponse uint8 = 7
// spec-video_file_format_spec_v10.pdf // spec-video_file_format_spec_v10.pdf
// Video tags // Video tags

@ -367,6 +367,7 @@ func (s *ClientSession) doMsg(stream *Stream) error {
nazalog.Warnf("[%s] read user control message, ignore. buf=%s", nazalog.Warnf("[%s] read user control message, ignore. buf=%s",
s.uniqueKey, hex.Dump(stream.msg.buff.Peek(32))) s.uniqueKey, hex.Dump(stream.msg.buff.Peek(32)))
} }
s.doUserControl(stream)
case base.RtmpTypeIdAudio: case base.RtmpTypeIdAudio:
fallthrough fallthrough
case base.RtmpTypeIdVideo: case base.RtmpTypeIdVideo:
@ -383,6 +384,15 @@ func (s *ClientSession) doAck(stream *Stream) error {
nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum) nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum)
return nil return nil
} }
func (s *ClientSession) doUserControl(stream *Stream) error {
userControlType := bele.BeUint16(stream.msg.buff.Bytes())
if userControlType == uint16(base.RtmpUserControlPingRequest) {
stream.msg.buff.Skip(2)
timeStamp := bele.BeUint32(stream.msg.buff.Bytes())
s.packer.writePingResponse(s.conn, timeStamp)
}
return nil
}
func (s *ClientSession) doDataMessageAmf0(stream *Stream) error { func (s *ClientSession) doDataMessageAmf0(stream *Stream) error {
val, err := stream.msg.peekStringWithType() val, err := stream.msg.peekStringWithType()

@ -253,6 +253,26 @@ func (packer *MessagePacker) writeStreamBegin(writer io.Writer, streamid uint32)
return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0)
} }
func (packer *MessagePacker) writePingRequest(writer io.Writer, timeStamp uint32) error {
packer.b.ModWritePos(12)
// 6
_ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlPingRequest))
_ = bele.WriteBe(packer.b, uint32(timeStamp))
return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0)
}
func (packer *MessagePacker) writePingResponse(writer io.Writer, timeStamp uint32) error {
packer.b.ModWritePos(12)
// 6
_ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlPingResponse))
_ = bele.WriteBe(packer.b, uint32(timeStamp))
return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0)
}
// --------------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------------
// TODO(chef): 整理所有的buffer // TODO(chef): 整理所有的buffer

@ -219,6 +219,8 @@ func (s *ServerSession) doMsg(stream *Stream) error {
return s.doDataMessageAmf0(stream) return s.doDataMessageAmf0(stream)
case base.RtmpTypeIdAck: case base.RtmpTypeIdAck:
return s.doAck(stream) return s.doAck(stream)
case base.RtmpTypeIdUserControl:
s.doUserControl(stream)
case base.RtmpTypeIdAudio: case base.RtmpTypeIdAudio:
fallthrough fallthrough
case base.RtmpTypeIdVideo: case base.RtmpTypeIdVideo:
@ -238,7 +240,15 @@ func (s *ServerSession) doAck(stream *Stream) error {
nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum) nazalog.Infof("[%s] < R Acknowledgement. ignore. sequence number=%d.", s.uniqueKey, seqNum)
return nil return nil
} }
func (s *ServerSession) doUserControl(stream *Stream) error {
userControlType := bele.BeUint16(stream.msg.buff.Bytes())
if userControlType == uint16(base.RtmpUserControlPingRequest) {
stream.msg.buff.Skip(2)
timeStamp := bele.BeUint32(stream.msg.buff.Bytes())
s.packer.writePingResponse(s.conn, timeStamp)
}
return nil
}
func (s *ServerSession) doDataMessageAmf0(stream *Stream) error { func (s *ServerSession) doDataMessageAmf0(stream *Stream) error {
if s.t != ServerSessionTypePub { if s.t != ServerSessionTypePub {
return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg) return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)

Loading…
Cancel
Save