整理 pkg/httpflv/client_pull_session.go

pull/200/head
q191201771 6 years ago
parent 5dc559c371
commit 97b5a05973

@ -7,36 +7,21 @@ import (
"os"
)
type Obs struct {
}
func (obs *Obs) ReadHTTPRespHeaderCB() {
log.Info("ReadHTTPRespHeaderCB")
}
func (obs *Obs) ReadFlvHeaderCB(flvHeader []byte) {
log.Info("ReadFlvHeaderCB")
}
func (obs *Obs) ReadFlvTagCB(tag *httpflv.Tag) {
log.Infof("ReadFlvTagCB %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
}
func main() {
url := parseFlag()
var obs Obs
session := httpflv.NewPullSession(&obs, 0, 0)
err := session.Pull(url)
session := httpflv.NewPullSession(0, 0)
err := session.Pull(url,
func(tag *httpflv.Tag) {
log.Infof("ReadFlvTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu())
})
if err != nil {
log.Error(err)
return
}
err = session.RunLoop()
log.Error(err)
}
func parseFlag() string {
url := flag.String("i", "", "specify rtmp url")
url := flag.String("i", "", "specify http-flv url")
flag.Parse()
if *url == "" {
flag.Usage()

@ -15,11 +15,8 @@ var sm *ServerManager
func main() {
confFile := parseFlag()
initLog()
log.Infof("bininfo: %s", bininfo.StringifySingleLine())
config := loadConf(confFile)
sm = NewServerManager(config)
@ -27,8 +24,6 @@ func main() {
//shutdownAfter(60 * time.Second)
// TODO chef: 添加优雅退出信号处理
startWebPProf()
}
@ -80,6 +75,7 @@ func startWebPProf() {
log.Info("start pprof listen. addr=:10001")
}
// TODO chef: 添加优雅退出信号处理
func shutdownAfter(d time.Duration) {
go func() {
time.Sleep(d)

@ -32,3 +32,6 @@ cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull
cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile
cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull
${ROOT_DIR}/bin/lal -v
ls -lrt ${ROOT_DIR}/bin

@ -32,3 +32,6 @@ cd ${ROOT_DIR}/app/httpflvpull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR
cd ${ROOT_DIR}/app/modflvfile && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/modflvfile
cd ${ROOT_DIR}/app/rtmppull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/rtmppull
${ROOT_DIR}/bin/lal -v
ls -lrt ${ROOT_DIR}/bin

@ -12,40 +12,64 @@ import (
)
type PullSession struct {
UniqueKey string
connectTimeoutMS int
readTimeoutMS int
obs PullSessionObserver
Conn connection.Connection
Conn connection.Connection
closeOnce sync.Once
UniqueKey string
}
host string
uri string
addr string
type PullSessionObserver interface {
ReadHTTPRespHeaderCB()
ReadFlvHeaderCB(flvHeader []byte)
ReadFlvTagCB(tag *Tag) // after cb, PullSession won't use this tag data
readFlvTagCB ReadFlvTagCB
}
// @param connectTimeoutMS TCP连接时超时单位毫秒如果为0则不设置超时
// @param readTimeoutMS 接收数据超时单位毫秒如果为0则不设置超时
func NewPullSession(obs PullSessionObserver, connectTimeoutMS int, readTimeoutMS int) *PullSession {
func NewPullSession(connectTimeoutMS int, readTimeoutMS int) *PullSession {
uk := unique.GenUniqueKey("FLVPULL")
log.Infof("lifecycle new PullSession. [%s]", uk)
return &PullSession{
connectTimeoutMS: connectTimeoutMS,
readTimeoutMS: readTimeoutMS,
obs: obs,
UniqueKey: uk,
UniqueKey: uk,
}
}
// 支持如下两种格式。当然,前提是对端支持
type ReadFlvTagCB func(tag *Tag)
// 阻塞直到拉流失败
//
// @param rawURL 支持如下两种格式。(当然,前提是对端支持)
// http://{domain}/{app_name}/{stream_name}.flv
// http://{ip}/{domain}/{app_name}/{stream_name}.flv
func (session *PullSession) Pull(rawURL string) error {
//
// @param readFlvTagCB 读取到 flv tag 数据时回调。回调结束后PullSession不会再使用 <tag> 数据。
func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error {
if err := session.Connect(rawURL); err != nil {
return err
}
if err := session.WriteHTTPGet(); err != nil {
return err
}
return session.runReadLoop(readFlvTagCB)
}
func (session *PullSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
if err := session.Conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}
func (session *PullSession) Connect(rawURL string) error {
// # 从 url 中解析 host uri addr
url, err := url.Parse(rawURL)
if err != nil {
return err
@ -54,89 +78,56 @@ func (session *PullSession) Pull(rawURL string) error {
return httpFlvErr
}
host := url.Host
session.host = url.Host
// TODO chef: uri with url.RawQuery?
uri := url.Path
session.uri = url.Path
var addr string
if strings.Contains(host, ":") {
addr = host
if strings.Contains(session.host, ":") {
session.addr = session.host
} else {
addr = host + ":80"
session.addr = session.host + ":80"
}
// # 建立连接
var conn net.Conn
if session.connectTimeoutMS == 0 {
conn, err = net.Dial("tcp", addr)
conn, err = net.Dial("tcp", session.addr)
} else {
conn, err = net.DialTimeout("tcp", addr, time.Duration(session.connectTimeoutMS)*time.Millisecond)
conn, err = net.DialTimeout("tcp", session.addr, time.Duration(session.connectTimeoutMS)*time.Millisecond)
}
if err != nil {
return err
}
session.Conn = connection.New(conn, &connection.Config{ReadBufSize: readBufSize})
return nil
}
_, err = session.Conn.PrintfWithTimeout(
func (session *PullSession) WriteHTTPGet() error {
// # 发送 http GET 请求
_, err := session.Conn.PrintfWithTimeout(
session.readTimeoutMS,
"GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n",
uri, host)
session.uri, session.host)
return err
}
func (session *PullSession) RunLoop() error {
err := session.runReadLoop()
session.Dispose(err)
return err
}
func (session *PullSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
if err := session.Conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}
func (session *PullSession) runReadLoop() error {
if err := session.readHTTPRespHeader(); err != nil {
return err
}
// TODO chef: 把内容返回给上层
session.obs.ReadHTTPRespHeaderCB()
flvHeader, err := session.readFlvHeader()
if err != nil {
return err
}
session.obs.ReadFlvHeaderCB(flvHeader)
for {
tag, err := session.readTag()
if err != nil {
return err
}
session.obs.ReadFlvTagCB(tag)
}
}
func (session *PullSession) readHTTPRespHeader() error {
func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[string]string, err error) {
// TODO chef: timeout
_, firstLine, headers, err := parseHTTPHeader(session.Conn)
_, firstLine, headers, err = parseHTTPHeader(session.Conn)
if err != nil {
return err
return
}
if !strings.Contains(firstLine, "200") || len(headers) == 0 {
return httpFlvErr
err = httpFlvErr
return
}
log.Infof("-----> http response header. [%s]", session.UniqueKey)
return nil
return
}
func (session *PullSession) readFlvHeader() ([]byte, error) {
func (session *PullSession) ReadFlvHeader() ([]byte, error) {
flvHeader := make([]byte, flvHeaderSize)
_, err := session.Conn.ReadAtLeastWithTimeout(flvHeader, flvHeaderSize, session.readTimeoutMS)
if err != nil {
@ -148,7 +139,7 @@ func (session *PullSession) readFlvHeader() ([]byte, error) {
return flvHeader, nil
}
func (session *PullSession) readTag() (*Tag, error) {
func (session *PullSession) ReadTag() (*Tag, error) {
rawHeader := make([]byte, TagHeaderSize)
if _, err := session.Conn.ReadAtLeastWithTimeout(rawHeader, TagHeaderSize, session.readTimeoutMS); err != nil {
return nil, err
@ -167,3 +158,21 @@ func (session *PullSession) readTag() (*Tag, error) {
return tag, nil
}
func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error {
if _, _, err := session.ReadHTTPRespHeader(); err != nil {
return err
}
if _, err := session.ReadFlvHeader(); err != nil {
return err
}
for {
tag, err := session.ReadTag()
if err != nil {
return err
}
readFlvTagCB(tag)
}
}

@ -11,8 +11,11 @@ var gopCacheNum = 2
// TODO chef: 所有新增对象的UniqueKey
// TODO chef: 将Observer方式改成 func CB方式
type GroupObserver interface {
PullSessionObserver
ReadHTTPRespHeaderCB()
ReadFlvHeaderCB(flvHeader []byte)
ReadFlvTagCB(tag *Tag)
}
type Group struct {
@ -66,7 +69,7 @@ func (group *Group) AddHTTPFlvSubSession(session *SubSession) {
}
func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) {
group.pullSession = NewPullSession(group, int(connectTimeout), int(readTimeout))
group.pullSession = NewPullSession(int(connectTimeout), int(readTimeout))
defer func() {
group.mutex.Lock()
@ -77,16 +80,17 @@ func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) {
log.Infof("<----- connect. [%s]", group.pullSession.UniqueKey)
url := fmt.Sprintf("http://%s/%s/%s.flv", addr, group.appName, group.streamName)
if err := group.pullSession.Pull(url); err != nil {
log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err)
// TODO chef: impl cb
if err := group.pullSession.Pull(url, nil); err != nil {
//log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err)
return
}
log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey)
//log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey)
if err := group.pullSession.RunLoop(); err != nil {
log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err)
return
}
//if err := group.pullSession.RunLoop(); err != nil {
// log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err)
// return
//}
}
func (group *Group) IsTotalEmpty() bool {

@ -2,7 +2,6 @@ package httpflv
import (
"bufio"
"github.com/q191201771/nezha/pkg/connstat"
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/nezha/pkg/unique"
"net"
@ -30,7 +29,6 @@ var wChanSize = 1024 // TODO chef: 1024
type SubSession struct {
UniqueKey string
ConnStat connstat.ConnStat
writeTimeout int64
StartTick int64
@ -113,7 +111,6 @@ func (session *SubSession) ReadRequest() (err error) {
}
func (session *SubSession) RunLoop() error {
session.ConnStat.Start(0, session.writeTimeout)
go func() {
buf := make([]byte, 128)
if _, err := session.conn.Read(buf); err != nil {
@ -177,12 +174,11 @@ func (session *SubSession) runWriteLoop() error {
}
// TODO chef: use bufio.Writer
n, err := session.conn.Write(pkt)
_, err := session.conn.Write(pkt)
if err != nil {
session.Dispose(err)
return err
}
session.ConnStat.Write(n)
}
}
}

Loading…
Cancel
Save