Merge branch '3.0release' of https://github.com/ossrs/srs into 3.0release

# Conflicts:
#	trunk/src/kernel/srs_kernel_utility.hpp
pull/1109/head
Harlan 7 years ago
commit 22c5af62cf

@ -184,6 +184,17 @@ Please select according to languages:
### V3 changes ### V3 changes
* v3.0, 2018-08-12, For [#1202][bug #1202], Support edge/forward to Aliyun CDN. 3.0.40
* v3.0, 2018-08-11, For [#910][bug #910], Support HTTP FLV with HTTP callback. 3.0.39
* v3.0, 2018-08-05, Refine HTTP-FLV latency, support realtime mode.3.0.38
* v3.0, 2018-08-05, Fix [#1087][bug #1087], Ignore iface without address. 3.0.37
* v3.0, 2018-08-04, For [#1110][bug #1110], Support params in http callback. 3.0.36
* v3.0, 2018-08-02, Always use vhost in stream query, the unify uri. 3.0.35
* v3.0, 2018-08-02, For [#1031][bug #1031], SRS edge support douyu.com. 3.0.34
* v3.0, 2018-07-22, Replace hex to string to match MIT license. 3.0.33
* v3.0, 2018-07-22, Replace base64 to match MIT license. 3.0.32
* v3.0, 2018-07-22, Replace crc32 IEEE and MPEG by pycrc to match MIT license. 3.0.31
* v3.0, 2018-07-21, Replace crc32 IEEE by golang to match MIT license. 3.0.30
* v3.0, 2018-02-16, Fix [#464][bug #464], support RTMP origin cluster. 3.0.29 * v3.0, 2018-02-16, Fix [#464][bug #464], support RTMP origin cluster. 3.0.29
* v3.0, 2018-02-13, Fix [#1057][bug #1057], switch to simple handshake. 3.0.28 * v3.0, 2018-02-13, Fix [#1057][bug #1057], switch to simple handshake. 3.0.28
* v3.0, 2018-02-13, Fix [#1059][bug #1059], merge from 2.0, supports url with vhost in stream. 3.0.27 * v3.0, 2018-02-13, Fix [#1059][bug #1059], merge from 2.0, supports url with vhost in stream. 3.0.27
@ -220,6 +231,16 @@ Please select according to languages:
### V2 changes ### V2 changes
* <strong>v2.0, 2018-08-12, [2.0 release4(2.0.255)][r2.0r4] released. 86915 lines.</strong>
* v2.0, 2018-08-12, For [#1202][bug #1202], Support edge/forward to Aliyun CDN. 2.0.255
* v2.0, 2018-08-11, For [#910][bug #910], Support HTTP FLV with HTTP callback. 2.0.254
* v2.0, 2018-08-11, For [#1110][bug #1110], Refine params in http callback. 2.0.253
* v2.0, 2018-08-05, Refine HTTP-FLV latency, support realtime mode. 2.0.252
* v2.0, 2018-08-04, For [#1110][bug #1110], Support params in http callback. 2.0.251
* v2.0, 2018-08-02, For [#1031][bug #1031], SRS edge support douyu.com. 2.0.250
* v2.0, 2018-07-21, Merge [#1119][bug #1119], fix memory leak. 2.0.249
* <strong>v2.0, 2018-07-18, [2.0 release3(2.0.248)][r2.0r3] released. 86775 lines.</strong>
* v2.0, 2018-07-17, Merge [#1176][bug #1176], fix scaned issues. 2.0.248
* v2.0, 2018-02-28, Merge [#1077][bug #1077], fix crash for edge HLS. 2.0.247 * v2.0, 2018-02-28, Merge [#1077][bug #1077], fix crash for edge HLS. 2.0.247
* v2.0, 2018-02-13, Fix [#1059][bug #1059], support vhost in stream parameters. 2.0.246 * v2.0, 2018-02-13, Fix [#1059][bug #1059], support vhost in stream parameters. 2.0.246
* v2.0, 2018-01-07, Merge [#1045][bug #1045], fix [#1044][bug #1044], TCP connection alive detection. 2.0.245 * v2.0, 2018-01-07, Merge [#1045][bug #1045], fix [#1044][bug #1044], TCP connection alive detection. 2.0.245
@ -821,6 +842,8 @@ The latency between encoder and player with realtime config([CN][v3_CN_LowLatenc
| 2014-12-12 | 2.0.70 |[0.1s][p13]|[0.4s][p14]| 1.0s | 0.9s | | 2014-12-12 | 2.0.70 |[0.1s][p13]|[0.4s][p14]| 1.0s | 0.9s |
| 2014-12-16 | 2.0.72 | 0.1s | 0.4s |[0.8s][p15]|[0.6s][p16]| | 2014-12-16 | 2.0.72 | 0.1s | 0.4s |[0.8s][p15]|[0.6s][p16]|
> 2018-08-05, [c45f72e](https://github.com/ossrs/srs/commit/c45f72ef7bac9c7cf85b9125fc9e3aafd53f396f), Refine HTTP-FLV latency, support realtime mode. 2.0.252
We used FMLE as encoder for benchmark. The latency of server was 0.1s+, We used FMLE as encoder for benchmark. The latency of server was 0.1s+,
and the bottleneck was the encoder. For more information, read and the bottleneck was the encoder. For more information, read
[bug #257][bug #257-c0]. [bug #257][bug #257-c0].
@ -1421,6 +1444,12 @@ Winlin
[bug #1045]: https://github.com/ossrs/srs/issues/1045 [bug #1045]: https://github.com/ossrs/srs/issues/1045
[bug #1059]: https://github.com/ossrs/srs/issues/1059 [bug #1059]: https://github.com/ossrs/srs/issues/1059
[bug #1077]: https://github.com/ossrs/srs/issues/1077 [bug #1077]: https://github.com/ossrs/srs/issues/1077
[bug #1176]: https://github.com/ossrs/srs/issues/1176
[bug #1119]: https://github.com/ossrs/srs/issues/1119
[bug #1031]: https://github.com/ossrs/srs/issues/1031
[bug #1110]: https://github.com/ossrs/srs/issues/1110
[bug #910]: https://github.com/ossrs/srs/issues/910
[bug #1202]: https://github.com/ossrs/srs/issues/1202
[bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx [bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx
[bug #735]: https://github.com/ossrs/srs/issues/735 [bug #735]: https://github.com/ossrs/srs/issues/735
@ -1436,10 +1465,13 @@ Winlin
[bug #1057]: https://github.com/ossrs/srs/issues/1057 [bug #1057]: https://github.com/ossrs/srs/issues/1057
[bug #105]: https://github.com/ossrs/srs/issues/105 [bug #105]: https://github.com/ossrs/srs/issues/105
[bug #727]: https://github.com/ossrs/srs/issues/727 [bug #727]: https://github.com/ossrs/srs/issues/727
[bug #1087]: https://github.com/ossrs/srs/issues/1087
[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx [bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx
[exo #828]: https://github.com/google/ExoPlayer/pull/828 [exo #828]: https://github.com/google/ExoPlayer/pull/828
[r2.0r4]: https://github.com/ossrs/srs/releases/tag/v2.0-r4
[r2.0r3]: https://github.com/ossrs/srs/releases/tag/v2.0-r3
[r2.0r2]: https://github.com/ossrs/srs/releases/tag/v2.0-r2 [r2.0r2]: https://github.com/ossrs/srs/releases/tag/v2.0-r2
[r2.0r1]: https://github.com/ossrs/srs/releases/tag/v2.0-r1 [r2.0r1]: https://github.com/ossrs/srs/releases/tag/v2.0-r1
[r2.0r0]: https://github.com/ossrs/srs/releases/tag/v2.0-r0 [r2.0r0]: https://github.com/ossrs/srs/releases/tag/v2.0-r0

Binary file not shown.

@ -837,7 +837,7 @@ vhost hooks.callback.srs.com {
# "action": "on_publish", # "action": "on_publish",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream" # "stream": "livestream", "param":"?token=xxx&salt=yyy"
# } # }
# if valid, the hook must return HTTP code 200(Status OK) and response # if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success): # an int value specifies the error code(0 corresponding to success):
@ -851,7 +851,7 @@ vhost hooks.callback.srs.com {
# "action": "on_unpublish", # "action": "on_unpublish",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream" # "stream": "livestream", "param":"?token=xxx&salt=yyy"
# } # }
# if valid, the hook must return HTTP code 200(Status OK) and response # if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success): # an int value specifies the error code(0 corresponding to success):
@ -865,7 +865,7 @@ vhost hooks.callback.srs.com {
# "action": "on_play", # "action": "on_play",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", # "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "pageUrl": "http://www.test.com/live.html" # "pageUrl": "http://www.test.com/live.html"
# } # }
# if valid, the hook must return HTTP code 200(Status OK) and response # if valid, the hook must return HTTP code 200(Status OK) and response
@ -880,7 +880,7 @@ vhost hooks.callback.srs.com {
# "action": "on_stop", # "action": "on_stop",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream" # "stream": "livestream", "param":"?token=xxx&salt=yyy"
# } # }
# if valid, the hook must return HTTP code 200(Status OK) and response # if valid, the hook must return HTTP code 200(Status OK) and response
# an int value specifies the error code(0 corresponding to success): # an int value specifies the error code(0 corresponding to success):
@ -894,7 +894,7 @@ vhost hooks.callback.srs.com {
# "action": "on_dvr", # "action": "on_dvr",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", # "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "cwd": "/usr/local/srs", # "cwd": "/usr/local/srs",
# "file": "./objs/nginx/html/live/livestream.1420254068776.flv" # "file": "./objs/nginx/html/live/livestream.1420254068776.flv"
# } # }
@ -908,7 +908,7 @@ vhost hooks.callback.srs.com {
# "action": "on_hls", # "action": "on_hls",
# "client_id": 1985, # "client_id": 1985,
# "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", # "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
# "stream": "livestream", # "stream": "livestream", "param":"?token=xxx&salt=yyy",
# "duration": 9.36, // in seconds # "duration": 9.36, // in seconds
# "cwd": "/usr/local/srs", # "cwd": "/usr/local/srs",
# "file": "./objs/nginx/html/live/livestream/2015-04-23/01/476584165.ts", # "file": "./objs/nginx/html/live/livestream/2015-04-23/01/476584165.ts",
@ -926,10 +926,11 @@ vhost hooks.callback.srs.com {
# so we use HTTP GET and use the variable following: # so we use HTTP GET and use the variable following:
# [app], replace with the app. # [app], replace with the app.
# [stream], replace with the stream. # [stream], replace with the stream.
# [param], replace with the param.
# [ts_url], replace with the ts url. # [ts_url], replace with the ts url.
# ignore any return data of server. # ignore any return data of server.
# @remark random select a url to report, not report all. # @remark random select a url to report, not report all.
on_hls_notify http://127.0.0.1:8085/api/v1/hls/[app]/[stream][ts_url]; on_hls_notify http://127.0.0.1:8085/api/v1/hls/[app]/[stream]/[ts_url][param];
} }
} }
@ -1046,6 +1047,7 @@ vhost hls.srs.com {
# [999], replace this const to current millisecond. # [999], replace this const to current millisecond.
# [timestamp],replace this const to current UNIX timestamp in ms. # [timestamp],replace this const to current UNIX timestamp in ms.
# [seq], the sequence number of ts. # [seq], the sequence number of ts.
# [duration], replace this const to current ts duration.
# @see https://github.com/ossrs/srs/wiki/v2_CN_DVR#custom-path # @see https://github.com/ossrs/srs/wiki/v2_CN_DVR#custom-path
# @see https://github.com/ossrs/srs/wiki/v2_CN_DeliveryHLS#hls-config # @see https://github.com/ossrs/srs/wiki/v2_CN_DeliveryHLS#hls-config
# default: [app]/[stream]-[seq].ts # default: [app]/[stream]-[seq].ts
@ -1129,6 +1131,7 @@ vhost hls.srs.com {
# we support the variables to generate the notify url: # we support the variables to generate the notify url:
# [app], replace with the app. # [app], replace with the app.
# [stream], replace with the stream. # [stream], replace with the stream.
# [param], replace with the param.
# [ts_url], replace with the ts url. # [ts_url], replace with the ts url.
# for the hls http callback, @see http_hooks.on_hls_notify of vhost hooks.callback.srs.com # for the hls http callback, @see http_hooks.on_hls_notify of vhost hooks.callback.srs.com
# @read https://github.com/ossrs/srs/wiki/v2_CN_DeliveryHLS#on-hls-notify # @read https://github.com/ossrs/srs/wiki/v2_CN_DeliveryHLS#on-hls-notify

@ -1,5 +1,5 @@
# The module to ingest hls to replace ffmpeg with better behavior. # The module to parse mp4 file.
SRS_MODULE_NAME=("srs_mp4_parser") SRS_MODULE_NAME=("srs_mp4_parser")
SRS_MODULE_MAIN=("srs_main_mp4_parser") SRS_MODULE_MAIN=("srs_main_mp4_parser")
SRS_MODULE_APP=() SRS_MODULE_APP=()

@ -176,7 +176,7 @@ class RESTStreams(object):
"action": "on_publish", "action": "on_publish",
"client_id": 1985, "client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream" "stream": "livestream", "param":"?token=xxx&salt=yyy"
} }
on_unpublish: on_unpublish:
when client(encoder) stop publish to vhost/app/stream, call the hook, when client(encoder) stop publish to vhost/app/stream, call the hook,
@ -185,7 +185,7 @@ class RESTStreams(object):
"action": "on_unpublish", "action": "on_unpublish",
"client_id": 1985, "client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream" "stream": "livestream", "param":"?token=xxx&salt=yyy"
} }
if valid, the hook must return HTTP code 200(Stauts OK) and response if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success): an int value specifies the error code(0 corresponding to success):
@ -223,8 +223,8 @@ class RESTStreams(object):
def __on_publish(self, req): def __on_publish(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
)) ))
# TODO: process the on_publish event # TODO: process the on_publish event
@ -234,8 +234,8 @@ class RESTStreams(object):
def __on_unpublish(self, req): def __on_unpublish(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
)) ))
# TODO: process the on_unpublish event # TODO: process the on_unpublish event
@ -263,7 +263,7 @@ class RESTDvrs(object):
"action": "on_dvr", "action": "on_dvr",
"client_id": 1985, "client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream", "stream": "livestream", "param":"?token=xxx&salt=yyy",
"cwd": "/usr/local/srs", "cwd": "/usr/local/srs",
"file": "./objs/nginx/html/live/livestream.1420254068776.flv" "file": "./objs/nginx/html/live/livestream.1420254068776.flv"
} }
@ -301,8 +301,8 @@ class RESTDvrs(object):
def __on_dvr(self, req): def __on_dvr(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, cwd=%s, file=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, cwd=%s, file=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"],
req["cwd"], req["file"] req["cwd"], req["file"]
)) ))
@ -325,6 +325,7 @@ class RESTProxy(object):
so we use HTTP GET and use the variable following: so we use HTTP GET and use the variable following:
[app], replace with the app. [app], replace with the app.
[stream], replace with the stream. [stream], replace with the stream.
[param], replace with the param.
[ts_url], replace with the ts url. [ts_url], replace with the ts url.
ignore any return data of server. ignore any return data of server.
''' '''
@ -359,6 +360,7 @@ class RESTHls(object):
so we use HTTP GET and use the variable following: so we use HTTP GET and use the variable following:
[app], replace with the app. [app], replace with the app.
[stream], replace with the stream. [stream], replace with the stream.
[param], replace with the param.
[ts_url], replace with the ts url. [ts_url], replace with the ts url.
ignore any return data of server. ignore any return data of server.
''' '''
@ -382,7 +384,7 @@ class RESTHls(object):
"ip": "192.168.1.10", "ip": "192.168.1.10",
"vhost": "video.test.com", "vhost": "video.test.com",
"app": "live", "app": "live",
"stream": "livestream", "stream": "livestream", "param":"?token=xxx&salt=yyy",
"duration": 9.68, // in seconds "duration": 9.68, // in seconds
"cwd": "/usr/local/srs", "cwd": "/usr/local/srs",
"file": "./objs/nginx/html/live/livestream.1420254068776-100.ts", "file": "./objs/nginx/html/live/livestream.1420254068776-100.ts",
@ -422,8 +424,8 @@ class RESTHls(object):
def __on_hls(self, req): def __on_hls(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, duration=%s, cwd=%s, file=%s, seq_no=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%s, cwd=%s, file=%s, seq_no=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["duration"], req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["duration"],
req["cwd"], req["file"], req["seq_no"] req["cwd"], req["file"], req["seq_no"]
)) ))
@ -452,7 +454,7 @@ class RESTSessions(object):
"action": "on_play", "action": "on_play",
"client_id": 1985, "client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream", "stream": "livestream", "param":"?token=xxx&salt=yyy",
"pageUrl": "http://www.test.com/live.html" "pageUrl": "http://www.test.com/live.html"
} }
on_stop: on_stop:
@ -462,7 +464,7 @@ class RESTSessions(object):
"action": "on_stop", "action": "on_stop",
"client_id": 1985, "client_id": 1985,
"ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live",
"stream": "livestream" "stream": "livestream", "param":"?token=xxx&salt=yyy"
} }
if valid, the hook must return HTTP code 200(Stauts OK) and response if valid, the hook must return HTTP code 200(Stauts OK) and response
an int value specifies the error code(0 corresponding to success): an int value specifies the error code(0 corresponding to success):
@ -500,8 +502,8 @@ class RESTSessions(object):
def __on_play(self, req): def __on_play(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, pageUrl=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s, pageUrl=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["pageUrl"] req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"], req["pageUrl"]
)) ))
# TODO: process the on_play event # TODO: process the on_play event
@ -511,8 +513,8 @@ class RESTSessions(object):
def __on_stop(self, req): def __on_stop(self, req):
code = Error.success code = Error.success
trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s"%( trace("srs %s: client id=%s, ip=%s, vhost=%s, app=%s, stream=%s, param=%s"%(
req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"] req["action"], req["client_id"], req["ip"], req["vhost"], req["app"], req["stream"], req["param"]
)) ))
# TODO: process the on_stop event # TODO: process the on_stop event

@ -44,6 +44,27 @@ function update_nav() {
$("#nav_vlc").attr("href", "vlc.html" + window.location.search); $("#nav_vlc").attr("href", "vlc.html" + window.location.search);
} }
// Special extra params, such as auth_key.
function user_extra_params(query, params) {
var queries = params || [];
var server = (query.server == undefined)? window.location.hostname:query.server;
var vhost = (query.vhost == undefined)? window.location.hostname:query.vhost;
for (var key in query.user_query) {
if (key == 'app' || key == 'autostart' || key == 'dir'
|| key == 'filename' || key == 'host' || key == 'hostname'
|| key == 'http_port' || key == 'pathname' || key == 'port'
|| key == 'server' || key == 'stream' || key == 'buffer'
|| key == 'schema' || key == 'vhost'
) {
continue;
}
queries.push(key + '=' + query[key]);
}
return queries;
}
/** /**
@param server the ip of server. default to window.location.hostname @param server the ip of server. default to window.location.hostname
@param vhost the vhost of rtmp. default to window.location.hostname @param vhost the vhost of rtmp. default to window.location.hostname
@ -65,9 +86,7 @@ function build_default_rtmp_url() {
if (server != vhost && vhost != "__defaultVhost__") { if (server != vhost && vhost != "__defaultVhost__") {
queries.push("vhost=" + vhost); queries.push("vhost=" + vhost);
} }
if (query.shp_identify) { queries = user_extra_params(query, queries);
queries.push("shp_identify=" + query.shp_identify);
}
var uri = schema + "://" + server + ":" + port + "/" + app + "/" + stream + "?" + queries.join('&'); var uri = schema + "://" + server + ":" + port + "/" + app + "/" + stream + "?" + queries.join('&');
while (uri.indexOf("?") == uri.length - 1) { while (uri.indexOf("?") == uri.length - 1) {

@ -643,6 +643,11 @@
if (query.buffer) { if (query.buffer) {
url += "&buffer=" + query.buffer; url += "&buffer=" + query.buffer;
} }
var queries = user_extra_params(query);
if (queries && queries.length) {
url += '&' + queries.join('&');
}
$("#player_url").text($("#txt_url").val()).attr("href", url); $("#player_url").text($("#txt_url").val()).attr("href", url);
$("#link_server").text(rtmp.server); $("#link_server").text(rtmp.server);

@ -202,7 +202,7 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto); return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
} }
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
return srs_error_wrap(err, "publish"); return srs_error_wrap(err, "publish");
} }

@ -23,6 +23,7 @@
#include <srs_app_conn.hpp> #include <srs_app_conn.hpp>
#include <netinet/tcp.h>
using namespace std; using namespace std;
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
@ -95,6 +96,89 @@ srs_error_t SrsConnection::start()
return err; return err;
} }
srs_error_t SrsConnection::set_tcp_nodelay(bool v)
{
srs_error_t err = srs_success;
int r0 = 0;
socklen_t nb_v = sizeof(int);
int fd = srs_netfd_fileno(stfd);
int ov = 0;
if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v)) != 0) {
return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0);
}
#ifndef SRS_PERF_TCP_NODELAY
srs_warn("ignore TCP_NODELAY, fd=%d, ov=%d", fd, ov);
return err;
#endif
int iv = (v? 1:0);
if ((r0 = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, nb_v)) != 0) {
return srs_error_new(ERROR_SOCKET_NO_NODELAY, "setsockopt fd=%d, r0=%v", fd, r0);
}
if ((r0 = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &iv, &nb_v)) != 0) {
return srs_error_new(ERROR_SOCKET_NO_NODELAY, "getsockopt fd=%d, r0=%d", fd, r0);
}
srs_trace("set fd=%d TCP_NODELAY %d=>%d", fd, ov, iv);
return err;
}
srs_error_t SrsConnection::set_socket_buffer(int buffer_ms)
{
srs_error_t err = srs_success;
int r0 = 0;
int fd = srs_netfd_fileno(stfd);
socklen_t nb_v = sizeof(int);
int ov = 0;
if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &ov, &nb_v)) != 0) {
return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0);
}
#ifndef SRS_PERF_MW_SO_SNDBUF
srs_warn("ignore SO_SNDBUF, fd=%d, ov=%d", fd, ov);
return err;
#endif
// the bytes:
// 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
// 128KB=131072, 256KB=262144, 512KB=524288
// the buffer should set to sleep*kbps/8,
// for example, your system delivery stream in 1000kbps,
// sleep 800ms for small bytes, the buffer should set to:
// 800*1000/8=100000B(about 128KB).
// other examples:
// 2000*3000/8=750000B(about 732KB).
// 2000*5000/8=1250000B(about 1220KB).
int kbps = 4000;
int iv = buffer_ms * kbps / 8;
// socket send buffer, system will double it.
iv = iv / 2;
// override the send buffer by macro.
#ifdef SRS_PERF_SO_SNDBUF_SIZE
iv = SRS_PERF_SO_SNDBUF_SIZE / 2;
#endif
// set the socket send buffer when required larger buffer
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, nb_v) < 0) {
return srs_error_new(ERROR_SOCKET_SNDBUF, "setsockopt fd=%d, r0=%v", fd, r0);
}
if ((r0 = getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &iv, &nb_v)) != 0) {
return srs_error_new(ERROR_SOCKET_SNDBUF, "getsockopt fd=%d, r0=%d", fd, r0);
}
srs_trace("set fd=%d, SO_SNDBUF=%d=>%d, buffer=%dms", fd, ov, iv, buffer_ms);
return err;
}
srs_error_t SrsConnection::cycle() srs_error_t SrsConnection::cycle()
{ {
srs_error_t err = do_cycle(); srs_error_t err = do_cycle();

@ -100,6 +100,10 @@ public:
* to remove the client by server->remove(this). * to remove the client by server->remove(this).
*/ */
virtual srs_error_t start(); virtual srs_error_t start();
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in ms.
virtual srs_error_t set_socket_buffer(int buffer_ms);
// interface ISrsOneCycleThreadHandler // interface ISrsOneCycleThreadHandler
public: public:
/** /**

@ -110,7 +110,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
} }
srs_freep(sdk); srs_freep(sdk);
@ -122,7 +122,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
return srs_error_wrap(err, "edge pull %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); return srs_error_wrap(err, "edge pull %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
} }
if ((err = sdk->play()) != srs_success) { if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "edge pull %s stream failed", url.c_str()); return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());
} }
@ -469,7 +469,7 @@ srs_error_t SrsEdgeForwarder::start()
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost); std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost); vhost = srs_string_replace(vhost, "[vhost]", req->vhost);
url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream); url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
} }
// open socket. // open socket.
@ -482,7 +482,7 @@ srs_error_t SrsEdgeForwarder::start()
return srs_error_wrap(err, "sdk connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); return srs_error_wrap(err, "sdk connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
} }
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish"); return srs_error_wrap(err, "sdk publish");
} }
@ -492,6 +492,7 @@ srs_error_t SrsEdgeForwarder::start()
if ((err = trd->start()) != srs_success) { if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine"); return srs_error_wrap(err, "coroutine");
} }
srs_trace("edge-fwr publish url %s", url.c_str());
return err; return err;
} }

@ -94,42 +94,6 @@ srs_error_t SrsForwarder::on_publish()
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// discovery the server port and tcUrl from req and ep_forward.
std::string server;
std::string tcUrl;
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
if (true) {
// parse host:port from hostport.
srs_parse_hostport(ep_forward, server, port);
// generate tcUrl
tcUrl = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
}
// dead loop check
std::string source_ep = "rtmp://";
source_ep += req->host;
source_ep += ":";
source_ep += req->port;
source_ep += "?vhost=";
source_ep += req->vhost;
std::string dest_ep = "rtmp://";
if (ep_forward == SRS_CONSTS_LOCALHOST) {
dest_ep += req->host;
} else {
dest_ep += server;
}
dest_ep += ":";
dest_ep += port;
dest_ep += "?vhost=";
dest_ep += req->vhost;
if (source_ep == dest_ep) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "forward loop detected. src=%s, dest=%s", source_ep.c_str(), dest_ep.c_str());
}
srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", source_ep.c_str(), dest_ep.c_str(), tcUrl.c_str(), req->stream.c_str());
srs_freep(trd); srs_freep(trd);
trd = new SrsSTCoroutine("forward", this); trd = new SrsSTCoroutine("forward", this);
if ((err = trd->start()) != srs_success) { if ((err = trd->start()) != srs_success) {
@ -245,7 +209,7 @@ srs_error_t SrsForwarder::do_cycle()
srs_parse_hostport(ep_forward, server, port); srs_parse_hostport(ep_forward, server, port);
// generate url // generate url
url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream); url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param);
} }
srs_freep(sdk); srs_freep(sdk);
@ -257,7 +221,7 @@ srs_error_t SrsForwarder::do_cycle()
return srs_error_wrap(err, "sdk connect url=%s, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto); return srs_error_wrap(err, "sdk connect url=%s, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
} }
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish"); return srs_error_wrap(err, "sdk publish");
} }

@ -28,6 +28,7 @@
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <unistd.h> #include <unistd.h>
#include <sstream>
using namespace std; using namespace std;
SrsFragment::SrsFragment() SrsFragment::SrsFragment()
@ -126,11 +127,16 @@ srs_error_t SrsFragment::rename()
string full_path = fullpath(); string full_path = fullpath();
string tmp_file = tmppath(); string tmp_file = tmppath();
int tempdur = (int)duration();
if (true) {
std::stringstream ss;
ss << tempdur;
full_path = srs_string_replace(full_path, "[duration]", ss.str());
}
if (::rename(tmp_file.c_str(), full_path.c_str()) < 0) { if (::rename(tmp_file.c_str(), full_path.c_str()) < 0) {
return srs_error_new(ERROR_SYSTEM_FRAGMENT_RENAME, "rename %s to %s", tmp_file.c_str(), full_path.c_str()); return srs_error_new(ERROR_SYSTEM_FRAGMENT_RENAME, "rename %s to %s", tmp_file.c_str(), full_path.c_str());
} }
filepath = full_path;
return err; return err;
} }

@ -789,7 +789,14 @@ srs_error_t SrsHlsMuxer::_refresh_m3u8(string m3u8_file)
ss << "#EXTINF:" << segment->duration() / 1000.0 << ", no desc" << SRS_CONSTS_LF; ss << "#EXTINF:" << segment->duration() / 1000.0 << ", no desc" << SRS_CONSTS_LF;
// {file name}\n // {file name}\n
ss << segment->uri << SRS_CONSTS_LF; std::string seg_uri = segment->uri;
if (true) {
std::stringstream stemp;
stemp << (int)(segment->duration());
seg_uri = srs_string_replace(seg_uri, "[duration]", stemp.str());
}
//ss << segment->uri << SRS_CONSTS_LF;
ss << seg_uri << SRS_CONSTS_LF;
} }
// write m3u8 to writer. // write m3u8 to writer.

@ -1404,16 +1404,19 @@ srs_error_t SrsHttpApi::do_cycle()
ISrsHttpMessage* req = NULL; ISrsHttpMessage* req = NULL;
// get a http message // get a http message
if ((err = parser->parse_message(skt, this, &req)) != srs_success) { if ((err = parser->parse_message(skt, &req)) != srs_success) {
return srs_error_wrap(err, "parse message"); return srs_error_wrap(err, "parse message");
} }
// if SUCCESS, always NOT-NULL. // if SUCCESS, always NOT-NULL.
srs_assert(req);
// always free it in this scope. // always free it in this scope.
srs_assert(req);
SrsAutoFree(ISrsHttpMessage, req); SrsAutoFree(ISrsHttpMessage, req);
// Attach owner connection to message.
SrsHttpMessage* hreq = (SrsHttpMessage*)req;
hreq->set_connection(this);
// ok, handle http request. // ok, handle http request.
SrsHttpResponseWriter writer(skt); SrsHttpResponseWriter writer(skt);
if ((err = process_request(&writer, req)) != srs_success) { if ((err = process_request(&writer, req)) != srs_success) {

@ -123,19 +123,21 @@ srs_error_t SrsHttpConn::do_cycle()
ISrsHttpMessage* req = NULL; ISrsHttpMessage* req = NULL;
// get a http message // get a http message
if ((err = parser->parse_message(skt, this, &req)) != srs_success) { if ((err = parser->parse_message(skt, &req)) != srs_success) {
break; break;
} }
// if SUCCESS, always NOT-NULL. // if SUCCESS, always NOT-NULL.
srs_assert(req);
// always free it in this scope. // always free it in this scope.
srs_assert(req);
SrsAutoFree(ISrsHttpMessage, req); SrsAutoFree(ISrsHttpMessage, req);
// Attach owner connection to message.
SrsHttpMessage* hreq = (SrsHttpMessage*)req;
hreq->set_connection(this);
// copy request to last request object. // copy request to last request object.
srs_freep(last_req); srs_freep(last_req);
SrsHttpMessage* hreq = dynamic_cast<SrsHttpMessage*>(req);
last_req = hreq->to_request(hreq->host()); last_req = hreq->to_request(hreq->host());
// may should discard the body. // may should discard the body.
@ -218,10 +220,14 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq)
return srs_error_wrap(err, "init socket"); return srs_error_wrap(err, "init socket");
} }
if ((err = parser->parse_message(&skt, this, preq)) != srs_success) { if ((err = parser->parse_message(&skt, preq)) != srs_success) {
return srs_error_wrap(err, "parse message"); return srs_error_wrap(err, "parse message");
} }
// Attach owner connection to message.
SrsHttpMessage* hreq = (SrsHttpMessage*)(*preq);
hreq->set_connection(this);
return err; return err;
} }
@ -237,8 +243,8 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg)
} }
// drop all request body. // drop all request body.
char body[4096];
while (!br->eof()) { while (!br->eof()) {
char body[4096];
if ((err = br->read(body, 4096, NULL)) != srs_success) { if ((err = br->read(body, 4096, NULL)) != srs_success) {
return srs_error_wrap(err, "read response"); return srs_error_wrap(err, "read response");
} }

@ -141,6 +141,7 @@ srs_error_t SrsHttpHooks::on_publish(string url, SrsRequest* req)
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str())); obj->set("tcUrl", SrsJsonAny::str(req->tcUrl.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
std::string data = obj->dumps(); std::string data = obj->dumps();
std::string res; std::string res;
@ -173,6 +174,7 @@ void SrsHttpHooks::on_unpublish(string url, SrsRequest* req)
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
std::string data = obj->dumps(); std::string data = obj->dumps();
std::string res; std::string res;
@ -208,6 +210,7 @@ srs_error_t SrsHttpHooks::on_play(string url, SrsRequest* req)
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("pageUrl", SrsJsonAny::str(req->pageUrl.c_str())); obj->set("pageUrl", SrsJsonAny::str(req->pageUrl.c_str()));
std::string data = obj->dumps(); std::string data = obj->dumps();
@ -241,6 +244,7 @@ void SrsHttpHooks::on_stop(string url, SrsRequest* req)
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
std::string data = obj->dumps(); std::string data = obj->dumps();
std::string res; std::string res;
@ -277,6 +281,7 @@ srs_error_t SrsHttpHooks::on_dvr(int cid, string url, SrsRequest* req, string fi
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("cwd", SrsJsonAny::str(cwd.c_str())); obj->set("cwd", SrsJsonAny::str(cwd.c_str()));
obj->set("file", SrsJsonAny::str(file.c_str())); obj->set("file", SrsJsonAny::str(file.c_str()));
@ -318,6 +323,7 @@ srs_error_t SrsHttpHooks::on_hls(int cid, string url, SrsRequest* req, string fi
obj->set("vhost", SrsJsonAny::str(req->vhost.c_str())); obj->set("vhost", SrsJsonAny::str(req->vhost.c_str()));
obj->set("app", SrsJsonAny::str(req->app.c_str())); obj->set("app", SrsJsonAny::str(req->app.c_str()));
obj->set("stream", SrsJsonAny::str(req->stream.c_str())); obj->set("stream", SrsJsonAny::str(req->stream.c_str()));
obj->set("param", SrsJsonAny::str(req->param.c_str()));
obj->set("duration", SrsJsonAny::number(duration)); obj->set("duration", SrsJsonAny::number(duration));
obj->set("cwd", SrsJsonAny::str(cwd.c_str())); obj->set("cwd", SrsJsonAny::str(cwd.c_str()));
obj->set("file", SrsJsonAny::str(file.c_str())); obj->set("file", SrsJsonAny::str(file.c_str()));
@ -355,6 +361,7 @@ srs_error_t SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* re
url = srs_string_replace(url, "[app]", req->app); url = srs_string_replace(url, "[app]", req->app);
url = srs_string_replace(url, "[stream]", req->stream); url = srs_string_replace(url, "[stream]", req->stream);
url = srs_string_replace(url, "[ts_url]", ts_url); url = srs_string_replace(url, "[ts_url]", ts_url);
url = srs_string_replace(url, "[param]", req->param);
int64_t starttime = srs_update_system_time_ms(); int64_t starttime = srs_update_system_time_ms();

@ -54,10 +54,11 @@ using namespace std;
#include <srs_app_server.hpp> #include <srs_app_server.hpp>
#include <srs_app_statistic.hpp> #include <srs_app_statistic.hpp>
#include <srs_app_recv_thread.hpp> #include <srs_app_recv_thread.hpp>
#include <srs_app_http_hooks.hpp>
SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r) SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
{ {
req = r->copy(); req = r->copy()->as_http();
source = s; source = s;
queue = new SrsMessageQueue(true); queue = new SrsMessageQueue(true);
trd = new SrsSTCoroutine("http-stream", this); trd = new SrsSTCoroutine("http-stream", this);
@ -466,7 +467,7 @@ SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c)
{ {
source = s; source = s;
cache = c; cache = c;
req = r->copy(); req = r->copy()->as_http();
} }
SrsLiveStream::~SrsLiveStream() SrsLiveStream::~SrsLiveStream()
@ -476,9 +477,10 @@ SrsLiveStream::~SrsLiveStream()
srs_error_t SrsLiveStream::update(SrsSource* s, SrsRequest* r) srs_error_t SrsLiveStream::update(SrsSource* s, SrsRequest* r)
{ {
srs_freep(req);
source = s; source = s;
req = r->copy();
srs_freep(req);
req = r->copy()->as_http();
return srs_success; return srs_success;
} }
@ -487,24 +489,51 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if ((err = http_hooks_on_play()) != srs_success) {
return srs_error_wrap(err, "http hook");
}
err = do_serve_http(w, r);
http_hooks_on_stop();
return err;
}
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
string enc_desc;
ISrsBufferEncoder* enc = NULL; ISrsBufferEncoder* enc = NULL;
srs_assert(entry); srs_assert(entry);
if (srs_string_ends_with(entry->pattern, ".flv")) { if (srs_string_ends_with(entry->pattern, ".flv")) {
w->header()->set_content_type("video/x-flv"); w->header()->set_content_type("video/x-flv");
#ifdef SRS_PERF_FAST_FLV_ENCODER #ifdef SRS_PERF_FAST_FLV_ENCODER
enc = new SrsFastFlvStreamEncoder(); bool realtime = _srs_config->get_realtime_enabled(req->vhost);
if (realtime) {
enc_desc = "FLV";
enc = new SrsFlvStreamEncoder();
} else {
enc_desc = "FastFLV";
enc = new SrsFastFlvStreamEncoder();
}
#else #else
enc_desc = "FLV";
enc = new SrsFlvStreamEncoder(); enc = new SrsFlvStreamEncoder();
#endif #endif
} else if (srs_string_ends_with(entry->pattern, ".aac")) { } else if (srs_string_ends_with(entry->pattern, ".aac")) {
w->header()->set_content_type("audio/x-aac"); w->header()->set_content_type("audio/x-aac");
enc_desc = "AAC";
enc = new SrsAacStreamEncoder(); enc = new SrsAacStreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".mp3")) { } else if (srs_string_ends_with(entry->pattern, ".mp3")) {
w->header()->set_content_type("audio/mpeg"); w->header()->set_content_type("audio/mpeg");
enc_desc = "MP3";
enc = new SrsMp3StreamEncoder(); enc = new SrsMp3StreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".ts")) { } else if (srs_string_ends_with(entry->pattern, ".ts")) {
w->header()->set_content_type("video/MP2T"); w->header()->set_content_type("video/MP2T");
enc_desc = "TS";
enc = new SrsTsStreamEncoder(); enc = new SrsTsStreamEncoder();
} else { } else {
return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str()); return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str());
@ -552,12 +581,28 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r); SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection()); SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}
int mw_sleep = _srs_config->get_mw_sleep_ms(req->vhost);
if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep");
}
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc);
SrsAutoFree(SrsHttpRecvThread, trd); SrsAutoFree(SrsHttpRecvThread, trd);
if ((err = trd->start()) != srs_success) { if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start recv thread"); return srs_error_wrap(err, "start recv thread");
} }
srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, mw_sleep, enc->has_cache(), msgs.max);
// TODO: free and erase the disabled entry after all related connections is closed. // TODO: free and erase the disabled entry after all related connections is closed.
while (entry->enabled) { while (entry->enabled) {
@ -576,17 +621,15 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
} }
if (count <= 0) { if (count <= 0) {
srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS); // Directly use sleep, donot use consumer wait, because we couldn't awake consumer.
// directly use sleep, donot use consumer wait. srs_usleep(mw_sleep * 1000);
srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000);
// ignore when nothing got. // ignore when nothing got.
continue; continue;
} }
if (pprint->can_print()) { if (pprint->can_print()) {
srs_info("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d", srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TMMS); count, pprint->age(), SRS_PERF_MW_MIN_MSGS, mw_sleep);
} }
// sendout all messages. // sendout all messages.
@ -615,6 +658,69 @@ srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
return err; return err;
} }
srs_error_t SrsLiveStream::http_hooks_on_play()
{
srs_error_t err = srs_success;
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return err;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_play(req->vhost);
if (!conf) {
return err;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
if ((err = SrsHttpHooks::on_play(url, req)) != srs_success) {
return srs_error_wrap(err, "rtmp on_play %s", url.c_str());
}
}
return err;
}
void SrsLiveStream::http_hooks_on_stop()
{
if (!_srs_config->get_vhost_http_hooks_enabled(req->vhost)) {
return;
}
// the http hooks will cause context switch,
// so we must copy all hooks for the on_connect may freed.
// @see https://github.com/ossrs/srs/issues/475
vector<string> hooks;
if (true) {
SrsConfDirective* conf = _srs_config->get_vhost_on_stop(req->vhost);
if (!conf) {
srs_info("ignore the empty http callback: on_stop");
return;
}
hooks = conf->args;
}
for (int i = 0; i < (int)hooks.size(); i++) {
std::string url = hooks.at(i);
SrsHttpHooks::on_stop(url, req);
}
return;
}
srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs) srs_error_t SrsLiveStream::streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -760,7 +866,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
srs_freep(tmpl->req); srs_freep(tmpl->req);
tmpl->source = s; tmpl->source = s;
tmpl->req = r->copy(); tmpl->req = r->copy()->as_http();
sflvs[sid] = entry; sflvs[sid] = entry;
@ -776,7 +882,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
if ((err = entry->cache->start()) != srs_success) { if ((err = entry->cache->start()) != srs_success) {
return srs_error_wrap(err, "http: start stream cache failed"); return srs_error_wrap(err, "http: start stream cache failed");
} }
srs_trace("http: mount flv stream for vhost=%s, mount=%s", sid.c_str(), mount.c_str()); srs_trace("http: mount flv stream for sid=%s, mount=%s", sid.c_str(), mount.c_str());
} else { } else {
entry = sflvs[sid]; entry = sflvs[sid];
entry->stream->update(s, r); entry->stream->update(s, r);

@ -34,9 +34,7 @@ class SrsFlvTransmuxer;
class SrsTsTransmuxer; class SrsTsTransmuxer;
/** /**
* for the srs http stream cache, * A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
* for example, the audio stream cache to make android(weixin) happy.
* we start a thread to shrink the queue.
*/ */
class SrsBufferCache : public ISrsCoroutineHandler class SrsBufferCache : public ISrsCoroutineHandler
{ {
@ -60,7 +58,7 @@ public:
}; };
/** /**
* the stream encoder in some codec, for example, flv or aac. * The encoder to transmux RTMP stream.
*/ */
class ISrsBufferEncoder class ISrsBufferEncoder
{ {
@ -94,7 +92,7 @@ public:
}; };
/** /**
* the flv stream encoder, remux rtmp stream to flv stream. * Transmux RTMP to HTTP Live Streaming.
*/ */
class SrsFlvStreamEncoder : public ISrsBufferEncoder class SrsFlvStreamEncoder : public ISrsBufferEncoder
{ {
@ -115,7 +113,7 @@ public:
#ifdef SRS_PERF_FAST_FLV_ENCODER #ifdef SRS_PERF_FAST_FLV_ENCODER
/** /**
* the fast flv stream encoder. * A Fast HTTP FLV Live Streaming, to write multiple tags by writev.
* @see https://github.com/ossrs/srs/issues/405 * @see https://github.com/ossrs/srs/issues/405
*/ */
class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder
@ -132,7 +130,7 @@ public:
#endif #endif
/** /**
* the ts stream encoder, remux rtmp stream to ts stream. * Transmux RTMP to HTTP TS Streaming.
*/ */
class SrsTsStreamEncoder : public ISrsBufferEncoder class SrsTsStreamEncoder : public ISrsBufferEncoder
{ {
@ -152,7 +150,7 @@ public:
}; };
/** /**
* the aac stream encoder, remux rtmp stream to aac stream. * Transmux RTMP with AAC stream to HTTP AAC Streaming.
*/ */
class SrsAacStreamEncoder : public ISrsBufferEncoder class SrsAacStreamEncoder : public ISrsBufferEncoder
{ {
@ -173,7 +171,7 @@ public:
}; };
/** /**
* the mp3 stream encoder, remux rtmp stream to mp3 stream. * Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
*/ */
class SrsMp3StreamEncoder : public ISrsBufferEncoder class SrsMp3StreamEncoder : public ISrsBufferEncoder
{ {
@ -215,8 +213,7 @@ public:
}; };
/** /**
* the flv live stream supports access rtmp in flv over http. * HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format.
* srs will remux rtmp to flv streaming.
*/ */
class SrsLiveStream : public ISrsHttpHandler class SrsLiveStream : public ISrsHttpHandler
{ {
@ -231,11 +228,14 @@ public:
public: public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private: private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
virtual srs_error_t http_hooks_on_play();
virtual void http_hooks_on_stop();
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
}; };
/** /**
* the srs live entry * The Live Entry, to handle HTTP Live Streaming.
*/ */
struct SrsLiveEntry struct SrsLiveEntry
{ {
@ -264,8 +264,7 @@ public:
}; };
/** /**
* the http stream server instance, * The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream.
* serve http stream, for example, flv/ts/mp3/aac live stream.
*/ */
// TODO: Support multiple stream. // TODO: Support multiple stream.
class SrsHttpStreamServer : virtual public ISrsReloadHandler class SrsHttpStreamServer : virtual public ISrsReloadHandler

@ -626,7 +626,7 @@ srs_error_t SrsMpegtsOverUdp::connect()
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto); return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
} }
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
close(); close();
return srs_error_wrap(err, "publish"); return srs_error_wrap(err, "publish");
} }

@ -221,7 +221,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl)
output = srs_string_replace(output, "[pageUrl]", req->pageUrl); output = srs_string_replace(output, "[pageUrl]", req->pageUrl);
if (output.find("[url]") != string::npos) { if (output.find("[url]") != string::npos) {
string url = srs_generate_rtmp_url(req->host, req->port, req->vhost, req->app, req->stream); string url = srs_generate_rtmp_url(req->host, req->port, req->host, req->vhost, req->app, req->stream, req->param);
output = srs_string_replace(output, "[url]", url); output = srs_string_replace(output, "[url]", url);
} }

@ -470,9 +470,9 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param); srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);
req->strip(); req->strip();
srs_trace("client identified, type=%s, vhost=%s, app=%s, stream_name=%s, duration=%.2f", srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%.2f",
srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->duration); srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), req->duration);
// discovery vhost, resolve the vhost from config // discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost); SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) { if (parsed_vhost) {
@ -489,11 +489,10 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
return srs_error_wrap(err, "check vhost"); return srs_error_wrap(err, "check vhost");
} }
srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, args=%s", srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port,
req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));
req->app.c_str(), req->stream.c_str(), (req->args? "(obj)":"null"));
// do token traverse before serve it. // do token traverse before serve it.
// @see https://github.com/ossrs/srs/pull/239 // @see https://github.com/ossrs/srs/pull/239
if (true) { if (true) {
@ -1137,48 +1136,7 @@ void SrsRtmpConn::change_mw_sleep(int sleep_ms)
return; return;
} }
// get the sock buffer size. set_socket_buffer(sleep_ms);
int fd = srs_netfd_fileno(stfd);
int onb_sbuf = 0;
socklen_t sock_buf_size = sizeof(int);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &onb_sbuf, &sock_buf_size);
#ifdef SRS_PERF_MW_SO_SNDBUF
// the bytes:
// 4KB=4096, 8KB=8192, 16KB=16384, 32KB=32768, 64KB=65536,
// 128KB=131072, 256KB=262144, 512KB=524288
// the buffer should set to sleep*kbps/8,
// for example, your system delivery stream in 1000kbps,
// sleep 800ms for small bytes, the buffer should set to:
// 800*1000/8=100000B(about 128KB).
// other examples:
// 2000*3000/8=750000B(about 732KB).
// 2000*5000/8=1250000B(about 1220KB).
int kbps = 5000;
int socket_buffer_size = sleep_ms * kbps / 8;
// socket send buffer, system will double it.
int nb_sbuf = socket_buffer_size / 2;
// override the send buffer by macro.
#ifdef SRS_PERF_SO_SNDBUF_SIZE
nb_sbuf = SRS_PERF_SO_SNDBUF_SIZE / 2;
#endif
// set the socket send buffer when required larger buffer
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, sock_buf_size) < 0) {
srs_warn("set sock SO_SENDBUF=%d failed.", nb_sbuf);
}
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &nb_sbuf, &sock_buf_size);
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, esbuf=%d, sbuf %d=>%d, realtime=%d",
mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, socket_buffer_size,
onb_sbuf, nb_sbuf, realtime);
#else
srs_trace("mw changed sleep %d=>%d, max_msgs=%d, sbuf %d, realtime=%d",
mw_sleep, sleep_ms, SRS_PERF_MW_MSGS, onb_sbuf, realtime);
#endif
mw_sleep = sleep_ms; mw_sleep = sleep_ms;
} }
@ -1189,25 +1147,12 @@ void SrsRtmpConn::set_sock_options()
bool nvalue = _srs_config->get_tcp_nodelay(req->vhost); bool nvalue = _srs_config->get_tcp_nodelay(req->vhost);
if (nvalue != tcp_nodelay) { if (nvalue != tcp_nodelay) {
tcp_nodelay = nvalue; tcp_nodelay = nvalue;
#ifdef SRS_PERF_TCP_NODELAY
int fd = srs_netfd_fileno(stfd);
socklen_t nb_v = sizeof(int);
int ov = 0; srs_error_t err = set_tcp_nodelay(tcp_nodelay);
getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &ov, &nb_v); if (err != srs_success) {
srs_warn("ignore err %s", srs_error_desc(err).c_str());
int v = tcp_nodelay; srs_freep(err);
// set the socket send buffer when required larger buffer
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, nb_v) < 0) {
srs_warn("set sock TCP_NODELAY=%d failed.", v);
} }
getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, &nb_v);
srs_trace("set TCP_NODELAY %d=>%d", ov, v);
#else
srs_warn("SRS_PERF_TCP_NODELAY is disabled but tcp_nodelay configed.");
#endif
} }
} }

