* PullSession和SubSession的主动关闭、被动关闭

* 引入日志库seelog,添加一些日志
* 缓存gop
* 部分解析avc seq header
pull/200/head
q191201771 6 years ago
parent 5a2ea4fadc
commit bf6b9502df

13
.gitignore vendored

@ -1,7 +1,10 @@
/.idea
/rtmp
/demo
/conf/lal.conf.json
/demo/httpflvpull/httpflvpull.go
/demo/
/conf/self.conf.json
/rtmp/
/.idea/
/logs/
/TODO.md
/lal
/lal_linux
/build_linux.sh

@ -0,0 +1,35 @@
lal - Go语言流媒体服务器
#### 编译和运行
```
$go get -u github.com/q191201771/lal
# cd into lal
$go build
# ./lal -c <配置文件> -l <日志配置文件>,比如:
$./lal -c conf/lal.conf.json -l conf/log.dev.xml
```
#### 配置文件说明
```
{
"httpflv": {
"sub_listen_addr": ":8080", // http-flv拉流地址
"pull_addr": "pull.xxx.com", // 如果设置则当客户端连接lal拉流且流不存在时lal会使用http-flv去该域名回
// 源拉流至本地再转发
"pull_connect_timeout": 2, // 回源连接超时时间
"pull_read_timeout": 20, // 回源读取数据超时时间
"sub_idle_timeout": 10, // 往客户端发送数据时的超时时间
"stop_pull_while_no_sub_timeout": 5, // 回源的流超过多长时间没有客户端播放,则关闭回源的流
"gop_cache_num": 2 // gop缓存个数如果设置为0则只缓存seq header不缓存gop数据
}
}
```
TODO 日志配置文件说明
#### 依赖
* cihub/seelog

