merge from 2.0

pull/499/head
winlin 10 years ago
commit d25eb0aaa2

@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_rtmp_buffer.hpp> #include <srs_rtmp_buffer.hpp>
#include <stdlib.h>
#include <srs_kernel_error.hpp> #include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp> #include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp> #include <srs_kernel_utility.hpp>
@ -58,10 +60,16 @@ SrsFastBuffer::SrsFastBuffer()
#endif #endif
nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE; nb_buffer = SRS_DEFAULT_RECV_BUFFER_SIZE;
buffer = new char[nb_buffer]; buffer = (char*)malloc(nb_buffer);
p = end = buffer; p = end = buffer;
} }
SrsFastBuffer::~SrsFastBuffer()
{
free(buffer);
buffer = NULL;
}
int SrsFastBuffer::size() int SrsFastBuffer::size()
{ {
return end - p; return end - p;
@ -75,33 +83,23 @@ char* SrsFastBuffer::bytes()
void SrsFastBuffer::set_buffer(int buffer_size) void SrsFastBuffer::set_buffer(int buffer_size)
{ {
// the user-space buffer size limit to a max value. // the user-space buffer size limit to a max value.
int nb_max_buf = srs_min(buffer_size, SRS_MAX_SOCKET_BUFFER); int nb_resize_buf = srs_min(buffer_size, SRS_MAX_SOCKET_BUFFER);
if (nb_max_buf < buffer_size) { if (buffer_size > SRS_MAX_SOCKET_BUFFER) {
srs_warn("limit the user-space buffer from %d to %d", buffer_size, nb_max_buf); srs_warn("limit the user-space buffer from %d to %d", buffer_size, SRS_MAX_SOCKET_BUFFER);
} }
// only realloc when buffer changed bigger // only realloc when buffer changed bigger
if (nb_max_buf <= nb_buffer) { if (nb_resize_buf <= nb_buffer) {
return; return;
} }
int start = p - buffer; int start = p - buffer;
int cap = end - p; int nb_bytes = end - p;
char* buf = new char[nb_max_buf];
if (cap > 0) {
memcpy(buf, buffer, nb_buffer);
}
srs_freep(buffer);
buffer = buf; buffer = (char*)realloc(buffer, nb_resize_buf);
nb_buffer = nb_resize_buf;
p = buffer + start; p = buffer + start;
end = p + cap; end = p + nb_bytes;
}
SrsFastBuffer::~SrsFastBuffer()
{
srs_freep(buffer);
} }
char SrsFastBuffer::read_1byte() char SrsFastBuffer::read_1byte()
@ -117,12 +115,6 @@ char* SrsFastBuffer::read_slice(int size)
char* ptr = p; char* ptr = p;
p += size; p += size;
// reset when consumed all.
if (p == end) {
p = end = buffer;
srs_verbose("all consumed, reset fast buffer");
}
return ptr; return ptr;
} }
@ -138,7 +130,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
// generally the required size is ok. // already got required size of bytes.
if (end - p >= required_size) { if (end - p >= required_size) {
return ret; return ret;
} }
@ -146,31 +138,42 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
// must be positive. // must be positive.
srs_assert(required_size > 0); srs_assert(required_size > 0);
// when read payload or there is no space to read, // the free space of buffer,
// reset the buffer with exists bytes. // buffer = consumed_bytes + exists_bytes + free_space.
int max_to_read = buffer + nb_buffer - end; int nb_free_space = buffer + nb_buffer - end;
if (required_size > SRS_RTMP_MAX_MESSAGE_HEADER || max_to_read < required_size) { // resize the space when no left space.
int nb_cap = end - p; if (nb_free_space < required_size) {
srs_verbose("move fast buffer %d bytes", nb_cap); // the bytes already in buffer
if (nb_cap < nb_buffer) { int nb_exists_bytes = end - p;
buffer = (char*)memmove(buffer, p, nb_cap); srs_assert(nb_exists_bytes >= 0);
srs_verbose("move fast buffer %d bytes", nb_exists_bytes);
if (!nb_exists_bytes) {
// reset when buffer is empty.
p = end = buffer;
srs_verbose("all consumed, reset fast buffer");
} else {
// move the left bytes to start of buffer.
srs_assert(nb_exists_bytes < nb_buffer);
buffer = (char*)memmove(buffer, p, nb_exists_bytes);
p = buffer; p = buffer;
end = p + nb_cap; end = p + nb_exists_bytes;
} }
} }
// directly check the available bytes to read in buffer. // check whether enough free space in buffer.
max_to_read = buffer + nb_buffer - end; nb_free_space = buffer + nb_buffer - end;
if (max_to_read < required_size) { if (nb_free_space < required_size) {
ret = ERROR_READER_BUFFER_OVERFLOW; ret = ERROR_READER_BUFFER_OVERFLOW;
srs_error("buffer overflow, required=%d, max=%d, ret=%d", required_size, nb_buffer, ret); srs_error("buffer overflow, required=%d, max=%d, left=%d, ret=%d",
required_size, nb_buffer, nb_free_space, ret);
return ret; return ret;
} }
// buffer is ok, read required size of bytes. // buffer is ok, read required size of bytes.
while (end - p < required_size) { while (end - p < required_size) {
ssize_t nread; ssize_t nread;
if ((ret = reader->read(end, max_to_read, &nread)) != ERROR_SUCCESS) { if ((ret = reader->read(end, nb_free_space, &nread)) != ERROR_SUCCESS) {
return ret; return ret;
} }
@ -189,7 +192,7 @@ int SrsFastBuffer::grow(ISrsBufferReader* reader, int required_size)
// we just move the ptr to next. // we just move the ptr to next.
srs_assert((int)nread > 0); srs_assert((int)nread > 0);
end += nread; end += nread;
max_to_read -= nread; nb_free_space -= nread;
} }
return ret; return ret;

