diff --git a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go index afd73e3..fcc1989 100644 --- a/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go +++ b/app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go @@ -11,7 +11,10 @@ package main import ( "flag" "fmt" + "github.com/q191201771/naza/pkg/nazaerrors" + "github.com/q191201771/naza/pkg/unique" "os" + "sync" "time" "github.com/q191201771/lal/pkg/sdp" @@ -22,23 +25,138 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -var rtpPacketChan = make(chan rtprtcp.RtpPacket, 1024) +type RtspTunnel struct { + pullUrl string + pushUrl string + pullOverTcp bool + pushOverTcp bool -type Observer struct { + uniqueKey string + + rtpPacketChan chan rtprtcp.RtpPacket + + disposeOnce sync.Once + waitChan chan error + + pullSession *rtsp.PullSession + pushSession *rtsp.PushSession +} + +func NewRtspTunnel(pullUrl string, pushUrl string, pullOverTcp bool, pushOverTcp bool) *RtspTunnel { + uniqueKey := unique.GenUniqueKey("RTSPTUNNEL") + nazalog.Debugf("[%s] lifecycle new RtspTunnel. pullUrl=%s, pushUrl=%s", uniqueKey, pullUrl, pushUrl) + return &RtspTunnel{ + pullUrl: pullUrl, + pushUrl: pushUrl, + pullOverTcp: pullOverTcp, + pushOverTcp: pushOverTcp, + uniqueKey: uniqueKey, + rtpPacketChan: make(chan rtprtcp.RtpPacket, 1024), + waitChan: make(chan error, 1), + } +} + +// Start 开启任务,阻塞直到任务开启成功或失败 +// +// @return true 表示任务启动成功,此时数据已经在后台转发 +// false 表示任务启动失败 +// +func (r *RtspTunnel) Start() error { + r.pullSession = rtsp.NewPullSession(r, func(option *rtsp.PullSessionOption) { + option.PullTimeoutMs = 5000 + option.OverTcp = r.pullOverTcp + }) + if err := r.pullSession.Pull(r.pullUrl); err != nil { + nazalog.Errorf("[%s] start pull failed. err=%+v, url=%s", r.uniqueKey, err, r.pullUrl) + return err + } + sdpCtx := r.pullSession.GetSdp() + nazalog.Debugf("[%s] start pull succ. sdp=%s", r.uniqueKey, string(sdpCtx.RawSdp)) + + r.pushSession = rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { + option.PushTimeoutMs = 5000 + option.OverTcp = r.pushOverTcp + }) + if err := r.pushSession.Push(r.pushUrl, sdpCtx); err != nil { + nazalog.Errorf("[%s] start push failed. err=%+v, url=%s", r.uniqueKey, err, r.pushUrl) + return err + } + nazalog.Debugf("[%s] start push succ.", r.uniqueKey) + + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for { + select { + case pkt := <-r.rtpPacketChan: + _ = r.pushSession.WriteRtpPacket(pkt) + case err := <-r.pullSession.WaitChan(): + nazalog.Debugf("[%s] < pullSession.Wait(). err=%+v", r.uniqueKey, err) + _ = r.dispose(err) + return + case err := <-r.pushSession.WaitChan(): + nazalog.Debugf("[%s] < pushSession.Wait(). err=%+v", r.uniqueKey, err) + _ = r.dispose(err) + return + case <-t.C: + r.pullSession.UpdateStat(1) + pullStat := r.pullSession.GetStat() + r.pushSession.UpdateStat(1) + pushStat := r.pushSession.GetStat() + nazalog.Debugf("[%s] stat. pull=%+v, push=%+v", r.uniqueKey, pullStat, pushStat) + } + } + }() + + return nil } -func (o *Observer) OnRtpPacket(pkt rtprtcp.RtpPacket) { - rtpPacketChan <- pkt +// Dispose 主动关闭tunnel时调用 +// +// 注意,只有 Start 成功后的tunnel才能调用,否则行为未定义 +// +// 更详细的说明参考 IClientSessionLifecycle interface +// +func (r *RtspTunnel) Dispose() error { + return r.dispose(nil) } -func (o *Observer) OnSdp(sdpCtx sdp.LogicContext) { +// WaitChan Start 成功后,可使用这个channel来接收tunnel结束的消息 +// +// 更详细的说明参考 IClientSessionLifecycle interface +// +func (r *RtspTunnel) WaitChan() chan error { + return r.waitChan +} + +// --------------------------------------------------------------------------------------------------------------------- + +func (r *RtspTunnel) OnRtpPacket(pkt rtprtcp.RtpPacket) { + r.rtpPacketChan <- pkt +} + +func (r *RtspTunnel) OnSdp(sdpCtx sdp.LogicContext) { // noop } -func (o *Observer) OnAvPacket(pkt base.AvPacket) { +func (r *RtspTunnel) OnAvPacket(pkt base.AvPacket) { // noop } +func (r *RtspTunnel) dispose(err error) error { + var retErr error + r.disposeOnce.Do(func() { + nazalog.Infof("[%s] lifecycle dispose RtspTunnel.", r.uniqueKey) + e1 := r.pullSession.Dispose() + e2 := r.pushSession.Dispose() + retErr = nazaerrors.CombineErrors(e1, e2) + r.waitChan <- err + }) + return retErr +} + +// --------------------------------------------------------------------------------------------------------------------- + func main() { _ = nazalog.Init(func(option *nazalog.Option) { option.AssertBehavior = nazalog.AssertFatal @@ -47,61 +165,20 @@ func main() { inUrl, outUrl, pullOverTcp, pushOverTcp := parseFlag() - o := &Observer{} - pullSession := rtsp.NewPullSession(o, func(option *rtsp.PullSessionOption) { - option.PullTimeoutMs = 5000 - option.OverTcp = pullOverTcp != 0 - }) - - err := pullSession.Pull(inUrl) - nazalog.Assert(nil, err) - sdpCtx := pullSession.GetSdp() - - pushSession := rtsp.NewPushSession(func(option *rtsp.PushSessionOption) { - option.PushTimeoutMs = 5000 - option.OverTcp = pushOverTcp != 0 - }) - - err = pushSession.Push(outUrl, sdpCtx) - nazalog.Assert(nil, err) - - go func() { - for { - pullSession.UpdateStat(1) - pullStat := pullSession.GetStat() - pushSession.UpdateStat(1) - pushStat := pushSession.GetStat() - nazalog.Debugf("stat. pull=%+v, push=%+v", pullStat, pushStat) - time.Sleep(1 * time.Second) - } - }() + rtspTunnel := NewRtspTunnel(inUrl, outUrl, pullOverTcp == 1, pushOverTcp == 1) + err := rtspTunnel.Start() + if err != nil { + nazalog.Errorf("start tunnel failed. err=%+v", err) + return + } - //只是为了测试主动关闭session //go func() { // time.Sleep(5 * time.Second) - // err := pullSession.Dispose() - // nazalog.Debugf("< pull session dispose. err=%+v", err) - // //err := pushSession.Dispose() - // //nazalog.Debugf("< push session dispose. err=%+v", err) + // _ = rtspTunnel.Dispose() //}() - for { - select { - case err = <-pullSession.WaitChan(): - nazalog.Infof("< pullSession.Wait(). err=%+v", err) - time.Sleep(1 * time.Second) // 不让程序立即退出,只是为了测试session内部资源是否正常及时释放 - return - case err = <-pushSession.WaitChan(): - nazalog.Infof("< pushSession.Wait(). err=%+v", err) - time.Sleep(1 * time.Second) - return - case pkt := <-rtpPacketChan: - if err := pushSession.WriteRtpPacket(pkt); err != nil { - nazalog.Errorf("write rtp packet failed. err=%+v", err) - } - } - } - + err = <-rtspTunnel.WaitChan() + nazalog.Errorf("tunnel stopped. err=%+v", err) } func parseFlag() (inUrl string, outUrl string, pullOverTcp int, pushOverTcp int) {