merge from wenjie, support banwidth test.

pull/133/head
winlin 11 years ago
parent df96b237b8
commit 9455181790

2
trunk/configure vendored

@ -116,7 +116,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server"
"srs_core_handshake" "srs_core_pithy_print"
"srs_core_config" "srs_core_refer" "srs_core_reload"
"srs_core_hls" "srs_core_forward" "srs_core_encoder"
"srs_core_http" "srs_core_thread")
"srs_core_http" "srs_core_thread" "srs_core_bandwidth")
MODULE_DIR="src/core" . auto/modules.sh
CORE_OBJS="${MODULE_OBJS[@]}"

@ -107,4 +107,15 @@ extern void srs_vhost_resolve(std::string& vhost, std::string& app);
// close the netfd, and close the underlayer fd.
extern void srs_close_stfd(st_netfd_t& stfd);
/**
* disable copy constructor of class
*/
#define disable_default_copy(className)\
private:\
/** \
* disable the copy constructor and operator=, donot allow directly copy. \
*/ \
className(const className&); \
className& operator= (const className&)
#endif

@ -0,0 +1,444 @@
/*
The MIT License (MIT)
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <srs_core_bandwidth.hpp>
#include <arpa/inet.h>
using namespace std;
#include <srs_core_rtmp.hpp>
#include <srs_core_error.hpp>
#include <srs_core_amf0.hpp>
#include <srs_core_protocol.hpp>
#include <srs_core_config.hpp>
#include <srs_core_autofree.hpp>
SrsBandwidth::SrsBandwidth()
{
}
SrsBandwidth::~SrsBandwidth()
{
}
int SrsBandwidth::bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmp* _rtmp)
{
int ret = ERROR_SUCCESS;
rtmp = _rtmp;
req = _req;
if (!config->get_bw_check_enabled(req->vhost)) {
return ret;
}
// validate the bandwidth check key
std::string key = "key=" + config->get_bw_check_key(req->vhost);
if (req->tcUrl.find(key) == std::string::npos) {
ret = ERROR_SYSTEM_BANDWIDTH_KEY;
srs_error("check the vhost=%s %s failed, tcUrl=%s, ret=%d",
req->vhost.c_str(), key.c_str(), req->tcUrl.c_str(), ret);
return ret;
}
// shared global last check time,
// to avoid attach by bandwidth check,
// if client request check in the window(specifeid by interval),
// directly reject the request.
static int64_t last_check_time = 0;
int interval_ms = config->get_bw_check_interval_ms(req->vhost);
int64_t time_now = srs_get_system_time_ms();
// reject the connection in the interval window.
if (last_check_time > 0 && time_now - last_check_time < interval_ms) {
ret = ERROR_SYSTEM_BANDWIDTH_DENIED;
srs_trace("bandcheck denied, "
"last_check=%"PRId64", now=%"PRId64", interval=%d",
last_check_time, time_now, interval_ms);
rtmp->response_connect_reject(req, "bandcheck rejected");
return ret;
}
// accept and do bandwidth check.
last_check_time = time_now;
char* local_ip = 0;
if ((ret = get_local_ip(stfd, local_ip)) != ERROR_SUCCESS) {
srs_error("get local ip failed. ret = %d", ret);
return ret;
}
if ((ret = rtmp->response_connect_app(req, local_ip)) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret);
return ret;
}
return do_bandwidth_check();
}
int SrsBandwidth::get_local_ip(st_netfd_t stfd, char *&local_ip)
{
int ret = ERROR_SUCCESS;
int fd = st_netfd_fileno(stfd);
// discovery client information
sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
if (getsockname(fd, (sockaddr*)&addr, &addrlen) == -1) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("discovery local ip information failed. ret=%d", ret);
return ret;
}
srs_verbose("get local ip success.");
// ip v4 or v6
char buf[INET6_ADDRSTRLEN];
memset(buf, 0, sizeof(buf));
if ((inet_ntop(addr.sin_family, &addr.sin_addr, buf, sizeof(buf))) == NULL) {
ret = ERROR_SOCKET_GET_LOCAL_IP;
srs_error("convert local ip information failed. ret=%d", ret);
return ret;
}
local_ip = new char[strlen(buf) + 1];
strcpy(local_ip, buf);
srs_verbose("get local ip of client ip=%s, fd=%d", buf, fd);
return ret;
}
int SrsBandwidth::do_bandwidth_check()
{
int ret = ERROR_SUCCESS;
SrsProtocol* protocol = rtmp->get_protocol();
int play_duration_ms = 3000;
int play_interval_ms = 0;
int play_actual_duration_ms = 0;
int play_bytes = 0;
int publish_duration_ms = 3000;
int publish_interval_ms = 0;
int publish_actual_duration_ms = 0;
int publish_bytes = 0;
int limit_kbps = config->get_bw_check_limit_kbps(req->vhost);
int64_t start_time = srs_get_system_time_ms();
ret = check_play(play_duration_ms,
play_interval_ms, play_actual_duration_ms, play_bytes, limit_kbps);
if (ret != ERROR_SUCCESS) {
srs_error("band width play check failed. ret=%d", ret);
return ret;
}
ret = check_publish(publish_duration_ms,
publish_interval_ms, publish_actual_duration_ms, publish_bytes, limit_kbps);
if (ret != ERROR_SUCCESS) {
srs_error("band width publish check failed. ret=%d", ret);
return ret;
}
int64_t end_time = srs_get_system_time_ms();
int play_kbps = play_bytes * 8 / play_actual_duration_ms;
int publish_kbps = publish_bytes * 8 / publish_actual_duration_ms;
// send finished msg
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_finish();
pkt->data->set("code", new SrsAmf0Number(ERROR_SUCCESS));
pkt->data->set("start_time", new SrsAmf0Number(start_time));
pkt->data->set("end_time", new SrsAmf0Number(end_time));
pkt->data->set("play_kbps", new SrsAmf0Number(play_kbps));
pkt->data->set("publish_kbps", new SrsAmf0Number(publish_kbps));
pkt->data->set("play_bytes", new SrsAmf0Number(play_bytes));
pkt->data->set("play_time", new SrsAmf0Number(play_actual_duration_ms));
pkt->data->set("publish_bytes", new SrsAmf0Number(publish_bytes));
pkt->data->set("publish_time", new SrsAmf0Number(publish_actual_duration_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check finish message failed. ret=%d", ret);
return ret;
}
// if flash, we notice the result, and expect a final packet.
while (true) {
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect final message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get final message succes.");
if (pkt->is_flash_final()) {
srs_trace("BW check recv flash final response.");
break;
}
}
srs_trace("BW check finished.");
return ret;
}
int SrsBandwidth::check_play(
int duration_ms, int interval_ms, int& actual_duration_ms,
int& play_bytes, int max_play_kbps)
{
int ret = ERROR_SUCCESS;
SrsProtocol* protocol = rtmp->get_protocol();
if (true) {
// send start play command to client
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_start_play();
pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check begin.");
}
while (true) {
// recv client's starting play response
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandwidth message succes.");
if (pkt->is_starting_play()) {
srs_trace("BW check recv play begin response.");
break;
}
}
// send play data to client
int64_t current_time = srs_get_system_time_ms();
int size = 1024; // TODO: FIXME: magic number
char random_data[size];
memset(random_data, 0x01, size);
int interval = 0;
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(interval);
// TODO: FIXME: use shared ptr message.
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_playing();
// TODO: FIXME: magic number
for (int i = 0; i < 100; ++i) {
char buf[32]; // TODO: FIXME: magic number
sprintf(buf, "%d", i);
pkt->data->set(buf, new SrsAmf0String(random_data));
}
// TODO: FIXME: get length from the rtmp protocol stack.
play_bytes += pkt->get_payload_length();
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check play messages failed. ret=%d", ret);
return ret;
}
// sleep while current kbps <= max_play_kbps
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = play_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_play_kbps) {
st_usleep(500);
} else {
break;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check send play bytes over.");
if (true) {
// notify client to stop play
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_play();
pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
pkt->data->set("bytes_delta", new SrsAmf0Number(play_bytes));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop play message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop play bytes.");
}
while (true) {
// recv client's stop play response.
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandwidth message succes.");
if (pkt->is_stopped_play()) {
srs_trace("BW check recv stop play response.");
break;
}
}
return ret;
}
int SrsBandwidth::check_publish(
int duration_ms, int interval_ms, int& actual_duration_ms,
int& publish_bytes, int max_pub_kbps)
{
int ret = ERROR_SUCCESS;
SrsProtocol* protocol = rtmp->get_protocol();
if (true) {
// notify client to start publish
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_start_publish();
pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check start publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check publish begin.");
}
while (true) {
// read client's notification of starting publish
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandwidth message succes.");
if (pkt->is_starting_publish()) {
srs_trace("BW check recv publish begin response.");
break;
}
}
// recv publish msgs until @duration_ms ms
int64_t current_time = srs_get_system_time_ms();
while ( (srs_get_system_time_ms() - current_time) < duration_ms ) {
st_usleep(0);
SrsCommonMessage* msg = NULL;
if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) {
srs_error("recv message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
// TODO: FIXME.
publish_bytes += msg->header.payload_length;
int kbps = 0;
while (true) {
if(srs_get_system_time_ms() - current_time != 0)
kbps = publish_bytes * 8 / (srs_get_system_time_ms() - current_time);
if (kbps > max_pub_kbps) {
st_usleep(500);
} else {
break;
}
}
}
actual_duration_ms = srs_get_system_time_ms() - current_time;
srs_trace("BW check recv publish data over.");
if (true) {
// notify client to stop publish
SrsBandwidthPacket* pkt = SrsBandwidthPacket::create_stop_publish();
pkt->data->set("duration_ms", new SrsAmf0Number(duration_ms));
pkt->data->set("interval_ms", new SrsAmf0Number(interval_ms));
pkt->data->set("duration_delta", new SrsAmf0Number(actual_duration_ms));
pkt->data->set("bytes_delta", new SrsAmf0Number(publish_bytes));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send bandwidth check stop publish message failed. ret=%d", ret);
return ret;
}
srs_trace("BW check stop publish bytes.");
}
// expect client to stop publish
// if flash client, we never expect the client stop publish bytes,
// for the flash send call packet to test publish bandwidth,
// there are many many packets in the queue.
// we just ignore the packet and send the bandwidth test data.
// TODO: FIXME: check whether flash client.
while (false) {
// recv client's stop publish response.
SrsCommonMessage* msg = NULL;
SrsBandwidthPacket* pkt = NULL;
if ((ret = srs_rtmp_expect_message<SrsBandwidthPacket>(protocol, &msg, &pkt)) != ERROR_SUCCESS) {
srs_error("expect bandwidth message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsCommonMessage, msg, false);
srs_info("get bandwidth message succes.");
if (pkt->is_stopped_publish()) {
srs_trace("BW check recv stop publish response.");
break;
}
}
return ret;
}

@ -0,0 +1,90 @@
/*
The MIT License (MIT)
Copyright (c) 2013 wenjiegit
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SRS_CORE_BANDWIDTH_HPP
#define SRS_CORE_BANDWIDTH_HPP
/*
#include <srs_core_bandwidth.hpp>
*/
#include <srs_core.hpp>
class SrsRequest;
class SrsRtmp;
/**
* bandwidth test agent which provides the interfaces for bandwidth check.
* 1. if vhost disabled bandwidth check, ignore.
* 2. otherwise, check the key, error if verify failed.
* 3. check the interval limit, error if bandwidth in the interval window.
* 4. check the bandwidth under the max kbps.
* 5. send the bandwidth data to client.
* bandwidth workflow:
* +------------+ +----------+
* | Client | | Server |
* +-----+------+ +-----+----+
* | |
* | connect vhost------> | if vhost enable bandwidth,
* | <-----result(success) | do bandwidth check.
* | |
* | <----call(start play) | onSrsBandCheckStartPlayBytes
* | result(playing)-----> | onSrsBandCheckStartingPlayBytes
* | <-------data(playing) | onSrsBandCheckStartingPlayBytes
* | <-----call(stop play) | onSrsBandCheckStopPlayBytes
* | result(stopped)-----> | onSrsBandCheckStoppedPlayBytes
* | |
* | <-call(start publish) | onSrsBandCheckStartPublishBytes
* | result(publishing)--> | onSrsBandCheckStartingPublishBytes
* | data(publishing)----> | onSrsBandCheckStartingPublishBytes
* | <--call(stop publish) | onSrsBandCheckStopPublishBytes
* | result(stopped)-----> | onSrsBandCheckStoppedPublishBytes(1)
* | |
* | <--------------report |
* | <END> |
* 1. when flash client, server ignore the publish stopped result.
* and flash client should close connection when got the report.
*/
class SrsBandwidth
{
private:
SrsRequest* req;
SrsRtmp* rtmp;
public:
SrsBandwidth();
virtual ~SrsBandwidth();
public:
/**
* do the bandwidth test.
*/
virtual int bandwidth_test(SrsRequest* _req, st_netfd_t stfd, SrsRtmp* _rtmp);
private:
virtual int get_local_ip(st_netfd_t stfd, char *&local_ip);
/**
* used to process band width check from client.
*/
virtual int do_bandwidth_check();
virtual int check_play(int duration_ms, int interval_ms, int& actual_duration_ms, int& play_bytes, int max_play_kbps);
virtual int check_publish(int duration_ms, int interval_ms, int& actual_duration_ms, int& publish_bytes, int max_pub_kbps);
};
#endif

