Merge branch '2.0release' into develop

pull/499/head
winlin 10 years ago
commit 9759b78da8

@ -13,6 +13,7 @@ RMB 500-999
* [2015-xx-xx xx:xx] xxx
RMB 100-499
* [2015-04-10 19:52] 阳成飞
* [2015-03-30 13:34] 扶凯
* [2015-03-29 11-07] 姚伟斌
* [2015-03-14 20:21] 万伟

@ -613,6 +613,11 @@ vhost with-hls.srs.com {
# whether cleanup the old ts files.
# default: on
hls_cleanup on;
# the max size to notify hls,
# to read max bytes from ts of specified cdn network,
# @remark only used when on_hls_notify is config.
# default: 64
hls_nb_notify 64;
# on_hls, never config in here, should config in http_hooks.
# for the hls http callback, @see http_hooks.on_hls of vhost hooks.callback.srs.com

@ -36,7 +36,7 @@ reload(sys)
exec("sys.setdefaultencoding('utf-8')")
assert sys.getdefaultencoding().lower() == "utf-8"
import os, json, time, datetime, cherrypy, threading
import os, json, time, datetime, cherrypy, threading, urllib2
# simple log functions.
def trace(msg):
@ -308,6 +308,41 @@ class RESTDvrs(object):
return code
'''
handle the hls proxy requests: hls stream.
'''
class RESTProxy(object):
exposed = True
'''
for SRS hook: on_hls_notify
on_hls_notify:
when srs reap a ts file of hls, call this hook,
used to push file to cdn network, by get the ts file from cdn network.
so we use HTTP GET and use the variable following:
[app], replace with the app.
[stream], replace with the stream.
[ts_url], replace with the ts url.
ignore any return data of server.
'''
def GET(self, *args, **kwargs):
enable_crossdomain()
url = "http://" + "/".join(args);
print "start to proxy url: %s"%url
f = None
try:
f = urllib2.urlopen(url)
f.read()
except:
print "error proxy url: %s"%url
finally:
if f: f.close()
print "completed proxy url: %s"%url
return url
'''
handle the hls requests: hls stream.
'''
@ -1195,6 +1230,7 @@ class V1(object):
self.sessions = RESTSessions()
self.dvrs = RESTDvrs()
self.hls = RESTHls()
self.proxy = RESTProxy()
self.chats = RESTChats()
self.servers = RESTServers()
self.nodes = RESTNodes()

@ -87,7 +87,7 @@ int SrsDvrAsyncCallThread::cycle()
for (it = copies.begin(); it != copies.end(); ++it) {
ISrsDvrAsyncCall* call = *it;
if ((ret = call->call()) != ERROR_SUCCESS) {
srs_warn("dvr: ignore callback %s, ret=%d", call->to_string().c_str(), ret);
srs_warn("ignore async callback %s, ret=%d", call->to_string().c_str(), ret);
}
srs_freep(call);
}

@ -1487,7 +1487,7 @@ int SrsConfig::check_config()
string m = conf->at(j)->name.c_str();
if (m != "enabled" && m != "hls_entry_prefix" && m != "hls_path" && m != "hls_fragment" && m != "hls_window" && m != "hls_on_error"
&& m != "hls_storage" && m != "hls_mount" && m != "hls_td_ratio" && m != "hls_aof_ratio" && m != "hls_acodec" && m != "hls_vcodec"
&& m != "hls_m3u8_file" && m != "hls_ts_file" && m != "hls_ts_floor" && m != "hls_cleanup"
&& m != "hls_m3u8_file" && m != "hls_ts_file" && m != "hls_ts_floor" && m != "hls_cleanup" && m != "hls_nb_notify"
) {
ret = ERROR_SYSTEM_CONFIG_INVALID;
srs_error("unsupported vhost hls directive %s, ret=%d", m.c_str(), ret);
@ -3418,6 +3418,22 @@ string SrsConfig::get_hls_vcodec(string vhost)
return conf->arg0();
}
int SrsConfig::get_vhost_hls_nb_notify(string vhost)
{
SrsConfDirective* conf = get_hls(vhost);
if (!conf) {
return SRS_CONF_DEFAULT_HLS_NB_NOTIFY;
}
conf = conf->get("hls_nb_notify");
if (!conf || conf->arg0().empty()) {
return SRS_CONF_DEFAULT_HLS_NB_NOTIFY;
}
return ::atoi(conf->arg0().c_str());
}
bool SrsConfig::get_hls_cleanup(string vhost)
{
SrsConfDirective* hls = get_hls(vhost);

@ -63,6 +63,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_HLS_ACODEC "aac"
#define SRS_CONF_DEFAULT_HLS_VCODEC "h264"
#define SRS_CONF_DEFAULT_HLS_CLEANUP true
#define SRS_CONF_DEFAULT_HLS_NB_NOTIFY 64
#define SRS_CONF_DEFAULT_DVR_PATH "./objs/nginx/html/[app]/[stream].[timestamp].flv"
#define SRS_CONF_DEFAULT_DVR_PLAN_SESSION "session"
#define SRS_CONF_DEFAULT_DVR_PLAN_SEGMENT "segment"
@ -959,8 +960,12 @@ public:
* whether cleanup the old ts files.
*/
virtual bool get_hls_cleanup(std::string vhost);
// hds section
/**
* get the size of bytes to read from cdn network, for the on_hls_notify callback,
* that is, to read max bytes of the bytes from the callback, or timeout or error.
*/
virtual int get_vhost_hls_nb_notify(std::string vhost);
// hds section
private:
/**
* get the hds directive of vhost.

@ -240,10 +240,11 @@ int SrsDvrAsyncCallOnHlsNotify::call()
return ret;
}
int nb_notify = _srs_config->get_vhost_hls_nb_notify(req->vhost);
for (int i = 0; i < (int)on_hls->args.size(); i++) {
std::string url = on_hls->args.at(i);
if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url)) != ERROR_SUCCESS) {
srs_error("hook client on_hls_notify failed. url=%s, ret=%d", url.c_str(), ret);
if ((ret = SrsHttpHooks::on_hls_notify(url, req, ts_url, nb_notify)) != ERROR_SUCCESS) {
srs_error("hook client on_hls_notify failed. url=%s, ts=%s, ret=%d", url.c_str(), ts_url.c_str(), ret);
return ret;
}
}

@ -53,6 +53,9 @@ using namespace std;
#define SRS_CONSTS_HTTP_PUT HTTP_PUT
#define SRS_CONSTS_HTTP_DELETE HTTP_DELETE
// for ead all of http body, read each time.
#define SRS_HTTP_READ_CACHE_BYTES 4096
#define SRS_HTTP_DEFAULT_PAGE "index.html"
int srs_go_http_response_json(ISrsHttpResponseWriter* w, string data)
@ -889,7 +892,8 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i
skt = io;
owner = msg;
is_eof = false;
nb_read = 0;
nb_total_read = 0;
nb_left_chunk = 0;
buffer = NULL;
}
@ -901,6 +905,8 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body)
{
int ret = ERROR_SUCCESS;
nb_left_chunk = 0;
nb_total_read = 0;
buffer = body;
return ret;
@ -911,7 +917,7 @@ bool SrsHttpResponseReader::eof()
return is_eof;
}
int SrsHttpResponseReader::read(std::string& data)
int SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
@ -923,95 +929,115 @@ int SrsHttpResponseReader::read(std::string& data)
// chunked encoding.
if (owner->is_chunked()) {
return read_chunked(data);
return read_chunked(data, nb_data, nb_read);
}
// read by specified content-length
int max = (int)owner->content_length() - (int)nb_read;
int max = (int)owner->content_length() - (int)nb_total_read;
if (max <= 0) {
is_eof = true;
return ret;
}
return read_specified(max, data);
// change the max to read.
nb_data = srs_min(nb_data, max);
return read_specified(data, nb_data, nb_read);
}
int SrsHttpResponseReader::read_chunked(std::string& data)
int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
// when no bytes left in chunk,
// parse the chunk length first.
char* at = NULL;
int length = 0;
while (!at) {
// find the CRLF of chunk header end.
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p < end - 1; p++) {
if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) {
// invalid chunk, ignore.
if (p == start) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header start with CRLF. ret=%d", ret);
return ret;
if (nb_left_chunk <= 0) {
char* at = NULL;
int length = 0;
while (!at) {
// find the CRLF of chunk header end.
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p < end - 1; p++) {
if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) {
// invalid chunk, ignore.
if (p == start) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header start with CRLF. ret=%d", ret);
return ret;
}
length = (int)(p - start + 2);
at = buffer->read_slice(length);
break;
}
length = (int)(p - start + 2);
at = buffer->read_slice(length);
}
// got at, ok.
if (at) {
break;
}
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
}
srs_assert(length >= 3);
// got at, ok.
if (at) {
break;
}
// it's ok to set the pos and pos+1 to NULL.
at[length - 1] = 0;
at[length - 2] = 0;
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
// size is the bytes size, excludes the chunk header and end CRLF.
int ilength = (int)::strtol(at, NULL, 16);
if (ilength < 0) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header negative, length=%d. ret=%d", ilength, ret);
return ret;
}
// all bytes in chunk is left now.
nb_left_chunk = ilength;
}
srs_assert(length >= 3);
// it's ok to set the pos and pos+1 to NULL.
at[length - 1] = 0;
at[length - 2] = 0;
// left bytes in chunk, read some.
srs_assert(nb_left_chunk);
// size is the bytes size, excludes the chunk header and end CRLF.
int ilength = (int)::strtol(at, NULL, 16);
if (ilength < 0) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header negative, length=%d. ret=%d", ilength, ret);
return ret;
int nb_bytes = srs_min(nb_left_chunk, nb_data);
ret = read_specified(data, nb_bytes, &nb_bytes);
// the nb_bytes used for output already read size of bytes.
if (nb_read) {
*nb_read = nb_bytes;
}
nb_left_chunk -= nb_bytes;
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
// error or still left bytes in chunk, ignore and read in future.
if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) {
return ret;
}
srs_info("http: read %d chunk", ilength);
srs_info("http: read %d bytes of chunk", nb_bytes);
// read payload when length specifies some payload.
if (ilength <= 0) {
if (nb_left_chunk <= 0) {
is_eof = true;
} else {
srs_assert(ilength);
data.append(buffer->read_slice(ilength), ilength);
nb_read += ilength;
}
// the CRLF of chunk payload end.
if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read EOF of chunk from server failed. ret=%d", ret);
}
return ret;
}
buffer->read_slice(2);
return ret;
}
int SrsHttpResponseReader::read_specified(int max, std::string& data)
int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
@ -1025,14 +1051,21 @@ int SrsHttpResponseReader::read_specified(int max, std::string& data)
}
}
int nb_bytes = srs_min(max, buffer->size());
int nb_bytes = srs_min(nb_data, buffer->size());
// read data to buffer.
srs_assert(nb_bytes);
data.append(buffer->read_slice(nb_bytes), nb_bytes);
nb_read += nb_bytes;
char* p = buffer->read_slice(nb_bytes);
memcpy(data, p, nb_bytes);
if (nb_read) {
*nb_read = nb_bytes;
}
// increase the total read to determine whether EOF.
nb_total_read += nb_bytes;
// when read completed, eof.
if (nb_read >= (int)owner->content_length()) {
if (nb_total_read >= (int)owner->content_length()) {
is_eof = true;
}
@ -1223,11 +1256,20 @@ int SrsHttpMessage::body_read_all(string& body)
{
int ret = ERROR_SUCCESS;
// cache to read.
char* buf = new char[SRS_HTTP_READ_CACHE_BYTES];
SrsAutoFree(char, buf);
// whatever, read util EOF.
while (!_body->eof()) {
if ((ret = _body->read(body)) != ERROR_SUCCESS) {
int nb_read = 0;
if ((ret = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != ERROR_SUCCESS) {
return ret;
}
if (nb_read > 0) {
body.append(buf, nb_read);
}
}
return ret;

@ -196,10 +196,13 @@ public:
*/
virtual bool eof() = 0;
/**
* read from the response body.
* @remark when eof(), return error.
*/
virtual int read(std::string& data) = 0;
* read from the response body.
* @param data, the buffer to read data buffer to.
* @param nb_data, the max size of data buffer.
* @param nb_read, the actual read size of bytes. NULL to ignore.
* @remark when eof(), return error.
*/
virtual int read(char* data, int nb_data, int* nb_read) = 0;
};
// Objects implementing the Handler interface can be
@ -431,7 +434,10 @@ private:
SrsHttpMessage* owner;
SrsFastBuffer* buffer;
bool is_eof;
int64_t nb_read;
// the left bytes in chunk.
int nb_left_chunk;
// already read total bytes.
int64_t nb_total_read;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io);
virtual ~SrsHttpResponseReader();
@ -443,10 +449,10 @@ public:
// interface ISrsHttpResponseReader
public:
virtual bool eof();
virtual int read(std::string& data);
virtual int read(char* data, int nb_data, int* nb_read);
private:
virtual int read_chunked(std::string& data);
virtual int read_specified(int max, std::string& data);
virtual int read_chunked(char* data, int nb_data, int* nb_read);
virtual int read_specified(char* data, int nb_data, int* nb_read);
};
// for http header.

