diff --git a/trunk/src/app/srs_app_client.cpp b/trunk/src/app/srs_app_client.cpp index 01ef920e4..6958e726f 100644 --- a/trunk/src/app/srs_app_client.cpp +++ b/trunk/src/app/srs_app_client.cpp @@ -201,6 +201,16 @@ int SrsClient::service_cycle() return ret; } + // for republish, continue service + if (ret == ERROR_CONTROL_REPUBLISH) { + // set timeout to a larger value, wait for encoder to republish. + rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US); + rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US); + + srs_trace("control message(unpublish) accept, retry stream service."); + continue; + } + // for "some" system control error, // logical accept and retry stream service. if (ret == ERROR_CONTROL_RTMP_CLOSE) { @@ -292,7 +302,7 @@ int SrsClient::stream_service_cycle() return ret; } srs_info("start to publish stream %s success", req->stream.c_str()); - ret = publish(source, true); + ret = fmle_publish(source); source->on_unpublish(); on_unpublish(); return ret; @@ -309,7 +319,7 @@ int SrsClient::stream_service_cycle() return ret; } srs_info("flash start to publish stream %s success", req->stream.c_str()); - ret = publish(source, false); + ret = flash_publish(source); source->on_unpublish(); on_unpublish(); return ret; @@ -448,24 +458,24 @@ int SrsClient::playing(SrsSource* source) return ret; } -int SrsClient::publish(SrsSource* source, bool is_fmle) +int SrsClient::fmle_publish(SrsSource* source) { int ret = ERROR_SUCCESS; if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { - srs_error("check publish_refer failed. ret=%d", ret); + srs_error("fmle check publish_refer failed. ret=%d", ret); return ret; } - srs_verbose("check publish_refer success."); + srs_verbose("fmle check publish_refer success."); SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); // notify the hls to prepare when publish start. if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { - srs_error("hls on_publish failed. ret=%d", ret); + srs_error("fmle hls on_publish failed. ret=%d", ret); return ret; } - srs_verbose("hls on_publish success."); + srs_verbose("fmle hls on_publish success."); while (true) { // switch to other st-threads. @@ -473,7 +483,7 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) SrsCommonMessage* msg = NULL; if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { - srs_error("recv identify client message failed. ret=%d", ret); + srs_error("fmle recv identify client message failed. ret=%d", ret); return ret; } @@ -486,9 +496,30 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); } + + // process UnPublish event. + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + srs_error("fmle decode unpublish message failed. ret=%d", ret); + return ret; + } + + SrsPacket* pkt = msg->get_packet(); + if (dynamic_cast(pkt)) { + SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); + if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) { + return ret; + } + return ERROR_CONTROL_REPUBLISH; + } + + srs_trace("fmle ignore AMF0/AMF3 command message."); + continue; + } - if ((ret = process_publish_message(source, msg, is_fmle)) != ERROR_SUCCESS) { - srs_error("process publish message failed. ret=%d", ret); + // video, audio, data message + if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) { + srs_error("fmle process publish message failed. ret=%d", ret); return ret; } } @@ -496,7 +527,69 @@ int SrsClient::publish(SrsSource* source, bool is_fmle) return ret; } -int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle) +int SrsClient::flash_publish(SrsSource* source) +{ + int ret = ERROR_SUCCESS; + + if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != ERROR_SUCCESS) { + srs_error("flash check publish_refer failed. ret=%d", ret); + return ret; + } + srs_verbose("flash check publish_refer success."); + + SrsPithyPrint pithy_print(SRS_STAGE_PUBLISH_USER); + + // notify the hls to prepare when publish start. + if ((ret = source->on_publish(req)) != ERROR_SUCCESS) { + srs_error("flash hls on_publish failed. ret=%d", ret); + return ret; + } + srs_verbose("flash hls on_publish success."); + + while (true) { + // switch to other st-threads. + st_usleep(0); + + SrsCommonMessage* msg = NULL; + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("flash recv identify client message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsCommonMessage, msg, false); + + pithy_print.set_age(msg->header.timestamp); + + // reportable + if (pithy_print.can_print()) { + srs_trace("<- time=%"PRId64", obytes=%"PRId64", ibytes=%"PRId64", okbps=%d, ikbps=%d", + pithy_print.get_age(), rtmp->get_send_bytes(), rtmp->get_recv_bytes(), rtmp->get_send_kbps(), rtmp->get_recv_kbps()); + } + + // process UnPublish event. + if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { + srs_error("flash decode unpublish message failed. ret=%d", ret); + return ret; + } + + // flash unpublish. + // TODO: maybe need to support republish. + srs_trace("flash flash publish finished."); + return ERROR_CONTROL_REPUBLISH; + } + + // video, audio, data message + if ((ret = process_publish_message(source, msg)) != ERROR_SUCCESS) { + srs_error("flash process publish message failed. ret=%d", ret); + return ret; + } + } + + return ret; +} + +int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; @@ -537,29 +630,6 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, return ret; } - // process UnPublish event. - if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { - srs_error("decode unpublish message failed. ret=%d", ret); - return ret; - } - - // flash unpublish. - if (!is_fmle) { - srs_trace("flash publish finished."); - return ret; - } - - SrsPacket* pkt = msg->get_packet(); - if (dynamic_cast(pkt)) { - SrsFMLEStartPacket* unpublish = dynamic_cast(pkt); - return rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id); - } - - srs_trace("ignore AMF0/AMF3 command message."); - return ret; - } - return ret; } diff --git a/trunk/src/app/srs_app_client.hpp b/trunk/src/app/srs_app_client.hpp index e30f3f6d5..655a2e447 100644 --- a/trunk/src/app/srs_app_client.hpp +++ b/trunk/src/app/srs_app_client.hpp @@ -78,8 +78,9 @@ private: virtual int stream_service_cycle(); virtual int check_vhost(); virtual int playing(SrsSource* source); - virtual int publish(SrsSource* source, bool is_fmle); - virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle); + virtual int fmle_publish(SrsSource* source); + virtual int flash_publish(SrsSource* source); + virtual int process_publish_message(SrsSource* source, SrsCommonMessage* msg); virtual int get_peer_ip(); virtual int process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg); private: diff --git a/trunk/src/kernel/srs_kernel_error.cpp b/trunk/src/kernel/srs_kernel_error.cpp index db333e2ea..e2f58b134 100644 --- a/trunk/src/kernel/srs_kernel_error.cpp +++ b/trunk/src/kernel/srs_kernel_error.cpp @@ -25,6 +25,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. bool srs_is_system_control_error(int error_code) { - return error_code == ERROR_CONTROL_RTMP_CLOSE; + return error_code == ERROR_CONTROL_RTMP_CLOSE + || error_code == ERROR_CONTROL_REPUBLISH; } diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 27564c5dc..14f999fac 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -155,6 +155,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // not an error, but special control logic. // sys ctl: rtmp close stream, support replay. #define ERROR_CONTROL_RTMP_CLOSE 900 +// FMLE stop publish and republish. +#define ERROR_CONTROL_REPUBLISH 901 /** * whether the error code is an system control error. diff --git a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp index 2c2c4cb7c..e299304fb 100644 --- a/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp +++ b/trunk/src/rtmp/srs_protocol_rtmp_stack.hpp @@ -59,11 +59,15 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // the timeout to wait client data, when client paused // if timeout, close the connection. #define SRS_PAUSED_SEND_TIMEOUT_US (int64_t)(30*60*1000*1000LL) - -// the timeout to send data to client, when client paused // if timeout, close the connection. #define SRS_PAUSED_RECV_TIMEOUT_US (int64_t)(30*60*1000*1000LL) +// the timeout to wait encoder to republish +// if timeout, close the connection. +#define SRS_REPUBLISH_SEND_TIMEOUT_US (int64_t)(3*60*1000*1000LL) +// if timeout, close the connection. +#define SRS_REPUBLISH_RECV_TIMEOUT_US (int64_t)(3*60*1000*1000LL) + // when stream is busy, for example, streaming is already // publishing, when a new client to request to publish, // sleep a while and close the connection.