diff --git a/app/httpflvpull/httpflvpull.go b/app/httpflvpull/httpflvpull.go index 8ccb20a..aa1621d 100644 --- a/app/httpflvpull/httpflvpull.go +++ b/app/httpflvpull/httpflvpull.go @@ -7,36 +7,21 @@ import ( "os" ) -type Obs struct { -} - -func (obs *Obs) ReadHTTPRespHeaderCB() { - log.Info("ReadHTTPRespHeaderCB") -} - -func (obs *Obs) ReadFlvHeaderCB(flvHeader []byte) { - log.Info("ReadFlvHeaderCB") -} - -func (obs *Obs) ReadFlvTagCB(tag *httpflv.Tag) { - log.Infof("ReadFlvTagCB %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) -} - func main() { url := parseFlag() - var obs Obs - session := httpflv.NewPullSession(&obs, 0, 0) - err := session.Pull(url) + session := httpflv.NewPullSession(0, 0) + err := session.Pull(url, + func(tag *httpflv.Tag) { + log.Infof("ReadFlvTagCB. %+v %t %t", tag.Header, tag.IsAVCKeySeqHeader(), tag.IsAVCKeyNalu()) + }) if err != nil { log.Error(err) return } - err = session.RunLoop() - log.Error(err) } func parseFlag() string { - url := flag.String("i", "", "specify rtmp url") + url := flag.String("i", "", "specify http-flv url") flag.Parse() if *url == "" { flag.Usage() diff --git a/app/lal/main.go b/app/lal/main.go index 323b302..dec5c43 100644 --- a/app/lal/main.go +++ b/app/lal/main.go @@ -15,11 +15,8 @@ var sm *ServerManager func main() { confFile := parseFlag() - initLog() - log.Infof("bininfo: %s", bininfo.StringifySingleLine()) - config := loadConf(confFile) sm = NewServerManager(config) @@ -27,8 +24,6 @@ func main() { //shutdownAfter(60 * time.Second) - // TODO chef: 添加优雅退出信号处理 - startWebPProf() } @@ -80,6 +75,7 @@ func startWebPProf() { log.Info("start pprof listen. addr=:10001") } +// TODO chef: 添加优雅退出信号处理 func shutdownAfter(d time.Duration) { go func() { time.Sleep(d) diff --git a/build.sh b/build.sh index 98076ea..21ca2ce 100755 --- a/build.sh +++ b/build.sh @@ -32,3 +32,6 @@ cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull + +${ROOT_DIR}/bin/lal -v +ls -lrt ${ROOT_DIR}/bin diff --git a/build_linux.sh b/build_linux.sh index f674739..746f9fa 100755 --- a/build_linux.sh +++ b/build_linux.sh @@ -32,3 +32,6 @@ cd ${ROOT_DIR}/app/httpflvpull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR cd ${ROOT_DIR}/app/modflvfile && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/modflvfile cd ${ROOT_DIR}/app/rtmppull && GOOS=linux GOARCH=amd64 go build -o ${ROOT_DIR}/bin/rtmppull + +${ROOT_DIR}/bin/lal -v +ls -lrt ${ROOT_DIR}/bin diff --git a/pkg/httpflv/client_pull_session.go b/pkg/httpflv/client_pull_session.go index 65daa51..166812c 100644 --- a/pkg/httpflv/client_pull_session.go +++ b/pkg/httpflv/client_pull_session.go @@ -12,40 +12,64 @@ import ( ) type PullSession struct { + UniqueKey string + connectTimeoutMS int readTimeoutMS int - obs PullSessionObserver - Conn connection.Connection - + Conn connection.Connection closeOnce sync.Once - UniqueKey string -} + host string + uri string + addr string -type PullSessionObserver interface { - ReadHTTPRespHeaderCB() - ReadFlvHeaderCB(flvHeader []byte) - ReadFlvTagCB(tag *Tag) // after cb, PullSession won't use this tag data + readFlvTagCB ReadFlvTagCB } // @param connectTimeoutMS TCP连接时超时,单位毫秒,如果为0,则不设置超时 // @param readTimeoutMS 接收数据超时,单位毫秒,如果为0,则不设置超时 -func NewPullSession(obs PullSessionObserver, connectTimeoutMS int, readTimeoutMS int) *PullSession { +func NewPullSession(connectTimeoutMS int, readTimeoutMS int) *PullSession { uk := unique.GenUniqueKey("FLVPULL") log.Infof("lifecycle new PullSession. [%s]", uk) return &PullSession{ connectTimeoutMS: connectTimeoutMS, readTimeoutMS: readTimeoutMS, - obs: obs, - UniqueKey: uk, + UniqueKey: uk, } } -// 支持如下两种格式。当然,前提是对端支持 +type ReadFlvTagCB func(tag *Tag) + +// 阻塞直到拉流失败 +// +// @param rawURL 支持如下两种格式。(当然,前提是对端支持) // http://{domain}/{app_name}/{stream_name}.flv // http://{ip}/{domain}/{app_name}/{stream_name}.flv -func (session *PullSession) Pull(rawURL string) error { +// +// @param readFlvTagCB 读取到 flv tag 数据时回调。回调结束后,PullSession不会再使用 数据。 +func (session *PullSession) Pull(rawURL string, readFlvTagCB ReadFlvTagCB) error { + if err := session.Connect(rawURL); err != nil { + return err + } + if err := session.WriteHTTPGet(); err != nil { + return err + } + + return session.runReadLoop(readFlvTagCB) +} + +func (session *PullSession) Dispose(err error) { + session.closeOnce.Do(func() { + log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) + if err := session.Conn.Close(); err != nil { + log.Error("conn close error. [%s] err=%v", session.UniqueKey, err) + } + }) +} + +func (session *PullSession) Connect(rawURL string) error { + // # 从 url 中解析 host uri addr url, err := url.Parse(rawURL) if err != nil { return err @@ -54,89 +78,56 @@ func (session *PullSession) Pull(rawURL string) error { return httpFlvErr } - host := url.Host + session.host = url.Host // TODO chef: uri with url.RawQuery? - uri := url.Path + session.uri = url.Path - var addr string - if strings.Contains(host, ":") { - addr = host + if strings.Contains(session.host, ":") { + session.addr = session.host } else { - addr = host + ":80" + session.addr = session.host + ":80" } + // # 建立连接 var conn net.Conn if session.connectTimeoutMS == 0 { - conn, err = net.Dial("tcp", addr) + conn, err = net.Dial("tcp", session.addr) } else { - conn, err = net.DialTimeout("tcp", addr, time.Duration(session.connectTimeoutMS)*time.Millisecond) + conn, err = net.DialTimeout("tcp", session.addr, time.Duration(session.connectTimeoutMS)*time.Millisecond) } if err != nil { return err } session.Conn = connection.New(conn, &connection.Config{ReadBufSize: readBufSize}) + return nil +} - _, err = session.Conn.PrintfWithTimeout( +func (session *PullSession) WriteHTTPGet() error { + // # 发送 http GET 请求 + _, err := session.Conn.PrintfWithTimeout( session.readTimeoutMS, "GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n", - uri, host) - + session.uri, session.host) return err } -func (session *PullSession) RunLoop() error { - err := session.runReadLoop() - session.Dispose(err) - return err -} - -func (session *PullSession) Dispose(err error) { - session.closeOnce.Do(func() { - log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err) - if err := session.Conn.Close(); err != nil { - log.Error("conn close error. [%s] err=%v", session.UniqueKey, err) - } - }) -} - -func (session *PullSession) runReadLoop() error { - if err := session.readHTTPRespHeader(); err != nil { - return err - } - // TODO chef: 把内容返回给上层 - session.obs.ReadHTTPRespHeaderCB() - - flvHeader, err := session.readFlvHeader() - if err != nil { - return err - } - session.obs.ReadFlvHeaderCB(flvHeader) - - for { - tag, err := session.readTag() - if err != nil { - return err - } - session.obs.ReadFlvTagCB(tag) - } -} - -func (session *PullSession) readHTTPRespHeader() error { +func (session *PullSession) ReadHTTPRespHeader() (firstLine string, headers map[string]string, err error) { // TODO chef: timeout - _, firstLine, headers, err := parseHTTPHeader(session.Conn) + _, firstLine, headers, err = parseHTTPHeader(session.Conn) if err != nil { - return err + return } if !strings.Contains(firstLine, "200") || len(headers) == 0 { - return httpFlvErr + err = httpFlvErr + return } log.Infof("-----> http response header. [%s]", session.UniqueKey) - return nil + return } -func (session *PullSession) readFlvHeader() ([]byte, error) { +func (session *PullSession) ReadFlvHeader() ([]byte, error) { flvHeader := make([]byte, flvHeaderSize) _, err := session.Conn.ReadAtLeastWithTimeout(flvHeader, flvHeaderSize, session.readTimeoutMS) if err != nil { @@ -148,7 +139,7 @@ func (session *PullSession) readFlvHeader() ([]byte, error) { return flvHeader, nil } -func (session *PullSession) readTag() (*Tag, error) { +func (session *PullSession) ReadTag() (*Tag, error) { rawHeader := make([]byte, TagHeaderSize) if _, err := session.Conn.ReadAtLeastWithTimeout(rawHeader, TagHeaderSize, session.readTimeoutMS); err != nil { return nil, err @@ -167,3 +158,21 @@ func (session *PullSession) readTag() (*Tag, error) { return tag, nil } + +func (session *PullSession) runReadLoop(readFlvTagCB ReadFlvTagCB) error { + if _, _, err := session.ReadHTTPRespHeader(); err != nil { + return err + } + + if _, err := session.ReadFlvHeader(); err != nil { + return err + } + + for { + tag, err := session.ReadTag() + if err != nil { + return err + } + readFlvTagCB(tag) + } +} diff --git a/pkg/httpflv/group.go b/pkg/httpflv/group.go index 7518edc..e5004bf 100644 --- a/pkg/httpflv/group.go +++ b/pkg/httpflv/group.go @@ -11,8 +11,11 @@ var gopCacheNum = 2 // TODO chef: 所有新增对象的UniqueKey +// TODO chef: 将Observer方式改成 func CB方式 type GroupObserver interface { - PullSessionObserver + ReadHTTPRespHeaderCB() + ReadFlvHeaderCB(flvHeader []byte) + ReadFlvTagCB(tag *Tag) } type Group struct { @@ -66,7 +69,7 @@ func (group *Group) AddHTTPFlvSubSession(session *SubSession) { } func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) { - group.pullSession = NewPullSession(group, int(connectTimeout), int(readTimeout)) + group.pullSession = NewPullSession(int(connectTimeout), int(readTimeout)) defer func() { group.mutex.Lock() @@ -77,16 +80,17 @@ func (group *Group) Pull(addr string, connectTimeout int64, readTimeout int64) { log.Infof("<----- connect. [%s]", group.pullSession.UniqueKey) url := fmt.Sprintf("http://%s/%s/%s.flv", addr, group.appName, group.streamName) - if err := group.pullSession.Pull(url); err != nil { - log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err) + // TODO chef: impl cb + if err := group.pullSession.Pull(url, nil); err != nil { + //log.Errorf("-----> connect error. [%s] err=%v", group.pullSession.UniqueKey, err) return } - log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey) + //log.Infof("-----> connect succ. [%s]", group.pullSession.UniqueKey) - if err := group.pullSession.RunLoop(); err != nil { - log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err) - return - } + //if err := group.pullSession.RunLoop(); err != nil { + // log.Debugf("PullSession loop done. [%s] err=%v", group.pullSession.UniqueKey, err) + // return + //} } func (group *Group) IsTotalEmpty() bool { diff --git a/pkg/httpflv/server_sub_session.go b/pkg/httpflv/server_sub_session.go index ee7a2c3..9ca3064 100644 --- a/pkg/httpflv/server_sub_session.go +++ b/pkg/httpflv/server_sub_session.go @@ -2,7 +2,6 @@ package httpflv import ( "bufio" - "github.com/q191201771/nezha/pkg/connstat" "github.com/q191201771/nezha/pkg/log" "github.com/q191201771/nezha/pkg/unique" "net" @@ -30,7 +29,6 @@ var wChanSize = 1024 // TODO chef: 1024 type SubSession struct { UniqueKey string - ConnStat connstat.ConnStat writeTimeout int64 StartTick int64 @@ -113,7 +111,6 @@ func (session *SubSession) ReadRequest() (err error) { } func (session *SubSession) RunLoop() error { - session.ConnStat.Start(0, session.writeTimeout) go func() { buf := make([]byte, 128) if _, err := session.conn.Read(buf); err != nil { @@ -177,12 +174,11 @@ func (session *SubSession) runWriteLoop() error { } // TODO chef: use bufio.Writer - n, err := session.conn.Write(pkt) + _, err := session.conn.Write(pkt) if err != nil { session.Dispose(err) return err } - session.ConnStat.Write(n) } } }