@ -37,15 +37,13 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_core_autofree.hpp>
// when error, http client sleep for a while and retry.
#define SRS_HTTP_CLIENT_SLEEP_US (int64_t)(3*1000*1000LL)
SrsHttpClient::SrsHttpClient()
{
connected = false;
stfd = NULL;
skt = NULL;
parser = NULL;
timeout_us = 0;
}
SrsHttpClient::~SrsHttpClient()
@ -54,7 +52,7 @@ SrsHttpClient::~SrsHttpClient()
srs_freep(parser);
}
int SrsHttpClient::initialize(string h, int p)
int SrsHttpClient::initialize(string h, int p, int64_t t_us)
{
int ret = ERROR_SUCCESS;
@ -68,6 +66,7 @@ int SrsHttpClient::initialize(string h, int p)
host = h;
port = p;
timeout_us = t_us;
return ret;
}
@ -183,10 +182,9 @@ int SrsHttpClient::connect()
disconnect();
// open socket.
int64_t timeout = SRS_HTTP_CLIENT_SLEEP_US;
if ((ret = srs_socket_connect(host, port, timeout, &stfd)) != ERROR_SUCCESS) {
if ((ret = srs_socket_connect(host, port, timeout_us, &stfd)) != ERROR_SUCCESS) {
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d",
host.c_str(), port, timeout, ret);
host.c_str(), port, timeout_us, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host, port);
@ -195,6 +193,10 @@ int SrsHttpClient::connect()
skt = new SrsStSocket(stfd);
connected = true;
// set the recv/send timeout in us.
skt->set_recv_timeout(timeout_us);
skt->set_send_timeout(timeout_us);
return ret;
}

