refine hls on_hls_notify, read a little of ts.

pull/133/head
winlin 10 years ago
parent 1f93fb3399
commit 5c00ce8a96

@ -53,6 +53,9 @@ using namespace std;
#define SRS_CONSTS_HTTP_PUT HTTP_PUT
#define SRS_CONSTS_HTTP_DELETE HTTP_DELETE
// for ead all of http body, read each time.
#define SRS_HTTP_READ_CACHE_BYTES 4096
#define SRS_HTTP_DEFAULT_PAGE "index.html"
int srs_go_http_response_json(ISrsHttpResponseWriter* w, string data)
@ -889,7 +892,8 @@ SrsHttpResponseReader::SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* i
skt = io;
owner = msg;
is_eof = false;
nb_read = 0;
nb_total_read = 0;
nb_left_chunk = 0;
buffer = NULL;
}
@ -901,6 +905,8 @@ int SrsHttpResponseReader::initialize(SrsFastBuffer* body)
{
int ret = ERROR_SUCCESS;
nb_left_chunk = 0;
nb_total_read = 0;
buffer = body;
return ret;
@ -911,7 +917,7 @@ bool SrsHttpResponseReader::eof()
return is_eof;
}
int SrsHttpResponseReader::read(std::string& data)
int SrsHttpResponseReader::read(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
@ -923,95 +929,115 @@ int SrsHttpResponseReader::read(std::string& data)
// chunked encoding.
if (owner->is_chunked()) {
return read_chunked(data);
return read_chunked(data, nb_data, nb_read);
}
// read by specified content-length
int max = (int)owner->content_length() - (int)nb_read;
int max = (int)owner->content_length() - (int)nb_total_read;
if (max <= 0) {
is_eof = true;
return ret;
}
return read_specified(max, data);
// change the max to read.
nb_data = srs_min(nb_data, max);
return read_specified(data, nb_data, nb_read);
}
int SrsHttpResponseReader::read_chunked(std::string& data)
int SrsHttpResponseReader::read_chunked(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
// when no bytes left in chunk,
// parse the chunk length first.
char* at = NULL;
int length = 0;
while (!at) {
// find the CRLF of chunk header end.
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p < end - 1; p++) {
if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) {
// invalid chunk, ignore.
if (p == start) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header start with CRLF. ret=%d", ret);
return ret;
if (nb_left_chunk <= 0) {
char* at = NULL;
int length = 0;
while (!at) {
// find the CRLF of chunk header end.
char* start = buffer->bytes();
char* end = start + buffer->size();
for (char* p = start; p < end - 1; p++) {
if (p[0] == SRS_HTTP_CR && p[1] == SRS_HTTP_LF) {
// invalid chunk, ignore.
if (p == start) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header start with CRLF. ret=%d", ret);
return ret;
}
length = (int)(p - start + 2);
at = buffer->read_slice(length);
break;
}
length = (int)(p - start + 2);
at = buffer->read_slice(length);
}
// got at, ok.
if (at) {
break;
}
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
return ret;
}
}
srs_assert(length >= 3);
// got at, ok.
if (at) {
break;
}
// it's ok to set the pos and pos+1 to NULL.
at[length - 1] = 0;
at[length - 2] = 0;
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, buffer->size() + 1)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
// size is the bytes size, excludes the chunk header and end CRLF.
int ilength = (int)::strtol(at, NULL, 16);
if (ilength < 0) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header negative, length=%d. ret=%d", ilength, ret);
return ret;
}
// all bytes in chunk is left now.
nb_left_chunk = ilength;
}
srs_assert(length >= 3);
// it's ok to set the pos and pos+1 to NULL.
at[length - 1] = 0;
at[length - 2] = 0;
// left bytes in chunk, read some.
srs_assert(nb_left_chunk);
// size is the bytes size, excludes the chunk header and end CRLF.
int ilength = (int)::strtol(at, NULL, 16);
if (ilength < 0) {
ret = ERROR_HTTP_INVALID_CHUNK_HEADER;
srs_error("chunk header negative, length=%d. ret=%d", ilength, ret);
return ret;
int nb_bytes = srs_min(nb_left_chunk, nb_data);
ret = read_specified(data, nb_bytes, &nb_bytes);
// the nb_bytes used for output already read size of bytes.
if (nb_read) {
*nb_read = nb_bytes;
}
nb_left_chunk -= nb_bytes;
// when empty, only grow 1bytes, but the buffer will cache more.
if ((ret = buffer->grow(skt, ilength + 2)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read body from server failed. ret=%d", ret);
}
// error or still left bytes in chunk, ignore and read in future.
if (nb_left_chunk > 0 || (ret != ERROR_SUCCESS)) {
return ret;
}
srs_info("http: read %d chunk", ilength);
srs_info("http: read %d bytes of chunk", nb_bytes);
// read payload when length specifies some payload.
if (ilength <= 0) {
if (nb_left_chunk <= 0) {
is_eof = true;
} else {
srs_assert(ilength);
data.append(buffer->read_slice(ilength), ilength);
nb_read += ilength;
}
// the CRLF of chunk payload end.
if ((ret = buffer->grow(skt, 2)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("read EOF of chunk from server failed. ret=%d", ret);
}
return ret;
}
buffer->read_slice(2);
return ret;
}
int SrsHttpResponseReader::read_specified(int max, std::string& data)
int SrsHttpResponseReader::read_specified(char* data, int nb_data, int* nb_read)
{
int ret = ERROR_SUCCESS;
@ -1025,14 +1051,21 @@ int SrsHttpResponseReader::read_specified(int max, std::string& data)
}
}
int nb_bytes = srs_min(max, buffer->size());
int nb_bytes = srs_min(nb_data, buffer->size());
// read data to buffer.
srs_assert(nb_bytes);
data.append(buffer->read_slice(nb_bytes), nb_bytes);
nb_read += nb_bytes;
char* p = buffer->read_slice(nb_bytes);
memcpy(data, p, nb_bytes);
if (nb_read) {
*nb_read = nb_bytes;
}
// increase the total read to determine whether EOF.
nb_total_read += nb_bytes;
// when read completed, eof.
if (nb_read >= (int)owner->content_length()) {
if (nb_total_read >= (int)owner->content_length()) {
is_eof = true;
}
@ -1223,11 +1256,19 @@ int SrsHttpMessage::body_read_all(string& body)
{
int ret = ERROR_SUCCESS;
// cache to read.
char* buf = new char[SRS_HTTP_READ_CACHE_BYTES];
SrsAutoFree(char, buf);
// whatever, read util EOF.
while (!_body->eof()) {
if ((ret = _body->read(body)) != ERROR_SUCCESS) {
int nb_read = 0;
if ((ret = _body->read(buf, SRS_HTTP_READ_CACHE_BYTES, &nb_read)) != ERROR_SUCCESS) {
return ret;
}
srs_assert (nb_read > 0);
body.append(buf, nb_read);
}
return ret;

@ -196,10 +196,13 @@ public:
*/
virtual bool eof() = 0;
/**
* read from the response body.
* @remark when eof(), return error.
*/
virtual int read(std::string& data) = 0;
* read from the response body.
* @param data, the buffer to read data buffer to.
* @param nb_data, the max size of data buffer.
* @param nb_read, the actual read size of bytes. NULL to ignore.
* @remark when eof(), return error.
*/
virtual int read(char* data, int nb_data, int* nb_read) = 0;
};
// Objects implementing the Handler interface can be
@ -431,7 +434,10 @@ private:
SrsHttpMessage* owner;
SrsFastBuffer* buffer;
bool is_eof;
int64_t nb_read;
// the left bytes in chunk.
int nb_left_chunk;
// already read total bytes.
int64_t nb_total_read;
public:
SrsHttpResponseReader(SrsHttpMessage* msg, SrsStSocket* io);
virtual ~SrsHttpResponseReader();
@ -443,10 +449,10 @@ public:
// interface ISrsHttpResponseReader
public:
virtual bool eof();
virtual int read(std::string& data);
virtual int read(char* data, int nb_data, int* nb_read);
private:
virtual int read_chunked(std::string& data);
virtual int read_specified(int max, std::string& data);
virtual int read_chunked(char* data, int nb_data, int* nb_read);
virtual int read_specified(char* data, int nb_data, int* nb_read);
};
// for http header.

@ -363,17 +363,16 @@ int SrsHttpHooks::on_hls_notify(std::string url, SrsRequest* req, std::string ts
}
SrsAutoFree(SrsHttpMessage, msg);
int nb_read = 0;
ISrsHttpResponseReader* br = msg->body_reader();
while (!br->eof()) {
std::string data;
// for notify, only read some data.
ret = br->read(data);
break;
if (!br->eof()) {
char buf[64]; // only read a little of bytes of ts.
ret = br->read(buf, 64, &nb_read);
}
int spenttime = (int)(srs_update_system_time_ms() - starttime);
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, ret=%d",
client_id, url.c_str(), msg->status_code(), spenttime, ret);
srs_trace("http hook on_hls_notify success. client_id=%d, url=%s, code=%d, spent=%dms, read=%dB, ret=%d",
client_id, url.c_str(), msg->status_code(), spenttime, nb_read, ret);
// ignore any error for on_hls_notify.
ret = ERROR_SUCCESS;

Loading…
Cancel
Save