@ -1,5 +1,7 @@
package bele
// assume local is `le`
func BeUInt24(p []byte) (ret uint32) {
ret = 0
ret |= uint32(p[0]) << 16
@ -7,3 +9,7 @@ func BeUInt24(p []byte) (ret uint32) {
ret |= uint32(p[2])
return
}
func BeUInt16(p []byte) uint16 {
return (uint16(p[0]) << 8) | uint16(p[1])
}

@ -0,0 +1,11 @@
{
"httpflv": {
"sub_listen_addr": ":8080",
"pull_addr": "pull.xxx.com",
"pull_connect_timeout": 2,
"pull_read_timeout": 20,
"sub_idle_timeout": 10,
"stop_pull_while_no_sub_timeout": 5,
"gop_cache_num": 2
}
}

@ -1,10 +0,0 @@
{
"httpflv": {
"sub_listen_addr": ":8080",
"pull_addr": "pull.x.com",
"pull_connect_timeout": 2,
"pull_read_timeout": 10,
"sub_idle_timeout": 10,
"stop_pull_while_no_sub_timeout": 30
}
}

@ -0,0 +1,35 @@
<seelog minlevel="trace">
<outputs formatid="file_common">
<filter levels="trace">
<console formatid="console_trace" />
</filter>
<filter levels="debug">
<console formatid="console_debug" />
</filter>
<filter levels="info">
<console formatid="console_info" />
</filter>
<filter levels="warn">
<console formatid="console_warn" />
</filter>
<filter levels="error">
<console formatid="console_error" />
</filter>
<filter levels="critical">
<console formatid="console_critical" />
</filter>
<file path="./logs/lal.log" />
<filter levels="warn,error,critical">
<file path="./logs/lal_error.log" />
</filter>
</outputs>
<formats>
<format id="file_common" format="%Date(2006-01-02 15:04:05.000) %LEV %Msg - %File:%Line%n" />
<format id="console_trace" format="%Date(2006-01-02 15:04:05.000) %EscM(37)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
<format id="console_debug" format="%Date(2006-01-02 15:04:05.000) %EscM(37)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
<format id="console_info" format="%Date(2006-01-02 15:04:05.000) %EscM(36)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
<format id="console_warn" format="%Date(2006-01-02 15:04:05.000) %EscM(33)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
<format id="console_error" format="%Date(2006-01-02 15:04:05.000) %EscM(31)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
<format id="console_critical" format="%Date(2006-01-02 15:04:05.000) %EscM(31)%LEV%EscM(49)%EscM(0) %Msg - %File:%Line%n" />
</formats>
</seelog>

@ -0,0 +1,11 @@
<seelog minlevel="info">
<outputs formatid="common">
<rollingfile type="date" filename="./logs/lal.log" datepattern="2006-01-02" />
<filter levels="warn,error,critical">
<rollingfile type="date" filename="./logs/lal_error.log" datepattern="2006-01-02" />
</filter>
</outputs>
<formats>
<format id="common" format="%Date(2006-01-02 15:04:05) %LEV %Msg%n" />
</formats>
</seelog>

@ -17,7 +17,6 @@ func LoadConf(confFile string) (*Config, error) {
return nil, err
}
if err = json.Unmarshal(rawContent, &config); err != nil {
panic(err)
return nil, err
}
return &config, nil

@ -0,0 +1,52 @@
package httpflv
import (
"github.com/q191201771/lal/bele"
"github.com/q191201771/lal/log"
)
// TODO chef: move me to other packet
// H.264-AVC-ISO_IEC_14496-15.pdf
// 5.2.4 Decoder configuration information
// <buf> body of tag
func parseAvcSeqHeader(buf []byte) (sps, pps []byte, err error) {
// TODO chef: check if read out of <buf> range
if buf[0] != AvcKey || buf[1] != AvcPacketTypeSeqHeader || buf[2] != 0 || buf[3] != 0 || buf[4] != 0 {
log.Error("parse avc seq header failed.")
err = fxxkErr
return
}
//configurationVersion := buf[5]
//avcProfileIndication := buf[6]
//profileCompatibility := buf[7]
//avcLevelIndication := buf[8]
//lengthSizeMinusOne := buf[9] & 0x03
index := 10
numOfSps := int(buf[index] & 0x1F)
index++
// TODO chef: if the situation of multi sps exist?
// only take the last one.
for i := 0; i < numOfSps; i++ {
lenOfSps := int(bele.BeUInt16(buf[index:]))
index += 2
sps = append(sps, buf[index:index+lenOfSps]...)
index += lenOfSps
}
numOfPps := int(buf[index] & 0x1F)
index++
for i := 0; i < numOfPps; i++ {
lenOfPps := int(bele.BeUInt16(buf[index:]))
index += 2
pps = append(pps, buf[index:index+lenOfPps]...)
index += lenOfPps
}
return
}

@ -7,4 +7,5 @@ type Config struct {
PullReadTimeout int64 `json:"pull_read_timeout"`
SubIdleTimeout int64 `json:"sub_idle_timeout"`
StopPullWhileNoSubTimeout int64 `json:"stop_pull_while_no_sub_timeout"`
GopCacheNum int `json:"gop_cache_num"`
}

@ -0,0 +1,206 @@
package httpflv
import (
"bytes"
"github.com/q191201771/lal/log"
"sync"
)
type Gop struct {
raw []byte
firstTimestamp uint32
}
type GopCache struct {
gopNum int
metadata *Tag
avcSeqHeader *Tag
aacSeqHeader *Tag
gops []*Gop // TODO chef: maybe use other container to mock a queue
mutex sync.Mutex
}
// gopNum: 0 means only cache metadata, avc seq header, aac seq header
func NewGopCache(gopNum int) *GopCache {
return &GopCache{
gopNum: gopNum,
}
}
func (c *GopCache) Push(tag *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if tag.isMetaData() {
// TODO chef: will this happen?
if c.metadata != nil {
log.Debugf("updating metadata.")
log.Debug(tag.Header, tag.Raw[tagHeaderSize:])
log.Debug(c.metadata.Header, c.metadata.Raw[tagHeaderSize:])
c.clearGop()
}
c.metadata = tag
}
if tag.isAvcKeySeqHeader() {
//log.Debug(parseAvcSeqHeader(tag.Raw[tagHeaderSize:]))
if c.avcSeqHeader == nil {
c.avcSeqHeader = tag
} else {
// TODO chef: compare nessary? if other way to update seq header and handle cache stuff?
if bytes.Compare(tag.Raw[tagHeaderSize:], c.avcSeqHeader.Raw[tagHeaderSize:]) == 0 {
// noop
} else {
log.Debugf("updating avc seq header.")
log.Debug(tag.Header, tag.Raw[tagHeaderSize:])
log.Debug(c.avcSeqHeader.Header, c.avcSeqHeader.Raw[tagHeaderSize:])
c.clearGop()
c.avcSeqHeader = tag
}
}
}
if tag.isAacSeqHeader() {
if c.aacSeqHeader == nil {
c.aacSeqHeader = tag
} else {
if bytes.Compare(tag.Raw[tagHeaderSize:], c.aacSeqHeader.Raw[tagHeaderSize:]) == 0 {
// noop
} else {
log.Debugf("updating aac seq header.")
c.clearGop()
c.aacSeqHeader = tag
}
}
c.aacSeqHeader = tag
}
if c.gopNum == 0 {
return
}
if len(c.gops) == 0 {
if tag.isAvcKeyNalu() {
gop := &Gop{}
gop.firstTimestamp = tag.Header.Timestamp
gop.raw = append(gop.raw, tag.Raw...)
c.gops = append(c.gops, gop)
c.syncOldestKeyNaluTimestampToSeqHeader()
}
} else {
if tag.isAvcKeyNalu() {
gop := &Gop{}
gop.firstTimestamp = tag.Header.Timestamp
gop.raw = append(gop.raw, tag.Raw...)
c.gops = append(c.gops, gop)
if len(c.gops) > c.gopNum+1 {
c.gops = c.gops[1:]
c.syncOldestKeyNaluTimestampToSeqHeader()
}
} else {
c.gops[len(c.gops)-1].raw = append(c.gops[len(c.gops)-1].raw, tag.Raw...)
}
}
}
func (c *GopCache) GetWholeThings() (hasKeyFrame bool, res []byte) {
if tag := c.getMetadata(); tag != nil {
res = append(res, tag.Raw...)
}
avc := c.getAvcSeqHeader()
aac := c.getAacSeqHeader()
// TODO chef: if nessary to sort them by timestamp
if avc != nil && aac != nil {
if avc.Header.Timestamp <= aac.Header.Timestamp {
res = append(res, avc.Raw...)
res = append(res, aac.Raw...)
} else {
res = append(res, aac.Raw...)
res = append(res, avc.Raw...)
}
} else if avc != nil && aac == nil {
res = append(res, avc.Raw...)
} else if avc == nil && aac != nil {
res = append(res, aac.Raw...)
}
if gops := c.getGops(false); gops != nil {
res = append(res, gops...)
log.Debug("cache match.")
hasKeyFrame = true
}
return
}
func (c *GopCache) ClearAll() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.metadata = nil
c.avcSeqHeader = nil
c.aacSeqHeader = nil
c.gops = nil
}
func (c *GopCache) getGops(mustCompleted bool) []byte {
c.mutex.Lock()
defer c.mutex.Unlock()
neededLen := len(c.gops)
if mustCompleted {
neededLen--
}
if neededLen <= 0 {
return nil
}
var res []byte
for i := 0; i != neededLen; i++ {
res = append(res, c.gops[i].raw...)
}
return res
}
func (c *GopCache) getMetadata() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.metadata != nil {
res = c.metadata.cloneTag()
}
return
}
func (c *GopCache) getAvcSeqHeader() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.avcSeqHeader != nil {
res = c.avcSeqHeader.cloneTag()
}
return
}
func (c *GopCache) getAacSeqHeader() (res *Tag) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.aacSeqHeader != nil {
res = c.aacSeqHeader.cloneTag()
}
return
}
func (c *GopCache) clearGop() {
log.Debug("clearGop")
c.gops = nil
}
// TODO chef: if nessary
func (c *GopCache) syncOldestKeyNaluTimestampToSeqHeader() {
ts := c.gops[0].firstTimestamp
if c.avcSeqHeader != nil {
c.avcSeqHeader.Header.Timestamp = ts
}
if c.aacSeqHeader != nil {
c.aacSeqHeader.Header.Timestamp = ts
}
}

