diff --git a/trunk/src/app/srs_app_edge.cpp b/trunk/src/app/srs_app_edge.cpp index 490a09681..b56ab2ba0 100644 --- a/trunk/src/app/srs_app_edge.cpp +++ b/trunk/src/app/srs_app_edge.cpp @@ -55,6 +55,9 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // when edge timeout, retry next. #define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(3*1000*1000LL) +// when edge error, wait for quit +#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL) + SrsEdgeIngester::SrsEdgeIngester() { io = NULL; @@ -165,8 +168,10 @@ int SrsEdgeIngester::ingest() // pithy print if (pithy_print.can_print()) { - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + srs_trace("<- "SRS_LOG_ID_EDGE_PLAY + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), + client->get_send_kbps(), client->get_recv_kbps()); } // read from client. @@ -323,6 +328,8 @@ SrsEdgeForwarder::SrsEdgeForwarder() stream_id = 0; stfd = NULL; pthread = new SrsThread(this, SRS_EDGE_FORWARDER_SLEEP_US); + queue = new SrsMessageQueue(); + send_error_code = ERROR_SUCCESS; } SrsEdgeForwarder::~SrsEdgeForwarder() @@ -330,6 +337,11 @@ SrsEdgeForwarder::~SrsEdgeForwarder() stop(); } +void SrsEdgeForwarder::set_queue_size(double queue_size) +{ + return queue->set_queue_size(queue_size); +} + int SrsEdgeForwarder::initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -345,6 +357,8 @@ int SrsEdgeForwarder::start() { int ret = ERROR_SUCCESS; + send_error_code = ERROR_SUCCESS; + if ((ret = connect_server()) != ERROR_SUCCESS) { return ret; } @@ -391,7 +405,7 @@ int SrsEdgeForwarder::cycle() { int ret = ERROR_SUCCESS; - client->set_recv_timeout(SRS_EDGE_FORWARDER_TIMEOUT_US); + client->set_recv_timeout(SRS_PULSE_TIMEOUT_US); SrsPithyPrint pithy_print(SRS_STAGE_EDGE); @@ -399,24 +413,63 @@ int SrsEdgeForwarder::cycle() // switch to other st-threads. st_usleep(0); + if (send_error_code != ERROR_SUCCESS) { + st_usleep(SRS_EDGE_FORWARDER_ERROR_US); + continue; + } + + // read from client. + if (true) { + SrsCommonMessage* msg = NULL; + ret = client->recv_message(&msg); + + srs_verbose("edge loop recv message. ret=%d", ret); + if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) { + srs_error("edge forwarder recv server control message failed. ret=%d", ret); + send_error_code = ret; + continue; + } + + srs_freep(msg); + } + + // forward all messages. + int count = 0; + SrsSharedPtrMessage** msgs = NULL; + if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) { + srs_error("get message to forward to origin failed. ret=%d", ret); + return ret; + } + pithy_print.elapse(); // pithy print if (pithy_print.can_print()) { - srs_trace("-> time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + srs_trace("-> "SRS_LOG_ID_EDGE_PUBLISH + " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), + client->get_send_kbps(), client->get_recv_kbps()); } - - // read from client. - SrsCommonMessage* msg = NULL; - if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { - srs_info("ignore forwarder recv origin server message failed. ret=%d", ret); + + // ignore when no messages. + if (count <= 0) { + srs_verbose("no packets to forward."); continue; } - srs_verbose("edge loop recv message. ret=%d", ret); - - srs_assert(msg); - SrsAutoFree(SrsCommonMessage, msg, false); + SrsAutoFree(SrsSharedPtrMessage*, msgs, true); + + // all msgs to forward. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs[i]; + + srs_assert(msg); + msgs[i] = NULL; + + if ((ret = client->send_message(msg)) != ERROR_SUCCESS) { + srs_error("edge publish forwarder send message to server failed. ret=%d", ret); + return ret; + } + } } return ret; @@ -426,6 +479,11 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; + if ((ret = send_error_code) != ERROR_SUCCESS) { + srs_error("publish edge proxy thread send error, ret=%d", ret); + return ret; + } + // the msg is auto free by source, // so we just ignore, or copy then send it. if (msg->size <= 0 @@ -445,9 +503,8 @@ int SrsEdgeForwarder::proxy(SrsCommonMessage* msg) srs_verbose("initialize shared ptr msg success."); copy->header.stream_id = stream_id; - if ((ret = client->send_message(copy->copy())) != ERROR_SUCCESS) { - srs_error("send client message to origin failed. ret=%d", ret); - return ret; + if ((ret = queue->enqueue(copy->copy())) != ERROR_SUCCESS) { + srs_error("enqueue edge publish msg failed. ret=%d", ret); } return ret; @@ -620,6 +677,11 @@ SrsPublishEdge::~SrsPublishEdge() srs_freep(forwarder); } +void SrsPublishEdge::set_queue_size(double queue_size) +{ + return forwarder->set_queue_size(queue_size); +} + int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req) { int ret = ERROR_SUCCESS; @@ -651,11 +713,15 @@ int SrsPublishEdge::on_client_publish() return ret; } + if ((ret = forwarder->start()) != ERROR_SUCCESS) { + return ret; + } + SrsEdgeState pstate = state; state = SrsEdgeStatePublish; srs_trace("edge change from %d to state %d (forward publish).", pstate, state); - return forwarder->start(); + return ret; } int SrsPublishEdge::on_proxy_publish(SrsCommonMessage* msg) diff --git a/trunk/src/app/srs_app_edge.hpp b/trunk/src/app/srs_app_edge.hpp index 21da18932..941cbbafd 100644 --- a/trunk/src/app/srs_app_edge.hpp +++ b/trunk/src/app/srs_app_edge.hpp @@ -41,6 +41,7 @@ class SrsPlayEdge; class SrsPublishEdge; class SrsRtmpClient; class SrsCommonMessage; +class SrsMessageQueue; class ISrsProtocolReaderWriter; /** @@ -117,9 +118,22 @@ private: ISrsProtocolReaderWriter* io; SrsRtmpClient* client; int origin_index; + /** + * we must ensure one thread one fd principle, + * that is, a fd must be write/read by the one thread. + * the publish service thread will proxy(msg), and the edge forward thread + * will cycle(), so we use queue for cycle to send the msg of proxy. + */ + SrsMessageQueue* queue; + /** + * error code of send, for edge proxy thread to query. + */ + int send_error_code; public: SrsEdgeForwarder(); virtual ~SrsEdgeForwarder(); +public: + virtual void set_queue_size(double queue_size); public: virtual int initialize(SrsSource* source, SrsPublishEdge* edge, SrsRequest* req); virtual int start(); @@ -179,6 +193,8 @@ private: public: SrsPublishEdge(); virtual ~SrsPublishEdge(); +public: + virtual void set_queue_size(double queue_size); public: virtual int initialize(SrsSource* source, SrsRequest* req); /** diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index 8e20bc6fe..5ce651315 100644 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -323,6 +323,8 @@ int SrsForwarder::forward() srs_error("recv server control message failed. ret=%d", ret); return ret; } + + srs_freep(msg); } // forward all messages. @@ -335,8 +337,10 @@ int SrsForwarder::forward() // pithy print if (pithy_print.can_print()) { - srs_trace("-> time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), client->get_send_kbps(), client->get_recv_kbps()); + srs_trace("-> "SRS_LOG_ID_FOWARDER + " time=%"PRId64", msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), count, client->get_send_bytes(), client->get_recv_bytes(), + client->get_send_kbps(), client->get_recv_kbps()); } // ignore when no messages. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index b1b4fb1e9..427a44ac0 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -500,8 +500,10 @@ int SrsRtmpConn::playing(SrsSource* source) // reportable if (pithy_print.can_print()) { - srs_trace("-> time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + srs_trace("-> "SRS_LOG_ID_PLAY + " time=%"PRId64", duration=%"PRId64", cmr=%d, msgs=%d, obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), duration, ctl_msg_ret, count, rtmp->get_send_bytes(), rtmp->get_recv_bytes(), + rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } if (count <= 0) { @@ -581,8 +583,10 @@ int SrsRtmpConn::fmle_publish(SrsSource* source) // reportable if (pithy_print.can_print()) { - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + srs_trace("<- "SRS_LOG_ID_CLIENT_PUBLISH + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), + rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } // process UnPublish event. @@ -654,8 +658,10 @@ int SrsRtmpConn::flash_publish(SrsSource* source) // reportable if (pithy_print.can_print()) { - srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", - pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + srs_trace("<- "SRS_LOG_ID_WEB_PUBLISH + " time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), + rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } // process UnPublish event. @@ -687,7 +693,11 @@ int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* ms // for edge, directly proxy message to origin. if (vhost_is_edge) { - return source->on_edge_proxy_publish(msg); + if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) { + srs_error("edge publish proxy msg failed. ret=%d", ret); + return ret; + } + return ret; } // process audio packet diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 50e6f1d22..8eb20ec79 100644 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -524,6 +524,9 @@ int SrsSource::initialize() return ret; } + double queue_size = _srs_config->get_queue_length(_req->vhost); + publish_edge->set_queue_size(queue_size); + return ret; } @@ -597,6 +600,11 @@ int SrsSource::on_reload_vhost_queue_length(string vhost) srs_trace("forwarders reload queue size success."); } + if (true) { + publish_edge->set_queue_size(queue_size); + srs_trace("publish_edge reload queue size success."); + } + return ret; } diff --git a/trunk/src/kernel/srs_kernel_log.hpp b/trunk/src/kernel/srs_kernel_log.hpp index b6c9b6e5a..9cd495d4f 100644 --- a/trunk/src/kernel/srs_kernel_log.hpp +++ b/trunk/src/kernel/srs_kernel_log.hpp @@ -144,4 +144,17 @@ extern ISrsThreadContext* _srs_context; #define srs_trace(msg, ...) (void)0 #endif +// downloading speed-up, play to edge, ingest from origin +#define SRS_LOG_ID_EDGE_PLAY "EIG" +// uploading speed-up, publish to edge, foward to origin +#define SRS_LOG_ID_EDGE_PUBLISH "EFW" +// edge/origin forwarder. +#define SRS_LOG_ID_FOWARDER "FWR" +// play stream on edge/origin. +#define SRS_LOG_ID_PLAY "PLA" +// client publish to edge/origin +#define SRS_LOG_ID_CLIENT_PUBLISH "CPB" +// web/flash publish to edge/origin +#define SRS_LOG_ID_WEB_PUBLISH "WPB" + #endif