merge from bravo code, fix some warnings.

pull/444/head
winlin 10 years ago
parent 44bc7976ac
commit 5d7b0edccc

@ -77,22 +77,22 @@ void SrsKbpsSlice::sample()
} }
if (now - sample_30s.time > 30 * 1000) { if (now - sample_30s.time > 30 * 1000) {
sample_30s.kbps = (total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time); sample_30s.kbps = (int)((total_bytes - sample_30s.bytes) * 8 / (now - sample_30s.time));
sample_30s.time = now; sample_30s.time = now;
sample_30s.bytes = total_bytes; sample_30s.bytes = total_bytes;
} }
if (now - sample_1m.time > 60 * 1000) { if (now - sample_1m.time > 60 * 1000) {
sample_1m.kbps = (total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time); sample_1m.kbps = (int)((total_bytes - sample_1m.bytes) * 8 / (now - sample_1m.time));
sample_1m.time = now; sample_1m.time = now;
sample_1m.bytes = total_bytes; sample_1m.bytes = total_bytes;
} }
if (now - sample_5m.time > 300 * 1000) { if (now - sample_5m.time > 300 * 1000) {
sample_5m.kbps = (total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time); sample_5m.kbps = (int)((total_bytes - sample_5m.bytes) * 8 / (now - sample_5m.time));
sample_5m.time = now; sample_5m.time = now;
sample_5m.bytes = total_bytes; sample_5m.bytes = total_bytes;
} }
if (now - sample_60m.time > 3600 * 1000) { if (now - sample_60m.time > 3600 * 1000) {
sample_60m.kbps = (total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time); sample_60m.kbps = (int)((total_bytes - sample_60m.bytes) * 8 / (now - sample_60m.time));
sample_60m.time = now; sample_60m.time = now;
sample_60m.bytes = total_bytes; sample_60m.bytes = total_bytes;
} }
@ -160,7 +160,7 @@ int SrsKbps::get_send_kbps()
return 0; return 0;
} }
int64_t bytes = get_send_bytes(); int64_t bytes = get_send_bytes();
return bytes * 8 / duration; return (int)(bytes * 8 / duration);
} }
int SrsKbps::get_recv_kbps() int SrsKbps::get_recv_kbps()
@ -170,7 +170,7 @@ int SrsKbps::get_recv_kbps()
return 0; return 0;
} }
int64_t bytes = get_recv_bytes(); int64_t bytes = get_recv_bytes();
return bytes * 8 / duration; return (int)(bytes * 8 / duration);
} }
int SrsKbps::get_send_kbps_30s() int SrsKbps::get_send_kbps_30s()

