[feat] httpflv.PullSession支持https,支持302跳转

pull/114/head
q191201771 3 years ago
parent 505cf2e834
commit b0bc51c239

@ -15,6 +15,8 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"github.com/q191201771/naza/pkg/nazaerrors"
) )
// 见单元测试 // 见单元测试
@ -38,7 +40,6 @@ type UrlPathContext struct {
RawQuery string RawQuery string
} }
// TODO chef: 考虑把rawUrl也放入其中
type UrlContext struct { type UrlContext struct {
Url string Url string
@ -68,7 +69,21 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) {
return ctx, err return ctx, err
} }
if stdUrl.Scheme == "" { if stdUrl.Scheme == "" {
return ctx, ErrUrl return ctx, nazaerrors.Wrap(ErrUrl)
}
// 如果不存在,则设置默认的
if defaultPort == -1 {
// TODO(chef): 测试大小写的情况
switch stdUrl.Scheme {
case "http":
defaultPort = DefaultHttpPort
case "https":
defaultPort = DefaultHttpsPort
case "rtmp":
defaultPort = DefaultRtmpPort
case "rtsp":
defaultPort = DefaultRtspPort
}
} }
ctx.Scheme = stdUrl.Scheme ctx.Scheme = stdUrl.Scheme
@ -113,12 +128,12 @@ func ParseUrl(rawUrl string, defaultPort int) (ctx UrlContext, err error) {
} }
func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) { func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) {
ctx, err = ParseUrl(rawUrl, DefaultRtmpPort) ctx, err = ParseUrl(rawUrl, -1)
if err != nil { if err != nil {
return return
} }
if ctx.Scheme != "rtmp" || ctx.Host == "" || ctx.Path == "" { if ctx.Scheme != "rtmp" || ctx.Host == "" || ctx.Path == "" {
return ctx, ErrUrl return ctx, nazaerrors.Wrap(ErrUrl)
} }
// 注意使用ffmpeg推流时会把`rtmp://127.0.0.1/test110`中的test110作为appName(streamName则为空) // 注意使用ffmpeg推流时会把`rtmp://127.0.0.1/test110`中的test110作为appName(streamName则为空)
@ -132,21 +147,17 @@ func ParseRtmpUrl(rawUrl string) (ctx UrlContext, err error) {
return return
} }
func ParseHttpflvUrl(rawUrl string, isHttps bool) (ctx UrlContext, err error) { func ParseHttpflvUrl(rawUrl string) (ctx UrlContext, err error) {
return ParseHttpUrl(rawUrl, isHttps, ".flv") return ParseHttpUrl(rawUrl, ".flv")
}
func ParseHttptsUrl(rawUrl string, isHttps bool) (ctx UrlContext, err error) {
return ParseHttpUrl(rawUrl, isHttps, ".ts")
} }
func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) { func ParseRtspUrl(rawUrl string) (ctx UrlContext, err error) {
ctx, err = ParseUrl(rawUrl, DefaultRtspPort) ctx, err = ParseUrl(rawUrl, -1)
if err != nil { if err != nil {
return return
} }
if ctx.Scheme != "rtsp" || ctx.Host == "" || ctx.Path == "" { if ctx.Scheme != "rtsp" || ctx.Host == "" || ctx.Path == "" {
return ctx, ErrUrl return ctx, nazaerrors.Wrap(ErrUrl)
} }
return return
@ -183,20 +194,13 @@ func parseUrlPath(stdUrl *url.URL) (ctx UrlPathContext, err error) {
return ctx, nil return ctx, nil
} }
func ParseHttpUrl(rawUrl string, isHttps bool, suffix string) (ctx UrlContext, err error) { func ParseHttpUrl(rawUrl string, suffix string) (ctx UrlContext, err error) {
var defaultPort int ctx, err = ParseUrl(rawUrl, -1)
if isHttps {
defaultPort = DefaultHttpsPort
} else {
defaultPort = DefaultHttpPort
}
ctx, err = ParseUrl(rawUrl, defaultPort)
if err != nil { if err != nil {
return return
} }
if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, suffix) { if (ctx.Scheme != "http" && ctx.Scheme != "https") || ctx.Host == "" || ctx.Path == "" || !strings.HasSuffix(ctx.LastItemOfPath, suffix) {
return ctx, ErrUrl return ctx, nazaerrors.Wrap(ErrUrl, fmt.Sprintf("%+v", ctx))
} }
return return

@ -10,6 +10,7 @@ package httpflv
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@ -176,19 +177,43 @@ func (session *PullSession) IsAlive() (readAlive, writeAlive bool) {
func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error { func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onReadFlvTag OnReadFlvTag) error {
errChan := make(chan error, 1) errChan := make(chan error, 1)
url := rawUrl
// 异步握手 // 异步握手
go func() { go func() {
if err := session.connect(rawUrl); err != nil { for {
errChan <- err if err := session.connect(url); err != nil {
errChan <- err
return
}
if err := session.writeHttpRequest(); err != nil {
errChan <- err
return
}
statusCode, headers, err := session.readHttpRespHeader()
if err != nil {
errChan <- err
return
}
// 处理跳转
if statusCode == "301" || statusCode == "302" {
url = headers.Get("Location")
if url == "" {
nazalog.Warnf("[%s] redirect but Location not found. headers=%+v", session.uniqueKey, headers)
errChan <- nil
return
}
_ = session.conn.Close()
nazalog.Debugf("[%s] redirect to %s", session.uniqueKey, url)
continue
}
errChan <- nil
return return
} }
if err := session.writeHttpRequest(); err != nil {
errChan <- err
return
}
errChan <- nil
}() }()
// 等待握手结果,或者超时通知 // 等待握手结果,或者超时通知
@ -211,18 +236,31 @@ func (session *PullSession) pullContext(ctx context.Context, rawUrl string, onRe
} }
func (session *PullSession) connect(rawUrl string) (err error) { func (session *PullSession) connect(rawUrl string) (err error) {
session.urlCtx, err = base.ParseHttpflvUrl(rawUrl, false) // TODO(chef): refactor 可以考虑抽象出一个http client负责http拉流的建连、https、302等功能
session.urlCtx, err = base.ParseHttpflvUrl(rawUrl)
if err != nil { if err != nil {
return return
} }
nazalog.Debugf("[%s] > tcp connect.", session.uniqueKey) nazalog.Debugf("[%s] > tcp connect. %s", session.uniqueKey, session.urlCtx.HostWithPort)
var conn net.Conn
if session.urlCtx.Scheme == "https" {
conf := &tls.Config{
InsecureSkipVerify: true,
}
conn, err = tls.Dial("tcp", session.urlCtx.HostWithPort, conf)
} else {
conn, err = net.Dial("tcp", session.urlCtx.HostWithPort)
}
// # 建立连接
conn, err := net.Dial("tcp", session.urlCtx.HostWithPort)
if err != nil { if err != nil {
return err return err
} }
nazalog.Debugf("[%s] tcp connect succ. remote=%s", session.uniqueKey, conn.RemoteAddr().String())
session.conn = connection.New(conn, func(option *connection.Option) { session.conn = connection.New(conn, func(option *connection.Option) {
option.ReadBufSize = readBufSize option.ReadBufSize = readBufSize
option.WriteTimeoutMs = session.option.ReadTimeoutMs // TODO chef: 为什么是 Read 赋值给 Write option.WriteTimeoutMs = session.option.ReadTimeoutMs // TODO chef: 为什么是 Read 赋值给 Write
@ -240,17 +278,17 @@ func (session *PullSession) writeHttpRequest() error {
return err return err
} }
func (session *PullSession) readHttpRespHeader() (statusLine string, headers http.Header, err error) { func (session *PullSession) readHttpRespHeader() (statusCode string, headers http.Header, err error) {
// TODO chef: timeout var statusLine string
if statusLine, headers, err = nazahttp.ReadHttpHeader(session.conn); err != nil { if statusLine, headers, err = nazahttp.ReadHttpHeader(session.conn); err != nil {
return return
} }
_, code, _, err := nazahttp.ParseHttpStatusLine(statusLine) _, statusCode, _, err = nazahttp.ParseHttpStatusLine(statusLine)
if err != nil { if err != nil {
return return
} }
nazalog.Debugf("[%s] < R http response header. code=%s", session.uniqueKey, code) nazalog.Debugf("[%s] < R http response header. statusLine=%s", session.uniqueKey, statusLine)
return return
} }
@ -276,10 +314,6 @@ func (session *PullSession) runReadLoop(onReadFlvTag OnReadFlvTag) {
_ = session.dispose(err) _ = session.dispose(err)
}() }()
if _, _, err = session.readHttpRespHeader(); err != nil {
return
}
if _, err = session.readFlvHeader(); err != nil { if _, err = session.readFlvHeader(); err != nil {
return return
} }

Loading…
Cancel
Save