@ -661,7 +661,7 @@ srs_error_t SrsRtspConn::connect()
} }
// publish. // publish.
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
close(); close();
return srs_error_wrap(err, "publish %s failed", url.c_str()); return srs_error_wrap(err, "publish %s failed", url.c_str());
} }

@ -653,6 +653,7 @@ srs_error_t SrsServer::acquire_pid_file()
if (fcntl(fd, F_SETLK, &lock) == -1) { if (fcntl(fd, F_SETLK, &lock) == -1) {
if(errno == EACCES || errno == EAGAIN) { if(errno == EACCES || errno == EAGAIN) {
::close(fd);
srs_error("srs is already running!"); srs_error("srs is already running!");
return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running"); return srs_error_new(ERROR_SYSTEM_PID_ALREADY_RUNNING, "srs is already running");
} }

@ -1482,8 +1482,8 @@ srs_error_t SrsOriginHub::create_forwarders()
} }
// TODO: FIXME: support queue size. // TODO: FIXME: support queue size.
//double queue_size = _srs_config->get_queue_length(req->vhost); double queue_size = _srs_config->get_queue_length(req->vhost);
//forwarder->set_queue_size(queue_size); forwarder->set_queue_size(queue_size);
if ((err = forwarder->on_publish()) != srs_success) { if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",

@ -27,7 +27,7 @@
// current release version // current release version
#define VERSION_MAJOR 3 #define VERSION_MAJOR 3
#define VERSION_MINOR 0 #define VERSION_MINOR 0
#define VERSION_REVISION 29 #define VERSION_REVISION 40
// generated by configure, only macros. // generated by configure, only macros.
#include <srs_auto_headers.hpp> #include <srs_auto_headers.hpp>

@ -203,7 +203,7 @@ bool SrsFlvAudio::aac(char* data, int size)
// 1 = 11 kHz = 11025 Hz // 1 = 11 kHz = 11025 Hz
// 2 = 22 kHz = 22050 Hz // 2 = 22 kHz = 22050 Hz
// 3 = 44 kHz = 44100 Hz // 3 = 44 kHz = 44100 Hz
int srs_flv_srates[] = {5512, 11025, 22050, 44100}; int srs_flv_srates[] = {5512, 11025, 22050, 44100, 0};
// the sample rates in the codec, // the sample rates in the codec,
// in the sequence header. // in the sequence header.

@ -112,6 +112,8 @@
#define ERROR_ASPROCESS_PPID 1073 #define ERROR_ASPROCESS_PPID 1073
#define ERROR_EXCEED_CONNECTIONS 1074 #define ERROR_EXCEED_CONNECTIONS 1074
#define ERROR_SOCKET_SETKEEPALIVE 1075 #define ERROR_SOCKET_SETKEEPALIVE 1075
#define ERROR_SOCKET_NO_NODELAY 1076
#define ERROR_SOCKET_SNDBUF 1077
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
// RTMP protocol error. // RTMP protocol error.
@ -318,6 +320,7 @@
#define ERROR_KAFKA_CODEC_MESSAGE 4036 #define ERROR_KAFKA_CODEC_MESSAGE 4036
#define ERROR_KAFKA_CODEC_PRODUCER 4037 #define ERROR_KAFKA_CODEC_PRODUCER 4037
#define ERROR_HTTP_302_INVALID 4038 #define ERROR_HTTP_302_INVALID 4038
#define ERROR_BASE64_DECODE 4039
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
// HTTP API error. // HTTP API error.

@ -182,8 +182,11 @@ SrsTsMessage* SrsTsMessage::detach()
cp->sid = sid; cp->sid = sid;
cp->PES_packet_length = PES_packet_length; cp->PES_packet_length = PES_packet_length;
cp->continuity_counter = continuity_counter; cp->continuity_counter = continuity_counter;
srs_freep(cp->payload);
cp->payload = payload; cp->payload = payload;
payload = NULL; payload = NULL;
return cp; return cp;
} }

