diff --git a/pkg/rtmp/message_packer.go b/pkg/rtmp/message_packer.go index fa44b5e..c2254b2 100644 --- a/pkg/rtmp/message_packer.go +++ b/pkg/rtmp/message_packer.go @@ -8,15 +8,12 @@ package rtmp -// message_packer.go -// @pure -// 打包并发送 rtmp 信令 - import ( - "bytes" "fmt" "io" + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/bele" @@ -28,37 +25,67 @@ const ( peerBandwidthLimitTypeDynamic = uint8(2) ) +// 打包并发送 rtmp 信令 +// type MessagePacker struct { - // 1. 增加一层缓冲,避免 write 一个信令时发生多次系统调用 - // 2. 因为 bytes.Buffer.Write 返回的 error 永远为 nil,所以本文件中所有对 b 的写操作都不判断返回值 - b *bytes.Buffer + b *Buffer } func NewMessagePacker() *MessagePacker { return &MessagePacker{ - b: &bytes.Buffer{}, + b: NewBuffer(256), } } -func (packer *MessagePacker) writeMessageHeader(csid int, bodyLen int, typeid uint8, streamid int) { +// 注意,这个函数只会打包一个chunk头,所以调用方应自己保证在`bodyLen`小于chunk size时使用 +// +func writeSingleChunkHeader(out []byte, csid int, bodyLen int, typeid uint8, streamid int) { // 目前这个函数只供发送信令时调用,信令的 csid 都是小于等于 63 的,如果传入的 csid 大于 63,直接 panic if csid > 63 { panic(csid) } - fmt := 0 + format := 0 + out[0] = uint8(format<<6 | csid) // 0 0 0 是时间戳 - _, _ = packer.b.Write([]byte{uint8(fmt<<6 | csid), 0, 0, 0}) - _ = bele.WriteBeUint24(packer.b, uint32(bodyLen)) - _ = packer.b.WriteByte(typeid) - _ = bele.WriteLe(packer.b, uint32(streamid)) + out[1] = 0 + out[2] = 0 + out[3] = 0 + bele.BePutUint24(out[4:], uint32(bodyLen)) + out[7] = typeid + bele.LePutUint32(out[8:], uint32(streamid)) +} + +func (packer *MessagePacker) ChunkAndWrite(writer io.Writer, csid int, typeid uint8, streamid int) error { + bodyLen := packer.b.Len() - 12 + + if bodyLen <= LocalChunkSize { + // 如果一个chunk就够放(大部分信令都是这种情况),我们直接在buffer前面预留的空间写入chunk header内容,避免造成拷贝 + writeSingleChunkHeader(packer.b.Bytes(), csid, bodyLen, typeid, streamid) + _, err := packer.b.WriteTo(writer) + return err + } + + var h base.RtmpHeader + h.Csid = csid + h.MsgLen = uint32(bodyLen) + h.MsgTypeId = typeid + h.MsgStreamId = streamid + h.TimestampAbs = 0 + chunks := Message2Chunks(packer.b.Bytes()[12:], &h) + nazalog.Debugf("CHEFERASEME %d %d", packer.b.Len(), len(chunks)) + packer.b.Reset() + _, err := writer.Write(chunks) + return err } func (packer *MessagePacker) writeProtocolControlMessage(writer io.Writer, typeid uint8, val int) error { - packer.writeMessageHeader(csidProtocolControl, 4, typeid, 0) + packer.b.ModWritePos(12) + + // 4 _ = bele.WriteBe(packer.b, uint32(val)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidProtocolControl, typeid, 0) } func (packer *MessagePacker) writeChunkSize(writer io.Writer, val int) error { @@ -70,16 +97,19 @@ func (packer *MessagePacker) writeWinAckSize(writer io.Writer, val int) error { } func (packer *MessagePacker) writePeerBandwidth(writer io.Writer, val int, limitType uint8) error { - packer.writeMessageHeader(csidProtocolControl, 5, base.RtmpTypeIdBandwidth, 0) + packer.b.ModWritePos(12) + + // 5 _ = bele.WriteBe(packer.b, uint32(val)) _ = packer.b.WriteByte(limitType) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdBandwidth, 0) } // @param isPush: 推流为true,拉流为false func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcUrl string, isPush bool) error { - packer.writeMessageHeader(csidOverConnection, 0, base.RtmpTypeIdCommandMessageAmf0, 0) + packer.b.ModWritePos(12) + _ = Amf0.WriteString(packer.b, "connect") _ = Amf0.WriteNumber(packer.b, float64(tidClientConnect)) @@ -97,15 +127,14 @@ func (packer *MessagePacker) writeConnect(writer io.Writer, appName, tcUrl strin objs = append(objs, ObjectPair{Key: "fpad", Value: false}) objs = append(objs, ObjectPair{Key: "tcUrl", Value: tcUrl}) _ = Amf0.WriteObject(packer.b, objs) - raw := packer.b.Bytes() - bele.BePutUint24(raw[4:], uint32(len(raw)-12)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverConnection, base.RtmpTypeIdCommandMessageAmf0, 0) } // @param objectEncoding 设置0或者3,表示是Amf0或AMF3,上层可根据connect信令中的objectEncoding值设置该值 func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int, objectEncoding int) error { - packer.writeMessageHeader(csidOverConnection, 0, base.RtmpTypeIdCommandMessageAmf0, 0) + packer.b.ModWritePos(12) + _ = Amf0.WriteString(packer.b, "_result") _ = Amf0.WriteNumber(packer.b, float64(tid)) objs := []ObjectPair{ @@ -121,61 +150,60 @@ func (packer *MessagePacker) writeConnectResult(writer io.Writer, tid int, objec {Key: "version", Value: base.LalRtmpConnectResultVersion}, } _ = Amf0.WriteObject(packer.b, objs) - raw := packer.b.Bytes() - bele.BePutUint24(raw[4:], uint32(len(raw)-12)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverConnection, base.RtmpTypeIdCommandMessageAmf0, 0) } func (packer *MessagePacker) writeCreateStream(writer io.Writer) error { + packer.b.ModWritePos(12) + // 25 = 15 + 9 + 1 - packer.writeMessageHeader(csidOverConnection, 25, base.RtmpTypeIdCommandMessageAmf0, 0) _ = Amf0.WriteString(packer.b, "createStream") _ = Amf0.WriteNumber(packer.b, float64(tidClientCreateStream)) _ = Amf0.WriteNull(packer.b) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverConnection, base.RtmpTypeIdCommandMessageAmf0, 0) } func (packer *MessagePacker) writeCreateStreamResult(writer io.Writer, tid int) error { - packer.writeMessageHeader(csidOverConnection, 29, base.RtmpTypeIdCommandMessageAmf0, 0) + packer.b.ModWritePos(12) + + // 29 _ = Amf0.WriteString(packer.b, "_result") _ = Amf0.WriteNumber(packer.b, float64(tid)) _ = Amf0.WriteNull(packer.b) _ = Amf0.WriteNumber(packer.b, float64(Msid1)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverConnection, base.RtmpTypeIdCommandMessageAmf0, 0) } func (packer *MessagePacker) writePlay(writer io.Writer, streamName string, streamid int) error { - packer.writeMessageHeader(csidOverStream, 0, base.RtmpTypeIdCommandMessageAmf0, streamid) + packer.b.ModWritePos(12) + _ = Amf0.WriteString(packer.b, "play") _ = Amf0.WriteNumber(packer.b, float64(tidClientPlay)) _ = Amf0.WriteNull(packer.b) _ = Amf0.WriteString(packer.b, streamName) - raw := packer.b.Bytes() - bele.BePutUint24(raw[4:], uint32(len(raw)-12)) - _, err := packer.b.WriteTo(writer) - return err + return packer.ChunkAndWrite(writer, csidOverStream, base.RtmpTypeIdCommandMessageAmf0, streamid) } func (packer *MessagePacker) writePublish(writer io.Writer, appName string, streamName string, streamid int) error { - packer.writeMessageHeader(csidOverStream, 0, base.RtmpTypeIdCommandMessageAmf0, streamid) + packer.b.ModWritePos(12) + _ = Amf0.WriteString(packer.b, "publish") _ = Amf0.WriteNumber(packer.b, float64(tidClientPublish)) _ = Amf0.WriteNull(packer.b) _ = Amf0.WriteString(packer.b, streamName) _ = Amf0.WriteString(packer.b, appName) - raw := packer.b.Bytes() - bele.BePutUint24(raw[4:], uint32(len(raw)-12)) - _, err := packer.b.WriteTo(writer) - return err + return packer.ChunkAndWrite(writer, csidOverStream, base.RtmpTypeIdCommandMessageAmf0, streamid) } func (packer *MessagePacker) writeOnStatusPublish(writer io.Writer, streamid int) error { - packer.writeMessageHeader(csidOverStream, 105, base.RtmpTypeIdCommandMessageAmf0, streamid) + packer.b.ModWritePos(12) + + // 105 _ = Amf0.WriteString(packer.b, "onStatus") _ = Amf0.WriteNumber(packer.b, 0) _ = Amf0.WriteNull(packer.b) @@ -185,12 +213,14 @@ func (packer *MessagePacker) writeOnStatusPublish(writer io.Writer, streamid int {Key: "description", Value: "Start publishing"}, } _ = Amf0.WriteObject(packer.b, objs) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverStream, base.RtmpTypeIdCommandMessageAmf0, streamid) } func (packer *MessagePacker) writeOnStatusPlay(writer io.Writer, streamid int) error { - packer.writeMessageHeader(csidOverStream, 96, base.RtmpTypeIdCommandMessageAmf0, streamid) + packer.b.ModWritePos(12) + + // 96 _ = Amf0.WriteString(packer.b, "onStatus") _ = Amf0.WriteNumber(packer.b, 0) _ = Amf0.WriteNull(packer.b) @@ -200,22 +230,116 @@ func (packer *MessagePacker) writeOnStatusPlay(writer io.Writer, streamid int) e {Key: "description", Value: "Start live"}, } _ = Amf0.WriteObject(packer.b, objs) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidOverStream, base.RtmpTypeIdCommandMessageAmf0, streamid) } func (packer *MessagePacker) writeStreamIsRecorded(writer io.Writer, streamid uint32) error { - packer.writeMessageHeader(csidProtocolControl, 6, base.RtmpTypeIdUserControl, 0) + packer.b.ModWritePos(12) + + // 6 _ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlRecorded)) _ = bele.WriteBe(packer.b, uint32(streamid)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) } func (packer *MessagePacker) writeStreamBegin(writer io.Writer, streamid uint32) error { - packer.writeMessageHeader(csidProtocolControl, 6, base.RtmpTypeIdUserControl, 0) + packer.b.ModWritePos(12) + + // 6 _ = bele.WriteBe(packer.b, uint16(base.RtmpUserControlStreamBegin)) _ = bele.WriteBe(packer.b, uint32(streamid)) - _, err := packer.b.WriteTo(writer) - return err + + return packer.ChunkAndWrite(writer, csidProtocolControl, base.RtmpTypeIdUserControl, 0) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// TODO(chef): 整理所有的buffer + +type Buffer struct { + core []byte + readPos int + writePos int +} + +func NewBuffer(n int) *Buffer { + return &Buffer{ + core: make([]byte, n), + readPos: 0, + writePos: 0, + } +} + +func (b *Buffer) Bytes() []byte { + return b.core[b.readPos:b.writePos] +} + +func (b *Buffer) Len() int { + return b.writePos - b.readPos +} + +func (b *Buffer) Reset() { + b.readPos = 0 + b.writePos = 0 +} + +func (b *Buffer) Write(p []byte) (n int, err error) { + b.grow(len(p)) + copy(b.core[b.writePos:], p) + b.writePos += len(p) + + return len(p), nil +} + +func (b *Buffer) WriteByte(c byte) error { + b.grow(1) + b.core[b.writePos] = c + b.writePos++ + return nil +} + +func (b *Buffer) WriteTo(w io.Writer) (n int64, err error) { + if nBytes := b.Len(); nBytes > 0 { + m, e := w.Write(b.Bytes()) + if m > nBytes { + nazalog.Panicf("Buffer.WriteTo: invalid Write count. expected=%d, actual=%d", nBytes, m) + } + b.readPos += m + n = int64(m) + if e != nil { + return n, e + } + if m != nBytes { + return n, io.ErrShortWrite + } + } + b.Reset() + return n, nil +} + +func (b *Buffer) ModWritePos(pos int) { + b.writePos = pos +} + +func (b *Buffer) grow(n int) { + if cap(b.core)-b.writePos >= n { + return + } + + // TODO(chef): 可以先尝试是否能挪出空闲位置 + + var newLen int + if cap(b.core) == 0 { + newLen = 128 + } else { + newLen = cap(b.core) * 2 + } + buf := make([]byte, newLen) + nazalog.Debugf("Buffer::grow. need=%d, old len=%d, cap=%d, new len=%d", n, b.Len(), cap(b.core), newLen) + copy(buf, b.core[b.readPos:b.writePos]) + b.core = buf + b.readPos = 0 + b.writePos = b.writePos - b.readPos } diff --git a/pkg/rtmp/message_packer_test.go b/pkg/rtmp/message_packer_test.go index 463f470..3057e6c 100644 --- a/pkg/rtmp/message_packer_test.go +++ b/pkg/rtmp/message_packer_test.go @@ -16,12 +16,12 @@ import ( "github.com/q191201771/naza/pkg/fake" ) -func TestWriteMessageHandler(t *testing.T) { - //buf := &bytes.Buffer{} - packer := NewMessagePacker() - packer.writeMessageHeader(1, 2, 3, 4) - assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 2, 3, 4, 0, 0, 0}, packer.b.Bytes()) -} +//func TestWriteMessageHandler(t *testing.T) { +// //buf := &bytes.Buffer{} +// packer := NewMessagePacker() +// packer.writeMessageHeader(1, 2, 3, 4) +// assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 2, 3, 4, 0, 0, 0}, packer.b.Bytes()) +//} func TestWrite(t *testing.T) { var ( @@ -106,14 +106,14 @@ func TestWrite(t *testing.T) { } func TestPackCorner(t *testing.T) { - func() { - defer func() { - recover() - }() - packer := NewMessagePacker() - // 测试csid超过63的情况 - packer.writeMessageHeader(128, 0, 0, 0) - }() + //func() { + // defer func() { + // recover() + // }() + // packer := NewMessagePacker() + // // 测试csid超过63的情况 + // packer.writeMessageHeader(128, 0, 0, 0) + //}() var err error mw := fake.NewWriter(fake.WriterTypeReturnError) diff --git a/pkg/rtmp/var.go b/pkg/rtmp/var.go index 93bc751..767c8ff 100644 --- a/pkg/rtmp/var.go +++ b/pkg/rtmp/var.go @@ -17,7 +17,21 @@ var ( serverSessionReadAvTimeoutMs = 10000 // server pub session,读音视频数据超时 serverSessionWriteAvTimeoutMs = 10000 // server sub session,写音视频数据超时 - LocalChunkSize = 4096 // 本端设置的 chunk size + // 本端(包括Server Session和Client Session)设置的chunk size,本端发送数据时切割chunk包时使用 + // (对端发送数据时的chunk size由对端决定,和本变量没有关系) + // + // 注意,这个值不应该设置的太小,原因有两方面: + // 1. 性能与带宽 + // 切割的chunk包过多,会消耗更多的CPU资源(包括本地和远端),另外还可能增加传输时的chunk header带宽消耗 + // 2. 兼容性 + // 理论上,信令也要参考chunk size切割成chunk包,而对端使用chunk包合成message的实现不一定标准。 + // 我就遇到过这样的case,对端认为rtmp握手后的几个信令,每个信令都只使用一个chunk。 + // 假如我们将一条信令切割成多个chunk,对端可能就解析错误了,这属于对端实现的问题。 + // 但为了更好的兼容性,我们不要将chunk size设置的太小。 + // 值得一提,lal作为接收端时,对端可以任意设置chunk size。 + // + LocalChunkSize = 4096 + windowAcknowledgementSize = 5000000 peerBandwidth = 5000000 )