From b0bc51c2394585ffac50ac0684929f7e171de0d7 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 27 Nov 2021 13:16:02 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20httpflv.PullSession=E6=94=AF=E6=8C=81h?= =?UTF-8?q?ttps=EF=BC=8C=E6=94=AF=E6=8C=81302=E8=B7=B3=E8=BD=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/base/url.go | 48 ++++++++++--------- pkg/httpflv/client_pull_session.go | 74 ++++++++++++++++++++++-------- 2 files changed, 80 insertions(+), 42 deletions(-) diff --git a/pkg/base/url.go b/pkg/base/url.go index 9c16494..8a85a6c 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -15,6 +15,8 @@ import ( "net/url" "strconv" "strings" + + "github.com/q191201771/naza/pkg/nazaerrors" ) // 见单元测试 @@ -38,7 +40,6 @@ type UrlPathContext struct { RawQuery string } -// TODO chef: 考虑把rawUrl也放入其中 type UrlContext struct { Url string @@ -68,7 +69,21 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) { return ctx, err } if stdUrl.Scheme == "" { - return ctx, ErrUrl + return ctx, nazaerrors.Wrap(ErrUrl) + } + // 如果不存在,则设置默认的 + if defaultPort == -1 { + // TODO(chef): 测试大小写的情况 + switch stdUrl.Scheme { + case "http": + defaultPort = DefaultHttpPort + case "https": + defaultPort = DefaultHttpsPort + case "rtmp": + defaultPort = DefaultRtmpPort + case "rtsp": + defaultPort = DefaultRtspPort + } } ctx.Scheme = stdUrl.Scheme @@ -113,12 +128,12 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) { } func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) { - ctx, err = ParseUrl(rawUrl, DefaultRtmpPort) + ctx, err = ParseUrl(rawUrl, -1) if err != nil { return } if ctx.Scheme != "rtmp" || ctx.Host == "" || ctx.Path == "" { - return ctx, ErrUrl + return ctx, nazaerrors.Wrap(ErrUrl) } // 注意,使用ffmpeg推流时,会把`rtmp://127.0.0.1/test110`中的test110作为appName(streamName则为空) @@ -132,21 +147,17 @@ func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) { return } -func ParseHttpflvUrl(rawUrl string, isHttps bool) (ctx UrlContext, err error) { - return ParseHttpUrl(rawUrl, isHttps, ".flv") -} - -func ParseHttptsUrl(rawUrl string, isHttps bool) (ctx UrlContext, err error) { - return ParseHttpUrl(rawUrl, isHttps, ".ts") +func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) { + return ParseHttpUrl(rawUrl, ".flv") } func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) { - ctx, err = ParseUrl(rawUrl, DefaultRtspPort) + ctx, err = ParseUrl(rawUrl, -1) if err != nil { return } if ctx.Scheme != "rtsp" || ctx.Host == "" || ctx.Path == "" { - return ctx, ErrUrl + return ctx, nazaerrors.Wrap(ErrUrl) } return @@ -183,20 +194,13 @@ func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) { return ctx, nil } -func ParseHttpUrl(rawUrl string, isHttps bool, suffix string) (ctx UrlContext, err error) { - var defaultPort int - if isHttps { - defaultPort = DefaultHttpsPort - } else { - defaultPort = DefaultHttpPort - } - - ctx, err = ParseUrl(rawUrl, defaultPort) +func ParseHttpUrl(rawUrl string, suffix string) (ctx UrlContext, err error) { + ctx, err = ParseUrl(rawUrl, -1) if err != nil { return } if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, suffix) { - return ctx, ErrUrl + return ctx, nazaerrors.Wrap(ErrUrl, fmt.Sprintf("%+v", ctx)) } return diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index d7f4175..5aeb507 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -10,6 +10,7 @@ package httpflv import ( "context" + "crypto/tls" "fmt" "net" "net/http" @@ -176,19 +177,43 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) { func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error { errChan := make(chan error, 1) + url := rawUrl // 异步握手 go func() { - if err := session.connect(rawUrl); err != nil { - errChan <- err + for { + if err := session.connect(url); err != nil { + errChan <- err + return + } + if err := session.writeHttpRequest(); err != nil { + errChan <- err + return + } + + statusCode, headers, err := session.readHttpRespHeader() + if err != nil { + errChan <- err + return + } + + // 处理跳转 + if statusCode == "301" || statusCode == "302" { + url = headers.Get("Location") + if url == "" { + nazalog.Warnf("[%s] redirect but Location not found. headers=%+v", session.uniqueKey, headers) + errChan <- nil + return + } + + _ = session.conn.Close() + nazalog.Debugf("[%s] redirect to %s", session.uniqueKey, url) + continue + } + + errChan <- nil return } - if err := session.writeHttpRequest(); err != nil { - errChan <- err - return - } - - errChan <- nil }() // 等待握手结果,或者超时通知 @@ -211,18 +236,31 @@ func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onRe } func (session *PullSession) connect(rawUrl string) (err error) { - session.urlCtx, err = base.ParseHttpflvUrl(rawUrl, false) + // TODO(chef): refactor 可以考虑抽象出一个http client,负责http拉流的建连、https、302等功能 + + session.urlCtx, err = base.ParseHttpflvUrl(rawUrl) if err != nil { return } - nazalog.Debugf("[%s] > tcp connect.", session.uniqueKey) + nazalog.Debugf("[%s] > tcp connect. %s", session.uniqueKey, session.urlCtx.HostWithPort) + + var conn net.Conn + if session.urlCtx.Scheme == "https" { + conf := &tls.Config{ + InsecureSkipVerify: true, + } + conn, err = tls.Dial("tcp", session.urlCtx.HostWithPort, conf) + } else { + conn, err = net.Dial("tcp", session.urlCtx.HostWithPort) + } - // # 建立连接 - conn, err := net.Dial("tcp", session.urlCtx.HostWithPort) if err != nil { return err } + + nazalog.Debugf("[%s] tcp connect succ. remote=%s", session.uniqueKey, conn.RemoteAddr().String()) + session.conn = connection.New(conn, func(option *connection.Option) { option.ReadBufSize = readBufSize option.WriteTimeoutMs = session.option.ReadTimeoutMs // TODO chef: 为什么是 Read 赋值给 Write @@ -240,17 +278,17 @@ func (session *PullSession) writeHttpRequest() error { return err } -func (session *PullSession) readHttpRespHeader() (statusLine string, headers http.Header, err error) { - // TODO chef: timeout +func (session *PullSession) readHttpRespHeader() (statusCode string, headers http.Header, err error) { + var statusLine string if statusLine, headers, err = nazahttp.ReadHttpHeader(session.conn); err != nil { return } - _, code, _, err := nazahttp.ParseHttpStatusLine(statusLine) + _, statusCode, _, err = nazahttp.ParseHttpStatusLine(statusLine) if err != nil { return } - nazalog.Debugf("[%s] < R http response header. code=%s", session.uniqueKey, code) + nazalog.Debugf("[%s] < R http response header. statusLine=%s", session.uniqueKey, statusLine) return } @@ -276,10 +314,6 @@ func (session *PullSession) runReadLoop(onReadFlvTag OnReadFlvTag) { _ = session.dispose(err) }() - if _, _, err = session.readHttpRespHeader(); err != nil { - return - } - if _, err = session.readFlvHeader(); err != nil { return }