@ -54,6 +54,11 @@ ISrsUdpHandler::~ISrsUdpHandler()
{ {
} }
int ISrsUdpHandler::on_stfd_change(st_netfd_t /*fd*/)
{
return ERROR_SUCCESS;
}
ISrsTcpHandler::ISrsTcpHandler() ISrsTcpHandler::ISrsTcpHandler()
{ {
} }
@ -69,7 +74,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
port = p; port = p;
_fd = -1; _fd = -1;
stfd = NULL; _stfd = NULL;
nb_buf = SRS_UDP_MAX_PACKET_SIZE; nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf]; buf = new char[nb_buf];
@ -80,7 +85,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
SrsUdpListener::~SrsUdpListener() SrsUdpListener::~SrsUdpListener()
{ {
// close the stfd to trigger thread to interrupted. // close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd); srs_close_stfd(_stfd);
pthread->stop(); pthread->stop();
srs_freep(pthread); srs_freep(pthread);
@ -97,6 +102,11 @@ int SrsUdpListener::fd()
return _fd; return _fd;
} }
st_netfd_t SrsUdpListener::stfd()
{
return _stfd;
}
int SrsUdpListener::listen() int SrsUdpListener::listen()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
@ -127,7 +137,7 @@ int SrsUdpListener::listen()
} }
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((stfd = st_netfd_open_socket(_fd)) == NULL){ if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET; ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret; return ret;
@ -153,7 +163,7 @@ int SrsUdpListener::cycle()
int nb_from = sizeof(sockaddr_in); int nb_from = sizeof(sockaddr_in);
int nread = 0; int nread = 0;
if ((nread = st_recvfrom(stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) { if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
srs_warn("ignore recv udp packet failed, nread=%d", nread); srs_warn("ignore recv udp packet failed, nread=%d", nread);
continue; continue;
} }
@ -178,7 +188,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
port = p; port = p;
_fd = -1; _fd = -1;
stfd = NULL; _stfd = NULL;
pthread = new SrsThread("tcp", this, 0, true); pthread = new SrsThread("tcp", this, 0, true);
} }
@ -186,7 +196,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
SrsTcpListener::~SrsTcpListener() SrsTcpListener::~SrsTcpListener()
{ {
// close the stfd to trigger thread to interrupted. // close the stfd to trigger thread to interrupted.
srs_close_stfd(stfd); srs_close_stfd(_stfd);
pthread->stop(); pthread->stop();
srs_freep(pthread); srs_freep(pthread);
@ -238,7 +248,7 @@ int SrsTcpListener::listen()
} }
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((stfd = st_netfd_open_socket(_fd)) == NULL){ if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET; ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret); srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret; return ret;
@ -258,7 +268,7 @@ int SrsTcpListener::cycle()
{ {
int ret = ERROR_SUCCESS; int ret = ERROR_SUCCESS;
st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if(client_stfd == NULL){ if(client_stfd == NULL){
// ignore error. // ignore error.

@ -45,6 +45,12 @@ class ISrsUdpHandler
public: public:
ISrsUdpHandler(); ISrsUdpHandler();
virtual ~ISrsUdpHandler(); virtual ~ISrsUdpHandler();
public:
/**
* when fd changed, for instance, reload the listen port,
* notify the handler and user can do something.
*/
virtual int on_stfd_change(st_netfd_t fd);
public: public:
/** /**
* when udp listener got a udp packet, notice server to process it. * when udp listener got a udp packet, notice server to process it.
@ -80,7 +86,7 @@ class SrsUdpListener : public ISrsThreadHandler
{ {
private: private:
int _fd; int _fd;
st_netfd_t stfd; st_netfd_t _stfd;
SrsThread* pthread; SrsThread* pthread;
private: private:
char* buf; char* buf;
@ -94,6 +100,7 @@ public:
virtual ~SrsUdpListener(); virtual ~SrsUdpListener();
public: public:
virtual int fd(); virtual int fd();
virtual st_netfd_t stfd();
public: public:
virtual int listen(); virtual int listen();
// interface ISrsThreadHandler. // interface ISrsThreadHandler.
@ -108,7 +115,7 @@ class SrsTcpListener : public ISrsThreadHandler
{ {
private: private:
int _fd; int _fd;
st_netfd_t stfd; st_netfd_t _stfd;
SrsThread* pthread; SrsThread* pthread;
private: private:
ISrsTcpHandler* handler; ISrsTcpHandler* handler;

@ -331,6 +331,12 @@ int SrsUdpStreamListener::listen(string i, int p)
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d", "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port); pthread->cid(), _srs_context->get_id(), port, type, fd, ip.c_str(), port);
// notify the handler the fd changed.
if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) {
srs_error("notify handler fd changed. ret=%d", ret);
return ret;
}
srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd()); srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret; return ret;

@ -45,6 +45,7 @@ using namespace std;
#include <srs_app_hds.hpp> #include <srs_app_hds.hpp>
#include <srs_app_statistic.hpp> #include <srs_app_statistic.hpp>
#include <srs_core_autofree.hpp> #include <srs_core_autofree.hpp>
#include <srs_rtmp_utility.hpp>
#define CONST_MAX_JITTER_MS 500 #define CONST_MAX_JITTER_MS 500
#define DEFAULT_FRAME_TIME_MS 40 #define DEFAULT_FRAME_TIME_MS 40
@ -759,6 +760,20 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
return source; return source;
} }
SrsSource* SrsSource::fetch(std::string vhost, std::string app, std::string stream)
{
SrsSource* source = NULL;
string stream_url = srs_generate_stream_url(vhost, app, stream);
if (pool.find(stream_url) == pool.end()) {
return NULL;
}
source = pool[stream_url];
return source;
}
void SrsSource::destroy() void SrsSource::destroy()
{ {
std::map<std::string, SrsSource*>::iterator it; std::map<std::string, SrsSource*>::iterator it;

@ -407,6 +407,10 @@ public:
*/ */
static SrsSource* fetch(SrsRequest* r); static SrsSource* fetch(SrsRequest* r);
/** /**
* get the exists source by stream info(vhost, app, stream), NULL when not exists.
*/
static SrsSource* fetch(std::string vhost, std::string app, std::string stream);
/**
* when system exit, destroy the sources, * when system exit, destroy the sources,
* for gmc to analysis mem leaks. * for gmc to analysis mem leaks.
*/ */

@ -195,6 +195,7 @@ int SrsStatistic::on_client(int id, SrsRequest* req)
SrsStatisticClient* client = NULL; SrsStatisticClient* client = NULL;
if (clients.find(id) == clients.end()) { if (clients.find(id) == clients.end()) {
client = new SrsStatisticClient(); client = new SrsStatisticClient();
client->id = id;
client->stream = stream; client->stream = stream;
clients[id] = client; clients[id] = client;
} else { } else {

@ -99,15 +99,7 @@ void SrsRequest::update_auth(SrsRequest* req)
string SrsRequest::get_stream_url() string SrsRequest::get_stream_url()
{ {
std::string url = ""; return srs_generate_stream_url(vhost, app, stream);
url += vhost;
url += "/";
url += app;
url += "/";
url += stream;
return url;
} }
void SrsRequest::strip() void SrsRequest::strip()

@ -31,6 +31,7 @@ using namespace std;
#include <srs_kernel_stream.hpp> #include <srs_kernel_stream.hpp>
#include <srs_rtmp_stack.hpp> #include <srs_rtmp_stack.hpp>
#include <srs_kernel_codec.hpp> #include <srs_kernel_codec.hpp>
#include <srs_kernel_consts.hpp>
void srs_discovery_tc_url( void srs_discovery_tc_url(
string tcUrl, string tcUrl,
@ -346,3 +347,18 @@ int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, in
return ret; return ret;
} }
std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream)
{
std::string url = "";
if (SRS_CONSTS_RTMP_DEFAULT_VHOST != vhost){
url += vhost;
}
url += "/";
url += app;
url += "/";
url += stream;
return url;
}

@ -120,5 +120,8 @@ extern int srs_chunk_header_c3(
*/ */
extern int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, int stream_id, SrsSharedPtrMessage** ppmsg); extern int srs_rtmp_create_msg(char type, u_int32_t timestamp, char* data, int size, int stream_id, SrsSharedPtrMessage** ppmsg);
// get the stream identify, vhost/app/stream.
extern std::string srs_generate_stream_url(std::string vhost, std::string app, std::string stream);
#endif #endif

Loading…
Cancel
Save