diff --git a/README.md b/README.md index ac891996a..3225c057e 100755 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ url: rtmp://127.0.0.1:1935/live/livestream * nginx v1.5.0: 139524 lines
### History +* v0.3, 2013-11-02, support listen multiple ports. * v0.3, 2013-11-02, support config file in nginx-conf style. * v0.3, 2013-10-29, support pithy print log message specified by stage. * v0.3, 2013-10-28, support librtmp without extended-timestamp in 0xCX chunk packet. diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index 783d05581..1af614102 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -1,5 +1,7 @@ -listen 1935; +listen 1935 19350; vhost __defaultVhost__ { application live { + no_delay on; + allow all; } } diff --git a/trunk/src/core/srs_core_config.cpp b/trunk/src/core/srs_core_config.cpp index dc11eed6e..0aa3ba6fe 100755 --- a/trunk/src/core/srs_core_config.cpp +++ b/trunk/src/core/srs_core_config.cpp @@ -83,6 +83,8 @@ int SrsFileBuffer::open(const char* filename) return ERROR_SYSTEM_CONFIG_INVALID; } + line = 1; + return ERROR_SUCCESS; } @@ -105,6 +107,19 @@ SrsConfDirective* SrsConfDirective::at(int index) return directives.at(index); } +SrsConfDirective* SrsConfDirective::get(std::string _name) +{ + std::vector::iterator it; + for (it = directives.begin(); it != directives.end(); ++it) { + SrsConfDirective* directive = *it; + if (directive->name == _name) { + return directive; + } + } + + return NULL; +} + int SrsConfDirective::parse(const char* filename) { int ret = ERROR_SUCCESS; @@ -161,6 +176,7 @@ int SrsConfDirective::parse_conf(SrsFileBuffer* buffer, SrsDirectiveType type) // build directive tree. SrsConfDirective* directive = new SrsConfDirective(); + directive->conf_line = buffer->line; directive->name = args[0]; args.erase(args.begin()); directive->args.swap(args); @@ -361,15 +377,22 @@ int SrsConfDirective::refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s return ret; } +Config* config = new Config(); + Config::Config() { show_help = false; show_version = false; config_file = NULL; + + root = new SrsConfDirective(); + root->conf_line = 0; + root->name = "root"; } Config::~Config() { + srs_freep(root); } // see: ngx_get_options @@ -400,16 +423,25 @@ int Config::parse_options(int argc, char** argv) return ERROR_SYSTEM_CONFIG_INVALID; } - SrsConfDirective root; - root.name = "root"; - - if ((ret = root.parse(config_file)) != ERROR_SUCCESS) { + if ((ret = root->parse(config_file)) != ERROR_SUCCESS) { return ret; } + SrsConfDirective* conf = NULL; + if ((conf = get_listen()) == NULL || conf->args.size() == 0) { + fprintf(stderr, "line %d: conf error, " + "directive \"listen\" is empty\n", conf? conf->conf_line:0); + return ERROR_SYSTEM_CONFIG_INVALID; + } + return ret; } +SrsConfDirective* Config::get_listen() +{ + return root->get("listen"); +} + int Config::parse_argv(int& i, char** argv) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_config.hpp b/trunk/src/core/srs_core_config.hpp index 5ea6595af..323690e4a 100755 --- a/trunk/src/core/srs_core_config.hpp +++ b/trunk/src/core/srs_core_config.hpp @@ -32,25 +32,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -/** -* the config parser. -*/ -class Config -{ -private: - bool show_help; - bool show_version; - char* config_file; -public: - Config(); - virtual ~Config(); -public: - virtual int parse_options(int argc, char** argv); -private: - virtual int parse_argv(int& i, char** argv); - virtual void print_help(char** argv); -}; - class SrsFileBuffer { public: @@ -73,6 +54,7 @@ public: class SrsConfDirective { public: + int conf_line; std::string name; std::vector args; std::vector directives; @@ -80,6 +62,7 @@ public: SrsConfDirective(); virtual ~SrsConfDirective(); SrsConfDirective* at(int index); + SrsConfDirective* get(std::string _name); public: virtual int parse(const char* filename); public: @@ -89,4 +72,28 @@ public: virtual int refill_buffer(SrsFileBuffer* buffer, bool d_quoted, bool s_quoted, int startline, char*& pstart); }; +/** +* the config parser. +*/ +class Config +{ +private: + bool show_help; + bool show_version; + char* config_file; + SrsConfDirective* root; +public: + Config(); + virtual ~Config(); +public: + virtual int parse_options(int argc, char** argv); + virtual SrsConfDirective* get_listen(); +private: + virtual int parse_argv(int& i, char** argv); + virtual void print_help(char** argv); +}; + +// global config +extern Config* config; + #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_server.cpp b/trunk/src/core/srs_core_server.cpp index d1ba62704..edf63dbbf 100755 --- a/trunk/src/core/srs_core_server.cpp +++ b/trunk/src/core/srs_core_server.cpp @@ -34,52 +34,34 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include #define SERVER_LISTEN_BACKLOG 10 #define SRS_TIME_RESOLUTION_MS 1000 -SrsServer::SrsServer() +SrsListener::SrsListener(SrsServer* _server, SrsListenerType _type) { + fd = -1; + stfd = NULL; + + port = 0; + server = _server; + type = _type; } -SrsServer::~SrsServer() +SrsListener::~SrsListener() { - for (std::vector::iterator it = conns.begin(); it != conns.end(); ++it) { - SrsConnection* conn = *it; - srs_freep(conn); + if (stfd) { + st_netfd_close(stfd); + stfd = NULL; } - conns.clear(); } -int SrsServer::initialize() +int SrsListener::listen(int _port) { int ret = ERROR_SUCCESS; - - // use linux epoll. - if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { - ret = ERROR_ST_SET_EPOLL; - srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret); - return ret; - } - srs_verbose("st_set_eventsys use linux epoll success"); - - if(st_init() != 0){ - ret = ERROR_ST_INITIALIZE; - srs_error("st_init failed. ret=%d", ret); - return ret; - } - srs_verbose("st_init success"); - // set current log id. - log_context->generate_id(); - srs_info("log set id success"); - - return ret; -} - -int SrsServer::listen(int port) -{ - int ret = ERROR_SUCCESS; + port = _port; if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { ret = ERROR_SOCKET_CREATE; @@ -133,6 +115,117 @@ int SrsServer::listen(int port) return ret; } +void SrsListener::listen_cycle() +{ + int ret = ERROR_SUCCESS; + + log_context->generate_id(); + srs_trace("listen cycle start, port=%d, type=%d, fd=%d", port, type, fd); + + while (true) { + st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); + + if(client_stfd == NULL){ + // ignore error. + srs_warn("ignore accept thread stoppped for accept client error"); + continue; + } + srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); + + if ((ret = server->accept_client(type, client_stfd)) != ERROR_SUCCESS) { + srs_warn("accept client error. ret=%d", ret); + continue; + } + + srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); + } +} + +void* SrsListener::listen_thread(void* arg) +{ + SrsListener* obj = (SrsListener*)arg; + srs_assert(obj != NULL); + + obj->listen_cycle(); + + return NULL; +} + +SrsServer::SrsServer() +{ +} + +SrsServer::~SrsServer() +{ + if (true) { + std::vector::iterator it; + for (it = conns.begin(); it != conns.end(); ++it) { + SrsConnection* conn = *it; + srs_freep(conn); + } + conns.clear(); + } + + if (true) { + std::vector::iterator it; + for (it = listeners.begin(); it != listeners.end(); ++it) { + SrsListener* listener = *it; + srs_freep(listener); + } + listeners.clear(); + } +} + +int SrsServer::initialize() +{ + int ret = ERROR_SUCCESS; + + // use linux epoll. + if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { + ret = ERROR_ST_SET_EPOLL; + srs_error("st_set_eventsys use linux epoll failed. ret=%d", ret); + return ret; + } + srs_verbose("st_set_eventsys use linux epoll success"); + + if(st_init() != 0){ + ret = ERROR_ST_INITIALIZE; + srs_error("st_init failed. ret=%d", ret); + return ret; + } + srs_verbose("st_init success"); + + // set current log id. + log_context->generate_id(); + srs_info("log set id success"); + + return ret; +} + +int SrsServer::listen() +{ + int ret = ERROR_SUCCESS; + + SrsConfDirective* conf = NULL; + + // stream service port. + conf = config->get_listen(); + srs_assert(conf); + + for (int i = 0; i < (int)conf->args.size(); i++) { + SrsListener* listener = new SrsListener(this, SrsListenerStream); + listeners.push_back(listener); + + int port = ::atoi(conf->args.at(i).c_str()); + if ((ret = listener->listen(port)) != ERROR_SUCCESS) { + srs_error("listen at port %d failed. ret=%d", port, ret); + return ret; + } + } + + return ret; +} + int SrsServer::cycle() { int ret = ERROR_SUCCESS; @@ -161,15 +254,21 @@ void SrsServer::remove(SrsConnection* conn) srs_freep(conn); } -int SrsServer::accept_client(st_netfd_t client_stfd) +int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd) { int ret = ERROR_SUCCESS; - SrsConnection* conn = new SrsClient(this, client_stfd); + SrsConnection* conn = NULL; + if (type == SrsListenerStream) { + conn = new SrsClient(this, client_stfd); + } else { + // handler others + } + srs_assert(conn); // directly enqueue, the cycle thread will remove the client. conns.push_back(conn); - srs_verbose("add conn to vector. conns=%d", (int)conns.size()); + srs_verbose("add conn from port %d to vector. conns=%d", port, (int)conns.size()); // cycle will start process thread and when finished remove the client. if ((ret = conn->start()) != ERROR_SUCCESS) { @@ -180,39 +279,3 @@ int SrsServer::accept_client(st_netfd_t client_stfd) return ret; } -void SrsServer::listen_cycle() -{ - int ret = ERROR_SUCCESS; - - log_context->generate_id(); - srs_trace("listen cycle start."); - - while (true) { - st_netfd_t client_stfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT); - - if(client_stfd == NULL){ - // ignore error. - srs_warn("ignore accept thread stoppped for accept client error"); - continue; - } - srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd)); - - if ((ret = accept_client(client_stfd)) != ERROR_SUCCESS) { - srs_warn("accept client error. ret=%d", ret); - continue; - } - - srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); - } -} - -void* SrsServer::listen_thread(void* arg) -{ - SrsServer* server = (SrsServer*)arg; - srs_assert(server != NULL); - - server->listen_cycle(); - - return NULL; -} - diff --git a/trunk/src/core/srs_core_server.hpp b/trunk/src/core/srs_core_server.hpp index e5007b46f..33db0c9a8 100755 --- a/trunk/src/core/srs_core_server.hpp +++ b/trunk/src/core/srs_core_server.hpp @@ -34,25 +34,50 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +class SrsServer; class SrsConnection; -class SrsServer + +enum SrsListenerType +{ + SrsListenerStream = 0, + SrsListenerApi +}; + +class SrsListener { +public: + SrsListenerType type; private: int fd; st_netfd_t stfd; + int port; + SrsServer* server; +public: + SrsListener(SrsServer* _server, SrsListenerType _type); + virtual ~SrsListener(); +public: + virtual int listen(int port); +private: + virtual void listen_cycle(); + static void* listen_thread(void* arg); +}; + +class SrsServer +{ + friend class SrsListener; +private: std::vector conns; + std::vector listeners; public: SrsServer(); virtual ~SrsServer(); public: virtual int initialize(); - virtual int listen(int port); + virtual int listen(); virtual int cycle(); virtual void remove(SrsConnection* conn); private: - virtual int accept_client(st_netfd_t client_stfd); - virtual void listen_cycle(); - static void* listen_thread(void* arg); + virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd); }; #endif \ No newline at end of file diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 557016e9a..57ba66417 100755 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -31,9 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. int main(int argc, char** argv){ int ret = ERROR_SUCCESS; - Config config; - - if ((ret = config.parse_options(argc, argv)) != ERROR_SUCCESS) { + if ((ret = config->parse_options(argc, argv)) != ERROR_SUCCESS) { return ret; } @@ -43,7 +41,7 @@ int main(int argc, char** argv){ return ret; } - if ((ret = server.listen(1935)) != ERROR_SUCCESS) { + if ((ret = server.listen()) != ERROR_SUCCESS) { return ret; }