[opt] rtmp: 兼容publish信令中没有pubType字段的情况 #280

pull/286/head
q191201771 2 years ago
parent efdee6f6e6
commit e6de1b59e0

@ -9,6 +9,7 @@
package rtmp
import (
"encoding/hex"
"fmt"
"net"
"strings"
@ -208,39 +209,48 @@ func (s *ServerSession) handshake() error {
}
func (s *ServerSession) doMsg(stream *Stream) error {
if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
var err error
refForDebugLog := stream.msg.buff.Bytes()
if err = s.writeAcknowledgementIfNeeded(stream); err != nil {
Log.Errorf("[%s] doMsg failed. stream=%s, msg=%s", s.UniqueKey(), stream.toDebugString(), hex.EncodeToString(refForDebugLog))
return err
}
//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
switch stream.header.MsgTypeId {
case base.RtmpTypeIdWinAckSize:
return s.doWinAckSize(stream)
err = s.doWinAckSize(stream)
case base.RtmpTypeIdSetChunkSize:
// noop
// 因为底层的 chunk composer 已经处理过了,这里就不用处理
case base.RtmpTypeIdCommandMessageAmf0:
return s.doCommandMessage(stream)
err = s.doCommandMessage(stream)
case base.RtmpTypeIdCommandMessageAmf3:
return s.doCommandAmf3Message(stream)
err = s.doCommandAmf3Message(stream)
case base.RtmpTypeIdMetadata:
return s.doDataMessageAmf0(stream)
err = s.doDataMessageAmf0(stream)
case base.RtmpTypeIdAck:
return s.doAck(stream)
err = s.doAck(stream)
case base.RtmpTypeIdUserControl:
s.doUserControl(stream)
err = s.doUserControl(stream)
case base.RtmpTypeIdAudio:
fallthrough
case base.RtmpTypeIdVideo:
if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
err = nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
}
s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
default:
Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())
Log.Warnf("[%s] read unknown message. stream=%s, msg=%s", s.UniqueKey(), stream.toDebugString(), hex.EncodeToString(refForDebugLog))
}
return nil
if err != nil {
Log.Errorf("[%s] doMsg failed. stream=%s, msg=%s", s.UniqueKey(), stream.toDebugString(), hex.EncodeToString(refForDebugLog))
}
return err
}
func (s *ServerSession) doWinAckSize(stream *Stream) error {
@ -456,8 +466,10 @@ func (s *ServerSession) doPublish(tid int, stream *Stream) (err error) {
s.url = fmt.Sprintf("%s/%s", s.tcUrl, s.streamNameWithRawQuery)
pubType, err := stream.msg.readStringWithType()
// 兼容 https://github.com/q191201771/lal/issues/280
// 没有 pubType 时,继续走后面的流程
if err != nil {
return err
Log.Warnf("[%s] read pubType failed. err=%s", s.UniqueKey(), err)
}
Log.Debugf("[%s] pubType=%s", s.UniqueKey(), pubType)
Log.Infof("[%s] < R publish('%s')", s.UniqueKey(), s.streamNameWithRawQuery)

@ -0,0 +1,94 @@
// Copyright 2023, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtmp
import (
"encoding/hex"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazabytes"
"net"
"testing"
"time"
)
type testServerSessionObserver struct {
}
func (o *testServerSessionObserver) OnRtmpConnect(session *ServerSession, opa ObjectPairArray) {
}
func (o *testServerSessionObserver) OnNewRtmpPubSession(session *ServerSession) error {
return nil
}
func (o *testServerSessionObserver) OnNewRtmpSubSession(session *ServerSession) error {
return nil
}
// 考虑加入naza中
type mConn struct{}
func (mConn) Read(b []byte) (n int, err error) {
//TODO implement me
panic("implement me")
}
func (mConn) Write(b []byte) (n int, err error) {
return len(b), nil
}
func (mConn) Close() error {
//TODO implement me
panic("implement me")
}
func (mConn) LocalAddr() net.Addr {
//TODO implement me
panic("implement me")
}
func (mConn) RemoteAddr() net.Addr {
addrs, _ := net.InterfaceAddrs()
return addrs[0]
}
func (mConn) SetDeadline(t time.Time) error {
//TODO implement me
panic("implement me")
}
func (mConn) SetReadDeadline(t time.Time) error {
//TODO implement me
panic("implement me")
}
func (mConn) SetWriteDeadline(t time.Time) error {
//TODO implement me
panic("implement me")
}
func TestServerSession_doMsg(t *testing.T) {
var o testServerSessionObserver
var c mConn
s := NewServerSession(&o, &c)
var stream Stream
stream.msg.buff = nazabytes.NewBuffer(1024)
// publish信令中没有pub type
// {Csid:5 MsgLen:39 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0}
stream.header.MsgTypeId = 20
//b, _ := hex.DecodeString("0200077075626c69736800400800000000000005020009696e6e6572746573740200046c697665")
b, _ := hex.DecodeString("0200077075626c69736800400800000000000005020009696e6e657274657374")
stream.msg.buff.Write(b)
err := s.doMsg(&stream)
assert.Equal(t, nil, err)
}
Loading…
Cancel
Save