diff --git a/trunk/configure b/trunk/configure index 698ee8f73..ba8b1b4fa 100755 --- a/trunk/configure +++ b/trunk/configure @@ -116,7 +116,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server" "srs_core_handshake" "srs_core_pithy_print" "srs_core_config" "srs_core_refer" "srs_core_reload" "srs_core_hls" "srs_core_forward" "srs_core_encoder" - "srs_core_http" "srs_core_thread") + "srs_core_http" "srs_core_thread" "srs_core_bandwidth") MODULE_DIR="src/core" . auto/modules.sh CORE_OBJS="${MODULE_OBJS[@]}" diff --git a/trunk/src/core/srs_core.cpp b/trunk/src/core/srs_core.cpp old mode 100755 new mode 100644 diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp old mode 100755 new mode 100644 index bcb795a30..ce3bfadc5 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -107,4 +107,15 @@ extern void srs_vhost_resolve(std::string& vhost, std::string& app); // close the netfd, and close the underlayer fd. extern void srs_close_stfd(st_netfd_t& stfd); +/** +* disable copy constructor of class +*/ +#define disable_default_copy(className)\ + private:\ + /** \ + * disable the copy constructor and operator=, donot allow directly copy. \ + */ \ + className(const className&); \ + className& operator= (const className&) + #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_bandwidth.cpp b/trunk/src/core/srs_core_bandwidth.cpp new file mode 100644 index 000000000..caca441d0 --- /dev/null +++ b/trunk/src/core/srs_core_bandwidth.cpp @@ -0,0 +1,444 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 wenjiegit + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include + +SrsBandwidth::SrsBandwidth() +{ +} + +SrsBandwidth::~SrsBandwidth() +{ +} + +int SrsBandwidth::bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmp* _rtmp) +{ + int ret = ERROR_SUCCESS; + + rtmp = _rtmp; + req = _req; + + if (!config->get_bw_check_enabled(req->vhost)) { + return ret; + } + + // validate the bandwidth check key + std::string key = "key=" + config->get_bw_check_key(req->vhost); + if (req->tcUrl.find(key) == std::string::npos) { + ret = ERROR_SYSTEM_BANDWIDTH_KEY; + srs_error("check the vhost=%s %s failed, tcUrl=%s, ret=%d", + req->vhost.c_str(), key.c_str(), req->tcUrl.c_str(), ret); + return ret; + } + + // shared global last check time, + // to avoid attach by bandwidth check, + // if client request check in the window(specifeid by interval), + // directly reject the request. + static int64_t last_check_time = 0; + int interval_ms = config->get_bw_check_interval_ms(req->vhost); + + int64_t time_now = srs_get_system_time_ms(); + // reject the connection in the interval window. + if (last_check_time > 0 && time_now - last_check_time < interval_ms) { + ret = ERROR_SYSTEM_BANDWIDTH_DENIED; + srs_trace("bandcheck denied, " + "last_check=%"PRId64", now=%"PRId64", interval=%d", + last_check_time, time_now, interval_ms); + + rtmp->response_connect_reject(req, "bandcheck rejected"); + return ret; + } + + // accept and do bandwidth check. + last_check_time = time_now; + + char* local_ip = 0; + if ((ret = get_local_ip(stfd, local_ip)) != ERROR_SUCCESS) { + srs_error("get local ip failed. ret = %d", ret); + return ret; + } + + if ((ret = rtmp->response_connect_app(req, local_ip)) != ERROR_SUCCESS) { + srs_error("response connect app failed. ret=%d", ret); + return ret; + } + + return do_bandwidth_check(); +} + +int SrsBandwidth::get_local_ip(st_netfd_t stfd, char *&local_ip) +{ + int ret = ERROR_SUCCESS; + + int fd = st_netfd_fileno(stfd); + + // discovery client information + sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + if (getsockname(fd, (sockaddr*)&addr, &addrlen) == -1) { + ret = ERROR_SOCKET_GET_LOCAL_IP; + srs_error("discovery local ip information failed. ret=%d", ret); + return ret; + } + srs_verbose("get local ip success."); + + // ip v4 or v6 + char buf[INET6_ADDRSTRLEN]; + memset(buf, 0, sizeof(buf)); + + if ((inet_ntop(addr.sin_family, &addr.sin_addr, buf, sizeof(buf))) == NULL) { + ret = ERROR_SOCKET_GET_LOCAL_IP; + srs_error("convert local ip information failed. ret=%d", ret); + return ret; + } + + local_ip = new char[strlen(buf) + 1]; + strcpy(local_ip, buf); + + srs_verbose("get local ip of client ip=%s, fd=%d", buf, fd); + + return ret; +} + +int SrsBandwidth::do_bandwidth_check() +{ + int ret = ERROR_SUCCESS; + + SrsProtocol* protocol = rtmp->get_protocol(); + + int play_duration_ms = 3000; + int play_interval_ms = 0; + int play_actual_duration_ms = 0; + int play_bytes = 0; + + int publish_duration_ms = 3000; + int publish_interval_ms = 0; + int publish_actual_duration_ms = 0; + int publish_bytes = 0; + + int limit_kbps = config->get_bw_check_limit_kbps(req->vhost); + + int64_t start_time = srs_get_system_time_ms(); + + ret = check_play(play_duration_ms, + play_interval_ms, play_actual_duration_ms, play_bytes, limit_kbps); + if (ret != ERROR_SUCCESS) { + srs_error("band width play check failed. ret=%d", ret); + return ret; + } + + ret = check_publish(publish_duration_ms, + publish_interval_ms, publish_actual_duration_ms, publish_bytes, limit_kbps); + if (ret != ERROR_SUCCESS) { + srs_error("band width publish check failed. ret=%d", ret); + return ret; + } + + int64_t end_time = srs_get_system_time_ms(); + int play_kbps = play_bytes * 8 / play_actual_duration_ms; + int publish_kbps = publish_bytes * 8 / publish_actual_duration_ms; + + // send finished msg + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_finish(); + pkt->data->set("code", new SrsAmf0Number(ERROR_SUCCESS)); + pkt->data->set("start_time", new SrsAmf0Number(start_time)); + pkt->data->set("end_time", new SrsAmf0Number(end_time)); + pkt->data->set("play_kbps", new SrsAmf0Number(play_kbps)); + pkt->data->set("publish_kbps", new SrsAmf0Number(publish_kbps)); + pkt->data->set("play_bytes", new SrsAmf0Number(play_bytes)); + pkt->data->set("play_time", new SrsAmf0Number(play_actual_duration_ms)); + pkt->data->set("publish_bytes", new SrsAmf0Number(publish_bytes)); + pkt->data->set("publish_time", new SrsAmf0Number(publish_actual_duration_ms)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check finish message failed. ret=%d", ret); + return ret; + } + + // if flash, we notice the result, and expect a final packet. + while (true) { + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect final message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get final message succes."); + + if (pkt->is_flash_final()) { + srs_trace("BW check recv flash final response."); + break; + } + } + + srs_trace("BW check finished."); + + return ret; +} + +int SrsBandwidth::check_play( + int duration_ms, int interval_ms, int& actual_duration_ms, + int& play_bytes, int max_play_kbps) +{ + int ret = ERROR_SUCCESS; + + SrsProtocol* protocol = rtmp->get_protocol(); + + if (true) { + // send start play command to client + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_start_play(); + + pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check start play message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check begin."); + } + + while (true) { + // recv client's starting play response + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandwidth message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandwidth message succes."); + + if (pkt->is_starting_play()) { + srs_trace("BW check recv play begin response."); + break; + } + } + + // send play data to client + int64_t current_time = srs_get_system_time_ms(); + int size = 1024; // TODO: FIXME: magic number + char random_data[size]; + memset(random_data, 0x01, size); + + int interval = 0; + while ( (srs_get_system_time_ms() - current_time) < duration_ms ) { + st_usleep(interval); + + // TODO: FIXME: use shared ptr message. + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing(); + + // TODO: FIXME: magic number + for (int i = 0; i < 100; ++i) { + char buf[32]; // TODO: FIXME: magic number + sprintf(buf, "%d", i); + pkt->data->set(buf, new SrsAmf0String(random_data)); + } + + // TODO: FIXME: get length from the rtmp protocol stack. + play_bytes += pkt->get_payload_length(); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check play messages failed. ret=%d", ret); + return ret; + } + + // sleep while current kbps <= max_play_kbps + int kbps = 0; + while (true) { + if(srs_get_system_time_ms() - current_time != 0) + kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_time); + + if (kbps > max_play_kbps) { + st_usleep(500); + } else { + break; + } + } + } + actual_duration_ms = srs_get_system_time_ms() - current_time; + srs_trace("BW check send play bytes over."); + + if (true) { + // notify client to stop play + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_play(); + pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms)); + pkt->data->set("bytes_delta", new SrsAmf0Number(play_bytes)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check stop play message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check stop play bytes."); + } + + while (true) { + // recv client's stop play response. + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandwidth message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandwidth message succes."); + + if (pkt->is_stopped_play()) { + srs_trace("BW check recv stop play response."); + break; + } + } + + return ret; +} + +int SrsBandwidth::check_publish( + int duration_ms, int interval_ms, int& actual_duration_ms, + int& publish_bytes, int max_pub_kbps) +{ + int ret = ERROR_SUCCESS; + + SrsProtocol* protocol = rtmp->get_protocol(); + + if (true) { + // notify client to start publish + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_start_publish(); + + pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check start publish message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check publish begin."); + } + + while (true) { + // read client's notification of starting publish + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandwidth message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandwidth message succes."); + + if (pkt->is_starting_publish()) { + srs_trace("BW check recv publish begin response."); + break; + } + } + + // recv publish msgs until @duration_ms ms + int64_t current_time = srs_get_system_time_ms(); + while ( (srs_get_system_time_ms() - current_time) < duration_ms ) { + st_usleep(0); + + SrsCommonMessage* msg = NULL; + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + + // TODO: FIXME. + publish_bytes += msg->header.payload_length; + + int kbps = 0; + while (true) { + if(srs_get_system_time_ms() - current_time != 0) + kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time); + + if (kbps > max_pub_kbps) { + st_usleep(500); + } else { + break; + } + } + } + actual_duration_ms = srs_get_system_time_ms() - current_time; + srs_trace("BW check recv publish data over."); + + if (true) { + // notify client to stop publish + SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_publish(); + pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms)); + pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms)); + pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms)); + pkt->data->set("bytes_delta", new SrsAmf0Number(publish_bytes)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send bandwidth check stop publish message failed. ret=%d", ret); + return ret; + } + srs_trace("BW check stop publish bytes."); + } + + // expect client to stop publish + // if flash client, we never expect the client stop publish bytes, + // for the flash send call packet to test publish bandwidth, + // there are many many packets in the queue. + // we just ignore the packet and send the bandwidth test data. + // TODO: FIXME: check whether flash client. + while (false) { + // recv client's stop publish response. + SrsCommonMessage* msg = NULL; + SrsBandwidthPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect bandwidth message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get bandwidth message succes."); + + if (pkt->is_stopped_publish()) { + srs_trace("BW check recv stop publish response."); + break; + } + } + + return ret; +} diff --git a/trunk/src/core/srs_core_bandwidth.hpp b/trunk/src/core/srs_core_bandwidth.hpp new file mode 100644 index 000000000..a170855dd --- /dev/null +++ b/trunk/src/core/srs_core_bandwidth.hpp @@ -0,0 +1,90 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 wenjiegit + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_CORE_BANDWIDTH_HPP +#define SRS_CORE_BANDWIDTH_HPP + +/* +#include +*/ +#include + +class SrsRequest; +class SrsRtmp; + +/** +* bandwidth test agent which provides the interfaces for bandwidth check. +* 1. if vhost disabled bandwidth check, ignore. +* 2. otherwise, check the key, error if verify failed. +* 3. check the interval limit, error if bandwidth in the interval window. +* 4. check the bandwidth under the max kbps. +* 5. send the bandwidth data to client. +* bandwidth workflow: +* +------------+ +----------+ +* | Client | | Server | +* +-----+------+ +-----+----+ +* | | +* | connect vhost------> | if vhost enable bandwidth, +* | <-----result(success) | do bandwidth check. +* | | +* | <----call(start play) | onSrsBandCheckStartPlayBytes +* | result(playing)-----> | onSrsBandCheckStartingPlayBytes +* | <-------data(playing) | onSrsBandCheckStartingPlayBytes +* | <-----call(stop play) | onSrsBandCheckStopPlayBytes +* | result(stopped)-----> | onSrsBandCheckStoppedPlayBytes +* | | +* | <-call(start publish) | onSrsBandCheckStartPublishBytes +* | result(publishing)--> | onSrsBandCheckStartingPublishBytes +* | data(publishing)----> | onSrsBandCheckStartingPublishBytes +* | <--call(stop publish) | onSrsBandCheckStopPublishBytes +* | result(stopped)-----> | onSrsBandCheckStoppedPublishBytes(1) +* | | +* | <--------------report | +* | | +* 1. when flash client, server ignore the publish stopped result. +* and flash client should close connection when got the report. +*/ +class SrsBandwidth +{ +private: + SrsRequest* req; + SrsRtmp* rtmp; +public: + SrsBandwidth(); + virtual ~SrsBandwidth(); +public: + /** + * do the bandwidth test. + */ + virtual int bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmp* _rtmp); +private: + virtual int get_local_ip(st_netfd_t stfd, char *&local_ip); + /** + * used to process band width check from client. + */ + virtual int do_bandwidth_check(); + virtual int check_play(int duration_ms, int interval_ms, int& actual_duration_ms, int& play_bytes, int max_play_kbps); + virtual int check_publish(int duration_ms, int interval_ms, int& actual_duration_ms, int& publish_bytes, int max_pub_kbps); +}; + +#endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 9f778799b..e1901c248 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -40,6 +40,7 @@ using namespace std; #include #include #include +#include #define SRS_PULSE_TIMEOUT_MS 100 #define SRS_SEND_TIMEOUT_US 5000000L @@ -57,6 +58,7 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) #ifdef SRS_HTTP http_hooks = new SrsHttpHooks(); #endif + bandwidth = new SrsBandwidth(); config->subscribe(this); } @@ -73,6 +75,7 @@ SrsClient::~SrsClient() #ifdef SRS_HTTP srs_freep(http_hooks); #endif + srs_freep(bandwidth); } // TODO: return detail message when error for client. @@ -152,7 +155,12 @@ int SrsClient::service_cycle() return ret; } srs_verbose("set peer bandwidth success"); - + + // do bandwidth test if connect to the vhost which is for bandwidth check. + if (config->get_bw_check_enabled(req->vhost)) { + return bandwidth->bandwidth_test(req, stfd, rtmp); + } + if ((ret = rtmp->response_connect_app(req)) != ERROR_SUCCESS) { srs_error("response connect app failed. ret=%d", ret); return ret; diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 1c7c7f044..f8aaee87c 100644 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -43,6 +43,7 @@ class SrsCommonMessage; #ifdef SRS_HTTP class SrsHttpHooks; #endif +class SrsBandwidth; /** * the client provides the main logic control for RTMP clients. @@ -58,6 +59,7 @@ private: #ifdef SRS_HTTP SrsHttpHooks* http_hooks; #endif + SrsBandwidth* bandwidth; public: SrsClient(SrsServer* srs_server, st_netfd_t client_stfd); virtual ~SrsClient(); diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp old mode 100755 new mode 100644 index 897746b5b..ed83a46d6 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -1510,6 +1510,90 @@ int SrsConfig::get_pithy_print_hls() return ::atoi(pithy->arg0().c_str()); } +bool SrsConfig::get_bw_check_enabled(const string &vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return false; + } + + conf = conf->get("bandcheck"); + if (!conf) { + return false; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0() != "on") { + return false; + } + + return true; +} + +string SrsConfig::get_bw_check_key(const string &vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return ""; + } + + conf = conf->get("bandcheck"); + if (!conf) { + return ""; + } + + conf = conf->get("key"); + if (!conf) { + return ""; + } + + return conf->arg0(); +} + +int SrsConfig::get_bw_check_interval_ms(const string &vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL; + } + + conf = conf->get("bandcheck"); + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL; + } + + conf = conf->get("interval_ms"); + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL; + } + + return ::atoi(conf->arg0().c_str()) * 1000; +} + +int SrsConfig::get_bw_check_limit_kbps(const string &vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS; + } + + conf = conf->get("bandcheck"); + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS; + } + + conf = conf->get("limit_kbps"); + if (!conf) { + return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS; + } + + return ::atoi(conf->arg0().c_str()); +} + int SrsConfig::get_pithy_print_encoder() { SrsConfDirective* pithy = root->get("encoder"); diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp old mode 100755 new mode 100644 index e9f2e3977..3c7dd3b65 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -52,6 +52,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define SRS_CONF_DEFAULT_QUEUE_LENGTH 30 // in seconds, the paused queue length. #define SRS_CONF_DEFAULT_PAUSED_LENGTH 10 +// the interval in seconds for bandwidth check +#define SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL 30 +// the interval in seconds for bandwidth check +#define SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS 1000 #define SRS_CONF_DEFAULT_CHUNK_SIZE 4096 @@ -168,6 +172,10 @@ public: virtual int get_pithy_print_encoder(); virtual int get_pithy_print_hls(); virtual int get_pithy_print_play(); + virtual bool get_bw_check_enabled(const std::string& vhost); + virtual std::string get_bw_check_key(const std::string& vhost); + virtual int get_bw_check_interval_ms(const std::string& vhost); + virtual int get_bw_check_limit_kbps(const std::string& vhost); }; /** diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 55372e6d1..15af170a3 100644 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -51,6 +51,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SOCKET_WRITE 209 #define ERROR_SOCKET_WAIT 210 #define ERROR_SOCKET_TIMEOUT 211 +#define ERROR_SOCKET_GET_LOCAL_IP 212 #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 @@ -85,6 +86,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_IP_INVALID 411 #define ERROR_SYSTEM_FORWARD_LOOP 412 #define ERROR_SYSTEM_WAITPID 413 +#define ERROR_SYSTEM_BANDWIDTH_KEY 414 +#define ERROR_SYSTEM_BANDWIDTH_DENIED 415 // see librtmp. // failed when open ssl create the dh diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index a1e95547b..6b4475302 100644 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -31,6 +31,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +using namespace std; + /**************************************************************************** ***************************************************************************** ****************************************************************************/ @@ -208,6 +210,33 @@ messages. #define RTMP_AMF0_DATA_SET_DATAFRAME "@setDataFrame" #define RTMP_AMF0_DATA_ON_METADATA "onMetaData" +/** +* band width check method name, which will be invoked by client. +* band width check mothods use SrsBandwidthPacket as its internal packet type, +* so ensure you set command name when you use it. +*/ +// server play control +#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes" +#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes" +#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes" +#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes" + +// server publish control +#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes" +#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes" +#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes" +#define SRS_BW_CHECK_STOPPED_PUBLISH "onSrsBandCheckStoppedPublishBytes" + +// EOF control. +#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished" +// for flash, it will sendout a final call, +// used to confirm got the report. +#define SRS_BW_CHECK_FLASH_FINAL "finalClientPacket" + +// client only +#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying" +#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing" + /**************************************************************************** ***************************************************************************** ****************************************************************************/ @@ -283,7 +312,7 @@ SrsProtocol::~SrsProtocol() srs_freep(skt); } -std::string SrsProtocol::get_request_name(double transcationId) +string SrsProtocol::get_request_name(double transcationId) { if (requests.find(transcationId) == requests.end()) { return ""; @@ -1318,7 +1347,21 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol) srs_info("decode the AMF0/AMF3 data(onMetaData message)."); packet = new SrsOnMetaDataPacket(); return packet->decode(stream); - } + } else if(command == SRS_BW_CHECK_FINISHED + || command == SRS_BW_CHECK_PLAYING + || command == SRS_BW_CHECK_PUBLISHING + || command == SRS_BW_CHECK_STARTING_PLAY + || command == SRS_BW_CHECK_STARTING_PUBLISH + || command == SRS_BW_CHECK_START_PLAY + || command == SRS_BW_CHECK_START_PUBLISH + || command == SRS_BW_CHECK_STOPPED_PLAY + || command == SRS_BW_CHECK_STOP_PLAY + || command == SRS_BW_CHECK_STOP_PUBLISH) + { + srs_info("decode the AMF0/AMF3 band width check message."); + packet = new SrsBandwidthPacket(); + return packet->decode(stream); + } // default packet to drop message. srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str()); @@ -1370,7 +1413,7 @@ int SrsCommonMessage::get_perfer_cid() return packet->get_perfer_cid(); } -void SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id) +SrsCommonMessage* SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id) { srs_freep(packet); @@ -1379,6 +1422,8 @@ void SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id) header.message_type = packet->get_message_type(); header.payload_length = packet->get_payload_length(); header.stream_id = stream_id; + + return this; } int SrsCommonMessage::encode_packet() @@ -1782,8 +1827,17 @@ int SrsConnectAppResPacket::get_message_type() int SrsConnectAppResPacket::get_size() { - return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() - + srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info); + int size = srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size(); + + if (props->size() > 0) { + size += srs_amf0_get_object_size(props); + } + + if (info->size() > 0) { + size += srs_amf0_get_object_size(info); + } + + return size; } int SrsConnectAppResPacket::encode_packet(SrsStream* stream) @@ -1802,16 +1856,22 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream) } srs_verbose("encode transaction_id success."); - if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { - srs_error("encode props failed. ret=%d", ret); - return ret; - } + if (props->size() > 0) { + if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) { + srs_error("encode props failed. ret=%d", ret); + return ret; + } + } + srs_verbose("encode props success."); - if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { - srs_error("encode info failed. ret=%d", ret); - return ret; - } + if (info->size() > 0) { + if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) { + srs_error("encode info failed. ret=%d", ret); + return ret; + } + } + srs_verbose("encode info success."); srs_info("encode connect app response packet success."); @@ -2596,6 +2656,163 @@ int SrsOnStatusCallPacket::encode_packet(SrsStream* stream) return ret; } +SrsBandwidthPacket::SrsBandwidthPacket() +{ + command_name = RTMP_AMF0_COMMAND_ON_STATUS; + transaction_id = 0; + args = new SrsAmf0Null(); + data = new SrsAmf0Object(); +} + +SrsBandwidthPacket::~SrsBandwidthPacket() +{ + srs_freep(args); + srs_freep(data); +} + +int SrsBandwidthPacket::get_perfer_cid() +{ + return RTMP_CID_OverStream; +} + +int SrsBandwidthPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsBandwidthPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_null_size() + srs_amf0_get_object_size(data); +} + +int SrsBandwidthPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) { + srs_error("encode args failed. ret=%d", ret); + return ret; + } + srs_verbose("encode args success.");; + + if ((ret = srs_amf0_write_object(stream, data)) != ERROR_SUCCESS) { + srs_error("encode data failed. ret=%d", ret); + return ret; + } + srs_verbose("encode data success."); + + srs_info("encode onStatus(Call) packet success."); + + return ret; +} + +int SrsBandwidthPacket::decode(SrsStream *stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_name failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode play transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode play command_object failed. ret=%d", ret); + return ret; + } + + // @remark, for bandwidth test, ignore the data field. + + srs_info("decode SrsBandwidthPacket success."); + + return ret; +} + +bool SrsBandwidthPacket::is_starting_play() +{ + return command_name == SRS_BW_CHECK_STARTING_PLAY; +} + +bool SrsBandwidthPacket::is_stopped_play() +{ + return command_name == SRS_BW_CHECK_STOPPED_PLAY; +} + +bool SrsBandwidthPacket::is_starting_publish() +{ + return command_name == SRS_BW_CHECK_STARTING_PUBLISH; +} + +bool SrsBandwidthPacket::is_stopped_publish() +{ + return command_name == SRS_BW_CHECK_STOPPED_PUBLISH; +} + +bool SrsBandwidthPacket::is_flash_final() +{ + return command_name == SRS_BW_CHECK_FLASH_FINAL; +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_finish() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_FINISHED); +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_start_play() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_START_PLAY); +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_playing() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_STARTING_PLAY); +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_stop_play() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_STOP_PLAY); +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_start_publish() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_START_PUBLISH); +} + +SrsBandwidthPacket* SrsBandwidthPacket::create_stop_publish() +{ + SrsBandwidthPacket* pkt = new SrsBandwidthPacket(); + return pkt->set_command(SRS_BW_CHECK_STOP_PUBLISH); +} + +SrsBandwidthPacket* SrsBandwidthPacket::set_command(string command) +{ + command_name = command; + + return this; +} + SrsOnStatusDataPacket::SrsOnStatusDataPacket() { command_name = RTMP_AMF0_COMMAND_ON_STATUS; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 733b82b31..490d7cb94 100644 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -310,6 +310,7 @@ class SrsCommonMessage : public ISrsMessage { private: typedef ISrsMessage super; + disable_default_copy(SrsCommonMessage); // decoded message payload. private: SrsStream* stream; @@ -345,9 +346,10 @@ public: * set the encoded packet to encode_packet() to payload. * @stream_id, the id of stream which is created by createStream. * @remark, user never free the pkt, the message will auto free it. + * @return message itself. */ // TODO: refine the send methods. - virtual void set_packet(SrsPacket* pkt, int stream_id); + virtual SrsCommonMessage* set_packet(SrsPacket* pkt, int stream_id); /** * encode the packet to message payload bytes. * @remark there exists empty packet, so maybe the payload is NULL. @@ -831,6 +833,55 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* the special packet for the bandwidth test. +* actually, it's a SrsOnStatusCallPacket, but +* 1. encode with data field, to send data to client. +* 2. decode ignore the data field, donot care. +*/ +class SrsBandwidthPacket : public SrsPacket +{ +private: + typedef SrsPacket super; + disable_default_copy(SrsBandwidthPacket); +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsBandwidthPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* args; + SrsAmf0Object* data; +public: + SrsBandwidthPacket(); + virtual ~SrsBandwidthPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +public: + virtual int decode(SrsStream* stream); +public: + virtual bool is_starting_play(); + virtual bool is_stopped_play(); + virtual bool is_starting_publish(); + virtual bool is_stopped_publish(); + virtual bool is_flash_final(); + static SrsBandwidthPacket* create_finish(); + static SrsBandwidthPacket* create_start_play(); + static SrsBandwidthPacket* create_playing(); + static SrsBandwidthPacket* create_stop_play(); + static SrsBandwidthPacket* create_start_publish(); + static SrsBandwidthPacket* create_stop_publish(); +private: + virtual SrsBandwidthPacket* set_command(std::string command); +}; + /** * onStatus data, AMF0 Data * @remark, user must set the stream_id by SrsMessage.set_packet(). diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index e7de3cf0e..4fce14050 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -51,8 +51,11 @@ using namespace std; #define StatusClientId "clientid" // status value #define StatusLevelStatus "status" +// status error +#define StatusLevelError "error" // code value #define StatusCodeConnectSuccess "NetConnection.Connect.Success" +#define StatusCodeConnectRejected "NetConnection.Connect.Rejected" #define StatusCodeStreamReset "NetStream.Play.Reset" #define StatusCodeStreamStart "NetStream.Play.Start" #define StatusCodeStreamPause "NetStream.Pause.Notify" @@ -577,7 +580,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type) return ret; } -int SrsRtmp::response_connect_app(SrsRequest* req) +int SrsRtmp::response_connect_app(SrsRequest *req, const char* server_ip) { int ret = ERROR_SUCCESS; @@ -606,6 +609,10 @@ int SrsRtmp::response_connect_app(SrsRequest* req) data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT)); data->set("srs_contributor", new SrsAmf0String(RTMP_SIG_SRS_CONTRIBUTOR)); + if (server_ip) { + data->set("srs_server_ip", new SrsAmf0String(server_ip)); + } + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { @@ -614,7 +621,28 @@ int SrsRtmp::response_connect_app(SrsRequest* req) } srs_info("send connect app response message success."); - return ret; + return ret; +} + +void SrsRtmp::response_connect_reject(SrsRequest *req, const char* desc) +{ + int ret = ERROR_SUCCESS; + + SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); + pkt->command_name = "_error"; + pkt->props->set(StatusLevel, new SrsAmf0String(StatusLevelError)); + pkt->props->set(StatusCode, new SrsAmf0String(StatusCodeConnectRejected)); + pkt->props->set(StatusDescription, new SrsAmf0String(desc)); + //pkt->props->set("objectEncoding", new SrsAmf0Number(req->objectEncoding)); + + SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0); + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send connect app response rejected message failed. ret=%d", ret); + return; + } + srs_info("send connect app response rejected message success."); + + return; } int SrsRtmp::on_bw_done() diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 7e17079d6..459e33436 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -167,7 +167,11 @@ public: * using the Limit type field. */ virtual int set_peer_bandwidth(int bandwidth, int type); - virtual int response_connect_app(SrsRequest* req); + /** + * @param server_ip the ip of server. + */ + virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL); + virtual void response_connect_reject(SrsRequest* req, const char* desc); virtual int on_bw_done(); /** * recv some message to identify the client. diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 53f5277c1..06e6aca70 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -10,6 +10,8 @@ file ..\core\srs_core_amf0.cpp, ..\core\srs_core_autofree.hpp, ..\core\srs_core_autofree.cpp, + ..\core\srs_core_bandwidth.hpp, + ..\core\srs_core_bandwidth.cpp, ..\core\srs_core_buffer.hpp, ..\core\srs_core_buffer.cpp, ..\core\srs_core_client.hpp,