diff --git a/pkg/base/t_http_an__api.go b/pkg/base/t_http_an__api.go index 0a4b179..56aefc7 100644 --- a/pkg/base/t_http_an__api.go +++ b/pkg/base/t_http_an__api.go @@ -58,6 +58,7 @@ const ( DespSessionNotFound = "session not found" ErrorCodeStartRelayPullFail = 2001 + ErrorCodeListenUdpPortFail = 2002 ) // HttpResponseBasic diff --git a/pkg/gb28181/pub_session.go b/pkg/gb28181/pub_session.go index d741347..9ebe1eb 100644 --- a/pkg/gb28181/pub_session.go +++ b/pkg/gb28181/pub_session.go @@ -9,6 +9,7 @@ package gb28181 import ( + "fmt" "github.com/q191201771/lal/pkg/base" "github.com/q191201771/naza/pkg/nazanet" "net" @@ -59,29 +60,37 @@ func (session *PubSession) WithHookReadUdpPacket(fn nazanet.OnReadUdpPacket) *Pu return session } -// RunLoop +// Listen // -// @param addr: 如果为空,则内部选择一个可用的地址 +// 注意,当`port`参数为0时,内部会自动选择一个可用端口监听,并通过返回值返回该端口 // -func (session *PubSession) RunLoop(addr string) error { - var uconn *net.UDPConn +func (session *PubSession) Listen(port int) (int, error) { var err error + var uconn *net.UDPConn + var addr string - if addr == "" { + if port == 0 { uconn, _, err = defaultUdpConnPoll.Acquire() if err != nil { - return err + return -1, err } + + port = uconn.LocalAddr().(*net.UDPAddr).Port + } else { + addr = fmt.Sprintf(":%d", port) } session.conn, err = nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) { option.LAddr = addr option.Conn = uconn }) - if err != nil { - return err - } - err = session.conn.RunLoop(func(b []byte, raddr *net.UDPAddr, err error) bool { + return port, err +} + +// RunLoop ... +// +func (session *PubSession) RunLoop() error { + err := session.conn.RunLoop(func(b []byte, raddr *net.UDPAddr, err error) bool { if len(b) == 0 && err != nil { return false } diff --git a/pkg/gb28181/pub_session_test.go b/pkg/gb28181/pub_session_test.go index f24df70..768a7ad 100644 --- a/pkg/gb28181/pub_session_test.go +++ b/pkg/gb28181/pub_session_test.go @@ -9,6 +9,7 @@ package gb28181 import ( + "encoding/hex" "fmt" "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/base" @@ -20,20 +21,41 @@ import ( "time" ) -func TestPubSession(t *testing.T) { +// 测试: +// 1. 测试 TestPubSession +// 1.1 测试指定端口 +// 1.2 测试随机端口 +// 1.3 测试重复端口 +// 2. 测试 TestReplayPubSession + +func TestReplayPubSession(t *testing.T) { // 重放业务方的流 // 步骤: - // 1. 业务方提供的lalserver录制下来的dump file + // 1. 业务方提供的lalserver录制下来的dump file,修改下面的文件名变量filename // 2. 启动lalserver // 3. 调用HTTP API + // curl -H "Content-Type:application/json" -X POST -d '{"stream_name": "test110", "port": 10002, "timeout_ms": 10000}' http://127.0.0.1:8083/api/ctrl/start_rtp_pub // 4. 执行该测试 - //testDumpFile("127.0.0.1:10002", "/tmp/test.psdata") + // go test -test.run TestReplayPubSession + // + filename := "/tmp/record.psdata" + + b, err := ioutil.ReadFile(filename) + if len(b) == 0 || err != nil { + return + } + //testPushFile("127.0.0.1:10002", filename) +} + +func TestPubSession(t *testing.T) { // 读取一大堆.ps文件,并使用udp发送到`addr`地址(外部的,比如外部自己启动lalserver) // 步骤: // 1. 启动lalserver // 2. 调用HTTP API + // curl -H "Content-Type:application/json" -X POST -d '{"stream_name": "test110", "port": 10002, "timeout_ms": 10000, "debug_dump_packet": "/tmp/test110.psdata"}' http://127.0.0.1:8083/api/ctrl/start_rtp_pub // 3. 执行该测试 + // go test -test.run TestPubSession //helpUdpSend("127.0.0.1:10002") // 读取一大堆.ps文件,并使用udp发送到`addr`地址(内部启动了PubSession做接收) @@ -77,7 +99,9 @@ func testPubSession() { helpUdpSend(addr) }() - runErr := session.RunLoop(addr) + _, runErr := session.Listen(int(port)) + nazalog.Assert(nil, runErr) + runErr = session.RunLoop() nazalog.Assert(nil, runErr) } @@ -91,13 +115,13 @@ func helpUdpSend(addr string) { //filename := fmt.Sprintf("/tmp/rtp-ps-video/%d.ps", i) b, err := ioutil.ReadFile(filename) nazalog.Assert(nil, err) - //nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:])) + nazalog.Debugf("[test] %d: %s", i, hex.EncodeToString(b[12:])) conn.Write(b) } } -func testDumpFile(addr string, filename string) { +func testPushFile(addr string, filename string) { conn, err := nazanet.NewUdpConnection(func(option *nazanet.UdpConnectionOption) { option.RAddr = addr }) diff --git a/pkg/logic/group__in.go b/pkg/logic/group__in.go index 874d660..8bd1495 100644 --- a/pkg/logic/group__in.go +++ b/pkg/logic/group__in.go @@ -9,7 +9,6 @@ package logic import ( - "fmt" "github.com/q191201771/lal/pkg/gb28181" "github.com/q191201771/naza/pkg/nazalog" "net" @@ -109,7 +108,7 @@ func (group *Group) AddRtspPubSession(session *rtsp.PubSession) error { return nil } -func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) { +func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) (ret base.ApiCtrlStartRtpPub) { group.mutex.Lock() defer group.mutex.Unlock() @@ -117,13 +116,14 @@ func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) { // TODO(chef): [fix] 处理已经有输入session的情况 202207 } - pubSession := gb28181.NewPubSession().WithStreamName(req.StreamName).WithOnAvPacket(group.OnAvPacketFromPsPubSession) if req.DebugDumpPacket != "" { group.psPubDumpFile = base.NewDumpFile() if err := group.psPubDumpFile.OpenToWrite(req.DebugDumpPacket); err != nil { Log.Errorf("%+v", err) } } + + pubSession := gb28181.NewPubSession().WithStreamName(req.StreamName).WithOnAvPacket(group.OnAvPacketFromPsPubSession) pubSession.WithHookReadUdpPacket(func(b []byte, raddr *net.UDPAddr, err error) bool { if group.psPubDumpFile != nil { group.psPubDumpFile.Write(b) @@ -151,15 +151,27 @@ func (group *Group) StartRtpPub(req base.ApiCtrlStartRtpPubReq) { ) } + port, err := pubSession.Listen(req.Port) + if err != nil { + group.delPsPubSession(pubSession) + + ret.ErrorCode = base.ErrorCodeListenUdpPortFail + ret.Desp = err.Error() + return + } + go func() { - var addr string - if req.Port != 0 { - addr = fmt.Sprintf(":%d", req.Port) - } - err := pubSession.RunLoop(addr) - nazalog.Debugf("[%s] [%s] ps PubSession run loop exit, err=%v", group.UniqueKey, pubSession.UniqueKey(), err) + runErr := pubSession.RunLoop() + nazalog.Debugf("[%s] [%s] ps PubSession run loop exit, err=%v", group.UniqueKey, pubSession.UniqueKey(), runErr) group.DelPsPubSession(pubSession) }() + + ret.ErrorCode = base.ErrorCodeSucc + ret.Desp = base.DespSucc + ret.Data.SessionId = pubSession.UniqueKey() + ret.Data.StreamName = pubSession.StreamName() + ret.Data.Port = port + return } func (group *Group) AddRtmpPullSession(session *rtmp.PullSession) error { diff --git a/pkg/logic/server_manager__api.go b/pkg/logic/server_manager__api.go index 1589101..82878eb 100644 --- a/pkg/logic/server_manager__api.go +++ b/pkg/logic/server_manager__api.go @@ -142,7 +142,7 @@ func (sm *ServerManager) CtrlStartRtpPub(info base.ApiCtrlStartRtpPubReq) (ret b // 注意,如果group不存在,我们依然relay pull g := sm.getOrCreateGroup("", info.StreamName) - g.StartRtpPub(info) + ret = g.StartRtpPub(info) return }