[refactor] 把base.Buffer移到naza库中

pull/114/head
q191201771 3 years ago
parent d5c7047295
commit 875569edb3

@ -1,269 +0,0 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"fmt"
"github.com/q191201771/naza/pkg/nazalog"
"io"
)
// TODO(chef): refactor 移入naza中
// TODO(chef): 增加options: growRoundThreshold; 是否做检查
// TODO(chef): 扩容策略函数可由外部传入
const growRoundThreshold = 1048576 // 1MB
// Buffer 先进先出可扩容流式buffer可直接读写内部切片避免拷贝
//
// 示例
// 读取方式1
// buf := Bytes()
// ... // 读取buf的内容
// Skip(n)
//
// 读取方式2
// buf := Peek(n)
// ...
//
// 读取方式3
// buf := make([]byte, n)
// nn, err := Read(buf)
//
// 写入方式1
// Grow(n)
// buf := WritableBytes()[:n]
// ... // 向buf中写入内容
// Flush(n)
//
// 写入方式2
// buf := ReserveBytes(n)
// ... // 向buf中写入内容
// Flush(n)
//
// 写入方式3
// n, err := Write(buf)
//
type Buffer struct {
core []byte
rpos int
wpos int
}
func NewBuffer(initCap int) *Buffer {
return &Buffer{
core: make([]byte, initCap, initCap),
}
}
// NewBufferRefBytes
//
// 注意,不拷贝参数`b`的内存块,仅持有
//
func NewBufferRefBytes(b []byte) *Buffer {
return &Buffer{
core: b,
}
}
// ---------------------------------------------------------------------------------------------------------------------
// Bytes Buffer中所有未读数据类似于PeekAll不拷贝
//
func (b *Buffer) Bytes() []byte {
if b.rpos == b.wpos {
return nil
}
return b.core[b.rpos:b.wpos]
}
// Peek 查看指定长度的未读数据不拷贝类似于Next但是不会修改读取偏移位置
//
func (b *Buffer) Peek(n int) []byte {
if b.rpos == b.wpos {
return nil
}
if b.Len() < n {
return b.Bytes()
}
return b.core[b.rpos : b.rpos+n]
}
// Skip 将前`n`未读数据标记为已读(也即消费完成)
//
func (b *Buffer) Skip(n int) {
if n > b.wpos-b.rpos {
nazalog.Warnf("[%p] Buffer::Skip too large. n=%d, %s", b, n, b.DebugString())
b.Reset()
return
}
b.rpos += n
b.resetIfEmpty()
}
// ---------------------------------------------------------------------------------------------------------------------
// Grow 确保Buffer中至少有`n`大小的空间可写类似于Reserve
//
func (b *Buffer) Grow(n int) {
//nazalog.Debugf("[%p] > Buffer::Grow. n=%d, %s", b, n, b.DebugString())
tail := len(b.core) - b.wpos
if tail >= n {
// 尾部空闲空间足够
return
}
if b.rpos+tail >= n {
// 头部加上尾部空闲空间足够,将可读数据移动到头部,回收头部空闲空间
nazalog.Debugf("[%p] Buffer::Grow. move, n=%d, copy=%d", b, n, b.Len())
copy(b.core, b.core[b.rpos:b.wpos])
b.wpos -= b.rpos
b.rpos = 0
return
}
// 扩容后总共需要的大小
needed := b.Len() + n
// 扩容大小在阈值范围内时向上取值到2的倍数
if needed < growRoundThreshold {
needed = roundUpPowerOfTwo(needed)
}
nazalog.Debugf("[%p] Buffer::Grow. realloc, n=%d, copy=%d, cap=(%d, %d)", b, n, b.Len(), b.Cap(), needed)
core := make([]byte, needed, needed)
copy(core, b.core[b.rpos:b.wpos])
b.core = core
b.rpos = 0
b.wpos -= b.rpos
}
// WritableBytes 返回当前可写入的字节切片
//
func (b *Buffer) WritableBytes() []byte {
if len(b.core) == b.wpos {
return nil
}
return b.core[b.wpos:]
}
// ReserveBytes 返回可写入`n`大小的字节切片,如果空闲空间不够,内部会进行扩容
//
// 注意,返回值空间大小只会为`n`
//
func (b *Buffer) ReserveBytes(n int) []byte {
b.Grow(n)
return b.WritableBytes()[:n]
}
// Flush 写入完成,更新写入位置
//
func (b *Buffer) Flush(n int) {
if len(b.core)-b.wpos < n {
nazalog.Warnf("[%p] Buffer::Flush too large. n=%d, %s", b, n, b.DebugString())
b.wpos = len(b.core)
return
}
b.wpos += n
}
// ----- implement io.Reader interface ---------------------------------------------------------------------------------
// Read 拷贝,`p`空间由外部申请
//
func (b *Buffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if b.Len() == 0 {
return 0, io.EOF
}
n = copy(p, b.core[b.rpos:b.wpos])
b.Skip(n)
return n, nil
}
// ----- implement io.Writer interface ---------------------------------------------------------------------------------
// Write 拷贝
//
func (b *Buffer) Write(p []byte) (n int, err error) {
b.Grow(len(p))
copy(b.core[b.wpos:], p)
b.wpos += n
return len(p), nil
}
// ---------------------------------------------------------------------------------------------------------------------
// Truncate 丢弃可读数据的末尾`n`大小的数据,或者理解为取消写
//
func (b *Buffer) Truncate(n int) {
if b.Len() < n {
nazalog.Warnf("[%p] Buffer::Truncate too large. n=%d, %s", b, n, b.DebugString())
b.Reset()
return
}
b.wpos -= n
b.resetIfEmpty()
}
// Reset 重置
//
// 注意,并不会释放内存块
//
func (b *Buffer) Reset() {
b.rpos = 0
b.wpos = 0
}
// ---------------------------------------------------------------------------------------------------------------------
// Len Buffer中还没有读的数据的长度
//
func (b *Buffer) Len() int {
return b.wpos - b.rpos
}
// Cap 整个Buffer占用的空间
//
func (b *Buffer) Cap() int {
return cap(b.core)
}
// ---------------------------------------------------------------------------------------------------------------------
func (b *Buffer) DebugString() string {
return fmt.Sprintf("len(core)=%d, rpos=%d, wpos=%d", len(b.core), b.rpos, b.wpos)
}
// ---------------------------------------------------------------------------------------------------------------------
func (b *Buffer) resetIfEmpty() {
if b.rpos == b.wpos {
b.Reset()
}
}
// TODO(chef): refactor 移入naza中
func roundUpPowerOfTwo(n int) int {
if n <= 2 {
return 2
}
n--
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
n |= n >> 32
n++
return n
}

