From e8c0ca7af0355fa061da4fcf0df5c32f75e5fb39 Mon Sep 17 00:00:00 2001 From: lovacat Date: Tue, 11 Aug 2015 15:23:46 +0800 Subject: [PATCH] fix #442: HTTP API kickoff client. --- trunk/src/app/srs_app_http_api.cpp | 70 ++++++++++++++++++++++----- trunk/src/app/srs_app_http_conn.cpp | 10 ++-- trunk/src/app/srs_app_rtmp_conn.cpp | 7 +++ trunk/src/app/srs_app_server.cpp | 2 +- trunk/src/app/srs_app_source.cpp | 12 +++++ trunk/src/app/srs_app_source.hpp | 8 +++ trunk/src/app/srs_app_statistic.cpp | 14 ++++++ trunk/src/app/srs_app_statistic.hpp | 1 + trunk/src/kernel/srs_kernel_error.hpp | 3 ++ trunk/src/protocol/srs_http_stack.cpp | 12 ++++- 10 files changed, 121 insertions(+), 18 deletions(-) mode change 100644 => 100755 trunk/src/app/srs_app_http_api.cpp mode change 100644 => 100755 trunk/src/app/srs_app_http_conn.cpp mode change 100644 => 100755 trunk/src/app/srs_app_server.cpp mode change 100644 => 100755 trunk/src/app/srs_app_statistic.cpp mode change 100644 => 100755 trunk/src/app/srs_app_statistic.hpp mode change 100644 => 100755 trunk/src/kernel/srs_kernel_error.hpp mode change 100644 => 100755 trunk/src/protocol/srs_http_stack.cpp diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp old mode 100644 new mode 100755 index c95cb9289..772b58ed0 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -26,6 +26,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #ifdef SRS_AUTO_HTTP_API #include +#include using namespace std; #include @@ -39,6 +40,7 @@ using namespace std; #include #include #include +#include #include SrsGoApiRoot::SrsGoApiRoot() @@ -459,21 +461,67 @@ SrsGoApiStreams::~SrsGoApiStreams() int SrsGoApiStreams::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { - std::stringstream data; + int ret = ERROR_SUCCESS; SrsStatistic* stat = SrsStatistic::instance(); - int ret = stat->dumps_streams(data); - std::stringstream ss; - - ss << SRS_JOBJECT_START - << SRS_JFIELD_ERROR(ret) << SRS_JFIELD_CONT - << SRS_JFIELD_ORG("server", stat->server_id()) << SRS_JFIELD_CONT - << SRS_JFIELD_ORG("streams", data.str()) - << SRS_JOBJECT_END; - - return srs_http_response_json(w, ss.str()); + + if (r->is_http_delete()) { + // path: {pattern}{stream_id} + // e.g. /api/v1/streams/100 pattern= /api/v1/streams/, stream_id=100 + string sid = r->path().substr((int)entry->pattern.length()); + if (sid.empty()) { + ret = ERROR_REQUEST_DATA; + srs_error("invalid param, stream_id=%s. ret=%d", sid.c_str(), ret); + + ss << SRS_JOBJECT_START + << SRS_JFIELD_ERROR(ret) + << SRS_JOBJECT_END; + + return srs_http_response_json(w, ss.str()); + } + + int stream_id = ::atoi(sid.c_str()); + SrsStatisticStream* stream = stat->find_stream(stream_id); + if (stream == NULL) { + ret = ERROR_RTMP_STREAM_NOT_FOUND; + srs_error("stream stream_id=%s not found. ret=%d", sid.c_str(), ret); + + ss << SRS_JOBJECT_START + << SRS_JFIELD_ERROR(ret) + << SRS_JOBJECT_END; + + return srs_http_response_json(w, ss.str()); + } + + SrsSource* source = SrsSource::fetch(stream->vhost->vhost, stream->app, stream->stream); + if (source) { + source->set_expired(); + srs_warn("disconnent stream=%d successfully. vhost=%s, app=%s, stream=%s.", + stream_id, stream->vhost->vhost.c_str(), stream->app.c_str(), stream->stream.c_str()); + } else { + ret = ERROR_SOURCE_NOT_FOUND; + } + + ss << SRS_JOBJECT_START + << SRS_JFIELD_ERROR(ret) + << SRS_JOBJECT_END; + + return srs_http_response_json(w, ss.str()); + } else { + std::stringstream data; + int ret = stat->dumps_streams(data); + + ss << SRS_JOBJECT_START + << SRS_JFIELD_ERROR(ret) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("server", stat->server_id()) << SRS_JFIELD_CONT + << SRS_JFIELD_ORG("streams", data.str()) + << SRS_JOBJECT_END; + + return srs_http_response_json(w, ss.str()); + } } + SrsHttpApi::SrsHttpApi(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m) : SrsConnection(cm, fd) { diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp old mode 100644 new mode 100755 index 64e095be0..d6ded9ab5 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -104,11 +104,11 @@ int SrsHttpResponseWriter::write(char* data, int size) if (!header_wrote) { write_header(SRS_CONSTS_HTTP_OK); - - if ((ret = send_header(data, size)) != ERROR_SUCCESS) { - srs_error("http: send header failed. ret=%d", ret); - return ret; - } + } + + if ((ret = send_header(data, size)) != ERROR_SUCCESS) { + srs_error("http: send header failed. ret=%d", ret); + return ret; } // check the bytes send and content length. diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 681cb0510..ae2ad3cd8 100755 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -773,6 +773,13 @@ int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd) int64_t nb_msgs = 0; while (!disposed) { pprint->elapse(); + + // when source is set to expired, disconnect it. + if (source->expired()) { + ret = ERROR_USER_DISCONNECT; + srs_error("source is expired. ret=%d", ret); + return ret; + } // cond wait for timeout. if (nb_msgs == 0) { diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp old mode 100644 new mode 100755 index 147dfbd17..6be751f65 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -806,7 +806,7 @@ int SrsServer::http_handle() if ((ret = http_api_mux->handle("/api/v1/vhosts", new SrsGoApiVhosts())) != ERROR_SUCCESS) { return ret; } - if ((ret = http_api_mux->handle("/api/v1/streams", new SrsGoApiStreams())) != ERROR_SUCCESS) { + if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) { return ret; } #endif diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 1c38fbaa9..872cd5d4f 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -890,6 +890,7 @@ SrsSource::SrsSource() jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); + is_expired = false; #ifdef SRS_AUTO_HLS hls = new SrsHls(); @@ -2071,6 +2072,7 @@ void SrsSource::on_unpublish() _can_publish = true; _source_id = -1; + is_expired = false; // notify the handler. srs_assert(handler); @@ -2229,3 +2231,13 @@ void SrsSource::destroy_forwarders() forwarders.clear(); } +bool SrsSource::expired() +{ + return is_expired; +} + +void SrsSource::set_expired() +{ + is_expired = true; +} + diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp index 523115964..361901920 100755 --- a/trunk/src/app/srs_app_source.hpp +++ b/trunk/src/app/srs_app_source.hpp @@ -452,6 +452,8 @@ private: // whether use interlaced/mixed algorithm to correct timestamp. bool mix_correct; SrsMixQueue* mix_queue; + // the flag of source expired or not. + bool is_expired; // whether stream is monotonically increase. bool is_monotonically_increase; int64_t last_packet_time; @@ -583,6 +585,12 @@ public: private: virtual int create_forwarders(); virtual void destroy_forwarders(); +public: + virtual bool expired(); + /** + * set source expired. + */ + virtual void set_expired(); }; #endif diff --git a/trunk/src/app/srs_app_statistic.cpp b/trunk/src/app/srs_app_statistic.cpp old mode 100644 new mode 100755 index c33d6b8ca..5bf1d36d5 --- a/trunk/src/app/srs_app_statistic.cpp +++ b/trunk/src/app/srs_app_statistic.cpp @@ -134,6 +134,20 @@ SrsStatistic* SrsStatistic::instance() return _instance; } +SrsStatisticStream* SrsStatistic::find_stream(int stream_id) +{ + std::map::iterator it; + for (it = clients.begin(); it != clients.end(); it++) { + SrsStatisticClient* client = it->second; + SrsStatisticStream* stream = client->stream; + + if (stream_id == stream->id) { + return stream; + } + } + return NULL; +} + int SrsStatistic::on_video_info(SrsRequest* req, SrsCodecVideo vcodec, SrsAvcProfile avc_profile, SrsAvcLevel avc_level ) { diff --git a/trunk/src/app/srs_app_statistic.hpp b/trunk/src/app/srs_app_statistic.hpp old mode 100644 new mode 100755 index 949bd0633..fb56d04f8 --- a/trunk/src/app/srs_app_statistic.hpp +++ b/trunk/src/app/srs_app_statistic.hpp @@ -131,6 +131,7 @@ private: public: static SrsStatistic* instance(); public: + virtual SrsStatisticStream* find_stream(int stream_id); /** * when got video info for stream. */ diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp old mode 100644 new mode 100755 index ebcf23909..47c68c9b1 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -149,6 +149,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTP_TYPE96_CORRUPT 2045 #define ERROR_RTP_TYPE97_CORRUPT 2046 #define ERROR_RTSP_AUDIO_CONFIG 2047 +#define ERROR_RTMP_STREAM_NOT_FOUND 2048 // // system control message, // not an error, but special control logic. @@ -225,6 +226,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_HLS_NO_STREAM 3062 #define ERROR_JSON_LOADS 3063 #define ERROR_RESPONSE_CODE 3064 +#define ERROR_RESPONSE_DATA 3065 +#define ERROR_REQUEST_DATA 3066 /////////////////////////////////////////////////////// // HTTP/StreamCaster protocol error. diff --git a/trunk/src/protocol/srs_http_stack.cpp b/trunk/src/protocol/srs_http_stack.cpp old mode 100644 new mode 100755 index 0bd11854a..172dfbf45 --- a/trunk/src/protocol/srs_http_stack.cpp +++ b/trunk/src/protocol/srs_http_stack.cpp @@ -246,7 +246,17 @@ SrsHttpRedirectHandler::~SrsHttpRedirectHandler() int SrsHttpRedirectHandler::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { int ret = ERROR_SUCCESS; - // TODO: FIXME: implements it. + string msg = "Moved Permsanently"; + + w->header()->set_content_type("text/plain; charset=utf-8"); + w->header()->set_content_length(msg.length()); + w->header()->set("Location", url); + w->write_header(code); + + w->write((char*)msg.data(), (int)msg.length()); + w->final_request(); + + srs_info("redirect to %s.", url.c_str()); return ret; }