@ -2,7 +2,8 @@ package httpflv
import (
"fmt"
"log"
"github.com/q191201771/lal/log"
"github.com/q191201771/lal/util"
"sync"
"time"
)
@ -12,21 +13,28 @@ type Group struct {
appName string
streamName string
exitChan chan bool
exitChan chan bool
pullSession *PullSession
subSessionList map[*SubSession]bool
turnToEmptyTick int64 // trace while sub session list turn to empty
gopCache *GopCache
mutex sync.Mutex
UniqueKey string
}
func NewGroup(appName string, streamName string, config Config) *Group {
uk := util.GenUniqueKey("FLVGROUP")
log.Infof("lifecycle new Group. [%s] appName=%s streamName=%s", uk, appName, streamName)
return &Group{
Config: config,
appName: appName,
streamName: streamName,
exitChan: make(chan bool),
subSessionList: make(map[*SubSession]bool),
gopCache: NewGopCache(config.GopCacheNum),
UniqueKey:uk,
}
}
@ -50,9 +58,9 @@ func (group *Group) RunLoop() {
continue
}
if _, diff := sub.GetStat(); diff.writeByte == 0 {
log.Println("sub idle timeout.", sub.StreamName)
sub.ForceClose()
log.Warnf("SubSession idle timeout. session:%s", sub.UniqueKey)
delete(group.subSessionList, sub)
sub.Dispose(fxxkErr)
}
}
group.mutex.Unlock()
@ -63,9 +71,8 @@ func (group *Group) RunLoop() {
if group.pullSession != nil {
if now-group.pullSession.StartTick > group.PullReadTimeout {
if _, diff := group.pullSession.GetStat(); diff.readByte == 0 {
log.Println("pull read timeout.")
group.pullSession.ForceClose()
group.pullSession = nil
log.Warnf("read timeout. [%s]", group.pullSession.UniqueKey)
group.disposePullSession(fxxkErr)
}
}
}
@ -77,9 +84,8 @@ func (group *Group) RunLoop() {
if group.pullSession != nil && group.turnToEmptyTick != 0 && len(group.subSessionList) == 0 &&
now-group.turnToEmptyTick > group.StopPullWhileNoSubTimeout {
log.Println("stop pull while no sub.")
group.pullSession.ForceClose()
group.pullSession = nil
log.Debugf("stop pull while no SubSession. [%s]", group.pullSession.UniqueKey)
group.disposePullSession(fxxkErr)
}
group.mutex.Unlock()
}
@ -87,27 +93,33 @@ func (group *Group) RunLoop() {
}
}
func (group *Group) Dispose() {
func (group *Group) Dispose(err error) {
log.Infof("lifecycle dispose Group. [%s] reason=%v", group.UniqueKey, err)
group.exitChan <- true
}
func (group *Group) AddSubSession(session *SubSession) {
group.mutex.Lock()
log.Println("add sub session in group.")
log.Debugf("add SubSession into group. [%s]", session.UniqueKey)
group.subSessionList[session] = true
group.turnToEmptyTick = 0
session.WriteHttpResponseHeader()
session.WriteFlvHeader()
if hasKeyFrame, cache := group.gopCache.GetWholeThings(); cache != nil {
session.HasKeyFrame = hasKeyFrame
session.WritePacket(cache)
}
group.mutex.Unlock()
session.writeHttpResponseHeader()
session.writeFlvHeader()
go func() {
if err := session.RunLoop(); err != nil {
log.Println(err)
log.Debugf("SubSession loop done. [%s] err=%v", session.UniqueKey, err)
}
group.mutex.Lock()
defer group.mutex.Unlock()
log.Println("erase sub session in group.")
log.Infof("del SubSession out of group. [%s]", session.UniqueKey)
delete(group.subSessionList, session)
if len(group.subSessionList) == 0 {
group.turnToEmptyTick = time.Now().Unix()
@ -121,27 +133,29 @@ func (group *Group) PullIfNeeded(httpFlvPullAddr string) {
if group.pullSession != nil {
return
}
log.Println("set pull session in group.")
pullSession := NewPullSession(group)
group.pullSession = pullSession
go func() {
defer func() {
group.mutex.Lock()
defer group.mutex.Unlock()
group.pullSession = nil
log.Println("erase pull session in group.")
log.Infof("del PullSession out of group. [%s]", pullSession.UniqueKey)
}()
log.Infof("<----- connect. [%s]", pullSession.UniqueKey)
url := fmt.Sprintf("http://%s/%s/%s.flv", httpFlvPullAddr, group.appName, group.streamName)
err := pullSession.Connect(url, time.Duration(group.PullConnectTimeout)*time.Second)
log.Println("pull session connected. ", url)
if err != nil {
log.Println("pull session connect failed.", err)
log.Errorf("-----> connect error. [%s] err=%v", pullSession.UniqueKey, err)
return
}
log.Infof("-----> connect succ. [%s]", pullSession.UniqueKey)
err = pullSession.RunLoop()
if err != nil {
log.Debugf("PullSession loop done. [%s] err=%v", pullSession.UniqueKey, err)
return
}
}()
@ -154,20 +168,36 @@ func (group *Group) IsTotalEmpty() bool {
return group.pullSession == nil && len(group.subSessionList) == 0
}
func (group *Group) disposePullSession(err error) {
group.pullSession.Dispose(err)
group.pullSession = nil
group.gopCache.ClearAll()
}
func (group *Group) ReadHttpRespHeaderCb() {
log.Println("ReadHttpRespHeaderCb")
//log.Debugf("ReadHttpRespHeaderCb. [%s]", group.UniqueKey)
}
func (group *Group) ReadFlvHeaderCb(flvHeader []byte) {
log.Println("ReadFlvHeaderCb")
//log.Debugf("ReadFlvHeaderCb. [%s]", group.UniqueKey)
}
func (group *Group) ReadTagCb(header *TagHeader, tag []byte) {
//log.Println(header.t, header.timestamp)
func (group *Group) ReadTagCb(tag *Tag) {
//log.Debug(header.t, header.timestamp)
group.mutex.Lock()
defer group.mutex.Unlock()
// TODO chef: assume that write fast and would not block
for session := range group.subSessionList {
session.WritePacket(tag)
if session.HasKeyFrame {
session.WritePacket(tag.Raw)
} else {
if tag.isMetaData() || tag.isAvcKeySeqHeader() || tag.isAacSeqHeader() || tag.isAvcKeyNalu() {
if tag.isAvcKeyNalu() {
session.HasKeyFrame = true
}
session.WritePacket(tag.Raw)
}
}
}
group.gopCache.Push(tag)
}

@ -18,7 +18,10 @@ func parseHttpHeader(r *bufio.Reader) (firstLine string, headers map[string]stri
var line []byte
var isPrefix bool
line, isPrefix, err = r.ReadLine()
if len(line) == 0 || isPrefix || err != nil {
if err != nil {
return
}
if len(line) == 0 || isPrefix {
err = fxxkErr
return
}
@ -29,10 +32,13 @@ func parseHttpHeader(r *bufio.Reader) (firstLine string, headers map[string]stri
if len(line) == 0 {
break
}
if isPrefix || err != nil {
if isPrefix {
err = fxxkErr
return
}
if err != nil {
return
}
l := string(line)
pos := strings.Index(l, ":")
if pos == -1 {

@ -1,7 +1,7 @@
package httpflv
import (
"log"
"github.com/q191201771/lal/log"
"sync"
"time"
)
@ -24,7 +24,7 @@ func NewManager(config Config) *Manager {
}
}
func (manager *Manager) SubSessionHandler(session *SubSession) {
func (manager *Manager) subSessionHandler(session *SubSession) {
group := manager.getOrCreateGroup(session.AppName, session.StreamName)
group.AddSubSession(session)
if manager.PullAddr != "" {
@ -36,9 +36,9 @@ func (manager *Manager) RunLoop() {
go func() {
for {
if subSession, ok := manager.server.Accept(); ok {
manager.SubSessionHandler(subSession)
manager.subSessionHandler(subSession)
} else {
log.Println("accept sub session from httpflv server failed.")
log.Error("accept sub session failed.")
return
}
}
@ -46,30 +46,35 @@ func (manager *Manager) RunLoop() {
go func() {
if err := manager.server.RunLoop(); err != nil {
log.Println(err)
log.Error(err)
}
}()
t := time.NewTicker(1 * time.Second)
defer t.Stop()
// TODO chef: erase me, just for debug
tmpT := time.NewTicker(30 * time.Second)
defer tmpT.Stop()
for {
select {
case <-manager.exitChan:
return
case <-t.C:
manager.check()
case <-tmpT.C:
log.Debugf("group size:%d", len(manager.groups))
}
}
}
func (manager *Manager) Dispose() {
log.Println("Dispose manager.")
log.Debug("Dispose manager.")
manager.server.Dispose()
manager.exitChan <- true
manager.mutex.Lock()
defer manager.mutex.Unlock()
for _, group := range manager.groups {
group.Dispose()
group.Dispose(fxxkErr)
}
manager.groups = nil
}
@ -79,8 +84,8 @@ func (manager *Manager) check() {
defer manager.mutex.Unlock()
for k, group := range manager.groups {
if group.IsTotalEmpty() {
log.Println("erase empty group.", k)
group.Dispose()
log.Infof("erase empty group. [%s]", group.UniqueKey)
group.Dispose(fxxkErr)
delete(manager.groups, k)
}
}
@ -91,7 +96,6 @@ func (manager *Manager) getOrCreateGroup(appName string, streamName string) *Gro
defer manager.mutex.Unlock()
group, exist := manager.groups[streamName]
if !exist {
log.Println("create group. ", streamName)
group = NewGroup(appName, streamName, manager.Config)
manager.groups[streamName] = group
}

@ -3,12 +3,12 @@ package httpflv
import (
"bufio"
"fmt"
"github.com/q191201771/lal/log"
"github.com/q191201771/lal/util"
"io"
"log"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -24,24 +24,32 @@ type PullSessionStat struct {
type PullSession struct {
StartTick int64
obs PullSessionObserver
*net.TCPConn // after Connect success, can direct visit net.TCPConn, useful for set socket options.
rb *bufio.Reader
closed uint32
obs PullSessionObserver
Conn *net.TCPConn // after Connect success, can direct visit net.TCPConn, useful for set socket options.
rb *bufio.Reader
stat PullSessionStat
prevStat PullSessionStat
statMutex sync.Mutex
closeOnce sync.Once
UniqueKey string
}
type PullSessionObserver interface {
ReadHttpRespHeaderCb()
ReadFlvHeaderCb(flvHeader []byte)
ReadTagCb(header *TagHeader, tag []byte)
ReadTagCb(tag *Tag)
}
func NewPullSession(obs PullSessionObserver) *PullSession {
return &PullSession{obs: obs}
uk := util.GenUniqueKey("FLVPULL")
log.Infof("lifecycle new PullSession. [%s]", uk)
return &PullSession{
obs: obs,
UniqueKey:uk,
}
}
// @param timeout: timeout for connect operate. if 0, then no timeout
@ -78,30 +86,32 @@ func (session *PullSession) Connect(url string, timeout time.Duration) error {
if err != nil {
return err
}
session.TCPConn = conn.(*net.TCPConn)
session.rb = bufio.NewReaderSize(session.TCPConn, readBufSize)
session.Conn = conn.(*net.TCPConn)
session.rb = bufio.NewReaderSize(session.Conn, readBufSize)
// TODO chef: write succ len
_, err = fmt.Fprintf(session.TCPConn,
_, err = fmt.Fprintf(session.Conn,
"GET %s HTTP/1.0\r\nAccept: */*\r\nRange: byte=0-\r\nConnection: close\r\nHost: %s\r\nIcy-MetaData: 1\r\n\r\n",
uri, host)
if err != nil {
return nil
return err
}
return nil
}
// if close by peer, return EOF
func (session *PullSession) RunLoop() error {
err := session.runReadLoop()
session.close()
session.Dispose(err)
return err
}
func (session *PullSession) ForceClose() {
log.Println("force close pull session.")
session.close()
func (session *PullSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose PullSession. [%s] reason=%v", session.UniqueKey, err)
if err := session.Conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}
func (session *PullSession) GetStat() (now PullSessionStat, diff PullSessionStat) {
@ -114,15 +124,8 @@ func (session *PullSession) GetStat() (now PullSessionStat, diff PullSessionStat
return
}
func (session *PullSession) close() {
if atomic.CompareAndSwapUint32(&session.closed, 0, 1) {
session.Close()
}
}
func (session *PullSession) runReadLoop() error {
err := session.readHttpRespHeader()
if err != nil {
if err := session.readHttpRespHeader(); err != nil {
return err
}
session.obs.ReadHttpRespHeaderCb()
@ -134,11 +137,11 @@ func (session *PullSession) runReadLoop() error {
session.obs.ReadFlvHeaderCb(flvHeader)
for {
h, tag, err := session.readTag()
tag, err := session.readTag()
if err != nil {
return err
}
session.obs.ReadTagCb(h, tag)
session.obs.ReadTagCb(tag)
}
}
@ -151,7 +154,7 @@ func (session *PullSession) readHttpRespHeader() error {
if !strings.Contains(firstLine, "200") || len(headers) == 0 {
return fxxkErr
}
log.Println("readHttpRespHeader")
log.Infof("-----> http response header. [%s]", session.UniqueKey)
return nil
}
@ -162,32 +165,33 @@ func (session *PullSession) readFlvHeader() ([]byte, error) {
if err != nil {
return flvHeader, err
}
log.Println("readFlvHeader")
log.Infof("-----> http flv header. [%s]", session.UniqueKey)
// TODO chef: check flv header's value
return flvHeader, nil
}
func (session *PullSession) readTag() (*TagHeader, []byte, error) {
h, rawHeader, err := readTagHeader(session.rb)
func (session *PullSession) readTag() (*Tag, error) {
header, rawHeader, err := readTagHeader(session.rb)
if err != nil {
return nil, nil, err
return nil, err
}
session.addStat(tagHeaderSize)
needed := int(h.dataSize) + flvPrevTagFieldSize
needed := int(header.DataSize) + flvPrevTagFieldSize
rawBody := make([]byte, needed)
if _, err := io.ReadAtLeast(session.rb, rawBody, needed); err != nil {
log.Println(err)
return nil, nil, err
return nil, err
}
session.addStat(needed)
var tag []byte
tag = append(tag, rawHeader...)
tag = append(tag, rawBody...)
tag := &Tag{}
tag.Header = header
tag.Raw = append(tag.Raw, rawHeader...)
tag.Raw = append(tag.Raw, rawBody...)
//log.Println(h.t, h.timestamp, h.dataSize)
return h, tag, nil
return tag, nil
}
func (session *PullSession) addStat(readByte int) {

@ -1,7 +1,7 @@
package httpflv
import (
"log"
"github.com/q191201771/lal/log"
"net"
)
@ -25,10 +25,10 @@ func (server *Server) RunLoop() error {
if err != nil {
return err
}
log.Infof("listen. addr=%s", server.addr)
for {
conn, err := server.ln.Accept()
if err != nil {
log.Println(err)
return err
}
go server.handleSubSessionConnect(conn)
@ -42,19 +42,19 @@ func (server *Server) Accept() (session *SubSession, ok bool) {
func (server *Server) Dispose() {
if err := server.ln.Close(); err != nil {
log.Println(err)
log.Error(err)
}
close(server.sessionChan)
}
func (server *Server) handleSubSessionConnect(conn net.Conn) {
log.Println("accept a http flv sub connection. ", conn.RemoteAddr())
log.Infof("accept a http flv connection. remoteAddr=%v", conn.RemoteAddr())
subSession := NewSubSession(conn)
err := subSession.ReadRequest()
if err != nil {
if err := subSession.ReadRequest(); err != nil {
log.Errorf("read SubSession request error. [%s]", subSession.UniqueKey)
return
}
log.Println("read sub session request. ", subSession.StreamName)
log.Infof("-----> http request. [%s] uri=%s", subSession.UniqueKey, subSession.Uri)
server.sessionChan <- subSession
}

@ -2,15 +2,16 @@ package httpflv
import (
"bufio"
"io"
"log"
"github.com/q191201771/lal/log"
"github.com/q191201771/lal/util"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
var flvHttpResponseHeader = "HTTP/1.1 200 OK\r\n" +
var flvHttpResponseHeaderStr = "HTTP/1.1 200 OK\r\n" +
"Cache-Control: no-cache\r\n" +
"Content-Type: video/x-flv\r\n" +
"Connection: close\r\n" +
@ -18,15 +19,11 @@ var flvHttpResponseHeader = "HTTP/1.1 200 OK\r\n" +
"Pragma: no-cache\r\n" +
"\r\n"
var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0x0, 0x0, 0x0, 0x0}
var flvHttpResponseHeader = []byte(flvHttpResponseHeaderStr)
var wChanSize = 32
var flvHeaderBuf13 = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x0, 0x0, 0x0, 0x09, 0x0, 0x0, 0x0, 0x0}
// TODO chef: type use enum inside Go style.
type writeMsg struct {
t int
pkt []byte
}
var wChanSize = 1024 // TODO chef: 1024
type SubSessionStat struct {
wannaWriteCount int64
@ -42,30 +39,46 @@ type SubSession struct {
Uri string
Headers map[string]string
conn net.Conn
rb *bufio.Reader
closeChan chan int
wChan chan writeMsg
HasKeyFrame bool
conn net.Conn
rb *bufio.Reader
wChan chan []byte
stat SubSessionStat
prevStat SubSessionStat
statMutex sync.Mutex
closeOnce sync.Once
exitChan chan struct{}
hasClosedFlag uint32
UniqueKey string
}
func NewSubSession(conn net.Conn) *SubSession {
uk := util.GenUniqueKey("FLVSUB")
log.Infof("lifecycle new SubSession. [%s] remoteAddr=%s", uk, conn.RemoteAddr().String())
return &SubSession{
conn: conn,
rb: bufio.NewReaderSize(conn, readBufSize),
wChan: make(chan writeMsg, wChanSize),
closeChan: make(chan int, 1),
conn: conn,
rb: bufio.NewReaderSize(conn, readBufSize),
wChan: make(chan []byte, wChanSize),
exitChan: make(chan struct{}),
UniqueKey: uk,
}
}
func (session *SubSession) ReadRequest() error {
func (session *SubSession) ReadRequest() (err error) {
session.StartTick = time.Now().Unix()
var err error
var firstLine string
defer func() {
if err != nil {
session.Dispose(err)
}
}()
firstLine, session.Headers, err = parseHttpHeader(session.rb)
if err != nil {
return err
@ -73,21 +86,24 @@ func (session *SubSession) ReadRequest() error {
items := strings.Split(string(firstLine), " ")
if len(items) != 3 || items[0] != "GET" {
return fxxkErr
err = fxxkErr
return
}
session.Uri = items[1]
if !strings.HasSuffix(session.Uri, ".flv") {
return fxxkErr
err = fxxkErr
return
}
//log.Println("uri:", session.uri)
items = strings.Split(session.Uri, "/")
if len(items) != 3 {
return fxxkErr
err = fxxkErr
return
}
session.AppName = items[1]
items = strings.Split(items[2], ".")
if len(items) < 2 {
return fxxkErr
err = fxxkErr
return
}
session.StreamName = items[0]
@ -96,22 +112,41 @@ func (session *SubSession) ReadRequest() error {
func (session *SubSession) RunLoop() error {
go func() {
// TODO chef: close by self.
buf := make([]byte, 128)
_, err := session.conn.Read(buf)
if err != nil {
session.closeChan <- 1
if _, err := session.conn.Read(buf); err != nil {
log.Errorf("read failed. [%s] err=%v", session.UniqueKey, err)
session.Dispose(err)
}
}()
err := session.runWriteLoop()
session.conn.Close()
return err
return session.runWriteLoop()
}
func (session *SubSession) WriteHttpResponseHeader() {
log.Infof("<----- http response header. [%s]", session.UniqueKey)
session.WritePacket(flvHttpResponseHeader)
}
func (session *SubSession) WriteFlvHeader() {
log.Infof("<----- http flv header. [%s]", session.UniqueKey)
session.WritePacket(flvHeaderBuf13)
}
func (session *SubSession) WritePacket(pkt []byte) {
if session.hasClosed() {
return
}
session.addWannaWriteStat(len(pkt))
session.wChan <- writeMsg{t: 2, pkt: pkt}
for {
select {
case session.wChan <- pkt:
return
default:
if session.hasClosed() {
return
}
}
}
}
func (session *SubSession) GetStat() (now SubSessionStat, diff SubSessionStat) {
@ -126,45 +161,40 @@ func (session *SubSession) GetStat() (now SubSessionStat, diff SubSessionStat) {
return
}
func (session *SubSession) ForceClose() {
session.closeChan <- 2
func (session *SubSession) Dispose(err error) {
session.closeOnce.Do(func() {
log.Infof("lifecycle dispose SubSession. [%s] reason=%v", session.UniqueKey, err)
atomic.StoreUint32(&session.hasClosedFlag, 1)
close(session.exitChan)
if err := session.conn.Close(); err != nil {
log.Error("conn close error. [%s] err=%v", session.UniqueKey, err)
}
})
}
func (session *SubSession) runWriteLoop() error {
for {
select {
case msg := <-session.wChan:
// TODO chef: fix me write less than pkt
n, err := session.conn.Write(msg.pkt)
case <-session.exitChan:
return fxxkErr
case pkt := <-session.wChan:
if session.hasClosed() {
return fxxkErr
}
n, err := session.conn.Write(pkt)
if err != nil {
log.Println("sub session write failed. ", err)
session.Dispose(err)
return err
} else {
session.addWriteStat(n)
}
case closeFlag := <-session.closeChan:
// TODO chef: hardcode number
switch closeFlag {
case 1:
log.Println("sub session close by peer.")
return io.EOF
case 2:
log.Println("sub session close by self.")
return nil
}
}
}
}
func (session *SubSession) writeHttpResponseHeader() {
session.addWannaWriteStat(len(flvHttpResponseHeader))
session.wChan <- writeMsg{t: 0, pkt: []byte(flvHttpResponseHeader)}
}
func (session *SubSession) writeFlvHeader() {
session.addWannaWriteStat(len(flvHeaderBuf13))
session.wChan <- writeMsg{t: 1, pkt: flvHeaderBuf13}
func (session *SubSession) hasClosed() bool {
return atomic.LoadUint32(&session.hasClosedFlag) == 1
}
func (session *SubSession) addWannaWriteStat(wannaWriteByte int) {

@ -3,28 +3,88 @@ package httpflv
import (
"github.com/q191201771/lal/bele"
"io"
"log"
)
var tagHeaderSize = 11
// TODO chef: make these const
const tagHeaderSize int = 11
var (
tagTypeMetadata uint8 = 18
tagTypeVideo uint8 = 9
tagTypeAudio uint8 = 8
)
var (
frameTypeKey uint8 = 1
frameTypeInter uint8 = 2
)
var (
codecIdAvc uint8 = 7
)
var (
AvcKey = frameTypeKey<<4 | codecIdAvc
AvcInter = frameTypeInter<<4 | codecIdAvc
)
var (
AvcPacketTypeSeqHeader uint8 = 0
AvcPacketTypeNalu uint8 = 1
)
var (
SoundFormatAac uint8 = 10
)
var (
AacPacketTypeSeqHeader uint8 = 0
AacPacketTypeRaw uint8 = 1
)
type TagHeader struct {
t uint8 // type
dataSize uint32
timestamp uint32
streamId uint32 // always 0
T uint8 // type
DataSize uint32
Timestamp uint32
StreamId uint32 // always 0
}
type Tag struct {
Header TagHeader
Raw []byte
}
func readTagHeader(rd io.Reader) (h *TagHeader, raw []byte, err error) {
raw = make([]byte, tagHeaderSize)
if _, err = io.ReadAtLeast(rd, raw, tagHeaderSize); err != nil {
log.Println(err)
func readTagHeader(rd io.Reader) (h TagHeader, rawHeader []byte, err error) {
rawHeader = make([]byte, tagHeaderSize)
if _, err = io.ReadAtLeast(rd, rawHeader, tagHeaderSize); err != nil {
return
}
h = &TagHeader{}
h.t = raw[0]
h.dataSize = bele.BeUInt24(raw[1:])
h.timestamp = (uint32(raw[7]) << 24) + bele.BeUInt24(raw[4:])
h.T = rawHeader[0]
h.DataSize = bele.BeUInt24(rawHeader[1:])
h.Timestamp = (uint32(rawHeader[7]) << 24) + bele.BeUInt24(rawHeader[4:])
return
}
func (tag *Tag) isMetaData() bool {
return tag.Header.T == tagTypeMetadata
}
func (tag *Tag) isAvcKeySeqHeader() bool {
return tag.Header.T == tagTypeVideo && tag.Raw[tagHeaderSize] == AvcKey && tag.Raw[tagHeaderSize+1] == AvcPacketTypeSeqHeader
}
func (tag *Tag) isAvcKeyNalu() bool {
return tag.Header.T == tagTypeVideo && tag.Raw[tagHeaderSize] == AvcKey && tag.Raw[tagHeaderSize+1] == AvcPacketTypeNalu
}
func (tag *Tag) isAacSeqHeader() bool {
return tag.Header.T == tagTypeAudio && tag.Raw[tagHeaderSize]>>4 == SoundFormatAac && tag.Raw[tagHeaderSize+1] == AacPacketTypeSeqHeader
}
func (tag *Tag) cloneTag() *Tag {
res := &Tag{}
res.Header = tag.Header
res.Raw = append(res.Raw, tag.Raw...)
return res
}

@ -0,0 +1,47 @@
package log
import "github.com/cihub/seelog"
var log seelog.LoggerInterface
func Initial(configFileName string) error {
var err error
log, err = seelog.LoggerFromConfigAsFile(configFileName)
if err != nil {
return err
}
err = log.SetAdditionalStackDepth(1)
return err
}
func Debugf(format string, params ...interface{}) {
log.Debugf(format, params...)
}
func Infof(format string, params ...interface{}) {
log.Infof(format, params...)
}
func Warnf(format string, params ...interface{}) {
log.Warnf(format, params...)
}
func Errorf(format string, params ...interface{}) {
log.Errorf(format, params...)
}
func Debug(v ...interface{}) {
log.Debug(v...)
}
func Info(v ...interface{}) {
log.Info(v...)
}
func Warn(v ...interface{}) {
log.Warn(v...)
}
func Error(v ...interface{}) {
log.Error(v...)
}

@ -2,33 +2,42 @@ package main
import (
"flag"
"fmt"
"github.com/q191201771/lal/httpflv"
"log"
"github.com/q191201771/lal/log"
"net/http"
_ "net/http/pprof"
"os"
)
var config *Config
func main() {
go func() {
log.Println(http.ListenAndServe("0.0.0.0:10000", nil))
}()
if err := http.ListenAndServe("0.0.0.0:10000", nil); err != nil {
log.SetFlags(log.Lshortfile | log.Lmicroseconds)
}
}()
confFile := flag.String("c", "", "specify conf file")
logConfFile := flag.String("l", "", "specify log conf file")
flag.Parse()
if *confFile == "" {
if *confFile == "" || *logConfFile == "" {
flag.Usage()
return
}
if err := log.Initial(*logConfFile); err != nil {
fmt.Fprintf(os.Stderr, "initial log failed. err=%v", err)
return
}
log.Info("initial log succ.")
config, err := LoadConf(*confFile)
if err != nil {
log.Println("Load Conf failed.", confFile, err)
log.Errorf("load Conf failed. file=%s err=%v", *confFile, err)
}
log.Println("load conf file.", *confFile, *config)
log.Infof("load conf file succ. file=%s content=%v", *confFile, config)
manager := httpflv.NewManager(config.HttpFlv)

@ -0,0 +1,16 @@
package util
import (
"fmt"
"sync/atomic"
)
var globalId uint64
func GenUniqueId() uint64 {
return atomic.AddUint64(&globalId, 1)
}
func GenUniqueKey(prefix string) string {
return fmt.Sprintf("%s%d", prefix, GenUniqueId())
}
Loading…
Cancel
Save