@ -40,6 +40,9 @@ class SrsHttpParser;
class SrsHttpMessage;
class SrsStSocket;
// the default timeout for http client.
#define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL)
/**
* http client to GET/POST/PUT/DELETE uri
*/
@ -51,6 +54,7 @@ private:
SrsStSocket* skt;
SrsHttpParser* parser;
private:
int64_t timeout_us;
// host name or ip.
std::string host;
int port;
@ -61,7 +65,7 @@ public:
/**
* initialize the client, connect to host and port.
*/
virtual int initialize(std::string h, int p);
virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US);
public:
/**
* to post data to the uri.

@ -42,8 +42,12 @@ using namespace std;
#define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS)
#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER 32 * 1024
// the timeout for hls notify, in us.
#define SRS_HLS_NOTIFY_TIMEOUT_US (int64_t)(10*1000*1000LL)
SrsHttpHooks::SrsHttpHooks()
{
}
@ -326,7 +330,7 @@ int SrsHttpHooks::on_hls(string url, SrsRequest* req, string file, int sn, doubl
return ret;
}
int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url)
int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts_url, int nb_notify)
{
int ret = ERROR_SUCCESS;
@ -341,6 +345,8 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts
url = srs_string_replace(url, "[stream]", req->stream);
url = srs_string_replace(url, "[ts_url]", ts_url);
int64_t starttime = srs_update_system_time_ms();
SrsHttpUri uri;
if ((ret = uri.initialize(url)) != ERROR_SUCCESS) {
srs_error("http: post failed. url=%s, ret=%d", url.c_str(), ret);
@ -348,7 +354,7 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts
}
SrsHttpClient http;
if ((ret = http.initialize(uri.get_host(), uri.get_port())) != ERROR_SUCCESS) {
if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT_US)) != ERROR_SUCCESS) {
return ret;
}
@ -358,16 +364,23 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts
}
SrsAutoFree(SrsHttpMessage, msg);
int nb_buf = srs_min(nb_notify, SRS_HTTP_READ_BUFFER);
char* buf = new char[nb_buf];
SrsAutoFree(char, buf);
int nb_read = 0;
ISrsHttpResponseReader* br = msg->body_reader();
while (!br->eof()) {
std::string data;
if ((ret = br->read(data)) != ERROR_SUCCESS) {
while (nb_read < nb_notify && !br->eof()) {
int nb_bytes = 0;
if ((ret = br->read(buf, nb_buf, &nb_bytes)) != ERROR_SUCCESS) {
break;
}
nb_read += nb_bytes;
}
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, ret=%d",
client_id, url.c_str(), msg->status_code(), ret);
int spenttime = (int)(srs_update_system_time_ms() - starttime);
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, read=%dB, ret=%d",
client_id, url.c_str(), msg->status_code(), spenttime, nb_read, ret);
// ignore any error for on_hls_notify.
ret = ERROR_SUCCESS;

@ -110,8 +110,9 @@ public:
* @param url the api server url, to process the event.
* ignore if empty.
* @param ts_url the ts uri, used to replace the variable [ts_url] in url.
* @param nb_notify the max bytes to read from notify server.
*/
static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url);
static int on_hls_notify(std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
private:
static int do_post(std::string url, std::string req, int& code, std::string& res);
};

@ -134,7 +134,7 @@ int64_t srs_update_system_time_ms()
if (_srs_system_time_us_cache <= 0) {
_srs_system_time_us_cache = now_us;
_srs_system_time_startup_time = now_us;
return _srs_system_time_us_cache;
return _srs_system_time_us_cache / 1000;
}
// use relative time.
@ -151,7 +151,7 @@ int64_t srs_update_system_time_ms()
srs_info("system time updated, startup=%"PRId64"us, now=%"PRId64"us",
_srs_system_time_startup_time, _srs_system_time_us_cache);
return _srs_system_time_us_cache;
return _srs_system_time_us_cache / 1000;
}
string srs_dns_resolve(string host)
@ -390,7 +390,7 @@ bool srs_aac_startswith_adts(SrsStream* stream)
char* bytes = stream->data() + stream->pos();
char* p = bytes;
if (!stream->require(p - bytes + 2)) {
if (!stream->require((int)(p - bytes) + 2)) {
return false;
}

Loading…
Cancel
Save