diff --git a/trunk/research/api-server/server.py b/trunk/research/api-server/server.py index a6b61acc0..435bdc0d3 100755 --- a/trunk/research/api-server/server.py +++ b/trunk/research/api-server/server.py @@ -49,14 +49,53 @@ if len(sys.argv) <= 1: print "See also: https://github.com/winlinvip/simple-rtmp-server" sys.exit(1) -import datetime, cherrypy +import json, datetime, cherrypy def trace(msg): date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print "[%s][trace] %s"%(date, msg) +def enable_crossdomain(): + cherrypy.response.headers["Access-Control-Allow-Origin"] = "*" + cherrypy.response.headers["Access-Control-Allow-Methods"] = "GET, POST, HEAD, PUT, DELETE" + # generate allow headers for crossdomain. + allow_headers = ["Cache-Control", "X-Proxy-Authorization", "X-Requested-With", "Content-Type"] + cherrypy.response.headers["Access-Control-Allow-Headers"] = ",".join(allow_headers) + +class Error: + # ok, success, completed. + success = 0 + # error when parse json + system_parse_json = 100 + +''' +handle the clients requests: +POST: create new client, handle the SRS on_connect callback. +''' class RESTClients(object): - pass; + exposed = True + def GET(self): + enable_crossdomain(); + + clients = {}; + return json.dumps(clients); + + def POST(self): + enable_crossdomain(); + + req = cherrypy.request.body.read(); + trace("post to clients, req=%s"%(req)); + try: + json_req = json.loads(req) + except Exception, ex: + trace("parse the request to json failed, req=%s, ex=%s"%(req, ex)) + return str(Error.system_parse_json); + + trace("valid clients post request success.") + return str(Error.success); + + def OPTIONS(self): + enable_crossdomain() class Root(object): def __init__(self): diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 411816f39..a5d5348f0 100644 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -144,5 +144,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HTTP_PARSE_URI 800 #define ERROR_HTTP_DATA_INVLIAD 801 +#define ERROR_HTTP_PARSE_HEADER 802 #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_http.cpp b/trunk/src/core/srs_core_http.cpp index 2963dad3d..7bb8ac006 100644 --- a/trunk/src/core/srs_core_http.cpp +++ b/trunk/src/core/srs_core_http.cpp @@ -23,13 +23,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#ifdef SRS_HTTP + +#include + #include +#include +#include +#include #include #include #include - -#ifdef SRS_HTTP +#include #define SRS_DEFAULT_HTTP_PORT 80 #define SRS_HTTP_RESPONSE_OK "0" @@ -97,6 +103,11 @@ int SrsHttpUri::get_port() return port; } +const char* SrsHttpUri::get_path() +{ + return path.c_str(); +} + std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, http_parser_url_fields field) { if((hp_u->field_set & (1 << field)) == 0){ @@ -117,18 +128,270 @@ std::string SrsHttpUri::get_uri_field(std::string uri, http_parser_url* hp_u, ht SrsHttpClient::SrsHttpClient() { + connected = false; + stfd = NULL; } SrsHttpClient::~SrsHttpClient() { + disconnect(); } int SrsHttpClient::post(SrsHttpUri* uri, std::string req, std::string& res) { int ret = ERROR_SUCCESS; + + if ((ret = connect(uri)) != ERROR_SUCCESS) { + srs_error("http connect server failed. ret=%d", ret); + return ret; + } + + // send POST request to uri + // POST %s HTTP/1.1\r\nHost: %s\r\nContent-Length: %d\r\n\r\n%s + std::stringstream ss; + ss << "POST " << uri->get_path() << " " + << "HTTP/1.1\r\n" + << "Host: " << uri->get_host() << "\r\n" + << "Connection: Keep-Alive" << "\r\n" + << "Content-Length: " << std::dec << req.length() << "\r\n" + << "User-Agent: " << RTMP_SIG_SRS_NAME << RTMP_SIG_SRS_VERSION << "\r\n" + << "Content-Type: text/html" << "\r\n" + << "\r\n" + << req; + + SrsSocket skt(stfd); + + std::string data = ss.str(); + ssize_t nwrite; + if ((ret = skt.write(data.c_str(), data.length(), &nwrite)) != ERROR_SUCCESS) { + // disconnect when error. + disconnect(); + + srs_error("write http post failed. ret=%d", ret); + return ret; + } + + if ((ret = parse_response(uri, &skt, &res)) != ERROR_SUCCESS) { + srs_error("parse http post response failed. ret=%d", ret); + return ret; + } + srs_info("parse http post response success."); + return ret; } +void SrsHttpClient::disconnect() +{ + connected = false; + + if (stfd) { + int fd = st_netfd_fileno(stfd); + st_netfd_close(stfd); + stfd = NULL; + + // st does not close it sometimes, + // close it manually. + ::close(fd); + } +} + +int SrsHttpClient::connect(SrsHttpUri* uri) +{ + int ret = ERROR_SUCCESS; + + if (connected) { + return ret; + } + + disconnect(); + + std::string ip = srs_dns_resolve(uri->get_host()); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(uri->get_port()); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. " + "ip=%s, port=%d, ret=%d", ip.c_str(), uri->get_port(), ret); + return ret; + } + srs_info("connect to server success. " + "http url=%s, server=%s, ip=%s, port=%d", + uri->get_url(), uri->get_host(), ip.c_str(), uri->get_port()); + + connected = true; + + return ret; +} + +int SrsHttpClient::parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response) +{ + int ret = ERROR_SUCCESS; + + int body_received = 0; + if ((ret = parse_response_header(skt, response, body_received)) != ERROR_SUCCESS) { + srs_error("parse response header failed. ret=%d", ret); + return ret; + } + + if ((ret = parse_response_body(uri, skt, response, body_received)) != ERROR_SUCCESS) { + srs_error("parse response body failed. ret=%d", ret); + return ret; + } + + srs_info("url %s download, body size=%"PRId64, uri->get_url(), http_header.content_length); + + return ret; +} + +int SrsHttpClient::parse_response_header(SrsSocket* skt, std::string* response, int& body_received) +{ + int ret = ERROR_SUCCESS; + + http_parser_settings settings; + + memset(&settings, 0, sizeof(settings)); + settings.on_headers_complete = on_headers_complete; + + http_parser parser; + http_parser_init(&parser, HTTP_RESPONSE); + // callback object ptr. + parser.data = (void*)this; + + // reset response header. + memset(&http_header, 0, sizeof(http_header)); + + // parser header. + char buf[SRS_HTTP_HEADER_BUFFER]; + for (;;) { + ssize_t nread; + if ((ret = skt->read(buf, (size_t)sizeof(buf), &nread)) != ERROR_SUCCESS) { + srs_error("read body from server failed. ret=%d", ret); + return ret; + } + + ssize_t nparsed = http_parser_execute(&parser, &settings, buf, nread); + srs_info("read_size=%d, nparsed=%d", (int)nread, (int)nparsed); + + // check header size. + if (http_header.nread != 0) { + body_received = nread - nparsed; + + srs_info("http header parsed, size=%d, content-length=%"PRId64", body-received=%d", + http_header.nread, http_header.content_length, body_received); + + if(response != NULL && body_received > 0){ + response->append(buf + nparsed, body_received); + } + + return ret; + } + + if (nparsed != nread) { + ret = ERROR_HTTP_PARSE_HEADER; + srs_error("parse response error, parsed(%d)!=read(%d), ret=%d", (int)nparsed, (int)nread, ret); + return ret; + } + } + + return ret; +} + +int SrsHttpClient::parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received) +{ + int ret = ERROR_SUCCESS; + + srs_assert(uri != NULL); + + uint64_t body_left = http_header.content_length - body_received; + + if (body_left <= 0) { + return ret; + } + + if (response != NULL) { + char buf[SRS_HTTP_BODY_BUFFER]; + + return parse_response_body_data( + uri, skt, response, (size_t)body_left, + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER + ); + } else { + // if ignore response, use shared fast memory. + static char buf[SRS_HTTP_BODY_BUFFER]; + + return parse_response_body_data( + uri, skt, response, (size_t)body_left, + (const void*)buf, (size_t)SRS_HTTP_BODY_BUFFER + ); + } + + return ret; +} + +int SrsHttpClient::parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size) +{ + int ret = ERROR_SUCCESS; + + srs_assert(uri != NULL); + + while (body_left > 0) { + ssize_t nread; + int size_to_read = srs_min(size, body_left); + if ((ret = skt->read(buf, size_to_read, &nread)) != ERROR_SUCCESS) { + srs_error("read header from server failed. ret=%d", ret); + return ret; + } + + if (response != NULL && nread > 0) { + response->append((char*)buf, nread); + } + + body_left -= nread; + srs_info("read url(%s) content partial %"PRId64"/%"PRId64"", + uri->get_url(), http_header.content_length - body_left, http_header.content_length); + } + + return ret; +} + +int SrsHttpClient::on_headers_complete(http_parser* parser) +{ + SrsHttpClient* obj = (SrsHttpClient*)parser->data; + obj->comple_header(parser); + + // see http_parser.c:1570, return 1 to skip body. + return 1; +} + +void SrsHttpClient::comple_header(http_parser* parser) +{ + // save the parser status when header parse completed. + memcpy(&http_header, parser, sizeof(http_header)); +} + SrsHttpHooks::SrsHttpHooks() { } @@ -148,33 +411,33 @@ int SrsHttpHooks::on_connect(std::string url, std::string ip, SrsRequest* req) return ret; } - std::string res; - std::string data; /** { "ip": "192.168.1.10", "vhost": "video.test.com", "app": "live", "pageUrl": "http://www.test.com/live.html" } */ - data += "{"; - // ip - data += "\"ip\":"; - data += "\"" + ip + "\""; - data += ","; - // vhost - data += "\"vhost\":"; - data += "\"" + req->vhost + "\""; - data += ","; - data += ","; - // app - data += "\"vhost\":"; - data += "\"" + req->app + "\""; - data += ","; - // pageUrl - data += "\"vhost\":"; - data += "\"" + req->pageUrl + "\""; - //data += ","; - data += "}"; + std::stringstream ss; + ss << "{" + // ip + << '"' << "ip" << '"' << ':' + << '"' << ip << '"' + << ',' + // vhost + << '"' << "vhost" << '"' << ':' + << '"' << req->vhost << '"' + << ',' + // app + << '"' << "app" << '"' << ':' + << '"' << req->app << '"' + << ',' + // pageUrl + << '"' << "pageUrl" << '"' << ':' + << '"' << req->pageUrl << '"' + //<< ',' + << "}"; + std::string data = ss.str(); + std::string res; SrsHttpClient http; if ((ret = http.post(&uri, data, res)) != ERROR_SUCCESS) { diff --git a/trunk/src/core/srs_core_http.hpp b/trunk/src/core/srs_core_http.hpp index e9cf2e66e..426760c1f 100644 --- a/trunk/src/core/srs_core_http.hpp +++ b/trunk/src/core/srs_core_http.hpp @@ -29,14 +29,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include +#ifdef SRS_HTTP + class SrsRequest; +class SrsSocket; #include -#ifdef SRS_HTTP - +#include #include +#define SRS_HTTP_HEADER_BUFFER 1024 +#define SRS_HTTP_BODY_BUFFER 32 * 1024 + /** * used to resolve the http uri. */ @@ -61,6 +66,7 @@ public: virtual const char* get_schema(); virtual const char* get_host(); virtual int get_port(); + virtual const char* get_path(); private: /** * get the parsed url field. @@ -74,6 +80,11 @@ private: */ class SrsHttpClient { +private: + bool connected; + st_netfd_t stfd; +private: + http_parser http_header; public: SrsHttpClient(); virtual ~SrsHttpClient(); @@ -84,6 +95,17 @@ public: * @param res the response data from server. */ virtual int post(SrsHttpUri* uri, std::string req, std::string& res); +private: + virtual void disconnect(); + virtual int connect(SrsHttpUri* uri); +private: + virtual int parse_response(SrsHttpUri* uri, SrsSocket* skt, std::string* response); + virtual int parse_response_header(SrsSocket* skt, std::string* response, int& body_received); + virtual int parse_response_body(SrsHttpUri* uri, SrsSocket* skt, std::string* response, int body_received); + virtual int parse_response_body_data(SrsHttpUri* uri, SrsSocket* skt, std::string* response, size_t body_left, const void* buf, size_t size); +private: + static int on_headers_complete(http_parser* parser); + virtual void comple_header(http_parser* parser); }; /**