lal support opus

pull/349/head
ZSC714725 10 months ago
parent 08487f9a9f
commit 3361edd01e

@ -21,11 +21,12 @@ type AvPacketPt int
const (
AvPacketPtUnknown AvPacketPt = -1
AvPacketPtG711U AvPacketPt = 0 // g711u
AvPacketPtG711A AvPacketPt = 8 // g711a
AvPacketPtAvc AvPacketPt = 96 // h264
AvPacketPtHevc AvPacketPt = 98 // h265
AvPacketPtAac AvPacketPt = 97 // aac
AvPacketPtG711U AvPacketPt = 0 // g711u
AvPacketPtG711A AvPacketPt = 8 // g711a
AvPacketPtAvc AvPacketPt = 96 // h264
AvPacketPtHevc AvPacketPt = 98 // h265
AvPacketPtAac AvPacketPt = 97 // aac
AvPacketPtOpus AvPacketPt = 101 // opus
)
func (a AvPacketPt) ReadableString() string {

@ -18,6 +18,7 @@ const (
AudioCodecAac = "AAC"
AudioCodecG711U = "PCMU"
AudioCodecG711A = "PCMA"
AudioCodecOpus = "OPUS"
// VideoCodecAvc StatGroup.VideoCodec
VideoCodecAvc = "H264"

@ -11,6 +11,7 @@ package base
import (
"encoding/hex"
"fmt"
"github.com/q191201771/naza/pkg/bele"
"github.com/q191201771/naza/pkg/nazabytes"
)
@ -117,6 +118,7 @@ const (
RtmpSoundFormatG711A uint8 = 7
RtmpSoundFormatG711U uint8 = 8
RtmpSoundFormatAac uint8 = 10
RtmpSoundFormatOpus uint8 = 13
RtmpAacPacketTypeSeqHeader = 0
RtmpAacPacketTypeRaw = 1

@ -9,9 +9,10 @@
package logic
import (
"github.com/q191201771/lal/pkg/rtsp"
"net"
"github.com/q191201771/lal/pkg/rtsp"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
@ -409,6 +410,8 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
group.stat.AudioCodec = base.AudioCodecG711U
case base.RtmpSoundFormatG711A:
group.stat.AudioCodec = base.AudioCodecG711A
case base.RtmpSoundFormatOpus:
group.stat.AudioCodec = base.AudioCodecOpus
}
}
}

@ -161,6 +161,7 @@ const (
// 0x24 HEVC (HEVC video stream as defined in Rec. ITU-T H.265 | ISO/IEC 23008-2 MPEG-H Part 2)
// -----------------------------------------------------------------------------
StreamTypeUnknown uint8 = 0x00
StreamTypePrivate uint8 = 0x06
StreamTypeAac uint8 = 0x0F
StreamTypeAvc uint8 = 0x1B
StreamTypeHevc uint8 = 0x24

@ -9,6 +9,7 @@
package mpegts
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/naza/pkg/nazabits"
)
@ -57,9 +58,10 @@ type Pmt struct {
}
type PmtProgramElement struct {
StreamType uint8
Pid uint16
Length uint16
StreamType uint8
Pid uint16
Length uint16
Descriptors []Descriptor
}
func ParsePmt(b []byte) (pmt Pmt) {
@ -110,7 +112,7 @@ func (pmt *Pmt) SearchPid(pid uint16) *PmtProgramElement {
return nil
}
func PackPmt(videoStreamType, audioStreamType uint8) []byte {
func PackPmt(videoCodecId, audioCodecId int) []byte {
ts := make([]byte, 188)
tsheader := []byte{0x47, 0x50, 0x01, 0x10}
copy(ts, tsheader)
@ -122,6 +124,13 @@ func PackPmt(videoStreamType, audioStreamType uint8) []byte {
psi.sectionData.section.currentNextIndicator = 1
psi.sectionData.pmtData.pcrPid = 0x100
videoStreamType := StreamTypeUnknown
if videoCodecId == int(base.RtmpCodecIdAvc) {
videoStreamType = StreamTypeAvc
} else if videoCodecId == int(base.RtmpCodecIdHevc) {
videoStreamType = StreamTypeHevc
}
if videoStreamType != StreamTypeUnknown {
psi.sectionData.pmtData.pes = append(psi.sectionData.pmtData.pes, PmtProgramElement{
StreamType: videoStreamType,
@ -129,11 +138,42 @@ func PackPmt(videoStreamType, audioStreamType uint8) []byte {
})
}
audioStreamType := StreamTypeUnknown
if audioCodecId == int(base.RtmpSoundFormatAac) {
audioStreamType = StreamTypeAac
} else if audioCodecId == int(base.RtmpSoundFormatOpus) {
audioStreamType = StreamTypePrivate
}
if audioStreamType != StreamTypeUnknown {
psi.sectionData.pmtData.pes = append(psi.sectionData.pmtData.pes, PmtProgramElement{
pmtEle := PmtProgramElement{
StreamType: audioStreamType,
Pid: PidAudio,
})
}
if audioCodecId == int(base.RtmpSoundFormatOpus) {
descriptor := []Descriptor{
{
Length: 4,
Tag: DescriptorTagRegistration,
Registration: DescriptorRegistration{
FormatIdentifier: opusIdentifier,
},
},
{
Length: 2,
Tag: DescriptorTagExtension,
Extension: DescriptorExtension{
Tag: 0x80,
Unknown: []uint8{0x02},
},
},
}
pmtEle.Descriptors = append(pmtEle.Descriptors, descriptor...)
}
psi.sectionData.pmtData.pes = append(psi.sectionData.pmtData.pes, pmtEle)
}
psilen, psiData := psi.Pack()

@ -30,6 +30,36 @@ const (
TsPsiIdForbidden = 0xFF // forbidden
)
const (
DescriptorTagAC3 = 0x6a
DescriptorTagAVCVideo = 0x28
DescriptorTagComponent = 0x50
DescriptorTagContent = 0x54
DescriptorTagDataStreamAlignment = 0x6
DescriptorTagEnhancedAC3 = 0x7a
DescriptorTagExtendedEvent = 0x4e
DescriptorTagExtension = 0x7f
DescriptorTagISO639LanguageAndAudioType = 0xa
DescriptorTagLocalTimeOffset = 0x58
DescriptorTagMaximumBitrate = 0xe
DescriptorTagNetworkName = 0x40
DescriptorTagParentalRating = 0x55
DescriptorTagPrivateDataIndicator = 0xf
DescriptorTagPrivateDataSpecifier = 0x5f
DescriptorTagRegistration = 0x5
DescriptorTagService = 0x48
DescriptorTagShortEvent = 0x4d
DescriptorTagStreamIdentifier = 0x52
DescriptorTagSubtitling = 0x59
DescriptorTagTeletext = 0x56
DescriptorTagVBIData = 0x45
DescriptorTagVBITeletext = 0x46
)
const (
opusIdentifier = 0x4f707573 // Opus
)
type PsiSection struct {
pointerFileld uint8
sectionData PsiSectionData
@ -152,13 +182,58 @@ func (psi *PsiSection) calaPatSectionLength() (length uint16) {
}
func (psi *PsiSection) calaPmtSectionLength() (length uint16) {
// 暂不考虑Program descriptors
// Reserved bits(3 bits)+PCR PID(13 bits)+Reserved bits(4 bits)+Program info length(12 bits)
length = 4
length += uint16(5 * len(psi.sectionData.pmtData.pes))
for _, pe := range psi.sectionData.pmtData.pes {
length += 5
if len(pe.Descriptors) > 0 {
length += psi.calcDescriptorsLength(pe.Descriptors)
}
}
return
}
func (psi *PsiSection) calcDescriptorsLength(ds []Descriptor) uint16 {
length := uint16(0)
for _, d := range ds {
length += 2 // tag and length
length += uint16(psi.calcDescriptorLength(d))
}
return length
}
func (psi *PsiSection) calcDescriptorLength(d Descriptor) uint8 {
if d.Length == 0 {
return 0
}
switch d.Tag {
case DescriptorTagRegistration:
return psi.calcDescriptorRegistrationLength(d.Registration)
case DescriptorTagExtension:
return psi.calcDescriptorExtensionLength(d.Extension)
}
return 0
}
func (psi *PsiSection) calcDescriptorRegistrationLength(d DescriptorRegistration) uint8 {
return uint8(4 + len(d.AdditionalIdentificationInfo))
}
func (psi *PsiSection) calcDescriptorExtensionLength(d DescriptorExtension) uint8 {
// tag
ret := 1
if d.Unknown != nil {
ret += len(d.Unknown)
}
return uint8(ret)
}
func (psi *PsiSection) writePatSection(bw *nazabits.BitWriter) {
for _, pe := range psi.sectionData.patData.pes {
bw.WriteBits16(16, pe.pn)
@ -179,8 +254,79 @@ func (psi *PsiSection) writePmtSection(bw *nazabits.BitWriter) {
bw.WriteBits8(8, pe.StreamType)
bw.WriteBits8(3, 0xff)
bw.WriteBits16(13, pe.Pid)
bw.WriteBits8(4, 0xff)
bw.WriteBits16(12, 0)
psi.writeDescriptorsWithLength(bw, pe.Descriptors)
}
return
}
func (psi *PsiSection) writeDescriptorsWithLength(bw *nazabits.BitWriter, dps []Descriptor) {
bw.WriteBits8(4, 0xff)
infolen := psi.calcDescriptorsLength(dps)
bw.WriteBits16(12, infolen)
for _, dp := range dps {
psi.writeDescriptor(bw, dp)
}
}
func (psi *PsiSection) writeDescriptor(bw *nazabits.BitWriter, d Descriptor) {
length := psi.calcDescriptorLength(d)
bw.WriteBits8(8, d.Tag)
bw.WriteBits8(8, length)
switch d.Tag {
case DescriptorTagRegistration:
psi.writeDescriptorRegistration(bw, d.Registration)
case DescriptorTagExtension:
psi.writeDescriptorExtension(bw, d.Extension)
}
}
func (psi *PsiSection) writeDescriptorRegistration(bw *nazabits.BitWriter, d DescriptorRegistration) {
bw.WriteBits16(16, uint16((d.FormatIdentifier>>16)&0xFFFF))
bw.WriteBits16(16, uint16(d.FormatIdentifier&0xFFFF))
if len(d.AdditionalIdentificationInfo) > 0 {
for _, b := range d.AdditionalIdentificationInfo {
bw.WriteBits8(8, b)
}
}
}
func (psi *PsiSection) writeDescriptorExtension(bw *nazabits.BitWriter, d DescriptorExtension) {
bw.WriteBits8(8, d.Tag)
if len(d.Unknown) > 0 {
for _, b := range d.Unknown {
bw.WriteBits8(8, b)
}
}
}
type Descriptor struct {
Length uint8
Tag uint8
Registration DescriptorRegistration
Extension DescriptorExtension
}
type DescriptorRegistration struct {
AdditionalIdentificationInfo []byte
FormatIdentifier uint32
}
type DescriptorExtension struct {
SupplementaryAudio DescriptorExtensionSupplementaryAudio
Tag uint8
Unknown []byte
}
type DescriptorExtensionSupplementaryAudio struct {
EditorialClassification uint8
HasLanguageCode bool
LanguageCode []byte
MixType bool
PrivateData []byte
}

@ -326,6 +326,14 @@ func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
copy(payload[1:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
case base.AvPacketPtOpus:
length := len(pkt.Payload) + 1
payload := make([]byte, length)
// codecid=13, 44kHz、16bits、Stereo
payload[0] = 0xdf
copy(payload[1:], pkt.Payload)
r.emitRtmpAvMsg(true, payload, pkt.Timestamp)
default:
Log.Warnf("unsupported packet. type=%d", pkt.PayloadType)
}

@ -12,4 +12,7 @@ package remux
var _ iRtmp2MpegtsFilterObserver = &Rtmp2MpegtsRemuxer{}
const pcmDefaultSampleRate = 8000
const (
pcmDefaultSampleRate = 8000
opusDefaultSampleRate = 48000
)

@ -177,7 +177,7 @@ func (s *Rtmp2MpegtsRemuxer) onPatPmt(b []byte) {
func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) {
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
if msg.AudioCodecId() != base.RtmpSoundFormatAac {
if msg.AudioCodecId() != base.RtmpSoundFormatAac && msg.AudioCodecId() != base.RtmpSoundFormatOpus {
return
}
s.feedAudio(msg)
@ -390,37 +390,41 @@ func (s *Rtmp2MpegtsRemuxer) feedAudio(msg base.RtmpMsg) {
Log.Warnf("[%s] rtmp msg too short, ignore. header=%+v, payload=%s", s.uk, msg.Header, hex.Dump(msg.Payload))
return
}
if msg.Payload[0]>>4 != base.RtmpSoundFormatAac {
return
}
//Log.Debugf("[%s] hls: feedAudio. dts=%d len=%d", s.uk, msg.Header.TimestampAbs, len(msg.Payload))
if msg.Payload[1] == base.RtmpAacPacketTypeSeqHeader {
if err := s.cacheAacSeqHeader(msg); err != nil {
Log.Errorf("[%s] cache aac seq header failed. err=%+v", s.uk, err)
if msg.AudioCodecId() == base.RtmpSoundFormatAac {
if msg.Payload[1] == base.RtmpAacPacketTypeSeqHeader {
if err := s.cacheAacSeqHeader(msg); err != nil {
Log.Errorf("[%s] cache aac seq header failed. err=%+v", s.uk, err)
}
return
}
return
}
if !s.audioSeqHeaderCached() {
Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.uk)
return
if !s.audioSeqHeaderCached() {
Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.uk)
return
}
}
pts := uint64(msg.Header.TimestampAbs) * 90
if msg.AudioCodecId() == base.RtmpSoundFormatAac {
if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts {
s.FlushAudio()
}
if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts {
s.FlushAudio()
}
if s.audioCacheEmpty() {
s.audioCacheFirstFramePts = pts
}
if s.audioCacheEmpty() {
adtsHeader := s.ascCtx.PackAdtsHeader(int(msg.Header.MsgLen - 2))
s.audioCacheFrames = append(s.audioCacheFrames, adtsHeader...)
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...)
} else {
s.audioCacheFirstFramePts = pts
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[1:]...)
s.FlushAudio()
}
adtsHeader := s.ascCtx.PackAdtsHeader(int(msg.Header.MsgLen - 2))
s.audioCacheFrames = append(s.audioCacheFrames, adtsHeader...)
s.audioCacheFrames = append(s.audioCacheFrames, msg.Payload[2:]...)
}
func (s *Rtmp2MpegtsRemuxer) cacheAacSeqHeader(msg base.RtmpMsg) error {

@ -90,11 +90,8 @@ func (q *rtmp2MpegtsFilter) Push(msg base.RtmpMsg) {
// ---------------------------------------------------------------------------------------------------------------------
func (q *rtmp2MpegtsFilter) drain() {
videoType := q.getVideoStreamType()
audioType := q.getAudioStreamType()
patpmt := mpegts.PackPat()
patpmt = append(patpmt, mpegts.PackPmt(videoType, audioType)...)
patpmt = append(patpmt, mpegts.PackPmt(q.videoCodecId, q.audioCodecId)...)
q.observer.onPatPmt(patpmt)
for i := range q.data {
@ -105,23 +102,3 @@ func (q *rtmp2MpegtsFilter) drain() {
q.done = true
}
func (q *rtmp2MpegtsFilter) getVideoStreamType() uint8 {
switch q.videoCodecId {
case int(base.RtmpCodecIdAvc):
return mpegts.StreamTypeAvc
case int(base.RtmpCodecIdHevc):
return mpegts.StreamTypeHevc
}
return mpegts.StreamTypeUnknown
}
func (q *rtmp2MpegtsFilter) getAudioStreamType() uint8 {
switch q.audioCodecId {
case int(base.RtmpSoundFormatAac):
return mpegts.StreamTypeAac
}
return mpegts.StreamTypeUnknown
}

@ -79,6 +79,8 @@ func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
r.audioPt = base.AvPacketPtG711U
case base.RtmpSoundFormatG711A:
r.audioPt = base.AvPacketPtG711A
case base.RtmpSoundFormatOpus:
r.audioPt = base.AvPacketPtOpus
}
}
if samplerate, ok := meta.Find("audiosamplerate").(float64); ok {
@ -103,6 +105,11 @@ func (r *Rtmp2RtspRemuxer) FeedRtmpMsg(msg base.RtmpMsg) {
if r.audioSampleRate < 0 {
r.audioSampleRate = pcmDefaultSampleRate
}
case base.RtmpSoundFormatOpus:
r.audioPt = base.AvPacketPtOpus
if r.audioSampleRate < 0 {
r.audioSampleRate = opusDefaultSampleRate
}
}
}
case base.RtmpTypeIdVideo:
@ -236,11 +243,19 @@ func (r *Rtmp2RtspRemuxer) remux(msg base.RtmpMsg) {
case base.RtmpTypeIdAudio:
packer = r.getAudioPacker()
if packer != nil {
rtppkts = packer.Pack(base.AvPacket{
Timestamp: int64(msg.Header.TimestampAbs),
PayloadType: r.audioPt,
Payload: msg.Payload[2:],
})
if msg.AudioCodecId() == base.RtmpSoundFormatG711A || msg.AudioCodecId() == base.RtmpSoundFormatG711U || msg.AudioCodecId() == base.RtmpSoundFormatOpus {
rtppkts = packer.Pack(base.AvPacket{
Timestamp: int64(msg.Header.TimestampAbs),
PayloadType: r.audioPt,
Payload: msg.Payload[1:],
})
} else {
rtppkts = packer.Pack(base.AvPacket{
Timestamp: int64(msg.Header.TimestampAbs),
PayloadType: r.audioPt,
Payload: msg.Payload[2:],
})
}
}
case base.RtmpTypeIdVideo:
packer = r.getVideoPacker()
@ -286,6 +301,9 @@ func (r *Rtmp2RtspRemuxer) getAudioPacker() *rtprtcp.RtpPacker {
case base.AvPacketPtG711U:
pp := rtprtcp.NewRtpPackerPayloadPcm()
r.audioPacker = rtprtcp.NewRtpPacker(pp, r.audioSampleRate, r.audioSsrc)
case base.AvPacketPtOpus:
pp := rtprtcp.NewRtpPackerPayloadOpus()
r.audioPacker = rtprtcp.NewRtpPacker(pp, r.audioSampleRate, r.audioSsrc)
case base.AvPacketPtAac:
if r.asc == nil {
return nil

@ -0,0 +1,32 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)
package rtprtcp
type RtpPackerPayloadOpus struct {
}
func NewRtpPackerPayloadOpus() *RtpPackerPayloadOpus {
return &RtpPackerPayloadOpus{}
}
func (r *RtpPackerPayloadOpus) Pack(in []byte, maxSize int) (out [][]byte) {
if in == nil || maxSize <= 0 {
return
}
if len(in) > maxSize {
Log.Warnf("frame size bigger than rtp payload size while packing. len(in)=%d, maxSize=%d", len(in), maxSize)
}
item := make([]byte, len(in))
copy(item, in)
out = append(out, item)
return
}

@ -78,7 +78,7 @@ func DefaultRtpUnpackerFactory(payloadType base.AvPacketPt, clockRate int, maxSi
protocol = NewRtpUnpackerAac(payloadType, clockRate, onAvPacket)
case base.AvPacketPtG711U:
fallthrough
case base.AvPacketPtG711A:
case base.AvPacketPtG711A, base.AvPacketPtOpus:
protocol = NewRtpUnpackerRaw(payloadType, clockRate, onAvPacket)
case base.AvPacketPtAvc:
fallthrough

@ -114,6 +114,12 @@ a=rtpmap:%d PCMU/%d
a=control:streamid=%d
`
return fmt.Sprintf(tmpl, base.AvPacketPtG711U, base.AvPacketPtG711U, audioInfo.SamplingFrequency, streamid)
} else if audioInfo.AudioPt == base.AvPacketPtOpus {
tmpl := `m=audio 0 RTP/AVP %d
a=rtpmap:%d opus/48000/2
a=control:streamid=%d
`
return fmt.Sprintf(tmpl, base.AvPacketPtOpus, base.AvPacketPtOpus, streamid)
}
return ""

@ -52,7 +52,7 @@ func (lc *LogicContext) IsPayloadTypeOrigin(t int) bool {
}
func (lc *LogicContext) IsAudioUnpackable() bool {
return (lc.audioPayloadTypeBase == base.AvPacketPtAac && lc.Asc != nil) || (lc.audioPayloadTypeBase == base.AvPacketPtG711A) || (lc.audioPayloadTypeBase == base.AvPacketPtG711U)
return (lc.audioPayloadTypeBase == base.AvPacketPtAac && lc.Asc != nil) || (lc.audioPayloadTypeBase == base.AvPacketPtG711A) || (lc.audioPayloadTypeBase == base.AvPacketPtG711U) || (lc.audioPayloadTypeBase == base.AvPacketPtOpus)
}
func (lc *LogicContext) IsVideoUnpackable() bool {
@ -131,6 +131,8 @@ func ParseSdp2LogicContext(b []byte) (LogicContext, error) {
ret.audioPayloadTypeBase = base.AvPacketPtG711A
} else if strings.EqualFold(md.ARtpMap.EncodingName, ARtpMapEncodingNameG711U) {
ret.audioPayloadTypeBase = base.AvPacketPtG711U
} else if strings.EqualFold(md.ARtpMap.EncodingName, ArtpMapEncodingNameOpus) {
ret.audioPayloadTypeBase = base.AvPacketPtOpus
} else {
if md.M.PT == 8 {
// ffmpeg推流情况下不会填充rtpmap字段,m中pt值为8也可以表示是PCMA,采样率默认为8000Hz

@ -16,4 +16,5 @@ const (
ARtpMapEncodingNameAac = "MPEG4-GENERIC"
ARtpMapEncodingNameG711A = "PCMA"
ARtpMapEncodingNameG711U = "PCMU"
ArtpMapEncodingNameOpus = "opus"
)

Loading…
Cancel
Save