Improve file writer performance by fwrite with cache. v5.0.133 (#3308)

* SrsFileWriter leverages libc buffer to boost dvr write speed.

* Refactor SrsFileWriter to use libc file functions mockable

* Add utest and refine code.

Co-authored-by: winlin <winlin@vip.126.com>
pull/3370/head
stone 2 years ago committed by GitHub
parent fb1790230b
commit 25eb21efe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -8,6 +8,7 @@ The changelog for SRS.
## SRS 5.0 Changelog
* v5.0, 2023-01-08, Merge [#3308](https://github.com/ossrs/srs/pull/3308): DVR: Improve file write performance by fwrite with cache. v5.0.133
* v5.0, 2023-01-06, DVR: Support blackbox test based on hooks. v5.0.132
* v5.0, 2023-01-06, FFmpeg: Support build with FFmpeg native opus. v5.0.131 (#3140)
* v5.0, 2023-01-05, CORS: Refine HTTP CORS headers. v5.0.130

@ -26,6 +26,8 @@ using namespace std;
#include <srs_kernel_mp4.hpp>
#include <srs_app_fragment.hpp>
#define SRS_FWRITE_CACHE_SIZE 65536
SrsDvrSegmenter::SrsDvrSegmenter()
{
req = NULL;
@ -95,6 +97,11 @@ srs_error_t SrsDvrSegmenter::open()
return srs_error_wrap(err, "open file %s", path.c_str());
}
// Set libc file write cache buffer size
if ((err = fs->set_iobuf_size(SRS_FWRITE_CACHE_SIZE)) != srs_success) {
return srs_error_wrap(err, "set iobuf size for file %s", path.c_str());
}
// initialize the encoder.
if ((err = open_encoder()) != srs_success) {
return srs_error_wrap(err, "open encoder");

@ -9,6 +9,6 @@
#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 132
#define VERSION_REVISION 133
#endif

@ -105,6 +105,8 @@
XX(ERROR_BACKTRACE_PARSE_NOT_SUPPORT , 1092, "BacktraceParseNotSupport", "Backtrace parse not supported") \
XX(ERROR_BACKTRACE_PARSE_OFFSET , 1093, "BacktraceParseOffset", "Parse backtrace offset failed") \
XX(ERROR_BACKTRACE_ADDR2LINE , 1094, "BacktraceAddr2Line", "Backtrace addr2line failed") \
XX(ERROR_SYSTEM_FILE_NOT_OPEN , 1095, "FileNotOpen", "File is not opened") \
XX(ERROR_SYSTEM_FILE_SETVBUF , 1096, "FileSetVBuf", "Failed to set file vbuf") \
/**************************************************/
/* RTMP protocol error. */

@ -26,32 +26,57 @@ srs_read_t _srs_read_fn = ::read;
srs_lseek_t _srs_lseek_fn = ::lseek;
srs_close_t _srs_close_fn = ::close;
srs_fopen_t _srs_fopen_fn = ::fopen;
srs_fwrite_t _srs_fwrite_fn = ::fwrite;
srs_fread_t _srs_fread_fn = ::fread;
srs_fseek_t _srs_fseek_fn = ::fseek;
srs_fclose_t _srs_fclose_fn = ::fclose;
srs_ftell_t _srs_ftell_fn = ::ftell;
srs_setvbuf_t _srs_setvbuf_fn = ::setvbuf;
SrsFileWriter::SrsFileWriter()
{
fd = -1;
fp_ = NULL;
buf_ = NULL;
}
SrsFileWriter::~SrsFileWriter()
{
close();
srs_freepa(buf_);
}
srs_error_t SrsFileWriter::set_iobuf_size(int size)
{
srs_error_t err = srs_success;
if (fp_ == NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_NOT_OPEN, "file %s is not opened", path_.c_str());
}
srs_freepa(buf_);
buf_ = size > 0 ? new char[size] : NULL;
int r0 = _srs_setvbuf_fn(fp_, buf_, _IOFBF, size);
if (r0) {
return srs_error_new(ERROR_SYSTEM_FILE_SETVBUF, "setvbuf err, file=%s, r0=%d", path_.c_str(), r0);
}
return err;
}
srs_error_t SrsFileWriter::open(string p)
{
srs_error_t err = srs_success;
if (fd > 0) {
if (fp_ != NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_ALREADY_OPENED, "file %s already opened", p.c_str());
}
int flags = O_CREAT|O_WRONLY|O_TRUNC;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((fd = _srs_open_fn(p.c_str(), flags, mode)) < 0) {
if ((fp_ = _srs_fopen_fn(p.c_str(), "wb")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_OPENE, "open file %s failed", p.c_str());
}
path = p;
path_ = p;
return err;
}
@ -60,70 +85,70 @@ srs_error_t SrsFileWriter::open_append(string p)
{
srs_error_t err = srs_success;
if (fd > 0) {
return srs_error_new(ERROR_SYSTEM_FILE_ALREADY_OPENED, "file %s already opened", path.c_str());
if (fp_ != NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_ALREADY_OPENED, "file %s already opened", p.c_str());
}
int flags = O_CREAT|O_APPEND|O_WRONLY;
mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
if ((fd = _srs_open_fn(p.c_str(), flags, mode)) < 0) {
if ((fp_ = _srs_fopen_fn(p.c_str(), "ab")) == NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_OPENE, "open file %s failed", p.c_str());
}
path = p;
path_ = p;
return err;
}
void SrsFileWriter::close()
{
if (fd < 0) {
if (fp_ == NULL) {
return;
}
if (_srs_close_fn(fd) < 0) {
srs_warn("close file %s failed", path.c_str());
if (_srs_fclose_fn(fp_) < 0) {
srs_warn("close file %s failed", path_.c_str());
}
fd = -1;
fp_ = NULL;
return;
}
bool SrsFileWriter::is_open()
{
return fd > 0;
return fp_ != NULL;
}
void SrsFileWriter::seek2(int64_t offset)
{
off_t r0 = _srs_lseek_fn(fd, (off_t)offset, SEEK_SET);
srs_assert(is_open());
int r0 = _srs_fseek_fn(fp_, (long)offset, SEEK_SET);
srs_assert(r0 != -1);
}
int64_t SrsFileWriter::tellg()
{
return (int64_t)_srs_lseek_fn(fd, 0, SEEK_CUR);
srs_assert(is_open());
return _srs_ftell_fn(fp_);
}
srs_error_t SrsFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
{
srs_error_t err = srs_success;
ssize_t nwrite;
// TODO: FIXME: use st_write.
#ifdef _WIN32
if ((nwrite = ::_write(fd, buf, (unsigned int)count)) < 0) {
#else
if ((nwrite = _srs_write_fn(fd, buf, count)) < 0) {
#endif
return srs_error_new(ERROR_SYSTEM_FILE_WRITE, "write to file %s failed", path.c_str());
if (fp_ == NULL) {
return srs_error_new(ERROR_SYSTEM_FILE_NOT_OPEN, "file %s is not opened", path_.c_str());
}
size_t n = _srs_fwrite_fn(buf, 1, count, fp_);
if (n != count) {
return srs_error_new(ERROR_SYSTEM_FILE_WRITE, "write to file %s failed", path_.c_str());
}
if (pnwrite != NULL) {
*pnwrite = nwrite;
*pnwrite = (ssize_t)n;
}
return err;
}
@ -136,29 +161,30 @@ srs_error_t SrsFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwrite
const iovec* piov = iov + i;
ssize_t this_nwrite = 0;
if ((err = write(piov->iov_base, piov->iov_len, &this_nwrite)) != srs_success) {
return srs_error_wrap(err, "write file");
return srs_error_wrap(err, "writev");
}
nwrite += this_nwrite;
}
if (pnwrite) {
*pnwrite = nwrite;
}
return err;
}
srs_error_t SrsFileWriter::lseek(off_t offset, int whence, off_t* seeked)
{
off_t sk = _srs_lseek_fn(fd, offset, whence);
if (sk < 0) {
srs_assert(is_open());
if (_srs_fseek_fn(fp_, (long)offset, whence) == -1) {
return srs_error_new(ERROR_SYSTEM_FILE_SEEK, "seek file");
}
if (seeked) {
*seeked = sk;
*seeked = _srs_ftell_fn(fp_);
}
return srs_success;
}

@ -26,12 +26,18 @@ class SrsFileReader;
class SrsFileWriter : public ISrsWriteSeeker
{
private:
std::string path;
int fd;
std::string path_;
FILE *fp_;
char *buf_;
public:
SrsFileWriter();
virtual ~SrsFileWriter();
public:
/**
* set io buf size
*/
virtual srs_error_t set_iobuf_size(int size);
/**
* open file writer, in truncate mode.
* @param p a string indicates the path of file to open.
@ -110,5 +116,16 @@ typedef ssize_t (*srs_read_t)(int fildes, void* buf, size_t nbyte);
typedef off_t (*srs_lseek_t)(int fildes, off_t offset, int whence);
typedef int (*srs_close_t)(int fildes);
typedef FILE* (*srs_fopen_t)(const char* path, const char* mode);
typedef size_t (*srs_fwrite_t)(const void* ptr, size_t size, size_t nitems,
FILE* stream);
typedef size_t (*srs_fread_t)(void* ptr, size_t size, size_t nitems,
FILE* stream);
typedef int (*srs_fseek_t)(FILE* stream, long offset, int whence);
typedef int (*srs_fclose_t)(FILE* stream);
typedef long (*srs_ftell_t)(FILE* stream);
typedef int (*srs_setvbuf_t)(FILE* stream, char* buf, int type, size_t size);
#endif

@ -4105,6 +4105,114 @@ public:
}
};
extern srs_fopen_t _srs_fopen_fn;
extern srs_fwrite_t _srs_fwrite_fn;
extern srs_fread_t _srs_fread_fn;
extern srs_fseek_t _srs_fseek_fn;
extern srs_fclose_t _srs_fclose_fn;
extern srs_ftell_t _srs_ftell_fn;
extern srs_setvbuf_t _srs_setvbuf_fn;
FILE* mock_fopen(const char* path, const char* mode) {
return NULL;
}
size_t mock_fwrite(const void* ptr, size_t size, size_t nitems, FILE* stream) {
return -1;
}
size_t mock_fread(void* ptr, size_t size, size_t nitems, FILE* stream) {
return -1;
}
int mock_fseek(FILE* stream, long offset, int whence) {
return -1;
}
int mock_fclose(FILE *stream) {
return -1;
}
long mock_ftell(FILE *stream) {
return -1;
}
int mock_setvbuf(FILE* stream, char* buf, int type, size_t size) {
return -1;
}
class MockLibcIO
{
private:
srs_fopen_t oo_;
srs_fwrite_t ow_;
srs_fread_t or_;
srs_fseek_t os_;
srs_fclose_t oc_;
srs_ftell_t ot_;
srs_setvbuf_t osb_;
public:
MockLibcIO(srs_fopen_t o = NULL, srs_fwrite_t w = NULL, srs_fread_t r = NULL,
srs_fseek_t s = NULL, srs_fclose_t c = NULL, srs_ftell_t t = NULL,
srs_setvbuf_t sb = NULL) {
oo_ = _srs_fopen_fn;
ow_ = _srs_fwrite_fn;
os_ = _srs_fseek_fn;
or_ = _srs_fread_fn;
oc_ = _srs_fclose_fn;
ot_ = _srs_ftell_fn;
osb_= _srs_setvbuf_fn;
if (o) {
_srs_fopen_fn = o;
}
if (w) {
_srs_fwrite_fn = w;
}
if (r) {
_srs_fread_fn = r;
}
if (s) {
_srs_fseek_fn = s;
}
if (c) {
_srs_fclose_fn = c;
}
if (t) {
_srs_ftell_fn = t;
}
if (sb){
_srs_setvbuf_fn = sb;
}
}
virtual ~MockLibcIO() {
if (oo_) {
_srs_fopen_fn = oo_;
}
if (ow_) {
_srs_fwrite_fn = ow_;
}
if (or_) {
_srs_fread_fn = or_;
}
if (os_) {
_srs_fseek_fn = os_;
}
if (oc_) {
_srs_fclose_fn = oc_;
}
if (ot_) {
_srs_ftell_fn = ot_;
}
if (osb_) {
_srs_setvbuf_fn = osb_;
}
}
};
VOID TEST(KernelFileWriterTest, WriteSpecialCase)
{
srs_error_t err;
@ -4123,15 +4231,22 @@ VOID TEST(KernelFileWriterTest, WriteSpecialCase)
HELPER_EXPECT_FAILED(f.open_append("/dev/null"));
}
if (true) {
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open_append("/dev/null"));
HELPER_EXPECT_SUCCESS(f.set_iobuf_size(65536));
}
// Always fail.
if (true) {
MockSystemIO _mockio(mock_open);
MockLibcIO _mockio(mock_fopen);
SrsFileWriter f;
HELPER_EXPECT_FAILED(f.open("/dev/null"));
HELPER_EXPECT_FAILED(f.open("/dev/null"));
}
if (true) {
MockSystemIO _mockio(mock_open);
MockLibcIO _mockio(mock_fopen);
SrsFileWriter f;
HELPER_EXPECT_FAILED(f.open_append("/dev/null"));
HELPER_EXPECT_FAILED(f.open_append("/dev/null"));
@ -4168,7 +4283,7 @@ VOID TEST(KernelFileWriterTest, WriteSpecialCase)
// Always fail.
if (true) {
MockSystemIO _mockio(NULL, mock_write);
MockLibcIO _mockio(NULL, mock_fwrite);
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open("/dev/null"));
@ -4185,18 +4300,34 @@ VOID TEST(KernelFileWriterTest, WriteSpecialCase)
HELPER_EXPECT_FAILED(f.writev(iovs, 3, NULL));
}
if (true) {
MockSystemIO _mockio(NULL, NULL, NULL, mock_lseek);
MockLibcIO _mockio(NULL, NULL, NULL, mock_fseek);
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open("/dev/null"));
HELPER_EXPECT_FAILED(f.lseek(0, 0, NULL));
}
if (true) {
MockSystemIO _mockio(NULL, NULL, NULL, NULL, mock_close);
MockLibcIO _mockio(NULL, NULL, NULL, NULL, mock_fclose);
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open("/dev/null"));
f.close();
}
if (true) {
MockLibcIO _mockio(NULL, NULL, NULL, NULL, NULL, NULL, mock_setvbuf);
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open("/dev/null"));
HELPER_EXPECT_FAILED(f.set_iobuf_size(100));
}
if (true) {
MockLibcIO _mockio(NULL, NULL, NULL, NULL, NULL, mock_ftell);
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open("/dev/null"));
EXPECT_EQ(f.tellg(), -1);
}
}
VOID TEST(KernelFileReaderTest, WriteSpecialCase)
@ -4253,26 +4384,24 @@ VOID TEST(KernelFileReaderTest, WriteSpecialCase)
}
}
class MockFileRemover
MockFileRemover::MockFileRemover(string p)
{
private:
string f;
public:
MockFileRemover(string p) {
f = p;
}
virtual ~MockFileRemover() {
if (f != "") {
::unlink(f.c_str());
}
}
};
path_ = p;
}
MockFileRemover::~MockFileRemover()
{
// Only remove {_srs_tmp_file_prefix}*.log file.
if (path_.find(_srs_tmp_file_prefix) != 0) return;
if (path_.find(".log") == string::npos) return;
::unlink(path_.c_str());
}
VOID TEST(KernelFileTest, ReadWriteCase)
{
srs_error_t err;
string filepath = _srs_tmp_file_prefix + "kernel-file-read-write-case";
string filepath = _srs_tmp_file_prefix + "kernel-file-read-write-case.log";
MockFileRemover _mfr(filepath);
SrsFileWriter w;
@ -4285,6 +4414,8 @@ VOID TEST(KernelFileTest, ReadWriteCase)
HELPER_EXPECT_SUCCESS(w.write((void*)"Hello", 5, &nn));
EXPECT_EQ(5, nn);
w.close();
char buf[16] = {0};
HELPER_EXPECT_SUCCESS(r.read(buf, sizeof(buf), &nn));
EXPECT_EQ(5, nn);
@ -4292,6 +4423,50 @@ VOID TEST(KernelFileTest, ReadWriteCase)
EXPECT_STREQ("Hello", buf);
}
VOID TEST(KernelFileTest, SeekCase)
{
srs_error_t err;
string filepath = _srs_tmp_file_prefix + "kernel-file-read-write-case.log";
MockFileRemover _mfr(filepath);
SrsFileWriter w;
HELPER_EXPECT_SUCCESS(w.open(filepath.c_str()));
HELPER_EXPECT_SUCCESS(w.set_iobuf_size(65536));
SrsFileReader r;
HELPER_EXPECT_SUCCESS(r.open(filepath.c_str()));
ssize_t nn = 0;
HELPER_EXPECT_SUCCESS(w.write((void*)"Hello", 5, &nn));
EXPECT_EQ(5, nn);
// over 4g file test
long seek_pos = 0x100000002l;
off_t pos;
HELPER_EXPECT_SUCCESS(w.lseek(seek_pos, SEEK_SET, &pos));
EXPECT_EQ(seek_pos, pos);
HELPER_EXPECT_SUCCESS(w.write((void*)"World", 5, &nn));
EXPECT_EQ(5, nn);
w.close();
char buf[16] = {0};
HELPER_EXPECT_SUCCESS(r.read(buf, 5, &nn));
EXPECT_EQ(5, nn);
EXPECT_STREQ("Hello", buf);
HELPER_EXPECT_SUCCESS(r.lseek(seek_pos, SEEK_SET, NULL));
HELPER_EXPECT_SUCCESS(r.read(buf, 5, &nn));
EXPECT_EQ(5, nn);
EXPECT_STREQ("World", buf);
}
VOID TEST(KernelFLVTest, CoverAll)
{
srs_error_t err;

@ -39,6 +39,15 @@ public:
virtual srs_error_t lseek(off_t offset, int whence, off_t* seeked);
};
class MockFileRemover
{
private:
std::string path_;
public:
MockFileRemover(std::string p);
virtual ~MockFileRemover();
};
class MockSrsFileWriter : public SrsFileWriter
{
public:

@ -352,3 +352,62 @@ VOID TEST(KernelLogTest, LogLevelStringV2)
EXPECT_EQ(srs_get_log_level_v2("off"), SrsLogLevelDisabled);
}
VOID TEST(KernelFileWriterTest, RealfileTest)
{
srs_error_t err;
string filename = "./test-realfile.log";
MockFileRemover disposer(filename);
if (true) {
SrsFileWriter f;
HELPER_EXPECT_SUCCESS(f.open(filename));
EXPECT_TRUE(f.is_open());
EXPECT_EQ(0, f.tellg());
HELPER_EXPECT_SUCCESS(f.write((void*) "HelloWorld", 10, NULL));
EXPECT_EQ(10, f.tellg());
f.seek2(5);
EXPECT_EQ(5, f.tellg());
HELPER_EXPECT_SUCCESS(f.write((void*) "HelloWorld", 10, NULL));
EXPECT_EQ(15, f.tellg());
off_t v = 0;
HELPER_EXPECT_SUCCESS(f.lseek(0, SEEK_CUR, &v));
EXPECT_EQ(15, v);
HELPER_EXPECT_SUCCESS(f.lseek(0, SEEK_SET, &v));
EXPECT_EQ(0, v);
HELPER_EXPECT_SUCCESS(f.lseek(10, SEEK_SET, &v));
EXPECT_EQ(10, v);
HELPER_EXPECT_SUCCESS(f.lseek(0, SEEK_END, &v));
EXPECT_EQ(15, v);
// There are 5 bytes empty lagging in file.
HELPER_EXPECT_SUCCESS(f.lseek(5, SEEK_END, &v));
EXPECT_EQ(20, v);
HELPER_EXPECT_SUCCESS(f.write((void*) "HelloWorld", 10, NULL));
EXPECT_EQ(30, f.tellg());
HELPER_EXPECT_SUCCESS(f.lseek(0, SEEK_SET, &v));
EXPECT_EQ(0, v);
HELPER_EXPECT_SUCCESS(f.write((void*) "HelloWorld", 10, NULL));
EXPECT_EQ(10, f.tellg());
}
SrsFileReader fr;
HELPER_ASSERT_SUCCESS(fr.open(filename));
// "HelloWorldWorld\0\0\0\0\0HelloWorld"
string str;
HELPER_ASSERT_SUCCESS(srs_ioutil_read_all(&fr, str));
EXPECT_STREQ("HelloWorldWorld", str.c_str());
EXPECT_STREQ("HelloWorld", str.substr(20).c_str());
}

Loading…
Cancel
Save