@ -40,6 +40,7 @@ using namespace std;
#include <srs_core_refer.hpp>
#include <srs_core_hls.hpp>
#include <srs_core_http.hpp>
#include <srs_core_bandwidth.hpp>
#define SRS_PULSE_TIMEOUT_MS 100
#define SRS_SEND_TIMEOUT_US 5000000L
@ -57,6 +58,7 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd)
#ifdef SRS_HTTP
http_hooks = new SrsHttpHooks();
#endif
bandwidth = new SrsBandwidth();
config->subscribe(this);
}
@ -73,6 +75,7 @@ SrsClient::~SrsClient()
#ifdef SRS_HTTP
srs_freep(http_hooks);
#endif
srs_freep(bandwidth);
}
// TODO: return detail message when error for client.
@ -153,6 +156,11 @@ int SrsClient::service_cycle()
}
srs_verbose("set peer bandwidth success");
// do bandwidth test if connect to the vhost which is for bandwidth check.
if (config->get_bw_check_enabled(req->vhost)) {
return bandwidth->bandwidth_test(req, stfd, rtmp);
}
if ((ret = rtmp->response_connect_app(req)) != ERROR_SUCCESS) {
srs_error("response connect app failed. ret=%d", ret);
return ret;

@ -43,6 +43,7 @@ class SrsCommonMessage;
#ifdef SRS_HTTP
class SrsHttpHooks;
#endif
class SrsBandwidth;
/**
* the client provides the main logic control for RTMP clients.
@ -58,6 +59,7 @@ private:
#ifdef SRS_HTTP
SrsHttpHooks* http_hooks;
#endif
SrsBandwidth* bandwidth;
public:
SrsClient(SrsServer* srs_server, st_netfd_t client_stfd);
virtual ~SrsClient();

@ -1510,6 +1510,90 @@ int SrsConfig::get_pithy_print_hls()
return ::atoi(pithy->arg0().c_str());
}
bool SrsConfig::get_bw_check_enabled(const string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return false;
}
conf = conf->get("bandcheck");
if (!conf) {
return false;
}
conf = conf->get("enabled");
if (!conf || conf->arg0() != "on") {
return false;
}
return true;
}
string SrsConfig::get_bw_check_key(const string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return "";
}
conf = conf->get("bandcheck");
if (!conf) {
return "";
}
conf = conf->get("key");
if (!conf) {
return "";
}
return conf->arg0();
}
int SrsConfig::get_bw_check_interval_ms(const string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL;
}
conf = conf->get("bandcheck");
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL;
}
conf = conf->get("interval_ms");
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL;
}
return ::atoi(conf->arg0().c_str()) * 1000;
}
int SrsConfig::get_bw_check_limit_kbps(const string &vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS;
}
conf = conf->get("bandcheck");
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS;
}
conf = conf->get("limit_kbps");
if (!conf) {
return SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS;
}
return ::atoi(conf->arg0().c_str());
}
int SrsConfig::get_pithy_print_encoder()
{
SrsConfDirective* pithy = root->get("encoder");

@ -52,6 +52,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define SRS_CONF_DEFAULT_QUEUE_LENGTH 30
// in seconds, the paused queue length.
#define SRS_CONF_DEFAULT_PAUSED_LENGTH 10
// the interval in seconds for bandwidth check
#define SRS_CONF_DEFAULT_BANDWIDTH_INTERVAL 30
// the interval in seconds for bandwidth check
#define SRS_CONF_DEFAULT_BANDWIDTH_LIMIT_KBPS 1000
#define SRS_CONF_DEFAULT_CHUNK_SIZE 4096
@ -168,6 +172,10 @@ public:
virtual int get_pithy_print_encoder();
virtual int get_pithy_print_hls();
virtual int get_pithy_print_play();
virtual bool get_bw_check_enabled(const std::string& vhost);
virtual std::string get_bw_check_key(const std::string& vhost);
virtual int get_bw_check_interval_ms(const std::string& vhost);
virtual int get_bw_check_limit_kbps(const std::string& vhost);
};
/**

@ -51,6 +51,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SOCKET_WRITE 209
#define ERROR_SOCKET_WAIT 210
#define ERROR_SOCKET_TIMEOUT 211
#define ERROR_SOCKET_GET_LOCAL_IP 212
#define ERROR_RTMP_PLAIN_REQUIRED 300
#define ERROR_RTMP_CHUNK_START 301
@ -85,6 +86,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#define ERROR_SYSTEM_IP_INVALID 411
#define ERROR_SYSTEM_FORWARD_LOOP 412
#define ERROR_SYSTEM_WAITPID 413
#define ERROR_SYSTEM_BANDWIDTH_KEY 414
#define ERROR_SYSTEM_BANDWIDTH_DENIED 415
// see librtmp.
// failed when open ssl create the dh

@ -31,6 +31,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core_stream.hpp>
#include <srs_core_autofree.hpp>
using namespace std;
/****************************************************************************
*****************************************************************************
****************************************************************************/
@ -208,6 +210,33 @@ messages.
#define RTMP_AMF0_DATA_SET_DATAFRAME "@setDataFrame"
#define RTMP_AMF0_DATA_ON_METADATA "onMetaData"
/**
* band width check method name, which will be invoked by client.
* band width check mothods use SrsBandwidthPacket as its internal packet type,
* so ensure you set command name when you use it.
*/
// server play control
#define SRS_BW_CHECK_START_PLAY "onSrsBandCheckStartPlayBytes"
#define SRS_BW_CHECK_STARTING_PLAY "onSrsBandCheckStartingPlayBytes"
#define SRS_BW_CHECK_STOP_PLAY "onSrsBandCheckStopPlayBytes"
#define SRS_BW_CHECK_STOPPED_PLAY "onSrsBandCheckStoppedPlayBytes"
// server publish control
#define SRS_BW_CHECK_START_PUBLISH "onSrsBandCheckStartPublishBytes"
#define SRS_BW_CHECK_STARTING_PUBLISH "onSrsBandCheckStartingPublishBytes"
#define SRS_BW_CHECK_STOP_PUBLISH "onSrsBandCheckStopPublishBytes"
#define SRS_BW_CHECK_STOPPED_PUBLISH "onSrsBandCheckStoppedPublishBytes"
// EOF control.
#define SRS_BW_CHECK_FINISHED "onSrsBandCheckFinished"
// for flash, it will sendout a final call,
// used to confirm got the report.
#define SRS_BW_CHECK_FLASH_FINAL "finalClientPacket"
// client only
#define SRS_BW_CHECK_PLAYING "onSrsBandCheckPlaying"
#define SRS_BW_CHECK_PUBLISHING "onSrsBandCheckPublishing"
/****************************************************************************
*****************************************************************************
****************************************************************************/
@ -283,7 +312,7 @@ SrsProtocol::~SrsProtocol()
srs_freep(skt);
}
std::string SrsProtocol::get_request_name(double transcationId)
string SrsProtocol::get_request_name(double transcationId)
{
if (requests.find(transcationId) == requests.end()) {
return "";
@ -1318,7 +1347,21 @@ int SrsCommonMessage::decode_packet(SrsProtocol* protocol)
srs_info("decode the AMF0/AMF3 data(onMetaData message).");
packet = new SrsOnMetaDataPacket();
return packet->decode(stream);
}
} else if(command == SRS_BW_CHECK_FINISHED
|| command == SRS_BW_CHECK_PLAYING
|| command == SRS_BW_CHECK_PUBLISHING
|| command == SRS_BW_CHECK_STARTING_PLAY
|| command == SRS_BW_CHECK_STARTING_PUBLISH
|| command == SRS_BW_CHECK_START_PLAY
|| command == SRS_BW_CHECK_START_PUBLISH
|| command == SRS_BW_CHECK_STOPPED_PLAY
|| command == SRS_BW_CHECK_STOP_PLAY
|| command == SRS_BW_CHECK_STOP_PUBLISH)
{
srs_info("decode the AMF0/AMF3 band width check message.");
packet = new SrsBandwidthPacket();
return packet->decode(stream);
}
// default packet to drop message.
srs_trace("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
@ -1370,7 +1413,7 @@ int SrsCommonMessage::get_perfer_cid()
return packet->get_perfer_cid();
}
void SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id)
SrsCommonMessage* SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id)
{
srs_freep(packet);
@ -1379,6 +1422,8 @@ void SrsCommonMessage::set_packet(SrsPacket* pkt, int stream_id)
header.message_type = packet->get_message_type();
header.payload_length = packet->get_payload_length();
header.stream_id = stream_id;
return this;
}
int SrsCommonMessage::encode_packet()
@ -1782,8 +1827,17 @@ int SrsConnectAppResPacket::get_message_type()
int SrsConnectAppResPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_object_size(props)+ srs_amf0_get_object_size(info);
int size = srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size();
if (props->size() > 0) {
size += srs_amf0_get_object_size(props);
}
if (info->size() > 0) {
size += srs_amf0_get_object_size(info);
}
return size;
}
int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
@ -1802,16 +1856,22 @@ int SrsConnectAppResPacket::encode_packet(SrsStream* stream)
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) {
srs_error("encode props failed. ret=%d", ret);
return ret;
}
if (props->size() > 0) {
if ((ret = srs_amf0_write_object(stream, props)) != ERROR_SUCCESS) {
srs_error("encode props failed. ret=%d", ret);
return ret;
}
}
srs_verbose("encode props success.");
if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) {
srs_error("encode info failed. ret=%d", ret);
return ret;
}
if (info->size() > 0) {
if ((ret = srs_amf0_write_object(stream, info)) != ERROR_SUCCESS) {
srs_error("encode info failed. ret=%d", ret);
return ret;
}
}
srs_verbose("encode info success.");
srs_info("encode connect app response packet success.");
@ -2596,6 +2656,163 @@ int SrsOnStatusCallPacket::encode_packet(SrsStream* stream)
return ret;
}
SrsBandwidthPacket::SrsBandwidthPacket()
{
command_name = RTMP_AMF0_COMMAND_ON_STATUS;
transaction_id = 0;
args = new SrsAmf0Null();
data = new SrsAmf0Object();
}
SrsBandwidthPacket::~SrsBandwidthPacket()
{
srs_freep(args);
srs_freep(data);
}
int SrsBandwidthPacket::get_perfer_cid()
{
return RTMP_CID_OverStream;
}
int SrsBandwidthPacket::get_message_type()
{
return RTMP_MSG_AMF0CommandMessage;
}
int SrsBandwidthPacket::get_size()
{
return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size()
+ srs_amf0_get_null_size() + srs_amf0_get_object_size(data);
}
int SrsBandwidthPacket::encode_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("encode command_name failed. ret=%d", ret);
return ret;
}
srs_verbose("encode command_name success.");
if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("encode transaction_id failed. ret=%d", ret);
return ret;
}
srs_verbose("encode transaction_id success.");
if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
srs_error("encode args failed. ret=%d", ret);
return ret;
}
srs_verbose("encode args success.");;
if ((ret = srs_amf0_write_object(stream, data)) != ERROR_SUCCESS) {
srs_error("encode data failed. ret=%d", ret);
return ret;
}
srs_verbose("encode data success.");
srs_info("encode onStatus(Call) packet success.");
return ret;
}
int SrsBandwidthPacket::decode(SrsStream *stream)
{
int ret = ERROR_SUCCESS;
if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_name failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
srs_error("amf0 decode play transaction_id failed. ret=%d", ret);
return ret;
}
if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
srs_error("amf0 decode play command_object failed. ret=%d", ret);
return ret;
}
// @remark, for bandwidth test, ignore the data field.
srs_info("decode SrsBandwidthPacket success.");
return ret;
}
bool SrsBandwidthPacket::is_starting_play()
{
return command_name == SRS_BW_CHECK_STARTING_PLAY;
}
bool SrsBandwidthPacket::is_stopped_play()
{
return command_name == SRS_BW_CHECK_STOPPED_PLAY;
}
bool SrsBandwidthPacket::is_starting_publish()
{
return command_name == SRS_BW_CHECK_STARTING_PUBLISH;
}
bool SrsBandwidthPacket::is_stopped_publish()
{
return command_name == SRS_BW_CHECK_STOPPED_PUBLISH;
}
bool SrsBandwidthPacket::is_flash_final()
{
return command_name == SRS_BW_CHECK_FLASH_FINAL;
}
SrsBandwidthPacket* SrsBandwidthPacket::create_finish()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_FINISHED);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_start_play()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_START_PLAY);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_playing()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_STARTING_PLAY);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_stop_play()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_STOP_PLAY);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_start_publish()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_START_PUBLISH);
}
SrsBandwidthPacket* SrsBandwidthPacket::create_stop_publish()
{
SrsBandwidthPacket* pkt = new SrsBandwidthPacket();
return pkt->set_command(SRS_BW_CHECK_STOP_PUBLISH);
}
SrsBandwidthPacket* SrsBandwidthPacket::set_command(string command)
{
command_name = command;
return this;
}
SrsOnStatusDataPacket::SrsOnStatusDataPacket()
{
command_name = RTMP_AMF0_COMMAND_ON_STATUS;

@ -310,6 +310,7 @@ class SrsCommonMessage : public ISrsMessage
{
private:
typedef ISrsMessage super;
disable_default_copy(SrsCommonMessage);
// decoded message payload.
private:
SrsStream* stream;
@ -345,9 +346,10 @@ public:
* set the encoded packet to encode_packet() to payload.
* @stream_id, the id of stream which is created by createStream.
* @remark, user never free the pkt, the message will auto free it.
* @return message itself.
*/
// TODO: refine the send methods.
virtual void set_packet(SrsPacket* pkt, int stream_id);
virtual SrsCommonMessage* set_packet(SrsPacket* pkt, int stream_id);
/**
* encode the packet to message payload bytes.
* @remark there exists empty packet, so maybe the payload is NULL.
@ -831,6 +833,55 @@ protected:
virtual int encode_packet(SrsStream* stream);
};
/**
* the special packet for the bandwidth test.
* actually, it's a SrsOnStatusCallPacket, but
* 1. encode with data field, to send data to client.
* 2. decode ignore the data field, donot care.
*/
class SrsBandwidthPacket : public SrsPacket
{
private:
typedef SrsPacket super;
disable_default_copy(SrsBandwidthPacket);
protected:
virtual const char* get_class_name()
{
return CLASS_NAME_STRING(SrsBandwidthPacket);
}
public:
std::string command_name;
double transaction_id;
SrsAmf0Null* args;
SrsAmf0Object* data;
public:
SrsBandwidthPacket();
virtual ~SrsBandwidthPacket();
public:
virtual int get_perfer_cid();
public:
virtual int get_message_type();
protected:
virtual int get_size();
virtual int encode_packet(SrsStream* stream);
public:
virtual int decode(SrsStream* stream);
public:
virtual bool is_starting_play();
virtual bool is_stopped_play();
virtual bool is_starting_publish();
virtual bool is_stopped_publish();
virtual bool is_flash_final();
static SrsBandwidthPacket* create_finish();
static SrsBandwidthPacket* create_start_play();
static SrsBandwidthPacket* create_playing();
static SrsBandwidthPacket* create_stop_play();
static SrsBandwidthPacket* create_start_publish();
static SrsBandwidthPacket* create_stop_publish();
private:
virtual SrsBandwidthPacket* set_command(std::string command);
};
/**
* onStatus data, AMF0 Data
* @remark, user must set the stream_id by SrsMessage.set_packet().

@ -51,8 +51,11 @@ using namespace std;
#define StatusClientId "clientid"
// status value
#define StatusLevelStatus "status"
// status error
#define StatusLevelError "error"
// code value
#define StatusCodeConnectSuccess "NetConnection.Connect.Success"
#define StatusCodeConnectRejected "NetConnection.Connect.Rejected"
#define StatusCodeStreamReset "NetStream.Play.Reset"
#define StatusCodeStreamStart "NetStream.Play.Start"
#define StatusCodeStreamPause "NetStream.Pause.Notify"
@ -577,7 +580,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type)
return ret;
}
int SrsRtmp::response_connect_app(SrsRequest* req)
int SrsRtmp::response_connect_app(SrsRequest *req, const char* server_ip)
{
int ret = ERROR_SUCCESS;
@ -606,6 +609,10 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
data->set("srs_copyright", new SrsAmf0String(RTMP_SIG_SRS_COPYRIGHT));
data->set("srs_contributor", new SrsAmf0String(RTMP_SIG_SRS_CONTRIBUTOR));
if (server_ip) {
data->set("srs_server_ip", new SrsAmf0String(server_ip));
}
msg->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
@ -614,7 +621,28 @@ int SrsRtmp::response_connect_app(SrsRequest* req)
}
srs_info("send connect app response message success.");
return ret;
return ret;
}
void SrsRtmp::response_connect_reject(SrsRequest *req, const char* desc)
{
int ret = ERROR_SUCCESS;
SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket();
pkt->command_name = "_error";
pkt->props->set(StatusLevel, new SrsAmf0String(StatusLevelError));
pkt->props->set(StatusCode, new SrsAmf0String(StatusCodeConnectRejected));
pkt->props->set(StatusDescription, new SrsAmf0String(desc));
//pkt->props->set("objectEncoding", new SrsAmf0Number(req->objectEncoding));
SrsCommonMessage* msg = (new SrsCommonMessage())->set_packet(pkt, 0);
if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) {
srs_error("send connect app response rejected message failed. ret=%d", ret);
return;
}
srs_info("send connect app response rejected message success.");
return;
}
int SrsRtmp::on_bw_done()

@ -167,7 +167,11 @@ public:
* using the Limit type field.
*/
virtual int set_peer_bandwidth(int bandwidth, int type);
virtual int response_connect_app(SrsRequest* req);
/**
* @param server_ip the ip of server.
*/
virtual int response_connect_app(SrsRequest* req, const char* server_ip = NULL);
virtual void response_connect_reject(SrsRequest* req, const char* desc);
virtual int on_bw_done();
/**
* recv some message to identify the client.

@ -10,6 +10,8 @@ file
..\core\srs_core_amf0.cpp,
..\core\srs_core_autofree.hpp,
..\core\srs_core_autofree.cpp,
..\core\srs_core_bandwidth.hpp,
..\core\srs_core_bandwidth.cpp,
..\core\srs_core_buffer.hpp,
..\core\srs_core_buffer.cpp,
..\core\srs_core_client.hpp,

Loading…
Cancel
Save