From c75f05c88bad0654b98da99982e0fc123d906ce5 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 21 Aug 2015 15:22:40 +0800 Subject: [PATCH] fix the log cid display error, merge the publish recv thread log to publish connection. --- trunk/src/app/srs_app_log.cpp | 14 ++++++++++++++ trunk/src/app/srs_app_log.hpp | 1 + trunk/src/app/srs_app_recv_thread.cpp | 26 +++++++++++++++++++++++++- trunk/src/app/srs_app_recv_thread.hpp | 7 +++++++ trunk/src/app/srs_app_rtmp_conn.cpp | 13 ++++++++++--- trunk/src/app/srs_app_source.cpp | 2 +- trunk/src/kernel/srs_kernel_log.cpp | 5 +++++ trunk/src/kernel/srs_kernel_log.hpp | 18 ++++++++++++++++++ trunk/src/protocol/srs_rtmp_stack.cpp | 4 +++- 9 files changed, 84 insertions(+), 6 deletions(-) diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index ba8cc0bf1..5cc6ed14d 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -57,6 +57,20 @@ int SrsThreadContext::get_id() return cache[st_thread_self()]; } +int SrsThreadContext::set_id(int v) +{ + st_thread_t self = st_thread_self(); + + int ov = 0; + if (cache.find(self) != cache.end()) { + ov = cache[self]; + } + + cache[self] = v; + + return ov; +} + // the max size of a line of log. #define LOG_MAX_SIZE 4096 diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index c997df3e8..a9900b914 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -53,6 +53,7 @@ public: public: virtual int generate_id(); virtual int get_id(); + virtual int set_id(int v); }; /** diff --git a/trunk/src/app/srs_app_recv_thread.cpp b/trunk/src/app/srs_app_recv_thread.cpp index e94b622dd..56dc7fd2b 100644 --- a/trunk/src/app/srs_app_recv_thread.cpp +++ b/trunk/src/app/srs_app_recv_thread.cpp @@ -62,6 +62,11 @@ SrsRecvThread::~SrsRecvThread() srs_freep(trd); } +int SrsRecvThread::cid() +{ + return trd->cid(); +} + int SrsRecvThread::start() { return trd->start(); @@ -253,6 +258,7 @@ SrsPublishRecvThread::SrsPublishRecvThread( recv_error_code = ERROR_SUCCESS; _nb_msgs = 0; error = st_cond_new(); + ncid = cid = 0; req = _req; mr_fd = mr_sock_fd; @@ -297,9 +303,21 @@ int SrsPublishRecvThread::error_code() return recv_error_code; } +void SrsPublishRecvThread::set_cid(int v) +{ + ncid = v; +} + +int SrsPublishRecvThread::get_cid() +{ + return ncid; +} + int SrsPublishRecvThread::start() { - return trd.start(); + int ret = trd.start(); + ncid = cid = trd.cid(); + return ret; } void SrsPublishRecvThread::stop() @@ -351,6 +369,12 @@ bool SrsPublishRecvThread::can_handle() int SrsPublishRecvThread::handle(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; + + // when cid changed, change it. + if (ncid != cid) { + _srs_context->set_id(ncid); + cid = ncid; + } _nb_msgs++; diff --git a/trunk/src/app/srs_app_recv_thread.hpp b/trunk/src/app/srs_app_recv_thread.hpp index b986b8bd6..345bf7837 100644 --- a/trunk/src/app/srs_app_recv_thread.hpp +++ b/trunk/src/app/srs_app_recv_thread.hpp @@ -90,6 +90,8 @@ protected: public: SrsRecvThread(ISrsMessageHandler* msg_handler, SrsRtmpServer* rtmp_sdk, int timeout_ms); virtual ~SrsRecvThread(); +public: + virtual int cid(); public: virtual int start(); virtual void stop(); @@ -170,6 +172,9 @@ private: // the error timeout cond // @see https://github.com/simple-rtmp-server/srs/issues/244 st_cond_t error; + // merged context id. + int cid; + int ncid; public: SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req, int mr_sock_fd, int timeout_ms, @@ -182,6 +187,8 @@ public: virtual int wait(int timeout_ms); virtual int64_t nb_msgs(); virtual int error_code(); + virtual void set_cid(int v); + virtual int get_cid(); public: virtual int start(); virtual void stop(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index e4561b932..e4651eecd 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -851,6 +851,11 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) return ret; } + // change the isolate recv thread context id, + // merge its log to current thread. + int receive_thread_cid = trd->get_cid(); + trd->set_cid(_srs_context->get_id()); + // initialize the publish timeout. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); publish_normal_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost); @@ -861,8 +866,8 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) if (true) { bool mr = _srs_config->get_mr_enabled(req->vhost); int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost); - srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d", - mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay); + srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d, rtcid=%d", + mr, mr_sleep, publish_1stpkt_timeout, publish_normal_timeout, tcp_nodelay, receive_thread_cid); } int64_t nb_msgs = 0; @@ -1109,7 +1114,9 @@ int SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessag res->command_object = SrsAmf0Any::null(); res->response = SrsAmf0Any::null(); if ((ret = rtmp->send_and_free_packet(res, 0)) != ERROR_SUCCESS) { - srs_warn("response call failed. ret=%d", ret); + if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { + srs_warn("response call failed. ret=%d", ret); + } return ret; } } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 1584bdfbe..f60f28887 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1415,6 +1415,7 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) { ss << ", acodec=" << (int)prop->to_number(); } + srs_trace("got metadata%s", ss.str().c_str()); // add server info to metadata metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); @@ -1479,7 +1480,6 @@ int SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata return ret; } } - srs_trace("got metadata%s", ss.str().c_str()); } // copy to all forwarders diff --git a/trunk/src/kernel/srs_kernel_log.cpp b/trunk/src/kernel/srs_kernel_log.cpp index 8c6f872c8..46fcc3763 100644 --- a/trunk/src/kernel/srs_kernel_log.cpp +++ b/trunk/src/kernel/srs_kernel_log.cpp @@ -76,4 +76,9 @@ int ISrsThreadContext::get_id() return 0; } +int ISrsThreadContext::set_id(int /*v*/) +{ + return 0; +} + diff --git a/trunk/src/kernel/srs_kernel_log.hpp b/trunk/src/kernel/srs_kernel_log.hpp index 966e39fbe..8d648d315 100644 --- a/trunk/src/kernel/srs_kernel_log.hpp +++ b/trunk/src/kernel/srs_kernel_log.hpp @@ -95,6 +95,13 @@ public: virtual void error(const char* tag, int context_id, const char* fmt, ...); }; +/** + * the context id manager to identify context, for instance, the green-thread. + * usage: + * _srs_context->generate_id(); // when thread start. + * _srs_context->get_id(); // get current generated id. + * int old_id = _srs_context->set_id(1000); // set context id if need to merge thread context. + */ // the context for multiple clients. class ISrsThreadContext { @@ -102,8 +109,19 @@ public: ISrsThreadContext(); virtual ~ISrsThreadContext(); public: + /** + * generate the id for current context. + */ virtual int generate_id(); + /** + * get the generated id of current context. + */ virtual int get_id(); + /** + * set the id of current context. + * @return the previous id value; 0 if no context. + */ + virtual int set_id(int v); }; // user must provides a log object diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index 0a169d64a..e7d15876d 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -2633,7 +2633,9 @@ int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, string& res->command_object = SrsAmf0Any::null(); res->response = SrsAmf0Any::null(); if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) { - srs_warn("response call failed. ret=%d", ret); + if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) { + srs_warn("response call failed. ret=%d", ret); + } return ret; } continue;