@ -59,6 +59,12 @@ public:
/** /**
* the buffer provices bytes cache for protocol. generally, * the buffer provices bytes cache for protocol. generally,
* protocol recv data from socket, put into buffer, decode to RTMP message. * protocol recv data from socket, put into buffer, decode to RTMP message.
* Usage:
* ISrsBufferReader* r = ......;
* SrsFastBuffer* fb = ......;
* fb->grow(r, 1024);
* char* header = fb->read_slice(100);
* char* payload = fb->read_payload(924);
*/ */
// TODO: FIXME: add utest for it. // TODO: FIXME: add utest for it.
class SrsFastBuffer class SrsFastBuffer
@ -79,7 +85,7 @@ private:
// ptr to the buffer. // ptr to the buffer.
// buffer <= p <= end <= buffer+nb_buffer // buffer <= p <= end <= buffer+nb_buffer
char* buffer; char* buffer;
// the max size of buffer. // the size of buffer.
int nb_buffer; int nb_buffer;
public: public:
SrsFastBuffer(); SrsFastBuffer();
@ -97,7 +103,7 @@ public:
virtual char* bytes(); virtual char* bytes();
/** /**
* create buffer with specifeid size. * create buffer with specifeid size.
* @param buffer the size of buffer. * @param buffer the size of buffer. ignore when smaller than SRS_MAX_SOCKET_BUFFER.
* @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K. * @remark when MR(SRS_PERF_MERGED_READ) disabled, always set to 8K.
* @remark when buffer changed, the previous ptr maybe invalid. * @remark when buffer changed, the previous ptr maybe invalid.
* @see https://github.com/winlinvip/simple-rtmp-server/issues/241 * @see https://github.com/winlinvip/simple-rtmp-server/issues/241
@ -112,8 +118,8 @@ public:
/** /**
* read a slice in size bytes, move to next bytes. * read a slice in size bytes, move to next bytes.
* user can use this char* ptr directly, and should never free it. * user can use this char* ptr directly, and should never free it.
* @remark assert buffer already grow(size). * @remark user can use the returned ptr util grow(size),
* @remark the ptr returned maybe invalid after grow(x). * for the ptr returned maybe invalid after grow(x).
*/ */
virtual char* read_slice(int size); virtual char* read_slice(int size);
/** /**

Loading…
Cancel
Save