File diff suppressed because it is too large Load Diff

@ -147,27 +147,8 @@ extern uint32_t srs_crc32_ieee(const void* buf, int size, uint32_t previous = 0)
/** /**
* Decode a base64-encoded string. * Decode a base64-encoded string.
*
* @param out buffer for decoded data
* @param in null-terminated input string
* @param out_size size in bytes of the out buffer, must be at
* least 3/4 of the length of in
* @return number of bytes written, or a negative value in case of
* invalid input
*/
extern int srs_av_base64_decode(uint8_t* out, const char* in, int out_size);
/**
* Encode data to base64 and null-terminate.
*
* @param out buffer for encoded data
* @param out_size size in bytes of the out buffer (including the
* null terminator), must be at least AV_BASE64_SIZE(in_size)
* @param in input buffer containing the data to encode
* @param in_size size in bytes of the in buffer
* @return out or NULL in case of error
*/ */
extern char* srs_av_base64_encode(char* out, int out_size, const uint8_t* in, int in_size); extern srs_error_t srs_av_base64_decode(std::string cipher, std::string& plaintext);
/** /**
* Calculate the output size needed to base64-encode x bytes to a * Calculate the output size needed to base64-encode x bytes to a
@ -180,11 +161,12 @@ extern char* srs_av_base64_encode(char* out, int out_size, const uint8_t* in, in
* for example, p=config='139056E5A0' * for example, p=config='139056E5A0'
* output hex to data={0x13, 0x90, 0x56, 0xe5, 0xa0} * output hex to data={0x13, 0x90, 0x56, 0xe5, 0xa0}
*/ */
extern int ff_hex_to_data(uint8_t* data, const char* p); extern int srs_hex_to_data(uint8_t* data, const char* p, int size);
/** /**
* convert data string to hex. * convert data string to hex.
*/ */
extern char *srs_data_to_hex(char *buff, const uint8_t *src, int s); extern char *srs_data_to_hex(char *des, const uint8_t *src, int len);
/** /**
* generate the c0 chunk header for msg. * generate the c0 chunk header for msg.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -906,10 +906,12 @@ int SrsIngestHlsOutput::parse_message_queue()
std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin(); std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
SrsTsMessage* msg = it->second; SrsTsMessage* msg = it->second;
SrsAutoFree(SrsTsMessage, msg);
queue.erase(it);
if (msg->channel->stream == SrsTsStreamVideoH264) { if (msg->channel->stream == SrsTsStreamVideoH264) {
nb_videos--; nb_videos--;
} }
queue.erase(it);
// parse the stream. // parse the stream.
SrsBuffer avs(msg->payload->bytes(), msg->payload->length()); SrsBuffer avs(msg->payload->bytes(), msg->payload->length());
@ -939,6 +941,7 @@ int SrsIngestHlsOutput::flush_message_queue()
std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin(); std::multimap<int64_t, SrsTsMessage*>::iterator it = queue.begin();
SrsTsMessage* msg = it->second; SrsTsMessage* msg = it->second;
SrsAutoFree(SrsTsMessage, msg);
queue.erase(it); queue.erase(it);
// parse the stream. // parse the stream.
@ -1286,7 +1289,7 @@ int SrsIngestHlsOutput::connect()
} }
// publish. // publish.
if ((err = sdk->publish()) != srs_success) { if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
// TODO: FIXME: Use error // TODO: FIXME: Use error
ret = srs_error_code(err); ret = srs_error_code(err);
srs_freep(err); srs_freep(err);

@ -113,6 +113,11 @@ void srs_discovery_tc_url(string tcUrl, string& schema, string& host, string& vh
vhost = host; vhost = host;
srs_vhost_resolve(vhost, app, param); srs_vhost_resolve(vhost, app, param);
srs_vhost_resolve(vhost, stream, param); srs_vhost_resolve(vhost, stream, param);
// Ignore when the param only contains the default vhost.
if (param == "?vhost="SRS_CONSTS_RTMP_DEFAULT_VHOST) {
param = "";
}
} }
void srs_parse_query_string(string q, map<string,string>& query) void srs_parse_query_string(string q, map<string,string>& query)
@ -151,43 +156,59 @@ void srs_random_generate(char* bytes, int size)
} }
} }
string srs_generate_tc_url(string ip, string vhost, string app, int port, string param) string srs_generate_tc_url(string host, string vhost, string app, int port)
{ {
string tcUrl = "rtmp://"; string tcUrl = "rtmp://";
if (vhost == SRS_CONSTS_RTMP_DEFAULT_VHOST) { if (vhost == SRS_CONSTS_RTMP_DEFAULT_VHOST) {
tcUrl += ip; tcUrl += host;
} else { } else {
tcUrl += vhost; tcUrl += vhost;
} }
if (port != SRS_CONSTS_RTMP_DEFAULT_PORT) { if (port != SRS_CONSTS_RTMP_DEFAULT_PORT) {
tcUrl += ":"; tcUrl += ":" + srs_int2str(port);
tcUrl += srs_int2str(port);
} }
tcUrl += "/"; tcUrl += "/" + app;
tcUrl += app;
if (!param.empty()) {
tcUrl += "?" + param;
}
return tcUrl; return tcUrl;
} }
string srs_generate_normal_tc_url(string ip, string vhost, string app, int port, string param) string srs_generate_stream_with_query(string host, string vhost, string stream, string param)
{
return "rtmp://" + vhost + ":" + srs_int2str(port) + "/" + app + (param.empty() ? "" : "?" + param);
}
string srs_generate_via_tc_url(string ip, string vhost, string app, int port, string param)
{
return "rtmp://" + ip + ":" + srs_int2str(port) + "/" + vhost + "/" + app + (param.empty() ? "" : "?" + param);
}
string srs_generate_vis_tc_url(string ip, string vhost, string app, int port, string param)
{ {
return "rtmp://" + ip + ":" + srs_int2str(port) + "/" + app + (param.empty() ? "" : "?" + param); string url = stream;
string query = param;
// If no vhost in param, try to append one.
string guessVhost;
if (query.find("vhost=") == string::npos) {
if (vhost != SRS_CONSTS_RTMP_DEFAULT_VHOST) {
guessVhost = vhost;
} else if (!srs_is_ipv4(host)) {
guessVhost = host;
}
}
// Well, if vhost exists, always append in query string.
if (!guessVhost.empty()) {
query += "&vhost=" + guessVhost;
}
// Remove the start & when param is empty.
srs_string_trim_start(query, "&");
// Prefix query with ?.
if (!srs_string_starts_with(query, "?")) {
url += "?";
}
// Append query to url.
if (!query.empty()) {
url += query;
}
return url;
} }
template<typename T> template<typename T>
@ -287,22 +308,12 @@ void srs_parse_rtmp_url(string url, string& tcUrl, string& stream)
} }
} }
string srs_generate_rtmp_url(string server, int port, string vhost, string app, string stream) string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param)
{ {
std::stringstream ss; string tcUrl = "rtmp://" + server + ":" + srs_int2str(port) + "/" + app;
string streamWithQuery = srs_generate_stream_with_query(host, vhost, stream, param);
ss << "rtmp://" << server << ":" << std::dec << port << "/" << app; string url = tcUrl + "/" + streamWithQuery;
return url;
// when default or server is vhost, donot specifies the vhost in params.
if (SRS_CONSTS_RTMP_DEFAULT_VHOST != vhost && server != vhost) {
ss << "...vhost..." << vhost;
}
if (!stream.empty()) {
ss << "/" << stream;
}
return ss.str();
} }
srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite) srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)

@ -71,28 +71,16 @@ extern void srs_parse_query_string(std::string q, std::map<std::string, std::str
extern void srs_random_generate(char* bytes, int size); extern void srs_random_generate(char* bytes, int size);
/** /**
* generate the tcUrl. * generate the tcUrl without param.
* @param param, the app parameters in tcUrl. for example, ?key=xxx,vhost=xxx * @remark Use host as tcUrl.vhost if vhost is default vhost.
* @return the tcUrl generated from ip/vhost/app/port.
* @remark when vhost equals to __defaultVhost__, use ip as vhost.
* @remark ignore port if port equals to default port 1935.
*/ */
extern std::string srs_generate_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param); extern std::string srs_generate_tc_url(std::string host, std::string vhost, std::string app, int port);
/** /**
* srs_detect_tools generate the normal tcUrl * Generate the stream with param.
* @remark Append vhost in query string if not default vhost.
*/ */
extern std::string srs_generate_normal_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param); extern std::string srs_generate_stream_with_query(std::string host, std::string vhost, std::string stream, std::string param);
/**
* srs_detect_tools generate the normal tcUrl
*/
extern std::string srs_generate_via_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);
/**
* srs_detect_tools generate the vis/vis2 tcUrl
*/
extern std::string srs_generate_vis_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);
/** /**
* create shared ptr message from bytes. * create shared ptr message from bytes.
@ -111,8 +99,9 @@ extern std::string srs_generate_stream_url(std::string vhost, std::string app, s
// stream: livestream // stream: livestream
extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string& stream); extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string& stream);
// genereate the rtmp url, for instance, rtmp://server:port/app...vhost...vhost/stream // Genereate the rtmp url, for instance, rtmp://server:port/app/stream?param
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string vhost, std::string app, std::string stream); // @remark We always put vhost in param, in the query of url.
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param);
// write large numbers of iovs. // write large numbers of iovs.
extern srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL); extern srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);
@ -120,5 +109,8 @@ extern srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* io
// join string in vector with indicated separator // join string in vector with indicated separator
extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator); extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);
// Whether domain is an IPv4 address.
extern bool srs_is_ipv4(std::string domain);
#endif #endif

@ -1581,6 +1581,7 @@ void SrsRequest::update_auth(SrsRequest* req)
pageUrl = req->pageUrl; pageUrl = req->pageUrl;
swfUrl = req->swfUrl; swfUrl = req->swfUrl;
tcUrl = req->tcUrl; tcUrl = req->tcUrl;
param = req->param;
ip = req->ip; ip = req->ip;
vhost = req->vhost; vhost = req->vhost;
@ -1624,6 +1625,12 @@ void SrsRequest::strip()
stream = srs_string_trim_start(stream, "/"); stream = srs_string_trim_start(stream, "/");
} }
SrsRequest* SrsRequest::as_http()
{
schema = "http";
return this;
}
SrsResponse::SrsResponse() SrsResponse::SrsResponse()
{ {
stream_id = SRS_DEFAULT_SID; stream_id = SRS_DEFAULT_SID;
@ -2053,7 +2060,7 @@ srs_error_t SrsRtmpClient::create_stream(int& stream_id)
return err; return err;
} }
srs_error_t SrsRtmpClient::play(string stream, int stream_id) srs_error_t SrsRtmpClient::play(string stream, int stream_id, int chunk_size)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -2081,27 +2088,27 @@ srs_error_t SrsRtmpClient::play(string stream, int stream_id)
} }
// SetChunkSize // SetChunkSize
if (true) { if (chunk_size != SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE) {
SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
pkt->chunk_size = SRS_CONSTS_RTMP_SRS_CHUNK_SIZE; pkt->chunk_size = chunk_size;
if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) { if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
return srs_error_wrap(err, "send set chunk size failed. stream=%s, chunk_size=%d", stream.c_str(), SRS_CONSTS_RTMP_SRS_CHUNK_SIZE); return srs_error_wrap(err, "send set chunk size failed. stream=%s, chunk_size=%d", stream.c_str(), chunk_size);
} }
} }
return err; return err;
} }
srs_error_t SrsRtmpClient::publish(string stream, int stream_id) srs_error_t SrsRtmpClient::publish(string stream, int stream_id, int chunk_size)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// SetChunkSize // SetChunkSize
if (true) { if (chunk_size != SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE) {
SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket();
pkt->chunk_size = SRS_CONSTS_RTMP_SRS_CHUNK_SIZE; pkt->chunk_size = chunk_size;
if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) { if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
return srs_error_wrap(err, "send set chunk size failed. stream=%s, chunk_size=%d", stream.c_str(), SRS_CONSTS_RTMP_SRS_CHUNK_SIZE); return srs_error_wrap(err, "send set chunk size failed. stream=%s, chunk_size=%d", stream.c_str(), chunk_size);
} }
} }
@ -2407,7 +2414,7 @@ srs_error_t SrsRtmpServer::redirect(SrsRequest* r, string host, int port, bool&
srs_error_t err = srs_success; srs_error_t err = srs_success;
if (true) { if (true) {
string url = srs_generate_rtmp_url(host, port, r->vhost, r->app, ""); string url = srs_generate_rtmp_url(host, port, r->host, r->vhost, r->app, r->stream, r->param);
SrsAmf0Object* ex = SrsAmf0Any::object(); SrsAmf0Object* ex = SrsAmf0Any::object();
ex->set("code", SrsAmf0Any::number(302)); ex->set("code", SrsAmf0Any::number(302));

@ -602,6 +602,9 @@ public:
* strip url, user must strip when update the url. * strip url, user must strip when update the url.
*/ */
virtual void strip(); virtual void strip();
public:
// Transform it as HTTP request.
virtual SrsRequest* as_http();
}; };
/** /**
@ -729,12 +732,12 @@ public:
/** /**
* start play stream. * start play stream.
*/ */
virtual srs_error_t play(std::string stream, int stream_id); virtual srs_error_t play(std::string stream, int stream_id, int chunk_size);
/** /**
* start publish stream. use flash publish workflow: * start publish stream. use flash publish workflow:
* connect-app => create-stream => flash-publish * connect-app => create-stream => flash-publish
*/ */
virtual srs_error_t publish(std::string stream, int stream_id); virtual srs_error_t publish(std::string stream, int stream_id, int chunk_size);
/** /**
* start publish stream. use FMLE publish workflow: * start publish stream. use FMLE publish workflow:
* connect-app => FMLE publish * connect-app => FMLE publish

@ -537,8 +537,12 @@ srs_error_t SrsRtspSdp::parse_fmtp_attribute(string attr)
char* tmp_sh = new char[item_value.length()]; char* tmp_sh = new char[item_value.length()];
SrsAutoFreeA(char, tmp_sh); SrsAutoFreeA(char, tmp_sh);
int nb_tmp_sh = ff_hex_to_data((uint8_t*)tmp_sh, item_value.c_str());
srs_assert(nb_tmp_sh > 0); int nb_tmp_sh = srs_hex_to_data((uint8_t*)tmp_sh, item_value.c_str(), item_value.length());
if (nb_tmp_sh <= 0) {
return srs_error_new(ERROR_RTSP_AUDIO_CONFIG, "audio config");
}
audio_sh.append(tmp_sh, nb_tmp_sh); audio_sh.append(tmp_sh, nb_tmp_sh);
} }
} }
@ -583,23 +587,16 @@ srs_error_t SrsRtspSdp::parse_control_attribute(string attr)
return err; return err;
} }
string SrsRtspSdp::base64_decode(string value) string SrsRtspSdp::base64_decode(string cipher)
{ {
if (value.empty()) { if (cipher.empty()) {
return ""; return "";
} }
int nb_output = (int)(value.length() * 2); string plaintext;
uint8_t* output = new uint8_t[nb_output]; srs_error_t err = srs_av_base64_decode(cipher, plaintext);
SrsAutoFreeA(uint8_t, output); srs_freep(err);
int ret = srs_av_base64_decode(output, (char*)value.c_str(), nb_output);
if (ret <= 0) {
return "";
}
std::string plaintext;
plaintext.append((char*)output, ret);
return plaintext; return plaintext;
} }

@ -124,7 +124,7 @@ srs_error_t SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg
} }
ISrsHttpMessage* msg = NULL; ISrsHttpMessage* msg = NULL;
if ((err = parser->parse_message(transport, NULL, &msg)) != srs_success) { if ((err = parser->parse_message(transport, &msg)) != srs_success) {
return srs_error_wrap(err, "http: parse response"); return srs_error_wrap(err, "http: parse response");
} }
srs_assert(msg); srs_assert(msg);
@ -170,7 +170,7 @@ srs_error_t SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
} }
ISrsHttpMessage* msg = NULL; ISrsHttpMessage* msg = NULL;
if ((err = parser->parse_message(transport, NULL, &msg)) != srs_success) { if ((err = parser->parse_message(transport, &msg)) != srs_success) {
return srs_error_wrap(err, "http: parse response"); return srs_error_wrap(err, "http: parse response");
} }
srs_assert(msg); srs_assert(msg);

@ -68,7 +68,7 @@ srs_error_t SrsHttpParser::initialize(enum http_parser_type type, bool allow_jso
return err; return err;
} }
srs_error_t SrsHttpParser::parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg) srs_error_t SrsHttpParser::parse_message(ISrsReader* reader, ISrsHttpMessage** ppmsg)
{ {
*ppmsg = NULL; *ppmsg = NULL;
@ -85,12 +85,12 @@ srs_error_t SrsHttpParser::parse_message(ISrsProtocolReaderWriter* io, SrsConnec
header_parsed = 0; header_parsed = 0;
// do parse // do parse
if ((err = parse_message_imp(io)) != srs_success) { if ((err = parse_message_imp(reader)) != srs_success) {
return srs_error_wrap(err, "parse message"); return srs_error_wrap(err, "parse message");
} }
// create msg // create msg
SrsHttpMessage* msg = new SrsHttpMessage(io, conn); SrsHttpMessage* msg = new SrsHttpMessage(reader);
// initalize http msg, parse url. // initalize http msg, parse url.
if ((err = msg->update(url, jsonp, &header, buffer, headers)) != srs_success) { if ((err = msg->update(url, jsonp, &header, buffer, headers)) != srs_success) {
@ -104,7 +104,7 @@ srs_error_t SrsHttpParser::parse_message(ISrsProtocolReaderWriter* io, SrsConnec
return err; return err;
} }
srs_error_t SrsHttpParser::parse_message_imp(ISrsProtocolReaderWriter* io) srs_error_t SrsHttpParser::parse_message_imp(ISrsReader* reader)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
@ -138,7 +138,7 @@ srs_error_t SrsHttpParser::parse_message_imp(ISrsProtocolReaderWriter* io)
// when nothing parsed, read more to parse. // when nothing parsed, read more to parse.
if (nparsed == 0) { if (nparsed == 0) {
// when requires more, only grow 1bytes, but the buffer will cache more. // when requires more, only grow 1bytes, but the buffer will cache more.
if ((err = buffer->grow(io, buffer->size() + 1)) != srs_success) { if ((err = buffer->grow(reader, buffer->size() + 1)) != srs_success) {
return srs_error_wrap(err, "grow buffer"); return srs_error_wrap(err, "grow buffer");
} }
} }
@ -254,14 +254,14 @@ int SrsHttpParser::on_body(http_parser* parser, const char* at, size_t length)
return 0; return 0;
} }
SrsHttpMessage::SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c) : ISrsHttpMessage() SrsHttpMessage::SrsHttpMessage(ISrsReader* reader) : ISrsHttpMessage()
{ {
conn = c; owner_conn = NULL;
chunked = false; chunked = false;
infinite_chunked = false; infinite_chunked = false;
keep_alive = true; keep_alive = true;
_uri = new SrsHttpUri(); _uri = new SrsHttpUri();
_body = new SrsHttpResponseReader(this, io); _body = new SrsHttpResponseReader(this, reader);
_http_ts_send_buffer = new char[SRS_HTTP_TS_SEND_BUFFER_SIZE]; _http_ts_send_buffer = new char[SRS_HTTP_TS_SEND_BUFFER_SIZE];
jsonp = false; jsonp = false;
} }
@ -329,7 +329,12 @@ srs_error_t SrsHttpMessage::update(string url, bool allow_jsonp, http_parser* he
SrsConnection* SrsHttpMessage::connection() SrsConnection* SrsHttpMessage::connection()
{ {
return conn; return owner_conn;
}
void SrsHttpMessage::set_connection(SrsConnection* conn)
{
owner_conn = conn;
} }
uint8_t SrsHttpMessage::method() uint8_t SrsHttpMessage::method()
@ -842,9 +847,9 @@ srs_error_t SrsHttpResponseWriter::send_header(char* data, int size)
return skt->write((void*)buf.c_str(), buf.length(), NULL); return skt->write((void*)buf.c_str(), buf.length(), NULL);
} }
SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io) SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, ISrsReader* reader)
{ {
skt = io; skt = reader;
owner = msg; owner = msg;
is_eof = false; is_eof = false;
nb_total_read = 0; nb_total_read = 0;

@ -30,10 +30,10 @@
#include <srs_http_stack.hpp> #include <srs_http_stack.hpp>
class ISrsProtocolReaderWriter;
class SrsConnection; class SrsConnection;
class SrsFastStream; class SrsFastStream;
class SrsRequest; class SrsRequest;
class ISrsReader;
class SrsHttpResponseReader; class SrsHttpResponseReader;
class SrsStSocket; class SrsStSocket;
@ -77,12 +77,12 @@ public:
* @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete(). * @remark, if success, *ppmsg always NOT-NULL, *ppmsg always is_complete().
* @remark user must free the ppmsg if not NULL. * @remark user must free the ppmsg if not NULL.
*/ */
virtual srs_error_t parse_message(ISrsProtocolReaderWriter* io, SrsConnection* conn, ISrsHttpMessage** ppmsg); virtual srs_error_t parse_message(ISrsReader* reader, ISrsHttpMessage** ppmsg);
private: private:
/** /**
* parse the HTTP message to member field: msg. * parse the HTTP message to member field: msg.
*/ */
virtual srs_error_t parse_message_imp(ISrsProtocolReaderWriter* io); virtual srs_error_t parse_message_imp(ISrsReader* reader);
private: private:
static int on_message_begin(http_parser* parser); static int on_message_begin(http_parser* parser);
static int on_headers_complete(http_parser* parser); static int on_headers_complete(http_parser* parser);
@ -149,13 +149,13 @@ private:
// the query map // the query map
std::map<std::string, std::string> _query; std::map<std::string, std::string> _query;
// the transport connection, can be NULL. // the transport connection, can be NULL.
SrsConnection* conn; SrsConnection* owner_conn;
// whether request is jsonp. // whether request is jsonp.
bool jsonp; bool jsonp;
// the method in QueryString will override the HTTP method. // the method in QueryString will override the HTTP method.
std::string jsonp_method; std::string jsonp_method;
public: public:
SrsHttpMessage(ISrsProtocolReaderWriter* io, SrsConnection* c); SrsHttpMessage(ISrsReader* io);
virtual ~SrsHttpMessage(); virtual ~SrsHttpMessage();
public: public:
/** /**
@ -163,7 +163,9 @@ public:
*/ */
virtual srs_error_t update(std::string url, bool allow_jsonp, http_parser* header, SrsFastStream* body, std::vector<SrsHttpHeaderField>& headers); virtual srs_error_t update(std::string url, bool allow_jsonp, http_parser* header, SrsFastStream* body, std::vector<SrsHttpHeaderField>& headers);
public: public:
// Get the owner connection, maybe NULL.
virtual SrsConnection* connection(); virtual SrsConnection* connection();
virtual void set_connection(SrsConnection* conn);
public: public:
virtual uint8_t method(); virtual uint8_t method();
virtual uint16_t status_code(); virtual uint16_t status_code();
@ -295,7 +297,7 @@ public:
class SrsHttpResponseReader : virtual public ISrsHttpResponseReader class SrsHttpResponseReader : virtual public ISrsHttpResponseReader
{ {
private: private:
ISrsProtocolReaderWriter* skt; ISrsReader* skt;
SrsHttpMessage* owner; SrsHttpMessage* owner;
SrsFastStream* buffer; SrsFastStream* buffer;
bool is_eof; bool is_eof;
@ -306,7 +308,7 @@ private:
// already read total bytes. // already read total bytes.
int64_t nb_total_read; int64_t nb_total_read;
public: public:
SrsHttpResponseReader(SrsHttpMessage* msg, ISrsProtocolReaderWriter* io); SrsHttpResponseReader(SrsHttpMessage* msg, ISrsReader* reader);
virtual ~SrsHttpResponseReader(); virtual ~SrsHttpResponseReader();
public: public:
/** /**

@ -134,7 +134,7 @@ srs_error_t SrsBasicRtmpClient::do_connect_app(string local_ip, bool debug)
// generate the tcUrl // generate the tcUrl
std::string param = ""; std::string param = "";
std::string target_vhost = req->vhost; std::string target_vhost = req->vhost;
std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port, param); std::string tc_url = srs_generate_tc_url(req->host, req->vhost, req->app, req->port);
// replace the tcUrl in request, // replace the tcUrl in request,
// which will replace the tc_url in client.connect_app(). // which will replace the tc_url in client.connect_app().
@ -150,24 +150,30 @@ srs_error_t SrsBasicRtmpClient::do_connect_app(string local_ip, bool debug)
return err; return err;
} }
srs_error_t SrsBasicRtmpClient::publish() srs_error_t SrsBasicRtmpClient::publish(int chunk_size)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
string stream = srs_generate_stream_with_query(req->host, req->vhost, req->stream, req->param);
// publish. // publish.
if ((err = client->publish(req->stream, stream_id)) != srs_success) { if ((err = client->publish(stream, stream_id, chunk_size)) != srs_success) {
return srs_error_wrap(err, "publish failed, stream=%s, stream_id=%d", req->stream.c_str(), stream_id); return srs_error_wrap(err, "publish failed, stream=%s, stream_id=%d", stream.c_str(), stream_id);
} }
return err; return err;
} }
srs_error_t SrsBasicRtmpClient::play() srs_error_t SrsBasicRtmpClient::play(int chunk_size)
{ {
srs_error_t err = srs_success; srs_error_t err = srs_success;
if ((err = client->play(req->stream, stream_id)) != srs_success) { // Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
return srs_error_wrap(err, "connect with server failed, stream=%s, stream_id=%d", req->stream.c_str(), stream_id); string stream = srs_generate_stream_with_query(req->host, req->vhost, req->stream, req->param);
if ((err = client->play(stream, stream_id, chunk_size)) != srs_success) {
return srs_error_wrap(err, "connect with server failed, stream=%s, stream_id=%d", stream.c_str(), stream_id);
} }
return err; return err;

@ -74,8 +74,8 @@ protected:
virtual srs_error_t connect_app(); virtual srs_error_t connect_app();
virtual srs_error_t do_connect_app(std::string local_ip, bool debug); virtual srs_error_t do_connect_app(std::string local_ip, bool debug);
public: public:
virtual srs_error_t publish(); virtual srs_error_t publish(int chunk_size);
virtual srs_error_t play(); virtual srs_error_t play(int chunk_size);
virtual void kbps_sample(const char* label, int64_t age); virtual void kbps_sample(const char* label, int64_t age);
virtual void kbps_sample(const char* label, int64_t age, int msgs); virtual void kbps_sample(const char* label, int64_t age, int msgs);
virtual int sid(); virtual int sid();

@ -136,6 +136,7 @@ srs_error_t srs_socket_connect(string server, int port, int64_t tm, srs_netfd_t*
srs_assert(!stfd); srs_assert(!stfd);
stfd = st_netfd_open_socket(sock); stfd = st_netfd_open_socket(sock);
if(stfd == NULL){ if(stfd == NULL){
::close(sock);
return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket"); return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket");
} }

@ -150,6 +150,12 @@ void retrieve_local_ips()
for (ifaddrs* p = ifap; p ; p = p->ifa_next) { for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p; ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL. // retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141 // @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET); bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);
@ -164,6 +170,12 @@ void retrieve_local_ips()
for (ifaddrs* p = ifap; p ; p = p->ifa_next) { for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p; ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL. // retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141 // @see: https://github.com/ossrs/srs/issues/141
bool ipv6 = (cur->ifa_addr->sa_family == AF_INET6); bool ipv6 = (cur->ifa_addr->sa_family == AF_INET6);
@ -179,6 +191,12 @@ void retrieve_local_ips()
for (ifaddrs* p = ifap; p ; p = p->ifa_next) { for (ifaddrs* p = ifap; p ; p = p->ifa_next) {
ifaddrs* cur = p; ifaddrs* cur = p;
// Ignore if no address for this interface.
// @see https://github.com/ossrs/srs/issues/1087#issuecomment-408847115
if (!cur->ifa_addr) {
continue;
}
// retrieve IP address, ignore the tun0 network device, whose addr is NULL. // retrieve IP address, ignore the tun0 network device, whose addr is NULL.
// @see: https://github.com/ossrs/srs/issues/141 // @see: https://github.com/ossrs/srs/issues/141
bool ipv4 = (cur->ifa_addr->sa_family == AF_INET); bool ipv4 = (cur->ifa_addr->sa_family == AF_INET);

@ -1523,5 +1523,189 @@ VOID TEST(KernelUtility, AvcUev)
} }
} }
extern void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in);
VOID TEST(KernelUtility, CRC32MakeTable)
{
uint32_t t[256];
// IEEE, @see https://github.com/ossrs/srs/blob/608c88b8f2b352cdbce3b89b9042026ea907e2d3/trunk/src/kernel/srs_kernel_utility.cpp#L770
__crc32_make_table(t, 0x4c11db7, true);
EXPECT_EQ((uint32_t)0x00000000, t[0]);
EXPECT_EQ((uint32_t)0x77073096, t[1]);
EXPECT_EQ((uint32_t)0xEE0E612C, t[2]);
EXPECT_EQ((uint32_t)0x990951BA, t[3]);
EXPECT_EQ((uint32_t)0x076DC419, t[4]);
EXPECT_EQ((uint32_t)0x706AF48F, t[5]);
EXPECT_EQ((uint32_t)0xE963A535, t[6]);
EXPECT_EQ((uint32_t)0x9E6495A3, t[7]);
EXPECT_EQ((uint32_t)0xB3667A2E, t[248]);
EXPECT_EQ((uint32_t)0xC4614AB8, t[249]);
EXPECT_EQ((uint32_t)0x5D681B02, t[250]);
EXPECT_EQ((uint32_t)0x2A6F2B94, t[251]);
EXPECT_EQ((uint32_t)0xB40BBE37, t[252]);
EXPECT_EQ((uint32_t)0xC30C8EA1, t[253]);
EXPECT_EQ((uint32_t)0x5A05DF1B, t[254]);
EXPECT_EQ((uint32_t)0x2D02EF8D, t[255]);
// IEEE, @see https://github.com/ossrs/srs/blob/608c88b8f2b352cdbce3b89b9042026ea907e2d3/trunk/src/kernel/srs_kernel_utility.cpp#L770
__crc32_make_table(t, 0x4c11db7, true);
EXPECT_EQ((uint32_t)0x00000000, t[0]);
EXPECT_EQ((uint32_t)0x77073096, t[1]);
EXPECT_EQ((uint32_t)0xEE0E612C, t[2]);
EXPECT_EQ((uint32_t)0x990951BA, t[3]);
EXPECT_EQ((uint32_t)0x076DC419, t[4]);
EXPECT_EQ((uint32_t)0x706AF48F, t[5]);
EXPECT_EQ((uint32_t)0xE963A535, t[6]);
EXPECT_EQ((uint32_t)0x9E6495A3, t[7]);
EXPECT_EQ((uint32_t)0xB3667A2E, t[248]);
EXPECT_EQ((uint32_t)0xC4614AB8, t[249]);
EXPECT_EQ((uint32_t)0x5D681B02, t[250]);
EXPECT_EQ((uint32_t)0x2A6F2B94, t[251]);
EXPECT_EQ((uint32_t)0xB40BBE37, t[252]);
EXPECT_EQ((uint32_t)0xC30C8EA1, t[253]);
EXPECT_EQ((uint32_t)0x5A05DF1B, t[254]);
EXPECT_EQ((uint32_t)0x2D02EF8D, t[255]);
// MPEG, @see https://github.com/ossrs/srs/blob/608c88b8f2b352cdbce3b89b9042026ea907e2d3/trunk/src/kernel/srs_kernel_utility.cpp#L691
__crc32_make_table(t, 0x4c11db7, false);
EXPECT_EQ((uint32_t)0x00000000, t[0]);
EXPECT_EQ((uint32_t)0x04c11db7, t[1]);
EXPECT_EQ((uint32_t)0x09823b6e, t[2]);
EXPECT_EQ((uint32_t)0x0d4326d9, t[3]);
EXPECT_EQ((uint32_t)0x130476dc, t[4]);
EXPECT_EQ((uint32_t)0x17c56b6b, t[5]);
EXPECT_EQ((uint32_t)0x1a864db2, t[6]);
EXPECT_EQ((uint32_t)0x1e475005, t[7]);
EXPECT_EQ((uint32_t)0xafb010b1, t[248]);
EXPECT_EQ((uint32_t)0xab710d06, t[249]);
EXPECT_EQ((uint32_t)0xa6322bdf, t[250]);
EXPECT_EQ((uint32_t)0xa2f33668, t[251]);
EXPECT_EQ((uint32_t)0xbcb4666d, t[252]);
EXPECT_EQ((uint32_t)0xb8757bda, t[253]);
EXPECT_EQ((uint32_t)0xb5365d03, t[254]);
EXPECT_EQ((uint32_t)0xb1f740b4, t[255]);
// MPEG, @see https://github.com/ossrs/srs/blob/608c88b8f2b352cdbce3b89b9042026ea907e2d3/trunk/src/kernel/srs_kernel_utility.cpp#L691
__crc32_make_table(t, 0x4c11db7, false);
EXPECT_EQ((uint32_t)0x00000000, t[0]);
EXPECT_EQ((uint32_t)0x04c11db7, t[1]);
EXPECT_EQ((uint32_t)0x09823b6e, t[2]);
EXPECT_EQ((uint32_t)0x0d4326d9, t[3]);
EXPECT_EQ((uint32_t)0x130476dc, t[4]);
EXPECT_EQ((uint32_t)0x17c56b6b, t[5]);
EXPECT_EQ((uint32_t)0x1a864db2, t[6]);
EXPECT_EQ((uint32_t)0x1e475005, t[7]);
EXPECT_EQ((uint32_t)0xafb010b1, t[248]);
EXPECT_EQ((uint32_t)0xab710d06, t[249]);
EXPECT_EQ((uint32_t)0xa6322bdf, t[250]);
EXPECT_EQ((uint32_t)0xa2f33668, t[251]);
EXPECT_EQ((uint32_t)0xbcb4666d, t[252]);
EXPECT_EQ((uint32_t)0xb8757bda, t[253]);
EXPECT_EQ((uint32_t)0xb5365d03, t[254]);
EXPECT_EQ((uint32_t)0xb1f740b4, t[255]);
}
VOID TEST(KernelUtility, CRC32IEEE)
{
if (true) {
string datas[] = {
"123456789", "srs", "ossrs.net",
"SRS's a simplest, conceptual integrated, industrial-strength live streaming origin cluster."
};
uint32_t checksums[] = {
0xcbf43926, 0x7df334e9, 0x2f52242b,
0x7e8677bd,
};
for (int i = 0; i < (int)(sizeof(datas)/sizeof(string)); i++) {
string data = datas[i];
uint32_t checksum = checksums[i];
EXPECT_EQ(checksum, srs_crc32_ieee(data.data(), data.length(), 0));
}
uint32_t previous = 0;
for (int i = 0; i < (int)(sizeof(datas)/sizeof(string)); i++) {
string data = datas[i];
previous = srs_crc32_ieee(data.data(), data.length(), previous);
}
EXPECT_EQ((uint32_t)0x431b8785, previous);
}
if (true) {
string data = "123456789srs";
EXPECT_EQ((uint32_t)0xf567b5cf, srs_crc32_ieee(data.data(), data.length(), 0));
}
if (true) {
string data = "123456789";
EXPECT_EQ((uint32_t)0xcbf43926, srs_crc32_ieee(data.data(), data.length(), 0));
data = "srs";
EXPECT_EQ((uint32_t)0xf567b5cf, srs_crc32_ieee(data.data(), data.length(), 0xcbf43926));
}
}
VOID TEST(KernelUtility, CRC32MPEGTS)
{
string datas[] = {
"123456789", "srs", "ossrs.net",
"SRS's a simplest, conceptual integrated, industrial-strength live streaming origin cluster."
};
uint32_t checksums[] = {
0x0376e6e7, 0xd9089591, 0xbd17933f,
0x9f389f7d
};
for (int i = 0; i < (int)(sizeof(datas)/sizeof(string)); i++) {
string data = datas[i];
uint32_t checksum = checksums[i];
EXPECT_EQ(checksum, (uint32_t)srs_crc32_mpegts(data.data(), data.length()));
}
}
VOID TEST(KernelUtility, Base64Decode)
{
string cipher = "dXNlcjpwYXNzd29yZA==";
string expect = "user:password";
string plaintext;
EXPECT_TRUE(srs_success == srs_av_base64_decode(cipher, plaintext));
EXPECT_TRUE(expect == plaintext);
}
VOID TEST(KernelUtility, StringToHex)
{
if (true) {
uint8_t h[16];
EXPECT_EQ(-1, srs_hex_to_data(h, NULL, 0));
EXPECT_EQ(-1, srs_hex_to_data(h, "0", 1));
EXPECT_EQ(-1, srs_hex_to_data(h, "0g", 2));
}
if (true) {
string s = "139056E5A0";
uint8_t h[16];
int n = srs_hex_to_data(h, s.data(), s.length());
EXPECT_EQ(n, 5);
EXPECT_EQ(0x13, h[0]);
EXPECT_EQ(0x90, h[1]);
EXPECT_EQ(0x56, h[2]);
EXPECT_EQ(0xe5, h[3]);
EXPECT_EQ(0xa0, h[4]);
}
}
#endif #endif

@ -33,6 +33,7 @@ using namespace std;
#include <srs_app_st.hpp> #include <srs_app_st.hpp>
#include <srs_protocol_amf0.hpp> #include <srs_protocol_amf0.hpp>
#include <srs_rtmp_stack.hpp> #include <srs_rtmp_stack.hpp>
#include <srs_service_http_conn.hpp>
MockEmptyIO::MockEmptyIO() MockEmptyIO::MockEmptyIO()
{ {
@ -105,6 +106,12 @@ MockBufferIO::~MockBufferIO()
{ {
} }
MockBufferIO* MockBufferIO::append(string data)
{
in_buffer.append(data.data(), data.length());
return this;
}
bool MockBufferIO::is_never_timeout(int64_t tm) bool MockBufferIO::is_never_timeout(int64_t tm)
{ {
return tm == SRS_CONSTS_NO_TMMS; return tm == SRS_CONSTS_NO_TMMS;
@ -559,15 +566,15 @@ VOID TEST(ProtocolUtilityTest, GenerateTcUrl)
string ip; string vhost; string app; int port; string tcUrl; string param; string ip; string vhost; string app; int port; string tcUrl; string param;
ip = "127.0.0.1"; vhost = "__defaultVhost__"; app = "live"; port = 1935; ip = "127.0.0.1"; vhost = "__defaultVhost__"; app = "live"; port = 1935;
tcUrl = srs_generate_tc_url(ip, vhost, app, port, param); tcUrl = srs_generate_tc_url(ip, vhost, app, port);
EXPECT_STREQ("rtmp://127.0.0.1/live", tcUrl.c_str()); EXPECT_STREQ("rtmp://127.0.0.1/live", tcUrl.c_str());
ip = "127.0.0.1"; vhost = "demo"; app = "live"; port = 1935; ip = "127.0.0.1"; vhost = "demo"; app = "live"; port = 1935;
tcUrl = srs_generate_tc_url(ip, vhost, app, port, param); tcUrl = srs_generate_tc_url(ip, vhost, app, port);
EXPECT_STREQ("rtmp://demo/live", tcUrl.c_str()); EXPECT_STREQ("rtmp://demo/live", tcUrl.c_str());
ip = "127.0.0.1"; vhost = "demo"; app = "live"; port = 19351; ip = "127.0.0.1"; vhost = "demo"; app = "live"; port = 19351;
tcUrl = srs_generate_tc_url(ip, vhost, app, port, param); tcUrl = srs_generate_tc_url(ip, vhost, app, port);
EXPECT_STREQ("rtmp://demo:19351/live", tcUrl.c_str()); EXPECT_STREQ("rtmp://demo:19351/live", tcUrl.c_str());
} }
@ -5523,5 +5530,91 @@ VOID TEST(ProtocolRTMPTest, RTMPHandshakeBytes)
EXPECT_TRUE(bytes.s0s1s2 != NULL); EXPECT_TRUE(bytes.s0s1s2 != NULL);
} }
VOID TEST(ProtocolHTTPTest, ParseHTTPMessage)
{
if (true) {
MockBufferIO bio;
SrsHttpParser hp;
bio.append("GET /gslb/v1/versions HTTP/1.1\r\nContent-Length: 5\r\n\r\nHello");
EXPECT_TRUE(0 == hp.initialize(HTTP_REQUEST, false));
if (true) {
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
ASSERT_TRUE(0 == hp.parse_message(&bio, &req));
// We should read body, or next parsing message will fail.
// @see https://github.com/ossrs/srs/issues/1181
EXPECT_FALSE(req->body_reader()->eof());
}
if (true) {
bio.append("GET /gslb/v1/versions HTTP/1.1\r\nContent-Length: 5\r\n\r\nHello");
// Should fail because there is body which not read.
// @see https://github.com/ossrs/srs/issues/1181
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
ASSERT_FALSE(0 == hp.parse_message(&bio, &req));
}
}
if (true) {
MockBufferIO bio;
SrsHttpParser hp;
bio.append("GET /gslb/v1/versions HTTP/1.1\r\nContent-Length: 5\r\n\r\nHello");
ASSERT_TRUE(0 == hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
ASSERT_TRUE(0 == hp.parse_message(&bio, &req));
char v[64] = {0};
EXPECT_TRUE(0 == req->body_reader()->read(v, sizeof(v), NULL));
EXPECT_TRUE(string("Hello") == string(v));
EXPECT_TRUE(req->body_reader()->eof());
}
if (true) {
MockBufferIO bio;
SrsHttpParser hp;
bio.append("GET /gslb/v1/versions HTTP/1.1\r\nContent-Length: 0\r\n\r\n");
ASSERT_TRUE(0 == hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
EXPECT_TRUE(0 == hp.parse_message(&bio, &req));
}
if (true) {
MockBufferIO bio;
SrsHttpParser hp;
bio.append("GET /gslb/v1/versions HTTP/1.1\r\n\r\n");
ASSERT_TRUE(0 == hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
EXPECT_TRUE(0 == hp.parse_message(&bio, &req));
}
if (true) {
MockBufferIO bio;
SrsHttpParser hp;
bio.append("GET /gslb/v1/versions HTTP/1.1\r\n\r\n");
ASSERT_TRUE(0 == hp.initialize(HTTP_REQUEST, false));
ISrsHttpMessage* req = NULL;
SrsAutoFree(ISrsHttpMessage, req);
EXPECT_TRUE(0 == hp.parse_message(&bio, &req));
}
}
#endif #endif

@ -86,6 +86,8 @@ public:
public: public:
MockBufferIO(); MockBufferIO();
virtual ~MockBufferIO(); virtual ~MockBufferIO();
public:
virtual MockBufferIO* append(std::string data);
// for protocol // for protocol
public: public:
virtual bool is_never_timeout(int64_t tm); virtual bool is_never_timeout(int64_t tm);

Loading…
Cancel
Save