@ -1,92 +0,0 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package base
import (
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazalog"
"testing"
)
func TestBuffer(t *testing.T) {
golden := []byte("1234567890")
b := NewBuffer(8)
assert.Equal(t, nil, b.Bytes())
assert.Equal(t, 8, len(b.WritableBytes()))
assert.Equal(t, 0, b.Len())
assert.Equal(t, 8, b.Cap())
// 简单写读
b.Grow(5)
buf := b.WritableBytes()[:5]
assert.Equal(t, nil, b.Bytes())
copy(buf, golden[:5])
b.Flush(5)
buf = b.Bytes()
assert.Equal(t, golden[:5], buf)
assert.Equal(t, 5, b.Len())
b.Skip(5)
assert.Equal(t, nil, b.Bytes())
assert.Equal(t, 8, len(b.WritableBytes()))
assert.Equal(t, 0, b.Len())
assert.Equal(t, 8, b.Cap())
// 发生扩容
buf = b.ReserveBytes(10)
copy(buf, golden)
b.Flush(10)
buf = b.Bytes()
assert.Equal(t, golden, buf)
b.Skip(10)
assert.Equal(t, nil, b.Bytes())
assert.Equal(t, 16, len(b.WritableBytes()))
assert.Equal(t, 0, b.Len())
assert.Equal(t, 16, b.Cap())
// 利用头部空闲空间扩容
buf = b.ReserveBytes(10)
copy(buf, golden)
b.Flush(10)
b.Skip(2)
buf = b.ReserveBytes(7)
copy(buf, golden[:7])
b.Flush(7)
nazalog.Debugf("%s", string(b.Bytes()))
assert.Equal(t, golden[2:], b.Bytes()[:8])
assert.Equal(t, golden[:7], b.Bytes()[8:])
assert.Equal(t, 15, b.Len())
assert.Equal(t, 16, b.Cap())
// Truncate
b.Reset()
buf = b.ReserveBytes(10)
copy(buf, golden)
b.Flush(10)
b.Truncate(4)
assert.Equal(t, golden[:6], b.Bytes())
// 特殊值
b.Reset()
b.Flush(b.Cap())
assert.Equal(t, nil, b.WritableBytes())
// 一些错误
b.Reset()
b.Skip(1)
assert.Equal(t, nil, b.Bytes())
b.Truncate(1)
assert.Equal(t, nil, b.Bytes())
b.Flush(b.Cap() + 1)
assert.Equal(t, b.Cap(), b.Len())
// 特殊值,极小的扩容
b = NewBuffer(1)
buf = b.ReserveBytes(2)
}

@ -13,9 +13,12 @@ package rtmp
// 读取chunk并组织chunk生成message返回给上层
import (
"io"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
"io"
"github.com/q191201771/naza/pkg/bele"
)
@ -217,7 +220,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error {
if stream.msg.Len() < aggregateStream.header.MsgLen {
return ErrRtmp
}
aggregateStream.msg.buff = base.NewBufferRefBytes(stream.msg.buff.Peek(int(aggregateStream.header.MsgLen)))
aggregateStream.msg.buff = nazabytes.NewBufferRefBytes(stream.msg.buff.Peek(int(aggregateStream.header.MsgLen)))
stream.msg.Skip(aggregateStream.header.MsgLen)
// sub message回调给上层

@ -11,6 +11,9 @@ package rtmp
import (
"encoding/hex"
"fmt"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazalog"
)
@ -27,7 +30,7 @@ type Stream struct {
func NewStream() *Stream {
return &Stream{
msg: StreamMsg{
buff: base.NewBuffer(initMsgLen),
buff: nazabytes.NewBuffer(initMsgLen),
},
}
}
@ -52,7 +55,7 @@ func (stream *Stream) toAvMsg() base.RtmpMsg {
// ----- StreamMsg -----------------------------------------------------------------------------------------------------
type StreamMsg struct {
buff *base.Buffer
buff *nazabytes.Buffer
}
// 确保可写空间,如果不够会扩容

Loading…
Cancel
Save