diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 2541c7033..ea070c9eb 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -267,7 +267,8 @@ srs_error_t SrsGoApiV1::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r urls->set("clients", SrsJsonAny::str("manage all clients or specified client, default query top 10 clients")); urls->set("raw", SrsJsonAny::str("raw api for srs, support CUID srs for instance the config")); urls->set("clusters", SrsJsonAny::str("origin cluster server API")); - + urls->set("perf", SrsJsonAny::str("System performance stat")); + SrsJsonObject* tests = SrsJsonAny::object(); obj->set("tests", tests); @@ -1565,6 +1566,34 @@ srs_error_t SrsGoApiClusters::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess return srs_api_response(w, r, obj->dumps()); } +SrsGoApiPerf::SrsGoApiPerf() +{ +} + +SrsGoApiPerf::~SrsGoApiPerf() +{ +} + +srs_error_t SrsGoApiPerf::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) +{ + srs_error_t err = srs_success; + + SrsJsonObject* obj = SrsJsonAny::object(); + SrsAutoFree(SrsJsonObject, obj); + + obj->set("code", SrsJsonAny::integer(ERROR_SUCCESS)); + SrsJsonObject* data = SrsJsonAny::object(); + obj->set("data", data); + + SrsStatistic* stat = SrsStatistic::instance(); + if ((err = stat->dumps_perf_mw(data)) != srs_success) { + int code = srs_error_code(err); srs_error_reset(err); + return srs_api_response_code(w, r, code); + } + + return srs_api_response(w, r, obj->dumps()); +} + SrsGoApiError::SrsGoApiError() { } diff --git a/trunk/src/app/srs_app_http_api.hpp b/trunk/src/app/srs_app_http_api.hpp index 758beaba1..80cde35fd 100644 --- a/trunk/src/app/srs_app_http_api.hpp +++ b/trunk/src/app/srs_app_http_api.hpp @@ -223,6 +223,15 @@ public: virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); }; +class SrsGoApiPerf : public ISrsHttpHandler +{ +public: + SrsGoApiPerf(); + virtual ~SrsGoApiPerf(); +public: + virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); +}; + class SrsGoApiError : public ISrsHttpHandler { public: diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index d68185ba7..214c42aa1 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include using namespace std; @@ -186,6 +187,9 @@ SrsQueueRecvThread::~SrsQueueRecvThread() srs_error_t SrsQueueRecvThread::start() { srs_error_t err = srs_success; + + SrsStatistic* stat = SrsStatistic::instance(); + rtmp->set_perf(stat); if ((err = trd.start()) != srs_success) { return srs_error_wrap(err, "queue recv thread"); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index bd70381e6..d312d4ab2 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -746,7 +746,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, Sr if ((err = consumer->dump_packets(&msgs, count)) != srs_success) { return srs_error_wrap(err, "rtmp: consumer dump packets"); } - + // reportable if (pprint->can_print()) { kbps->sample(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 20fbed740..13d513277 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -987,7 +987,10 @@ srs_error_t SrsServer::http_handle() return srs_error_wrap(err, "handle raw"); } if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) { - return srs_error_wrap(err, "handle raw"); + return srs_error_wrap(err, "handle clusters"); + } + if ((err = http_api_mux->handle("/api/v1/perf", new SrsGoApiPerf())) != srs_success) { + return srs_error_wrap(err, "handle perf"); } #ifdef SRS_AUTO_GB28181 if ((err = http_api_mux->handle("/api/v1/gb28181", new SrsGoApiGb28181())) != srs_success) { diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp index b180d20a1..f5bee3b51 100644 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -234,6 +234,25 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj) return err; } +SrsStatisticCategory::SrsStatisticCategory() +{ + a = 0; + b = 0; + c = 0; + d = 0; + e = 0; + + f = 0; + g = 0; + h = 0; + i = 0; + j = 0; +} + +SrsStatisticCategory::~SrsStatisticCategory() +{ +} + SrsStatistic* SrsStatistic::_instance = NULL; SrsStatistic::SrsStatistic() @@ -243,6 +262,10 @@ SrsStatistic::SrsStatistic() clk = new SrsWallClock(); kbps = new SrsKbps(clk); kbps->set_io(NULL, NULL); + + perf_iovs = new SrsStatisticCategory(); + perf_msgs = new SrsStatisticCategory(); + perf_sys = new SrsStatisticCategory(); } SrsStatistic::~SrsStatistic() @@ -276,6 +299,10 @@ SrsStatistic::~SrsStatistic() rvhosts.clear(); streams.clear(); rstreams.clear(); + + srs_freep(perf_iovs); + srs_freep(perf_msgs); + srs_freep(perf_sys); } SrsStatistic* SrsStatistic::instance() @@ -556,6 +583,159 @@ srs_error_t SrsStatistic::dumps_clients(SrsJsonArray* arr, int start, int count) return err; } +void SrsStatistic::perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) +{ + // For perf msgs, the nb_msgs stat. + // a: =1 + // b: <10 + // c: <100 + // d: <200 + // e: <300 + // f: <400 + // g: <500 + // h: <600 + // i: <1000 + // j: >=1000 + if (nb_msgs == 1) { + perf_msgs->a++; + } else if (nb_msgs < 10) { + perf_msgs->b++; + } else if (nb_msgs < 100) { + perf_msgs->c++; + } else if (nb_msgs < 200) { + perf_msgs->d++; + } else if (nb_msgs < 300) { + perf_msgs->e++; + } else if (nb_msgs < 400) { + perf_msgs->f++; + } else if (nb_msgs < 500) { + perf_msgs->g++; + } else if (nb_msgs < 600) { + perf_msgs->h++; + } else if (nb_msgs < 1000) { + perf_msgs->i++; + } else { + perf_msgs->j++; + } + + // For perf iovs, the nb_iovs stat. + // a: <=2 + // b: <10 + // c: <20 + // d: <200 + // e: <300 + // f: <500 + // g: <700 + // h: <900 + // i: <1024 + // j: >=1024 + if (nb_iovs <= 2) { + perf_iovs->a++; + } else if (nb_iovs < 10) { + perf_iovs->b++; + } else if (nb_iovs < 20) { + perf_iovs->c++; + } else if (nb_iovs < 200) { + perf_iovs->d++; + } else if (nb_iovs < 300) { + perf_iovs->e++; + } else if (nb_iovs < 500) { + perf_iovs->f++; + } else if (nb_iovs < 700) { + perf_iovs->g++; + } else if (nb_iovs < 900) { + perf_iovs->h++; + } else if (nb_iovs < 1024) { + perf_iovs->i++; + } else { + perf_iovs->j++; + } + + // Stat the syscalls. + // a: number of syscalls of msgs. + perf_sys->a++; +} + +void SrsStatistic::perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) +{ + // Stat the syscalls. + // a: number of syscalls of msgs. + // b: number of syscalls of pkts. + perf_sys->b++; +} + +srs_error_t SrsStatistic::dumps_perf_mw(SrsJsonObject* obj) +{ + srs_error_t err = srs_success; + + if (true) { + SrsJsonObject* p = SrsJsonAny::object(); + obj->set("msgs", p); + + // For perf msgs, the nb_msgs stat. + // a: =1 + // b: <10 + // c: <100 + // d: <200 + // e: <300 + // f: <400 + // g: <500 + // h: <600 + // i: <1000 + // j: >=1000 + p->set("lt_2", SrsJsonAny::integer(perf_msgs->a)); + p->set("lt_10", SrsJsonAny::integer(perf_msgs->b)); + p->set("lt_100", SrsJsonAny::integer(perf_msgs->c)); + p->set("lt_200", SrsJsonAny::integer(perf_msgs->d)); + p->set("lt_300", SrsJsonAny::integer(perf_msgs->e)); + p->set("lt_400", SrsJsonAny::integer(perf_msgs->f)); + p->set("lt_500", SrsJsonAny::integer(perf_msgs->g)); + p->set("lt_600", SrsJsonAny::integer(perf_msgs->h)); + p->set("lt_1000", SrsJsonAny::integer(perf_msgs->i)); + p->set("gt_1000", SrsJsonAny::integer(perf_msgs->j)); + } + + if (true) { + SrsJsonObject* p = SrsJsonAny::object(); + obj->set("iovs", p); + + // For perf iovs, the nb_iovs stat. + // a: <=2 + // b: <10 + // c: <20 + // d: <200 + // e: <300 + // f: <500 + // g: <700 + // h: <900 + // i: <1024 + // j: >=1024 + p->set("lt_3", SrsJsonAny::integer(perf_iovs->a)); + p->set("lt_10", SrsJsonAny::integer(perf_iovs->b)); + p->set("lt_20", SrsJsonAny::integer(perf_iovs->c)); + p->set("lt_200", SrsJsonAny::integer(perf_iovs->d)); + p->set("lt_300", SrsJsonAny::integer(perf_iovs->e)); + p->set("lt_500", SrsJsonAny::integer(perf_iovs->f)); + p->set("lt_700", SrsJsonAny::integer(perf_iovs->g)); + p->set("lt_900", SrsJsonAny::integer(perf_iovs->h)); + p->set("lt_1024", SrsJsonAny::integer(perf_iovs->i)); + p->set("gt_1024", SrsJsonAny::integer(perf_iovs->j)); + } + + if (true) { + SrsJsonObject* p = SrsJsonAny::object(); + obj->set("sys", p); + + // Stat the syscalls. + // a: number of syscalls of msgs. + // b: number of syscalls of pkts. + p->set("msgs", SrsJsonAny::integer(perf_sys->a)); + p->set("pkts", SrsJsonAny::integer(perf_sys->b)); + } + + return err; +} + SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req) { SrsStatisticVhost* vhost = NULL; diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp index fa8835849..ee905673a 100644 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -122,7 +122,26 @@ public: virtual srs_error_t dumps(SrsJsonObject* obj); }; -class SrsStatistic +class SrsStatisticCategory +{ +public: + uint64_t a; + uint64_t b; + uint64_t c; + uint64_t d; + uint64_t e; +public: + uint64_t f; + uint64_t g; + uint64_t h; + uint64_t i; + uint64_t j; +public: + SrsStatisticCategory(); + virtual ~SrsStatisticCategory(); +}; + +class SrsStatistic : public ISrsProtocolPerf { private: static SrsStatistic *_instance; @@ -146,6 +165,10 @@ private: // The server total kbps. SrsKbps* kbps; SrsWallClock* clk; + // The perf stat for mw(merged write). + SrsStatisticCategory* perf_iovs; + SrsStatisticCategory* perf_msgs; + SrsStatisticCategory* perf_sys; private: SrsStatistic(); virtual ~SrsStatistic(); @@ -203,6 +226,15 @@ public: // @param start the start index, from 0. // @param count the max count of clients to dump. virtual srs_error_t dumps_clients(SrsJsonArray* arr, int start, int count); +public: + // Stat for packets merged written, nb_msgs is the number of RTMP messages, + // bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec. + virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs); + // Stat for packets merged written, nb_pkts is the number of or chunk packets, + // bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec. + virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs); + // Dumps the perf statistic data, for performance analysis. + virtual srs_error_t dumps_perf_mw(SrsJsonObject* obj); private: virtual SrsStatisticVhost* create_vhost(SrsRequest* req); virtual SrsStatisticStream* create_stream(SrsStatisticVhost* vhost, SrsRequest* req); diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index a530316cd..8d3a2378e 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -217,6 +217,14 @@ srs_error_t SrsPacket::encode_packet(SrsBuffer* stream) return srs_error_new(ERROR_SYSTEM_PACKET_INVALID, "encode"); } +ISrsProtocolPerf::ISrsProtocolPerf() +{ +} + +ISrsProtocolPerf::~ISrsProtocolPerf() +{ +} + SrsProtocol::AckWindowSize::AckWindowSize() { window = 0; @@ -256,6 +264,7 @@ SrsProtocol::SrsProtocol(ISrsProtocolReadWriter* io) } out_c0c3_caches = new char[SRS_CONSTS_C0C3_HEADERS_MAX]; + perf = NULL; } SrsProtocol::~SrsProtocol() @@ -303,6 +312,11 @@ void SrsProtocol::set_auto_response(bool v) auto_response_when_recv = v; } +void SrsProtocol::set_perf(ISrsProtocolPerf* v) +{ + perf = v; +} + srs_error_t SrsProtocol::manual_response_flush() { srs_error_t err = srs_success; @@ -448,7 +462,12 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg int c0c3_cache_index = 0; char* c0c3_cache = out_c0c3_caches + c0c3_cache_index; - + + // How many messages are merged in written. + int nb_msgs_merged_written = 0; + // How many bytes of messages are merged in written. + int bytes_msgs_merged_written = 0; + // try to send use the c0c3 header cache, // if cache is consumed, try another loop. for (int i = 0; i < nb_msgs; i++) { @@ -462,6 +481,10 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg if (!msg->payload || msg->size <= 0) { continue; } + + // Increase the perf stat data. + nb_msgs_merged_written++; + bytes_msgs_merged_written += msg->size; // p set to current write position, // it's ok when payload is NULL and size is 0. @@ -522,6 +545,12 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) { return srs_error_wrap(err, "send iovs"); } + + // Notify about perf stat. + if (perf) { + perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index); + nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0; + } // reset caches, while these cache ensure // atleast we can sendout a chunk. @@ -539,8 +568,19 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg if (iov_index <= 0) { return err; } - - return do_iovs_send(out_iovs, iov_index); + + // Send out iovs at a time. + if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) { + return srs_error_wrap(err, "send iovs"); + } + + // Notify about perf stat. + if (perf) { + perf->perf_mw_on_msgs(nb_msgs_merged_written, bytes_msgs_merged_written, iov_index); + nb_msgs_merged_written = 0; bytes_msgs_merged_written = 0; + } + + return err; #else // try to send use the c0c3 header cache, // if cache is consumed, try another loop. @@ -587,6 +627,11 @@ srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msg if ((er = skt->writev(iovs, 2, NULL)) != srs_success) { return srs_error_wrap(err, "writev"); } + + // Notify about perf stat. + if (perf) { + perf->perf_mw_on_packets(1, payload_size, 2); + } } } @@ -2209,6 +2254,11 @@ void SrsRtmpServer::set_auto_response(bool v) protocol->set_auto_response(v); } +void SrsRtmpServer::set_perf(ISrsProtocolPerf* v) +{ + protocol->set_perf(v); +} + #ifdef SRS_PERF_MERGED_READ void SrsRtmpServer::set_merge_read(bool v, IMergeReadHandler* handler) { diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index c43c4ee17..470347cac 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -147,6 +147,21 @@ protected: virtual srs_error_t encode_packet(SrsBuffer* stream); }; +// The performance statistic data collect. +class ISrsProtocolPerf +{ +public: + ISrsProtocolPerf(); + virtual ~ISrsProtocolPerf(); +public: + // Stat for packets merged written, nb_msgs is the number of RTMP messages, + // bytes_msgs is the total bytes of RTMP messages, nb_iovs is the total number of iovec. + virtual void perf_mw_on_msgs(int nb_msgs, int bytes_msgs, int nb_iovs) = 0; + // Stat for packets merged written, nb_pkts is the number of or chunk packets, + // bytes_pkts is the total bytes of or chunk packets, nb_iovs is the total number of iovec. + virtual void perf_mw_on_packets(int nb_pkts, int bytes_pkts, int nb_iovs) = 0; +}; + // The protocol provides the rtmp-message-protocol services, // To recv RTMP message from RTMP chunk stream, // and to send out RTMP message over RTMP chunk stream. @@ -168,6 +183,8 @@ private: private: // The underlayer socket object, send/recv bytes. ISrsProtocolReadWriter* skt; + // The performance stat handler. + ISrsProtocolPerf* perf; // The requests sent out, used to build the response. // key: transactionId // value: the request command name @@ -227,6 +244,8 @@ public: // @param v, whether auto response message when recv message. // @see: https://github.com/ossrs/srs/issues/217 virtual void set_auto_response(bool v); + // Set the performance stat handler. + virtual void set_perf(ISrsProtocolPerf* v); // Flush for manual response when the auto response is disabled // by set_auto_response(false), we default use auto response, so donot // need to call this api(the protocol sdk will auto send message). @@ -631,6 +650,8 @@ public: // @param v, whether auto response message when recv message. // @see: https://github.com/ossrs/srs/issues/217 virtual void set_auto_response(bool v); + // Set the performance stat handler. + virtual void set_perf(ISrsProtocolPerf* v); #ifdef SRS_PERF_MERGED_READ // To improve read performance, merge some packets then read, // When it on and read small bytes, we sleep to wait more data.,