From ade81bb2441b656c0f6fa37b2969ba9e01e80f21 Mon Sep 17 00:00:00 2001 From: winlin Date: Tue, 11 Nov 2014 16:41:39 +0800 Subject: [PATCH] for bug#194, open pipe for each connection. --- trunk/src/app/srs_app_pipe.cpp | 2 +- trunk/src/app/srs_app_rtmp_conn.cpp | 108 +++++++++++++++++----------- 2 files changed, 68 insertions(+), 42 deletions(-) diff --git a/trunk/src/app/srs_app_pipe.cpp b/trunk/src/app/srs_app_pipe.cpp index d8a4e0367..9aa1acee8 100644 --- a/trunk/src/app/srs_app_pipe.cpp +++ b/trunk/src/app/srs_app_pipe.cpp @@ -81,7 +81,7 @@ int SrsPipe::active() int ret = ERROR_SUCCESS; int v = 0; - if (st_write(read_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) { + if (st_write(write_stfd, &v, sizeof(int), ST_UTIME_NO_TIMEOUT) != sizeof(int)) { ret = ERROR_SYSTEM_WRITE_PIPE; srs_error("write pipe failed. ret=%d", ret); return ret; diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index df543c7a3..eab1b399c 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -516,7 +516,12 @@ int SrsRtmpConn::playing(SrsSource* source) SrsAutoFree(SrsConsumer, consumer); srs_verbose("consumer created success."); + // TODO: FIXME: remove it. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US); + // disable the timeout. + // TODO: FIXME: maybe can use larger timeout? + rtmp->set_recv_timeout(ST_UTIME_NO_TIMEOUT); + rtmp->set_send_timeout(ST_UTIME_NO_TIMEOUT); SrsPithyPrint pithy_print(SRS_CONSTS_STAGE_PLAY_USER); @@ -525,12 +530,30 @@ int SrsRtmpConn::playing(SrsSource* source) bool user_specified_duration_to_stop = (req->duration > 0); int64_t starttime = -1; + pollfd pds[2]; + // poll the client incoming fd. + pds[0].fd = st_netfd_fileno(stfd); + pds[0].events = POLLIN; + // poll the consumer queue pipe. + pds[1].fd = st_netfd_fileno(consumer->pipe_fd()); + pds[1].events = POLLIN; + while (true) { // collect elapse for pithy print. pithy_print.elapse(); + + pds[0].revents = 0; + pds[1].revents = 0; + // wait for packet incoming or data outgoing. + if (st_poll(pds, 2, ST_UTIME_NO_TIMEOUT) <= 0) { + srs_error("st_poll failed."); + break; + } + + // packet incoming, read from RTMP. // read from client. - if (true) { + if (pds[0].revents & POLLIN) { SrsMessage* msg = NULL; ret = rtmp->recv_message(&msg); srs_verbose("play loop recv message. ret=%d", ret); @@ -553,49 +576,52 @@ int SrsRtmpConn::playing(SrsSource* source) } } - // get messages from consumer. - int count = 0; - if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { - srs_error("get messages from consumer failed. ret=%d", ret); - return ret; - } - - // reportable - if (pithy_print.can_print()) { - kbps->sample(); - srs_trace("-> "SRS_CONSTS_LOG_PLAY - " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", - pithy_print.age(), count, - kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), - kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); - } - - // sendout messages - // @remark, becareful, all msgs must be free explicitly, - // free by send_and_free_message or srs_freep. - for (int i = 0; i < count; i++) { - SrsSharedPtrMessage* msg = msgs.msgs[i]; - - // the send_message will free the msg, - // so set the msgs[i] to NULL. - msgs.msgs[i] = NULL; + // data outgoing, sendout packets. + if (pds[1].revents & POLLIN) { + // get messages from consumer. + int count = 0; + if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) { + srs_error("get messages from consumer failed. ret=%d", ret); + return ret; + } + + // reportable + if (pithy_print.can_print()) { + kbps->sample(); + srs_trace("-> "SRS_CONSTS_LOG_PLAY + " time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d", + pithy_print.age(), count, + kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), + kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); + } - // only when user specifies the duration, - // we start to collect the durations for each message. - if (user_specified_duration_to_stop) { - // foreach msg, collect the duration. - // @remark: never use msg when sent it, for the protocol sdk will free it. - if (starttime < 0 || starttime > msg->header.timestamp) { + // sendout messages + // @remark, becareful, all msgs must be free explicitly, + // free by send_and_free_message or srs_freep. + for (int i = 0; i < count; i++) { + SrsSharedPtrMessage* msg = msgs.msgs[i]; + + // the send_message will free the msg, + // so set the msgs[i] to NULL. + msgs.msgs[i] = NULL; + + // only when user specifies the duration, + // we start to collect the durations for each message. + if (user_specified_duration_to_stop) { + // foreach msg, collect the duration. + // @remark: never use msg when sent it, for the protocol sdk will free it. + if (starttime < 0 || starttime > msg->header.timestamp) { + starttime = msg->header.timestamp; + } + duration += msg->header.timestamp - starttime; starttime = msg->header.timestamp; } - duration += msg->header.timestamp - starttime; - starttime = msg->header.timestamp; - } - - // no need to assert msg, for the rtmp will assert it. - if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) { - srs_error("send message to client failed. ret=%d", ret); - return ret; + + // no need to assert msg, for the rtmp will assert it. + if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) { + srs_error("send message to client failed. ret=%d", ret); + return ret; + } } }