[feat] httpflv pull拉流时,携带url参数

pull/8/head
q191201771 5 years ago
parent 6d2368f4c6
commit d0ff67ffb9

@ -15,8 +15,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/connection"
log "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
"github.com/q191201771/naza/pkg/unique" "github.com/q191201771/naza/pkg/unique"
) )
@ -37,9 +39,9 @@ type PullSession struct {
option PullSessionOption option PullSessionOption
host string host string
uri string pathWithQuery string
addr string addr string
} }
type ModPullSessionOption func(option *PullSessionOption) type ModPullSessionOption func(option *PullSessionOption)
@ -51,7 +53,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {
} }
uk := unique.GenUniqueKey("FLVPULL") uk := unique.GenUniqueKey("FLVPULL")
log.Infof("lifecycle new PullSession. [%s]", uk) nazalog.Infof("lifecycle new PullSession. [%s]", uk)
return &PullSession{ return &PullSession{
option: option, option: option,
UniqueKey: uk, UniqueKey: uk,
@ -79,23 +81,26 @@ func (session *PullSession) Pull(rawURL string, onReadFLVTag OnReadFLVTag) error
} }
func (session *PullSession) Dispose() { func (session *PullSession) Dispose() {
log.Infof("lifecycle dispose PullSession. [%s]", session.UniqueKey) nazalog.Infof("lifecycle dispose PullSession. [%s]", session.UniqueKey)
_ = session.Conn.Close() _ = session.Conn.Close()
} }
func (session *PullSession) Connect(rawURL string) error { func (session *PullSession) Connect(rawURL string) error {
// # 从 url 中解析 host uri addr // # 从 url 中解析 host uri addr
url, err := url.Parse(rawURL) u, err := url.Parse(rawURL)
if err != nil { if err != nil {
return err return err
} }
if url.Scheme != "http" || !strings.HasSuffix(url.Path, ".flv") { if u.Scheme != "http" || !strings.HasSuffix(u.Path, ".flv") {
return ErrHTTPFLV return ErrHTTPFLV
} }
session.host = url.Host session.host = u.Host
// TODO chef: uri with url.RawQuery? if u.RawQuery == "" {
session.uri = url.Path session.pathWithQuery = u.Path
} else {
session.pathWithQuery = fmt.Sprintf("%s?%s", u.Path, u.RawQuery)
}
if strings.Contains(session.host, ":") { if strings.Contains(session.host, ":") {
session.addr = session.host session.addr = session.host
@ -103,6 +108,8 @@ func (session *PullSession) Connect(rawURL string) error {
session.addr = session.host + ":80" session.addr = session.host + ":80"
} }
nazalog.Debugf("> tcp connect. [%s]", session.UniqueKey)
// # 建立连接 // # 建立连接
conn, err := net.DialTimeout("tcp", session.addr, time.Duration(session.option.ConnectTimeoutMS)*time.Millisecond) conn, err := net.DialTimeout("tcp", session.addr, time.Duration(session.option.ConnectTimeoutMS)*time.Millisecond)
if err != nil { if err != nil {
@ -118,22 +125,24 @@ func (session *PullSession) Connect(rawURL string) error {
func (session *PullSession) WriteHTTPRequest() error { func (session *PullSession) WriteHTTPRequest() error {
// # 发送 http GET 请求 // # 发送 http GET 请求
nazalog.Debugf("> send http request. [%s] GET %s", session.UniqueKey, session.pathWithQuery)
req := fmt.Sprintf("GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n", req := fmt.Sprintf("GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n",
session.uri, session.host) session.pathWithQuery, session.host)
_, err := session.Conn.Write([]byte(req)) _, err := session.Conn.Write([]byte(req))
return err return err
} }
func (session *PullSession) ReadHTTPRespHeader() (statusLine string, headers map[string]string, err error) { func (session *PullSession) ReadHTTPRespHeader() (statusLine string, headers map[string]string, err error) {
// TODO chef: timeout // TODO chef: timeout
if statusLine, headers, err = parseHTTPHeader(session.Conn); err != nil { if statusLine, headers, err = nazahttp.ReadHTTPHeader(session.Conn); err != nil {
return return
} }
if _, _, _, err = parseStatusLine(statusLine); err != nil { _, code, _, err := nazahttp.ParseHTTPStatusLine(statusLine)
if err != nil {
return return
} }
log.Infof("-----> http response header. [%s]", session.UniqueKey) nazalog.Debugf("< read http response header. [%s] code=%s", session.UniqueKey, code)
return return
} }
@ -143,7 +152,7 @@ func (session *PullSession) ReadFLVHeader() ([]byte, error) {
if err != nil { if err != nil {
return flvHeader, err return flvHeader, err
} }
log.Infof("-----> httpflv header. [%s]", session.UniqueKey) nazalog.Debugf("< read http flv header. [%s]", session.UniqueKey)
// TODO chef: check flv header's value // TODO chef: check flv header's value
return flvHeader, nil return flvHeader, nil

@ -10,7 +10,6 @@ package httpflv
import ( import (
"errors" "errors"
"strings"
) )
var ErrHTTPFLV = errors.New("lal.httpflv: fxxk") var ErrHTTPFLV = errors.New("lal.httpflv: fxxk")
@ -60,65 +59,3 @@ const (
const ( const (
SoundFormatAAC uint8 = 10 SoundFormatAAC uint8 = 10
) )
type LineReader interface {
ReadLine() (line []byte, isPrefix bool, err error)
}
// @return firstLine: request 的 request line 或 response 的 status line
// @return headers: 头中的键值对
func parseHTTPHeader(r LineReader) (firstLine string, headers map[string]string, err error) {
headers = make(map[string]string)
var line []byte
var isPrefix bool
line, isPrefix, err = r.ReadLine()
if err != nil {
return
}
if len(line) == 0 || isPrefix {
err = ErrHTTPFLV
return
}
firstLine = string(line)
for {
line, isPrefix, err = r.ReadLine()
if len(line) == 0 { // 读到一个空的 \r\n 表示http头全部读取完毕了
break
}
if isPrefix {
err = ErrHTTPFLV
return
}
if err != nil {
return
}
l := string(line)
pos := strings.Index(l, ":")
if pos == -1 {
err = ErrHTTPFLV
return
}
headers[strings.Trim(l[0:pos], " ")] = strings.Trim(l[pos+1:], " ")
}
return
}
func parseRequestLine(line string) (method string, uri string, version string, err error) {
items := strings.Split(line, " ")
if len(items) != 3 {
err = ErrHTTPFLV
return
}
return items[0], items[1], items[2], nil
}
func parseStatusLine(line string) (version string, statusCode string, reason string, err error) {
items := strings.Split(line, " ")
if len(items) != 3 {
err = ErrHTTPFLV
return
}
return items[0], items[1], items[2], nil
}

@ -69,7 +69,7 @@ func (server *Server) handleConnect(conn net.Conn) {
log.Errorf("read httpflv SubSession request error. [%s]", session.UniqueKey) log.Errorf("read httpflv SubSession request error. [%s]", session.UniqueKey)
return return
} }
log.Infof("-----> http request. [%s] uri=%s", session.UniqueKey, session.URI) log.Debugf("< read http request. [%s] uri=%s", session.UniqueKey, session.URI)
if !server.obs.NewHTTPFLVSubSessionCB(session) { if !server.obs.NewHTTPFLVSubSessionCB(session) {
session.Dispose() session.Dispose()

@ -14,6 +14,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/q191201771/naza/pkg/nazahttp"
"github.com/q191201771/naza/pkg/connection" "github.com/q191201771/naza/pkg/connection"
"github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/nazalog"
@ -72,10 +74,10 @@ func (session *SubSession) ReadRequest() (err error) {
requestLine string requestLine string
method string method string
) )
if requestLine, session.Headers, err = parseHTTPHeader(session.conn); err != nil { if requestLine, session.Headers, err = nazahttp.ReadHTTPHeader(session.conn); err != nil {
return return
} }
if method, session.URI, _, err = parseRequestLine(requestLine); err != nil { if method, session.URI, _, err = nazahttp.ParseHTTPRequestLine(requestLine); err != nil {
return return
} }
if method != "GET" { if method != "GET" {
@ -115,12 +117,12 @@ func (session *SubSession) RunLoop() error {
} }
func (session *SubSession) WriteHTTPResponseHeader() { func (session *SubSession) WriteHTTPResponseHeader() {
nazalog.Infof("<----- http response header. [%s]", session.UniqueKey) nazalog.Debugf("> send http response header. [%s]", session.UniqueKey)
session.WriteRawPacket(flvHTTPResponseHeader) session.WriteRawPacket(flvHTTPResponseHeader)
} }
func (session *SubSession) WriteFLVHeader() { func (session *SubSession) WriteFLVHeader() {
nazalog.Infof("<----- http flv header. [%s]", session.UniqueKey) nazalog.Debugf("> send http flv header. [%s]", session.UniqueKey)
session.WriteRawPacket(FLVHeader) session.WriteRawPacket(FLVHeader)
} }

Loading…
Cancel
Save