Refine typo in app.

pull/1651/head
winlin 6 years ago
parent aac8a13f42
commit 45009785fb

@ -40,9 +40,7 @@ class SrsMpdWriter;
class SrsMp4M2tsInitEncoder; class SrsMp4M2tsInitEncoder;
class SrsMp4M2tsSegmentEncoder; class SrsMp4M2tsSegmentEncoder;
/** // The init mp4 for FMP4.
* The init mp4 for FMP4.
*/
class SrsInitMp4 : public SrsFragment class SrsInitMp4 : public SrsFragment
{ {
private: private:
@ -56,9 +54,7 @@ public:
virtual srs_error_t write(SrsFormat* format, bool video, int tid); virtual srs_error_t write(SrsFormat* format, bool video, int tid);
}; };
/** // The FMP4(Fragmented MP4) for DASH streaming.
* The FMP4(Fragmented MP4) for DASH streaming.
*/
class SrsFragmentedMp4 : public SrsFragment class SrsFragmentedMp4 : public SrsFragment
{ {
private: private:
@ -76,9 +72,7 @@ public:
virtual srs_error_t reap(uint64_t& dts); virtual srs_error_t reap(uint64_t& dts);
}; };
/** // The writer to write MPD for DASH.
* The writer to write MPD for DASH.
*/
class SrsMpdWriter class SrsMpdWriter
{ {
private: private:
@ -113,9 +107,7 @@ public:
virtual srs_error_t get_fragment(bool video, std::string& home, std::string& filename, int64_t& sn, srs_utime_t& basetime); virtual srs_error_t get_fragment(bool video, std::string& home, std::string& filename, int64_t& sn, srs_utime_t& basetime);
}; };
/** // The controller for DASH, control the MPD and FMP4 generating system.
* The controller for DASH, control the MPD and FMP4 generating system.
*/
class SrsDashController class SrsDashController
{ {
private: private:
@ -149,9 +141,7 @@ private:
virtual srs_error_t refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFormat* format); virtual srs_error_t refresh_init_mp4(SrsSharedPtrMessage* msg, SrsFormat* format);
}; };
/** // The MPEG-DASH encoder, transmux RTMP to DASH.
* The MPEG-DASH encoder, transmux RTMP to DASH.
*/
class SrsDash class SrsDash
{ {
private: private:

@ -49,9 +49,7 @@ class SrsFormat;
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
#include <srs_app_async_call.hpp> #include <srs_app_async_call.hpp>
/** // The segmenter for DVR, to write a segment file in flv/mp4.
* The segmenter for DVR, to write a segment file in flv/mp4.
*/
class SrsDvrSegmenter : public ISrsReloadHandler class SrsDvrSegmenter : public ISrsReloadHandler
{ {
protected: protected:
@ -110,9 +108,7 @@ public:
virtual srs_error_t on_reload_vhost_dvr(std::string vhost); virtual srs_error_t on_reload_vhost_dvr(std::string vhost);
}; };
/** // The FLV segmenter to use FLV encoder to write file.
* The FLV segmenter to use FLV encoder to write file.
*/
class SrsDvrFlvSegmenter : public SrsDvrSegmenter class SrsDvrFlvSegmenter : public SrsDvrSegmenter
{ {
private: private:
@ -140,9 +136,7 @@ protected:
virtual srs_error_t close_encoder(); virtual srs_error_t close_encoder();
}; };
/** // The MP4 segmenter to use MP4 encoder to write file.
* The MP4 segmenter to use MP4 encoder to write file.
*/
class SrsDvrMp4Segmenter : public SrsDvrSegmenter class SrsDvrMp4Segmenter : public SrsDvrSegmenter
{ {
private: private:
@ -161,9 +155,7 @@ protected:
virtual srs_error_t close_encoder(); virtual srs_error_t close_encoder();
}; };
/** // the dvr async call.
* the dvr async call.
*/
class SrsDvrAsyncCallOnDvr : public ISrsAsyncCallTask class SrsDvrAsyncCallOnDvr : public ISrsAsyncCallTask
{ {
private: private:
@ -178,9 +170,7 @@ public:
virtual std::string to_string(); virtual std::string to_string();
}; };
/** // The DVR plan, when and how to reap segment.
* The DVR plan, when and how to reap segment.
*/
class SrsDvrPlan : public ISrsReloadHandler class SrsDvrPlan : public ISrsReloadHandler
{ {
public: public:
@ -208,9 +198,7 @@ public:
static srs_error_t create_plan(std::string vhost, SrsDvrPlan** pplan); static srs_error_t create_plan(std::string vhost, SrsDvrPlan** pplan);
}; };
/** // The DVR session plan: reap flv when session complete(unpublish)
* The DVR session plan: reap flv when session complete(unpublish)
*/
class SrsDvrSessionPlan : public SrsDvrPlan class SrsDvrSessionPlan : public SrsDvrPlan
{ {
public: public:
@ -221,9 +209,7 @@ public:
virtual void on_unpublish(); virtual void on_unpublish();
}; };
/** // The DVR segment plan: reap flv when duration exceed.
* The DVR segment plan: reap flv when duration exceed.
*/
class SrsDvrSegmentPlan : public SrsDvrPlan class SrsDvrSegmentPlan : public SrsDvrPlan
{ {
private: private:
@ -246,9 +232,7 @@ public:
virtual srs_error_t on_reload_vhost_dvr(std::string vhost); virtual srs_error_t on_reload_vhost_dvr(std::string vhost);
}; };
/** // DVR(Digital Video Recorder) to record RTMP stream to flv/mp4 file.
* DVR(Digital Video Recorder) to record RTMP stream to flv/mp4 file.
*/
class SrsDvr : public ISrsReloadHandler class SrsDvr : public ISrsReloadHandler
{ {
private: private:
@ -264,36 +248,24 @@ public:
SrsDvr(); SrsDvr();
virtual ~SrsDvr(); virtual ~SrsDvr();
public: public:
/** // initialize dvr, create dvr plan.
* initialize dvr, create dvr plan. // when system initialize(encoder publish at first time, or reload),
* when system initialize(encoder publish at first time, or reload), // initialize the dvr will reinitialize the plan, the whole dvr framework.
* initialize the dvr will reinitialize the plan, the whole dvr framework.
*/
virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r); virtual srs_error_t initialize(SrsOriginHub* h, SrsRequest* r);
/** // publish stream event,
* publish stream event, // when encoder start to publish RTMP stream.
* when encoder start to publish RTMP stream. // @param fetch_sequence_header whether fetch sequence from source.
* @param fetch_sequence_header whether fetch sequence from source.
*/
virtual srs_error_t on_publish(); virtual srs_error_t on_publish();
/** // the unpublish event.,
* the unpublish event., // when encoder stop(unpublish) to publish RTMP stream.
* when encoder stop(unpublish) to publish RTMP stream.
*/
virtual void on_unpublish(); virtual void on_unpublish();
/** // get some information from metadata, it's optinal.
* get some information from metadata, it's optinal.
*/
virtual srs_error_t on_meta_data(SrsSharedPtrMessage* metadata); virtual srs_error_t on_meta_data(SrsSharedPtrMessage* metadata);
/** // mux the audio packets to dvr.
* mux the audio packets to dvr. // @param shared_audio, directly ptr, copy it if need to save it.
* @param shared_audio, directly ptr, copy it if need to save it.
*/
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* foramt); virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio, SrsFormat* foramt);
/** // mux the video packets to dvr.
* mux the video packets to dvr. // @param shared_video, directly ptr, copy it if need to save it.
* @param shared_video, directly ptr, copy it if need to save it.
*/
virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format); virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video, SrsFormat* format);
// interface ISrsReloadHandler // interface ISrsReloadHandler
public: public:

@ -47,34 +47,28 @@ class SrsTcpClient;
class SrsSimpleRtmpClient; class SrsSimpleRtmpClient;
class SrsPacket; class SrsPacket;
/** // The state of edge, auto machine
* the state of edge, auto machine
*/
enum SrsEdgeState enum SrsEdgeState
{ {
SrsEdgeStateInit = 0, SrsEdgeStateInit = 0,
// for play edge // For play edge
SrsEdgeStatePlay = 100, SrsEdgeStatePlay = 100,
// play stream from origin, ingest stream // play stream from origin, ingest stream
SrsEdgeStateIngestConnected = 101, SrsEdgeStateIngestConnected = 101,
// for publish edge // For publish edge
SrsEdgeStatePublish = 200, SrsEdgeStatePublish = 200,
}; };
/** // The state of edge from user, manual machine
* the state of edge from user, manual machine
*/
enum SrsEdgeUserState enum SrsEdgeUserState
{ {
SrsEdgeUserStateInit = 0, SrsEdgeUserStateInit = 0,
SrsEdgeUserStateReloading = 100, SrsEdgeUserStateReloading = 100,
}; };
/** // The upstream of edge, can be rtmp or http.
* the upstream of edge, can be rtmp or http.
*/
class SrsEdgeUpstream class SrsEdgeUpstream
{ {
public: public:
@ -93,7 +87,7 @@ public:
class SrsEdgeRtmpUpstream : public SrsEdgeUpstream class SrsEdgeRtmpUpstream : public SrsEdgeUpstream
{ {
private: private:
// for RTMP 302, if not empty, // For RTMP 302, if not empty,
// use this <ip[:port]> as upstream. // use this <ip[:port]> as upstream.
std::string redirect; std::string redirect;
SrsSimpleRtmpClient* sdk; SrsSimpleRtmpClient* sdk;
@ -111,9 +105,7 @@ public:
virtual void kbps_sample(const char* label, int64_t age); virtual void kbps_sample(const char* label, int64_t age);
}; };
/** // The edge used to ingest stream from origin.
* edge used to ingest stream from origin.
*/
class SrsEdgeIngester : public ISrsCoroutineHandler class SrsEdgeIngester : public ISrsCoroutineHandler
{ {
private: private:
@ -123,7 +115,7 @@ private:
SrsCoroutine* trd; SrsCoroutine* trd;
SrsLbRoundRobin* lb; SrsLbRoundRobin* lb;
SrsEdgeUpstream* upstream; SrsEdgeUpstream* upstream;
// for RTMP 302 redirect. // For RTMP 302 redirect.
std::string redirect; std::string redirect;
public: public:
SrsEdgeIngester(); SrsEdgeIngester();
@ -143,9 +135,7 @@ private:
virtual srs_error_t process_publish_message(SrsCommonMessage* msg); virtual srs_error_t process_publish_message(SrsCommonMessage* msg);
}; };
/** // The edge used to forward stream to origin.
* edge used to forward stream to origin.
*/
class SrsEdgeForwarder : public ISrsCoroutineHandler class SrsEdgeForwarder : public ISrsCoroutineHandler
{ {
private: private:
@ -155,16 +145,12 @@ private:
SrsCoroutine* trd; SrsCoroutine* trd;
SrsSimpleRtmpClient* sdk; SrsSimpleRtmpClient* sdk;
SrsLbRoundRobin* lb; SrsLbRoundRobin* lb;
/** // we must ensure one thread one fd principle,
* we must ensure one thread one fd principle, // that is, a fd must be write/read by the one thread.
* that is, a fd must be write/read by the one thread. // The publish service thread will proxy(msg), and the edge forward thread
* the publish service thread will proxy(msg), and the edge forward thread // will cycle(), so we use queue for cycle to send the msg of proxy.
* will cycle(), so we use queue for cycle to send the msg of proxy.
*/
SrsMessageQueue* queue; SrsMessageQueue* queue;
/** // error code of send, for edge proxy thread to query.
* error code of send, for edge proxy thread to query.
*/
int send_error_code; int send_error_code;
public: public:
SrsEdgeForwarder(); SrsEdgeForwarder();
@ -184,10 +170,7 @@ public:
virtual srs_error_t proxy(SrsCommonMessage* msg); virtual srs_error_t proxy(SrsCommonMessage* msg);
}; };
/** // The play edge control service.
* play edge control service.
* downloading edge speed-up.
*/
class SrsPlayEdge class SrsPlayEdge
{ {
private: private:
@ -197,32 +180,21 @@ public:
SrsPlayEdge(); SrsPlayEdge();
virtual ~SrsPlayEdge(); virtual ~SrsPlayEdge();
public: public:
/** // Always use the req of source,
* always use the req of source, // For we assume all client to edge is invalid,
* for we assume all client to edge is invalid, // if auth open, edge must valid it from origin, then service it.
* if auth open, edge must valid it from origin, then service it.
*/
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
/** // When client play stream on edge.
* when client play stream on edge.
*/
virtual srs_error_t on_client_play(); virtual srs_error_t on_client_play();
/** // When all client stopped play, disconnect to origin.
* when all client stopped play, disconnect to origin.
*/
virtual void on_all_client_stop(); virtual void on_all_client_stop();
virtual std::string get_curr_origin(); virtual std::string get_curr_origin();
public: public:
/** // When ingester start to play stream.
* when ingester start to play stream.
*/
virtual srs_error_t on_ingest_play(); virtual srs_error_t on_ingest_play();
}; };
/** // The publish edge control service.
* publish edge control service.
* uploading edge speed-up.
*/
class SrsPublishEdge class SrsPublishEdge
{ {
private: private:
@ -236,17 +208,11 @@ public:
public: public:
virtual srs_error_t initialize(SrsSource* source, SrsRequest* req); virtual srs_error_t initialize(SrsSource* source, SrsRequest* req);
virtual bool can_publish(); virtual bool can_publish();
/** // When client publish stream on edge.
* when client publish stream on edge.
*/
virtual srs_error_t on_client_publish(); virtual srs_error_t on_client_publish();
/** // Proxy publish stream to edge
* proxy publish stream to edge
*/
virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg); virtual srs_error_t on_proxy_publish(SrsCommonMessage* msg);
/** // Proxy unpublish stream to edge.
* proxy unpublish stream to edge.
*/
virtual void on_proxy_unpublish(); virtual void on_proxy_unpublish();
}; };

@ -36,10 +36,8 @@ class SrsRequest;
class SrsPithyPrint; class SrsPithyPrint;
class SrsFFMPEG; class SrsFFMPEG;
/** // The encoder for a stream, may use multiple
* the encoder for a stream, // ffmpegs to transcode the specified stream.
* may use multiple ffmpegs to transcode the specified stream.
*/
class SrsEncoder : public ISrsCoroutineHandler class SrsEncoder : public ISrsCoroutineHandler
{ {
private: private:

@ -35,10 +35,7 @@ class SrsConfDirective;
class SrsPithyPrint; class SrsPithyPrint;
class SrsProcess; class SrsProcess;
/** // A transcode engine: ffmepg, used to transcode a stream to another.
* a transcode engine: ffmepg,
* used to transcode a stream to another.
*/
class SrsFFMPEG class SrsFFMPEG
{ {
private: private:

@ -43,14 +43,11 @@ class SrsOriginHub;
class SrsKbps; class SrsKbps;
class SrsSimpleRtmpClient; class SrsSimpleRtmpClient;
/** // Forward the stream to other servers.
* forward the stream to other servers.
*/
// TODO: FIXME: refine the error log, comments it.
class SrsForwarder : public ISrsCoroutineHandler class SrsForwarder : public ISrsCoroutineHandler
{ {
private: private:
// the ep to forward, server[:port]. // The ep to forward, server[:port].
std::string ep_forward; std::string ep_forward;
SrsRequest* req; SrsRequest* req;
private: private:
@ -60,10 +57,8 @@ private:
SrsSimpleRtmpClient* sdk; SrsSimpleRtmpClient* sdk;
SrsRtmpJitter* jitter; SrsRtmpJitter* jitter;
SrsMessageQueue* queue; SrsMessageQueue* queue;
/** // Cache the sequence header for retry when slave is failed.
* cache the sequence header for retry when slave is failed. // @see https://github.com/ossrs/srs/issues/150
* @see https://github.com/ossrs/srs/issues/150
*/
SrsSharedPtrMessage* sh_audio; SrsSharedPtrMessage* sh_audio;
SrsSharedPtrMessage* sh_video; SrsSharedPtrMessage* sh_video;
public: public:
@ -75,20 +70,14 @@ public:
public: public:
virtual srs_error_t on_publish(); virtual srs_error_t on_publish();
virtual void on_unpublish(); virtual void on_unpublish();
/** // Forward the audio packet.
* forward the audio packet. // @param shared_metadata, directly ptr, copy it if need to save it.
* @param shared_metadata, directly ptr, copy it if need to save it.
*/
virtual srs_error_t on_meta_data(SrsSharedPtrMessage* shared_metadata); virtual srs_error_t on_meta_data(SrsSharedPtrMessage* shared_metadata);
/** // Forward the audio packet.
* forward the audio packet. // @param shared_audio, directly ptr, copy it if need to save it.
* @param shared_audio, directly ptr, copy it if need to save it.
*/
virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio); virtual srs_error_t on_audio(SrsSharedPtrMessage* shared_audio);
/** // Forward the video packet.
* forward the video packet. // @param shared_video, directly ptr, copy it if need to save it.
* @param shared_video, directly ptr, copy it if need to save it.
*/
virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video); virtual srs_error_t on_video(SrsSharedPtrMessage* shared_video);
// interface ISrsReusableThread2Handler. // interface ISrsReusableThread2Handler.
public: public:

@ -29,10 +29,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
/** // Represent a fragment, such as HLS segment, DVR segment or DASH segment.
* Represent a fragment, such as HLS segment, DVR segment or DASH segment. // It's a media file, for example FLV or MP4, with duration.
* It's a media file, for example FLV or MP4, with duration.
*/
class SrsFragment class SrsFragment
{ {
private: private:
@ -75,9 +73,7 @@ public:
virtual srs_error_t rename(); virtual srs_error_t rename();
}; };
/** // The fragment window manage a series of fragment.
* The fragment window manage a series of fragment.
*/
class SrsFragmentWindow class SrsFragmentWindow
{ {
private: private:

@ -35,6 +35,7 @@ class SrsSharedPtrMessage;
class SrsHdsFragment; class SrsHdsFragment;
class SrsSource; class SrsSource;
// Mux RTMP to Adobe HDS streaming.
class SrsHds class SrsHds
{ {
public: public:

@ -28,68 +28,63 @@
#include <map> #include <map>
/** // The handler for the tick.
* the handler for the tick.
*/
class ISrsHourGlass class ISrsHourGlass
{ {
public: public:
ISrsHourGlass(); ISrsHourGlass();
virtual ~ISrsHourGlass(); virtual ~ISrsHourGlass();
public: public:
/** // When time is ticked, this function is called.
* notify the handler, the type and tick.
*/
virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick) = 0; virtual srs_error_t notify(int type, srs_utime_t interval, srs_utime_t tick) = 0;
}; };
/** // he hourglass used to do some specieal task,
* the hourglass used to do some specieal task, // while these task is cycle when some interval, for example,
* while these task is cycle when some interval, for example, // there are N=3 tasks to do:
* there are N=3 tasks to do: // 1. heartbeat every 3s.
* 1. heartbeat every 3s. // 2. print message every 5s.
* 2. print message every 5s. // 3. notify backend every 7s.
* 3. notify backend every 7s. // The hourglass will call back when ticks:
* the hourglass will call back when ticks: // 1. notify(type=1, time=3)
* 1. notify(type=1, time=3) // 2. notify(type=2, time=5)
* 2. notify(type=2, time=5) // 3. notify(type=1, time=6)
* 3. notify(type=1, time=6) // 4. notify(type=3, time=7)
* 4. notify(type=3, time=7) // 5. notify(type=1, time=9)
* 5. notify(type=1, time=9) // 6. notify(type=2, time=10)
* 6. notify(type=2, time=10) // This is used for server and bocar server and other manager.
* this is used for server and bocar server and other manager. //
* // Usage:
* Usage: // SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS);
* SrsHourGlass* hg = new SrsHourGlass(handler, 1 * SRS_UTIME_MILLISECONDS); // hg->tick(1, 3 * SRS_UTIME_MILLISECONDS);
* hg->tick(1, 3 * SRS_UTIME_MILLISECONDS); // hg->tick(2, 5 * SRS_UTIME_MILLISECONDS);
* hg->tick(2, 5 * SRS_UTIME_MILLISECONDS); // hg->tick(3, 7 * SRS_UTIME_MILLISECONDS);
* hg->tick(3, 7 * SRS_UTIME_MILLISECONDS); // // create a thread to cycle, which will call handerl when ticked.
* // create a thread to cycle, which will call handerl when ticked. // while (true) {
* while (true) { // hg->cycle();
* hg->cycle(); // }
* }
*/
class SrsHourGlass class SrsHourGlass
{ {
private: private:
ISrsHourGlass* handler; ISrsHourGlass* handler;
srs_utime_t _resolution; srs_utime_t _resolution;
// key: the type of tick. // The ticks:
// value: the interval of tick. // key: the type of tick.
// value: the interval of tick.
std::map<int, srs_utime_t> ticks; std::map<int, srs_utime_t> ticks;
// the total elapsed time, // The total elapsed time,
// for each cycle, we increase it with a resolution. // for each cycle, we increase it with a resolution.
srs_utime_t total_elapse; srs_utime_t total_elapse;
public: public:
SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution); SrsHourGlass(ISrsHourGlass* h, srs_utime_t resolution);
virtual ~SrsHourGlass(); virtual ~SrsHourGlass();
public: public:
// add a pair of tick(type, interval). // Add a pair of tick(type, interval).
// @param type the type of tick. // @param type the type of tick.
// @param interval the interval in srs_utime_t of tick. // @param interval the interval in srs_utime_t of tick.
virtual srs_error_t tick(int type, srs_utime_t interval); virtual srs_error_t tick(int type, srs_utime_t interval);
public: public:
// cycle the hourglass, which will sleep resolution every time. // Cycle the hourglass, which will sleep resolution every time.
// and call handler when ticked. // and call handler when ticked.
virtual srs_error_t cycle(); virtual srs_error_t cycle();
}; };

@ -37,7 +37,7 @@ class SrsServer;
#include <srs_http_stack.hpp> #include <srs_http_stack.hpp>
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
// for http root. // For http root.
class SrsGoApiRoot : public ISrsHttpHandler class SrsGoApiRoot : public ISrsHttpHandler
{ {
public: public:

@ -55,9 +55,7 @@ class SrsHttpMessage;
class SrsHttpStreamServer; class SrsHttpStreamServer;
class SrsHttpStaticServer; class SrsHttpStaticServer;
/** // The http connection which request the static or stream content.
* The http connection which request the static or stream content.
*/
class SrsHttpConn : public SrsConnection class SrsHttpConn : public SrsConnection
{ {
protected: protected:
@ -73,26 +71,22 @@ public:
protected: protected:
virtual srs_error_t do_cycle(); virtual srs_error_t do_cycle();
protected: protected:
// when got http message, // When got http message,
// for the static service or api, discard any body. // for the static service or api, discard any body.
// for the stream caster, for instance, http flv streaming, may discard the flv header or not. // for the stream caster, for instance, http flv streaming, may discard the flv header or not.
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg) = 0; virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg) = 0;
private: private:
virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
/** // When the connection disconnect, call this method.
* when the connection disconnect, call this method. // e.g. log msg of connection and report to other system.
* e.g. log msg of connection and report to other system. // @param request: request which is converted by the last http message.
* @param request: request which is converted by the last http message.
*/
virtual srs_error_t on_disconnect(SrsRequest* req); virtual srs_error_t on_disconnect(SrsRequest* req);
// interface ISrsReloadHandler // interface ISrsReloadHandler
public: public:
virtual srs_error_t on_reload_http_stream_crossdomain(); virtual srs_error_t on_reload_http_stream_crossdomain();
}; };
/** // Drop body of request, only process the response.
* drop body of request, only process the response.
*/
class SrsResponseOnlyHttpConn : public SrsHttpConn class SrsResponseOnlyHttpConn : public SrsHttpConn
{ {
public: public:
@ -109,9 +103,7 @@ public:
virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg); virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg);
}; };
/** // The http server, use http stream or static server to serve requests.
* the http server, use http stream or static server to serve requests.
*/
class SrsHttpServer : public ISrsHttpServeMux class SrsHttpServer : public ISrsHttpServeMux
{ {
private: private:
@ -123,10 +115,9 @@ public:
virtual ~SrsHttpServer(); virtual ~SrsHttpServer();
public: public:
virtual srs_error_t initialize(); virtual srs_error_t initialize();
// ISrsHttpServeMux // interface ISrsHttpServeMux
public: public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
// http flv/ts/mp3/aac stream
public: public:
virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r); virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r);
virtual void http_unmount(SrsSource* s, SrsRequest* r); virtual void http_unmount(SrsSource* s, SrsRequest* r);

@ -34,11 +34,10 @@ class SrsRequest;
class SrsHttpParser; class SrsHttpParser;
class SrsHttpClient; class SrsHttpClient;
/** // the http hooks, http callback api,
* the http hooks, http callback api, // for some event, such as on_connect, call
* for some event, such as on_connect, call // a http api(hooks).
* a http api(hooks). // TODO: Refine to global variable.
*/
class SrsHttpHooks class SrsHttpHooks
{ {
private: private:
@ -46,76 +45,56 @@ private:
public: public:
virtual ~SrsHttpHooks(); virtual ~SrsHttpHooks();
public: public:
/** // The on_connect hook, when client connect to srs.
* on_connect hook, when client connect to srs. // @param url the api server url, to valid the client.
* @param url the api server url, to valid the client. // ignore if empty.
* ignore if empty.
*/
static srs_error_t on_connect(std::string url, SrsRequest* req); static srs_error_t on_connect(std::string url, SrsRequest* req);
/** // The on_close hook, when client disconnect to srs, where client is valid by on_connect.
* on_close hook, when client disconnect to srs, where client is valid by on_connect. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty.
*/
static void on_close(std::string url, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes); static void on_close(std::string url, SrsRequest* req, int64_t send_bytes, int64_t recv_bytes);
/** // The on_publish hook, when client(encoder) start to publish stream
* on_publish hook, when client(encoder) start to publish stream // @param url the api server url, to valid the client.
* @param url the api server url, to valid the client. // ignore if empty.
* ignore if empty.
*/
static srs_error_t on_publish(std::string url, SrsRequest* req); static srs_error_t on_publish(std::string url, SrsRequest* req);
/** // The on_unpublish hook, when client(encoder) stop publish stream.
* on_unpublish hook, when client(encoder) stop publish stream. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty.
*/
static void on_unpublish(std::string url, SrsRequest* req); static void on_unpublish(std::string url, SrsRequest* req);
/** // The on_play hook, when client start to play stream.
* on_play hook, when client start to play stream. // @param url the api server url, to valid the client.
* @param url the api server url, to valid the client. // ignore if empty.
* ignore if empty.
*/
static srs_error_t on_play(std::string url, SrsRequest* req); static srs_error_t on_play(std::string url, SrsRequest* req);
/** // The on_stop hook, when client stop to play the stream.
* on_stop hook, when client stop to play the stream. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty.
*/
static void on_stop(std::string url, SrsRequest* req); static void on_stop(std::string url, SrsRequest* req);
/** // The on_dvr hook, when reap a dvr file.
* on_dvr hook, when reap a dvr file. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty. // @param file the file path, can be relative or absolute path.
* @param file the file path, can be relative or absolute path. // @param cid the source connection cid, for the on_dvr is async call.
* @param cid the source connection cid, for the on_dvr is async call.
*/
static srs_error_t on_dvr(int cid, std::string url, SrsRequest* req, std::string file); static srs_error_t on_dvr(int cid, std::string url, SrsRequest* req, std::string file);
/** // When hls reap segment, callback.
* when hls reap segment, callback. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty. // @param file the ts file path, can be relative or absolute path.
* @param file the ts file path, can be relative or absolute path. // @param ts_url the ts url, which used for m3u8.
* @param ts_url the ts url, which used for m3u8. // @param m3u8 the m3u8 file path, can be relative or absolute path.
* @param m3u8 the m3u8 file path, can be relative or absolute path. // @param m3u8_url the m3u8 url, which is used for the http mount path.
* @param m3u8_url the m3u8 url, which is used for the http mount path. // @param sn the seq_no, the sequence number of ts in hls/m3u8.
* @param sn the seq_no, the sequence number of ts in hls/m3u8. // @param duration the segment duration in srs_utime_t.
* @param duration the segment duration in srs_utime_t. // @param cid the source connection cid, for the on_dvr is async call.
* @param cid the source connection cid, for the on_dvr is async call.
*/
static srs_error_t on_hls(int cid, std::string url, SrsRequest* req, std::string file, std::string ts_url, static srs_error_t on_hls(int cid, std::string url, SrsRequest* req, std::string file, std::string ts_url,
std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration); std::string m3u8, std::string m3u8_url, int sn, srs_utime_t duration);
/** // When hls reap segment, callback.
* when hls reap segment, callback. // @param url the api server url, to process the event.
* @param url the api server url, to process the event. // ignore if empty.
* ignore if empty. // @param ts_url the ts uri, used to replace the variable [ts_url] in url.
* @param ts_url the ts uri, used to replace the variable [ts_url] in url. // @param nb_notify the max bytes to read from notify server.
* @param nb_notify the max bytes to read from notify server. // @param cid the source connection cid, for the on_dvr is async call.
* @param cid the source connection cid, for the on_dvr is async call.
*/
static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify); static srs_error_t on_hls_notify(int cid, std::string url, SrsRequest* req, std::string ts_url, int nb_notify);
/** // Discover co-workers for origin cluster.
* Discover co-workers for origin cluster.
*/
static srs_error_t discover_co_workers(std::string url, std::string& host, int& port); static srs_error_t discover_co_workers(std::string url, std::string& host, int& port);
private: private:
static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res); static srs_error_t do_post(SrsHttpClient* hc, std::string url, std::string req, int& code, std::string& res);

@ -28,12 +28,10 @@
#include <srs_app_http_conn.hpp> #include <srs_app_http_conn.hpp>
/** // The flv vod stream supports flv?start=offset-bytes.
* the flv vod stream supports flv?start=offset-bytes. // For example, http://server/file.flv?start=10240
* for example, http://server/file.flv?start=10240 // server will write flv header and sequence header,
* server will write flv header and sequence header, // then seek(10240) and response flv tag data.
* then seek(10240) and response flv tag data.
*/
class SrsVodStream : public SrsHttpFileServer class SrsVodStream : public SrsHttpFileServer
{ {
public: public:
@ -44,10 +42,8 @@ protected:
virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end); virtual srs_error_t serve_mp4_stream(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string fullpath, int start, int end);
}; };
/** // The http static server instance,
* the http static server instance, // serve http static file and flv/mp4 vod stream.
* serve http static file and flv/mp4 vod stream.
*/
class SrsHttpStaticServer : virtual public ISrsReloadHandler class SrsHttpStaticServer : virtual public ISrsReloadHandler
{ {
private: private:

@ -33,9 +33,7 @@ class SrsMp3Transmuxer;
class SrsFlvTransmuxer; class SrsFlvTransmuxer;
class SrsTsTransmuxer; class SrsTsTransmuxer;
/** // A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
* A cache for HTTP Live Streaming encoder, to make android(weixin) happy.
*/
class SrsBufferCache : public ISrsCoroutineHandler class SrsBufferCache : public ISrsCoroutineHandler
{ {
private: private:
@ -57,43 +55,31 @@ public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
}; };
/** // The encoder to transmux RTMP stream.
* The encoder to transmux RTMP stream.
*/
class ISrsBufferEncoder class ISrsBufferEncoder
{ {
public: public:
ISrsBufferEncoder(); ISrsBufferEncoder();
virtual ~ISrsBufferEncoder(); virtual ~ISrsBufferEncoder();
public: public:
/** // Initialize the encoder with file writer(to http response) and stream cache.
* initialize the encoder with file writer(to http response) and stream cache. // @param w the writer to write to http response.
* @param w the writer to write to http response. // @param c the stream cache for audio stream fast startup.
* @param c the stream cache for audio stream fast startup.
*/
virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c) = 0; virtual srs_error_t initialize(SrsFileWriter* w, SrsBufferCache* c) = 0;
/** // Write rtmp video/audio/metadata.
* write rtmp video/audio/metadata.
*/
virtual srs_error_t write_audio(int64_t timestamp, char* data, int size) = 0; virtual srs_error_t write_audio(int64_t timestamp, char* data, int size) = 0;
virtual srs_error_t write_video(int64_t timestamp, char* data, int size) = 0; virtual srs_error_t write_video(int64_t timestamp, char* data, int size) = 0;
virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size) = 0; virtual srs_error_t write_metadata(int64_t timestamp, char* data, int size) = 0;
public: public:
/** // For some stream, for example, mp3 and aac, the audio stream,
* for some stream, for example, mp3 and aac, the audio stream, // we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio.
* we use large gop cache in encoder, for the gop cache of SrsSource is ignore audio. // @return true to use gop cache of encoder; otherwise, use SrsSource.
* @return true to use gop cache of encoder; otherwise, use SrsSource.
*/
virtual bool has_cache() = 0; virtual bool has_cache() = 0;
/** // Dumps the cache of encoder to consumer.
* dumps the cache of encoder to consumer.
*/
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0; virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter) = 0;
}; };
/** // Transmux RTMP to HTTP Live Streaming.
* Transmux RTMP to HTTP Live Streaming.
*/
class SrsFlvStreamEncoder : public ISrsBufferEncoder class SrsFlvStreamEncoder : public ISrsBufferEncoder
{ {
protected: protected:
@ -112,26 +98,20 @@ public:
}; };
#ifdef SRS_PERF_FAST_FLV_ENCODER #ifdef SRS_PERF_FAST_FLV_ENCODER
/** // A Fast HTTP FLV Live Streaming, to write multiple tags by writev.
* A Fast HTTP FLV Live Streaming, to write multiple tags by writev. // @see https://github.com/ossrs/srs/issues/405
* @see https://github.com/ossrs/srs/issues/405
*/
class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder class SrsFastFlvStreamEncoder : public SrsFlvStreamEncoder
{ {
public: public:
SrsFastFlvStreamEncoder(); SrsFastFlvStreamEncoder();
virtual ~SrsFastFlvStreamEncoder(); virtual ~SrsFastFlvStreamEncoder();
public: public:
/** // Write the tags in a time.
* write the tags in a time.
*/
virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count); virtual srs_error_t write_tags(SrsSharedPtrMessage** msgs, int count);
}; };
#endif #endif
/** // Transmux RTMP to HTTP TS Streaming.
* Transmux RTMP to HTTP TS Streaming.
*/
class SrsTsStreamEncoder : public ISrsBufferEncoder class SrsTsStreamEncoder : public ISrsBufferEncoder
{ {
private: private:
@ -149,9 +129,7 @@ public:
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
/** // Transmux RTMP with AAC stream to HTTP AAC Streaming.
* Transmux RTMP with AAC stream to HTTP AAC Streaming.
*/
class SrsAacStreamEncoder : public ISrsBufferEncoder class SrsAacStreamEncoder : public ISrsBufferEncoder
{ {
private: private:
@ -170,9 +148,7 @@ public:
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
/** // Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
* Transmux RTMP with MP3 stream to HTTP MP3 Streaming.
*/
class SrsMp3StreamEncoder : public ISrsBufferEncoder class SrsMp3StreamEncoder : public ISrsBufferEncoder
{ {
private: private:
@ -191,9 +167,7 @@ public:
virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter); virtual srs_error_t dump_cache(SrsConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
}; };
/** // Write stream to http response direclty.
* write stream to http response direclty.
*/
class SrsBufferWriter : public SrsFileWriter class SrsBufferWriter : public SrsFileWriter
{ {
private: private:
@ -212,9 +186,7 @@ public:
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite); virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
}; };
/** // HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format.
* HTTP Live Streaming, to transmux RTMP to HTTP FLV or other format.
*/
class SrsLiveStream : public ISrsHttpHandler class SrsLiveStream : public ISrsHttpHandler
{ {
private: private:
@ -234,9 +206,7 @@ private:
virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs); virtual srs_error_t streaming_send_messages(ISrsBufferEncoder* enc, SrsSharedPtrMessage** msgs, int nb_msgs);
}; };
/** // The Live Entry, to handle HTTP Live Streaming.
* The Live Entry, to handle HTTP Live Streaming.
*/
struct SrsLiveEntry struct SrsLiveEntry
{ {
private: private:
@ -248,8 +218,8 @@ public:
SrsRequest* req; SrsRequest* req;
SrsSource* source; SrsSource* source;
public: public:
// for template, the mount contains variables. // For template, the mount contains variables.
// for concrete stream, the mount is url to access. // For concrete stream, the mount is url to access.
std::string mount; std::string mount;
SrsLiveStream* stream; SrsLiveStream* stream;
@ -263,9 +233,7 @@ public:
bool is_aac(); bool is_aac();
}; };
/** // The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream.
* The HTTP Live Streaming Server, to serve FLV/TS/MP3/AAC stream.
*/
// TODO: Support multiple stream. // TODO: Support multiple stream.
class SrsHttpStreamServer : virtual public ISrsReloadHandler class SrsHttpStreamServer : virtual public ISrsReloadHandler
, virtual public ISrsHttpMatchHijacker , virtual public ISrsHttpMatchHijacker
@ -274,17 +242,17 @@ private:
SrsServer* server; SrsServer* server;
public: public:
SrsHttpServeMux mux; SrsHttpServeMux mux;
// the http live streaming template, to create streams. // The http live streaming template, to create streams.
std::map<std::string, SrsLiveEntry*> tflvs; std::map<std::string, SrsLiveEntry*> tflvs;
// the http live streaming streams, crote by template. // The http live streaming streams, crote by template.
std::map<std::string, SrsLiveEntry*> sflvs; std::map<std::string, SrsLiveEntry*> sflvs;
public: public:
SrsHttpStreamServer(SrsServer* svr); SrsHttpStreamServer(SrsServer* svr);
virtual ~SrsHttpStreamServer(); virtual ~SrsHttpStreamServer();
public: public:
virtual srs_error_t initialize(); virtual srs_error_t initialize();
// http flv/ts/mp3/aac stream
public: public:
// HTTP flv/ts/mp3/aac stream
virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r); virtual srs_error_t http_mount(SrsSource* s, SrsRequest* r);
virtual void http_unmount(SrsSource* s, SrsRequest* r); virtual void http_unmount(SrsSource* s, SrsRequest* r);
// interface ISrsReloadHandler. // interface ISrsReloadHandler.

@ -33,51 +33,39 @@
struct sockaddr; struct sockaddr;
/** // The udp packet handler.
* the udp packet handler.
*/
class ISrsUdpHandler class ISrsUdpHandler
{ {
public: public:
ISrsUdpHandler(); ISrsUdpHandler();
virtual ~ISrsUdpHandler(); virtual ~ISrsUdpHandler();
public: public:
/** // When fd changed, for instance, reload the listen port,
* when fd changed, for instance, reload the listen port, // notify the handler and user can do something.
* notify the handler and user can do something.
*/
virtual srs_error_t on_stfd_change(srs_netfd_t fd); virtual srs_error_t on_stfd_change(srs_netfd_t fd);
public: public:
/** // When udp listener got a udp packet, notice server to process it.
* when udp listener got a udp packet, notice server to process it. // @param type, the client type, used to create concrete connection,
* @param type, the client type, used to create concrete connection, // for instance RTMP connection to serve client.
* for instance RTMP connection to serve client. // @param from, the udp packet from address.
* @param from, the udp packet from address. // @param buf, the udp packet bytes, user should copy if need to use.
* @param buf, the udp packet bytes, user should copy if need to use. // @param nb_buf, the size of udp packet bytes.
* @param nb_buf, the size of udp packet bytes. // @remark user should never use the buf, for it's a shared memory bytes.
* @remark user should never use the buf, for it's a shared memory bytes.
*/
virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0; virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) = 0;
}; };
/** // The tcp connection handler.
* the tcp connection handler.
*/
class ISrsTcpHandler class ISrsTcpHandler
{ {
public: public:
ISrsTcpHandler(); ISrsTcpHandler();
virtual ~ISrsTcpHandler(); virtual ~ISrsTcpHandler();
public: public:
/** // When got tcp client.
* when got tcp client.
*/
virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0; virtual srs_error_t on_tcp_client(srs_netfd_t stfd) = 0;
}; };
/** // Bind udp port, start thread to recv packet and handler it.
* bind udp port, start thread to recv packet and handler it.
*/
class SrsUdpListener : public ISrsCoroutineHandler class SrsUdpListener : public ISrsCoroutineHandler
{ {
private: private:
@ -104,9 +92,7 @@ public:
virtual srs_error_t cycle(); virtual srs_error_t cycle();
}; };
/** // Bind and listen tcp port, use handler to process the client.
* bind and listen tcp port, use handler to process the client.
*/
class SrsTcpListener : public ISrsCoroutineHandler class SrsTcpListener : public ISrsCoroutineHandler
{ {
private: private:

@ -32,24 +32,21 @@
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
#include <srs_service_log.hpp> #include <srs_service_log.hpp>
/** // Use memory/disk cache and donot flush when write log.
* we use memory/disk cache and donot flush when write log. // it's ok to use it without config, which will log to console, and default trace level.
* it's ok to use it without config, which will log to console, and default trace level. // when you want to use different level, override this classs, set the protected _level.
* when you want to use different level, override this classs, set the protected _level.
*/
class SrsFastLog : public ISrsLog, public ISrsReloadHandler class SrsFastLog : public ISrsLog, public ISrsReloadHandler
{ {
// for utest to override private:
protected: // Defined in SrsLogLevel.
// defined in SrsLogLevel.
SrsLogLevel level; SrsLogLevel level;
private: private:
char* log_data; char* log_data;
// log to file if specified srs_log_file // Log to file if specified srs_log_file
int fd; int fd;
// whether log to file tank // Whether log to file tank
bool log_to_file_tank; bool log_to_file_tank;
// whether use utc time. // Whether use utc time.
bool utc; bool utc;
public: public:
SrsFastLog(); SrsFastLog();

@ -48,15 +48,13 @@ class SrsSimpleRtmpClient;
#include <srs_kernel_ts.hpp> #include <srs_kernel_ts.hpp>
#include <srs_app_listener.hpp> #include <srs_app_listener.hpp>
/** // The queue for mpegts over udp to send packets.
* the queue for mpegts over udp to send packets. // For the aac in mpegts contains many flv packets in a pes packet,
* for the aac in mpegts contains many flv packets in a pes packet, // we must recalc the timestamp.
* we must recalc the timestamp.
*/
class SrsMpegtsQueue class SrsMpegtsQueue
{ {
private: private:
// key: dts, value: msg. // The key: dts, value: msg.
std::map<int64_t, SrsSharedPtrMessage*> msgs; std::map<int64_t, SrsSharedPtrMessage*> msgs;
int nb_audios; int nb_audios;
int nb_videos; int nb_videos;
@ -68,11 +66,8 @@ public:
virtual SrsSharedPtrMessage* dequeue(); virtual SrsSharedPtrMessage* dequeue();
}; };
/** // The mpegts over udp stream caster.
* the mpegts over udp stream caster. class SrsMpegtsOverUdp : virtual public ISrsTsHandler, virtual public ISrsUdpHandler
*/
class SrsMpegtsOverUdp : virtual public ISrsTsHandler
, virtual public ISrsUdpHandler
{ {
private: private:
SrsTsContext* context; SrsTsContext* context;

@ -35,11 +35,9 @@ class SrsRequest;
class SrsPithyPrint; class SrsPithyPrint;
class SrsProcess; class SrsProcess;
/** // The ng-exec is the exec feature introduced by nginx-rtmp,
* the ng-exec is the exec feature introduced by nginx-rtmp, // @see https://github.com/arut/nginx-rtmp-module/wiki/Directives#exec_push
* @see https://github.com/arut/nginx-rtmp-module/wiki/Directives#exec_push // @see https://github.com/ossrs/srs/issues/367
* @see https://github.com/ossrs/srs/issues/367
*/
class SrsNgExec : public ISrsCoroutineHandler class SrsNgExec : public ISrsCoroutineHandler
{ {
private: private:

@ -28,9 +28,7 @@
#include <srs_app_reload.hpp> #include <srs_app_reload.hpp>
/** // The stage info to calc the age.
* the stage info to calc the age.
*/
class SrsStageInfo : public ISrsReloadHandler class SrsStageInfo : public ISrsReloadHandler
{ {
public: public:
@ -50,26 +48,24 @@ public:
virtual srs_error_t on_reload_pithy_print(); virtual srs_error_t on_reload_pithy_print();
}; };
/** // The stage is used for a collection of object to do print,
* the stage is used for a collection of object to do print, // the print time in a stage is constant and not changed,
* the print time in a stage is constant and not changed, // that is, we always got one message to print every specified time.
* that is, we always got one message to print every specified time. //
* // For example, stage #1 for all play clients, print time is 3s,
* for example, stage #1 for all play clients, print time is 3s, // if there is 1client, it will print every 3s.
* if there is 1client, it will print every 3s. // if there is 10clients, random select one to print every 3s.
* if there is 10clients, random select one to print every 3s. // Usage:
* Usage: // SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
* SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play(); // SrsAutoFree(SrsPithyPrint, pprint);
* SrsAutoFree(SrsPithyPrint, pprint); // while (true) {
* while (true) { // pprint->elapse();
* pprint->elapse(); // if (pprint->can_print()) {
* if (pprint->can_print()) { // // print pithy message.
* // print pithy message. // // user can get the elapse time by: pprint->age()
* // user can get the elapse time by: pprint->age() // }
* } // // read and write RTMP messages.
* // read and write RTMP messages. // }
* }
*/
class SrsPithyPrint class SrsPithyPrint
{ {
private: private:
@ -93,26 +89,16 @@ public:
static SrsPithyPrint* create_http_stream_cache(); static SrsPithyPrint* create_http_stream_cache();
virtual ~SrsPithyPrint(); virtual ~SrsPithyPrint();
private: private:
/** // Enter the specified stage, return the client id.
* enter the specified stage, return the client id.
*/
virtual int enter_stage(); virtual int enter_stage();
/** // Leave the specified stage, release the client id.
* leave the specified stage, release the client id.
*/
virtual void leave_stage(); virtual void leave_stage();
public: public:
/** // Auto calc the elapse time
* auto calc the elapse time
*/
virtual void elapse(); virtual void elapse();
/** // Whether current client can print.
* whether current client can print.
*/
virtual bool can_print(); virtual bool can_print();
/** // Get the elapsed time in srs_utime_t.
* get the elapsed time in srs_utime_t.
*/
virtual srs_utime_t age(); virtual srs_utime_t age();
}; };

@ -29,26 +29,24 @@
#include <string> #include <string>
#include <vector> #include <vector>
/** // Start and stop a process. Call cycle to restart the process when terminated.
* to start and stop a process, cycle to restart the process when terminated. // The usage:
* the usage: // // the binary is the process to fork.
* // the binary is the process to fork. // binary = "./objs/ffmpeg/bin/ffmpeg";
* binary = "./objs/ffmpeg/bin/ffmpeg"; // // where argv is a array contains each params.
* // where argv is a array contains each params. // argv = ["./objs/ffmpeg/bin/ffmpeg", "-i", "in.flv", "1", ">", "/dev/null", "2", ">", "/dev/null"];
* argv = ["./objs/ffmpeg/bin/ffmpeg", "-i", "in.flv", "1", ">", "/dev/null", "2", ">", "/dev/null"]; //
* // process = new SrsProcess();
* process = new SrsProcess(); // if ((ret = process->initialize(binary, argv)) != ERROR_SUCCESS) { return ret; }
* if ((ret = process->initialize(binary, argv)) != ERROR_SUCCESS) { return ret; } // if ((ret = process->start()) != ERROR_SUCCESS) { return ret; }
* if ((ret = process->start()) != ERROR_SUCCESS) { return ret; } // if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; }
* if ((ret = process->cycle()) != ERROR_SUCCESS) { return ret; } // process->fast_stop();
* process->fast_stop(); // process->stop();
* process->stop();
*/
class SrsProcess class SrsProcess
{ {
private: private:
bool is_started; bool is_started;
// whether SIGTERM send but need to wait or SIGKILL. // Whether SIGTERM send but need to wait or SIGKILL.
bool fast_stopped; bool fast_stopped;
pid_t pid; pid_t pid;
private: private:
@ -56,56 +54,42 @@ private:
std::string stdout_file; std::string stdout_file;
std::string stderr_file; std::string stderr_file;
std::vector<std::string> params; std::vector<std::string> params;
// the cli to fork process. // The cli to fork process.
std::string cli; std::string cli;
std::string actual_cli; std::string actual_cli;
public: public:
SrsProcess(); SrsProcess();
virtual ~SrsProcess(); virtual ~SrsProcess();
public: public:
/** // Get pid of process.
* get pid of process.
*/
virtual int get_pid(); virtual int get_pid();
/** // whether process is already started.
* whether process is already started.
*/
virtual bool started(); virtual bool started();
/** // Initialize the process with binary and argv.
* initialize the process with binary and argv. // @param binary the binary path to exec.
* @param binary the binary path to exec. // @param argv the argv for binary path, the argv[0] generally is the binary.
* @param argv the argv for binary path, the argv[0] generally is the binary. // @remark the argv[0] must be the binary.
* @remark the argv[0] must be the binary.
*/
virtual srs_error_t initialize(std::string binary, std::vector<std::string> argv); virtual srs_error_t initialize(std::string binary, std::vector<std::string> argv);
public: public:
/** // Start the process, ignore when already started.
* start the process, ignore when already started.
*/
virtual srs_error_t start(); virtual srs_error_t start();
/** // cycle check the process, update the state of process.
* cycle check the process, update the state of process. // @remark when process terminated(not started), user can restart it again by start().
* @remark when process terminated(not started), user can restart it again by start().
*/
virtual srs_error_t cycle(); virtual srs_error_t cycle();
/** // Send SIGTERM then SIGKILL to ensure the process stopped.
* send SIGTERM then SIGKILL to ensure the process stopped. // the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the
* the stop will wait [0, SRS_PROCESS_QUIT_TIMEOUT_MS] depends on the // process quit timeout.
* process quit timeout. // @remark use fast_stop before stop one by one, when got lots of process to quit.
* @remark use fast_stop before stop one by one, when got lots of process to quit.
*/
virtual void stop(); virtual void stop();
public: public:
/** // The fast stop is to send a SIGTERM.
* the fast stop is to send a SIGTERM. // for example, the ingesters owner lots of FFMPEG, it will take a long time
* for example, the ingesters owner lots of FFMPEG, it will take a long time // to stop one by one, instead the ingesters can fast_stop all FFMPEG, then
* to stop one by one, instead the ingesters can fast_stop all FFMPEG, then // wait one by one to stop, it's more faster.
* wait one by one to stop, it's more faster. // @remark user must use stop() to ensure the ffmpeg to stopped.
* @remark user must use stop() to ensure the ffmpeg to stopped. // @remark we got N processes to stop, compare the time we spend,
* @remark we got N processes to stop, compare the time we spend, // when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N]
* when use stop without fast_stop, we spend maybe [0, SRS_PROCESS_QUIT_TIMEOUT_MS * N] // but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS].
* but use fast_stop then stop, the time is almost [0, SRS_PROCESS_QUIT_TIMEOUT_MS].
*/
virtual void fast_stop(); virtual void fast_stop();
}; };

@ -42,54 +42,38 @@ class SrsConsumer;
class SrsHttpConn; class SrsHttpConn;
class SrsResponseOnlyHttpConn; class SrsResponseOnlyHttpConn;
/** // The message consumer which consume a message.
* The message consumer which consume a message.
*/
class ISrsMessageConsumer class ISrsMessageConsumer
{ {
public: public:
ISrsMessageConsumer(); ISrsMessageConsumer();
virtual ~ISrsMessageConsumer(); virtual ~ISrsMessageConsumer();
public: public:
/** // Consume the received message.
* Consume the received message. // @remark user must free this message.
* @remark user must free this message.
*/
virtual srs_error_t consume(SrsCommonMessage* msg) = 0; virtual srs_error_t consume(SrsCommonMessage* msg) = 0;
}; };
/** // The message pumper to pump messages to processer.
* The message pumper to pump messages to processer.
*/
class ISrsMessagePumper : public ISrsMessageConsumer class ISrsMessagePumper : public ISrsMessageConsumer
{ {
public: public:
ISrsMessagePumper(); ISrsMessagePumper();
virtual ~ISrsMessagePumper(); virtual ~ISrsMessagePumper();
public: public:
/** // Whether the pumper is interrupted.
* Whether the pumper is interrupted. // For example, when pumpter is busy, it's interrupted,
* For example, when pumpter is busy, it's interrupted, // please wait for a while then try to feed the pumper.
* please wait for a while then try to feed the pumper.
*/
virtual bool interrupted() = 0; virtual bool interrupted() = 0;
/** // Interrupt the pumper for a error.
* Interrupt the pumper for a error.
*/
virtual void interrupt(srs_error_t error) = 0; virtual void interrupt(srs_error_t error) = 0;
/** // When start the pumper.
* When start the pumper.
*/
virtual void on_start() = 0; virtual void on_start() = 0;
/** // When stop the pumper.
* When stop the pumper.
*/
virtual void on_stop() = 0; virtual void on_stop() = 0;
}; };
/** // The recv thread, use message handler to handle each received message.
* the recv thread, use message handler to handle each received message.
*/
class SrsRecvThread : public ISrsCoroutineHandler class SrsRecvThread : public ISrsCoroutineHandler
{ {
protected: protected:
@ -117,19 +101,17 @@ private:
virtual srs_error_t do_cycle(); virtual srs_error_t do_cycle();
}; };
/** // The recv thread used to replace the timeout recv,
* the recv thread used to replace the timeout recv, // which hurt performance for the epoll_ctrl is frequently used.
* which hurt performance for the epoll_ctrl is frequently used. // @see: SrsRtmpConn::playing
* @see: SrsRtmpConn::playing // @see: https://github.com/ossrs/srs/issues/217
* @see: https://github.com/ossrs/srs/issues/217
*/
class SrsQueueRecvThread : public ISrsMessagePumper class SrsQueueRecvThread : public ISrsMessagePumper
{ {
private: private:
std::vector<SrsCommonMessage*> queue; std::vector<SrsCommonMessage*> queue;
SrsRecvThread trd; SrsRecvThread trd;
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
// the recv thread error code. // The recv thread error code.
srs_error_t recv_error; srs_error_t recv_error;
SrsConsumer* _consumer; SrsConsumer* _consumer;
public: public:
@ -153,10 +135,8 @@ public:
virtual void on_stop(); virtual void on_stop();
}; };
/** // The publish recv thread got message and callback the source method to process message.
* the publish recv thread got message and callback the source method to process message. // @see: https://github.com/ossrs/srs/issues/237
* @see: https://github.com/ossrs/srs/issues/237
*/
class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler class SrsPublishRecvThread : virtual public ISrsMessagePumper, virtual public ISrsReloadHandler
#ifdef SRS_PERF_MERGED_READ #ifdef SRS_PERF_MERGED_READ
, virtual public IMergeReadHandler , virtual public IMergeReadHandler
@ -166,27 +146,27 @@ private:
SrsRecvThread trd; SrsRecvThread trd;
SrsRtmpServer* rtmp; SrsRtmpServer* rtmp;
SrsRequest* req; SrsRequest* req;
// the msgs already got. // The msgs already got.
int64_t _nb_msgs; int64_t _nb_msgs;
// The video frames we got. // The video frames we got.
uint64_t video_frames; uint64_t video_frames;
// for mr(merged read), // For mr(merged read),
// @see https://github.com/ossrs/srs/issues/241 // @see https://github.com/ossrs/srs/issues/241
bool mr; bool mr;
int mr_fd; int mr_fd;
srs_utime_t mr_sleep; srs_utime_t mr_sleep;
// for realtime // For realtime
// @see https://github.com/ossrs/srs/issues/257 // @see https://github.com/ossrs/srs/issues/257
bool realtime; bool realtime;
// the recv thread error code. // The recv thread error code.
srs_error_t recv_error; srs_error_t recv_error;
SrsRtmpConn* _conn; SrsRtmpConn* _conn;
// the params for conn callback. // The params for conn callback.
SrsSource* _source; SrsSource* _source;
// the error timeout cond // The error timeout cond
// @see https://github.com/ossrs/srs/issues/244 // @see https://github.com/ossrs/srs/issues/244
srs_cond_t error; srs_cond_t error;
// merged context id. // The merged context id.
int cid; int cid;
int ncid; int ncid;
public: public:
@ -194,9 +174,7 @@ public:
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid); int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSource* source, int parent_cid);
virtual ~SrsPublishRecvThread(); virtual ~SrsPublishRecvThread();
public: public:
/** // Wait for error for some timeout.
* wait for error for some timeout.
*/
virtual srs_error_t wait(srs_utime_t tm); virtual srs_error_t wait(srs_utime_t tm);
virtual int64_t nb_msgs(); virtual int64_t nb_msgs();
virtual uint64_t nb_video_frames(); virtual uint64_t nb_video_frames();
@ -226,12 +204,10 @@ private:
virtual void set_socket_buffer(srs_utime_t sleep_v); virtual void set_socket_buffer(srs_utime_t sleep_v);
}; };
/** // The HTTP receive thread, try to read messages util EOF.
* The HTTP receive thread, try to read messages util EOF. // For example, the HTTP FLV serving thread will use the receive thread to break
* For example, the HTTP FLV serving thread will use the receive thread to break // when client closed the request, to avoid FD leak.
* when client closed the request, to avoid FD leak. // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
* @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
*/
class SrsHttpRecvThread : public ISrsCoroutineHandler class SrsHttpRecvThread : public ISrsCoroutineHandler
{ {
private: private:

@ -36,11 +36,9 @@ public:
SrsRefer(); SrsRefer();
virtual ~SrsRefer(); virtual ~SrsRefer();
public: public:
/** // Check the refer.
* to check the refer. // @param page_url the client page url.
* @param page_url the client page url. // @param refer the refer in config.
* @param refer the refer in config.
*/
virtual srs_error_t check(std::string page_url, SrsConfDirective* refer); virtual srs_error_t check(std::string page_url, SrsConfDirective* refer);
private: private:
virtual srs_error_t check_single_refer(std::string page_url, std::string refer); virtual srs_error_t check_single_refer(std::string page_url, std::string refer);

@ -28,13 +28,11 @@
#include <string> #include <string>
/** // The handler for config reload.
* the handler for config reload. // When reload callback, the config is updated yet.
* when reload callback, the config is updated yet. //
* // Features not support reload,
* features not support reload, // @see: https://github.com/ossrs/srs/wiki/v1_CN_Reload#notsupportedfeatures
* @see: https://github.com/ossrs/srs/wiki/v1_CN_Reload#notsupportedfeatures
*/
class ISrsReloadHandler class ISrsReloadHandler
{ {
public: public:

@ -58,9 +58,7 @@ class SrsPacket;
class ISrsKafkaCluster; class ISrsKafkaCluster;
#endif #endif
/** // The simple rtmp client for SRS.
* The simple rtmp client for SRS.
*/
class SrsSimpleRtmpClient : public SrsBasicRtmpClient class SrsSimpleRtmpClient : public SrsBasicRtmpClient
{ {
public: public:
@ -70,9 +68,7 @@ protected:
virtual srs_error_t connect_app(); virtual srs_error_t connect_app();
}; };
/** // Some information of client.
* Some information of client.
*/
class SrsClientInfo class SrsClientInfo
{ {
public: public:
@ -89,12 +85,10 @@ public:
virtual ~SrsClientInfo(); virtual ~SrsClientInfo();
}; };
/** // The client provides the main logic control for RTMP clients.
* the client provides the main logic control for RTMP clients.
*/
class SrsRtmpConn : virtual public SrsConnection, virtual public ISrsReloadHandler class SrsRtmpConn : virtual public SrsConnection, virtual public ISrsReloadHandler
{ {
// for the thread to directly access any field of connection. // For the thread to directly access any field of connection.
friend class SrsPublishRecvThread; friend class SrsPublishRecvThread;
private: private:
SrsServer* server; SrsServer* server;
@ -102,27 +96,27 @@ private:
SrsRefer* refer; SrsRefer* refer;
SrsBandwidth* bandwidth; SrsBandwidth* bandwidth;
SrsSecurity* security; SrsSecurity* security;
// the wakable handler, maybe NULL. // The wakable handler, maybe NULL.
// TODO: FIXME: Should refine the state for receiving thread. // TODO: FIXME: Should refine the state for receiving thread.
ISrsWakable* wakable; ISrsWakable* wakable;
// elapse duration in srs_utime_t // The elapsed duration in srs_utime_t
// for live play duration, for instance, rtmpdump to record. // For live play duration, for instance, rtmpdump to record.
// @see https://github.com/ossrs/srs/issues/47 // @see https://github.com/ossrs/srs/issues/47
srs_utime_t duration; srs_utime_t duration;
// the MR(merged-write) sleep time in srs_utime_t. // The MR(merged-write) sleep time in srs_utime_t.
srs_utime_t mw_sleep; srs_utime_t mw_sleep;
// the MR(merged-write) only enabled for play. // The MR(merged-write) only enabled for play.
int mw_enabled; int mw_enabled;
// for realtime // For realtime
// @see https://github.com/ossrs/srs/issues/257 // @see https://github.com/ossrs/srs/issues/257
bool realtime; bool realtime;
// the minimal interval in srs_utime_t for delivery stream. // The minimal interval in srs_utime_t for delivery stream.
srs_utime_t send_min_interval; srs_utime_t send_min_interval;
// publish 1st packet timeout in srs_utime_t // The publish 1st packet timeout in srs_utime_t
srs_utime_t publish_1stpkt_timeout; srs_utime_t publish_1stpkt_timeout;
// publish normal packet timeout in srs_utime_t // The publish normal packet timeout in srs_utime_t
srs_utime_t publish_normal_timeout; srs_utime_t publish_normal_timeout;
// whether enable the tcp_nodelay. // Whether enable the tcp_nodelay.
bool tcp_nodelay; bool tcp_nodelay;
// About the rtmp client. // About the rtmp client.
SrsClientInfo* info; SrsClientInfo* info;
@ -144,9 +138,9 @@ public:
public: public:
virtual void remark(int64_t* in, int64_t* out); virtual void remark(int64_t* in, int64_t* out);
private: private:
// when valid and connected to vhost/app, service the client. // When valid and connected to vhost/app, service the client.
virtual srs_error_t service_cycle(); virtual srs_error_t service_cycle();
// stream(play/publish) service cycle, identify client first. // The stream(play/publish) service cycle, identify client first.
virtual srs_error_t stream_service_cycle(); virtual srs_error_t stream_service_cycle();
virtual srs_error_t check_vhost(bool try_default_vhost); virtual srs_error_t check_vhost(bool try_default_vhost);
virtual srs_error_t playing(SrsSource* source); virtual srs_error_t playing(SrsSource* source);
@ -164,10 +158,8 @@ private:
virtual srs_error_t check_edge_token_traverse_auth(); virtual srs_error_t check_edge_token_traverse_auth();
virtual srs_error_t do_token_traverse_auth(SrsRtmpClient* client); virtual srs_error_t do_token_traverse_auth(SrsRtmpClient* client);
private: private:
/** // When the connection disconnect, call this method.
* when the connection disconnect, call this method. // e.g. log msg of connection and report to other system.
* e.g. log msg of connection and report to other system.
*/
virtual srs_error_t on_disconnect(); virtual srs_error_t on_disconnect();
private: private:
virtual srs_error_t http_hooks_on_connect(); virtual srs_error_t http_hooks_on_connect();

@ -52,9 +52,7 @@ class SrsSimpleStream;
class SrsPithyPrint; class SrsPithyPrint;
class SrsSimpleRtmpClient; class SrsSimpleRtmpClient;
/** // A rtp connection which transport a stream.
* a rtp connection which transport a stream.
*/
class SrsRtpConn: public ISrsUdpHandler class SrsRtpConn: public ISrsUdpHandler
{ {
private: private:
@ -75,9 +73,7 @@ public:
virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf);
}; };
/** // The audio cache, audio is grouped by frames.
* audio is group by frames.
*/
struct SrsRtspAudioCache struct SrsRtspAudioCache
{ {
int64_t dts; int64_t dts;
@ -88,9 +84,7 @@ struct SrsRtspAudioCache
virtual ~SrsRtspAudioCache(); virtual ~SrsRtspAudioCache();
}; };
/** // The time jitter correct for rtsp.
* the time jitter correct for rtsp.
*/
class SrsRtspJitter class SrsRtspJitter
{ {
private: private:
@ -105,9 +99,7 @@ public:
virtual srs_error_t correct(int64_t& ts); virtual srs_error_t correct(int64_t& ts);
}; };
/** // The rtsp connection serve the fd.
* the rtsp connection serve the fd.
*/
class SrsRtspConn : public ISrsCoroutineHandler class SrsRtspConn : public ISrsCoroutineHandler
{ {
private: private:
@ -176,16 +168,14 @@ private:
virtual void close(); virtual void close();
}; };
/** // The caster for rtsp.
* the caster for rtsp.
*/
class SrsRtspCaster : public ISrsTcpHandler class SrsRtspCaster : public ISrsTcpHandler
{ {
private: private:
std::string output; std::string output;
int local_port_min; int local_port_min;
int local_port_max; int local_port_max;
// key: port, value: whether used. // The key: port, value: whether used.
std::map<int, bool> used_ports; std::map<int, bool> used_ports;
private: private:
std::vector<SrsRtspConn*> clients; std::vector<SrsRtspConn*> clients;
@ -193,19 +183,15 @@ public:
SrsRtspCaster(SrsConfDirective* c); SrsRtspCaster(SrsConfDirective* c);
virtual ~SrsRtspCaster(); virtual ~SrsRtspCaster();
public: public:
/** // Alloc a rtp port from local ports pool.
* alloc a rtp port from local ports pool. // @param pport output the rtp port.
* @param pport output the rtp port.
*/
virtual srs_error_t alloc_port(int* pport); virtual srs_error_t alloc_port(int* pport);
/** // Free the alloced rtp port.
* free the alloced rtp port.
*/
virtual void free_port(int lpmin, int lpmax); virtual void free_port(int lpmin, int lpmax);
// interface ISrsTcpHandler // interface ISrsTcpHandler
public: public:
virtual srs_error_t on_tcp_client(srs_netfd_t stfd); virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
// internal methods. // internal methods.
public: public:
virtual void remove(SrsRtspConn* conn); virtual void remove(SrsRtspConn* conn);
}; };

@ -32,33 +32,25 @@
class SrsConfDirective; class SrsConfDirective;
/** // The security apply on vhost.
* the security apply on vhost. // @see https://github.com/ossrs/srs/issues/211
* @see https://github.com/ossrs/srs/issues/211
*/
class SrsSecurity class SrsSecurity
{ {
public: public:
SrsSecurity(); SrsSecurity();
virtual ~SrsSecurity(); virtual ~SrsSecurity();
public: public:
/** // Security check the client apply by vhost security strategy
* security check the client apply by vhost security strategy // @param type the client type, publish or play.
* @param type the client type, publish or play. // @param ip the ip address of client.
* @param ip the ip address of client. // @param req the request object of client.
* @param req the request object of client.
*/
virtual srs_error_t check(SrsRtmpConnType type, std::string ip, SrsRequest* req); virtual srs_error_t check(SrsRtmpConnType type, std::string ip, SrsRequest* req);
private: private:
/** // Security check the allow,
* security check the allow, // @return, if allowed, ERROR_SYSTEM_SECURITY_ALLOW.
* @return, if allowed, ERROR_SYSTEM_SECURITY_ALLOW.
*/
virtual int allow_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); virtual int allow_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip);
/** // Security check the deny,
* security check the deny, // @return, if denied, ERROR_SYSTEM_SECURITY_DENY.
* @return, if denied, ERROR_SYSTEM_SECURITY_DENY.
*/
virtual int deny_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip); virtual int deny_check(SrsConfDirective* rules, SrsRtmpConnType type, std::string ip);
}; };

@ -31,49 +31,43 @@
#include <srs_service_st.hpp> #include <srs_service_st.hpp>
#include <srs_protocol_io.hpp> #include <srs_protocol_io.hpp>
/** // Each ST-coroutine must implements this interface,
* Each ST-coroutine must implements this interface, // to do the cycle job and handle some events.
* to do the cycle job and handle some events. //
* // Thread do a job then terminated normally, it's a SrsOneCycleThread:
* Thread do a job then terminated normally, it's a SrsOneCycleThread: // class SrsOneCycleThread : public ISrsCoroutineHandler {
* class SrsOneCycleThread : public ISrsCoroutineHandler { // public: SrsCoroutine trd;
* public: SrsCoroutine trd; // public: virtual srs_error_t cycle() {
* public: virtual srs_error_t cycle() { // // Do something, then return this cycle and thread terminated normally.
* // Do something, then return this cycle and thread terminated normally. // }
* } // };
* }; //
* // Thread has its inside loop, such as the RTMP receive thread:
* Thread has its inside loop, such as the RTMP receive thread: // class SrsReceiveThread : public ISrsCoroutineHandler {
* class SrsReceiveThread : public ISrsCoroutineHandler { // public: SrsCoroutine* trd;
* public: SrsCoroutine* trd; // public: virtual srs_error_t cycle() {
* public: virtual srs_error_t cycle() { // while (true) {
* while (true) { // // Check whether thread interrupted.
* // Check whether thread interrupted. // if ((err = trd->pull()) != srs_success) {
* if ((err = trd->pull()) != srs_success) { // return err;
* return err; // }
* } // // Do something, such as st_read() packets, it'll be wakeup
* // Do something, such as st_read() packets, it'll be wakeup // // when user stop or interrupt the thread.
* // when user stop or interrupt the thread. // }
* } // }
* } // };
* };
*/
class ISrsCoroutineHandler class ISrsCoroutineHandler
{ {
public: public:
ISrsCoroutineHandler(); ISrsCoroutineHandler();
virtual ~ISrsCoroutineHandler(); virtual ~ISrsCoroutineHandler();
public: public:
/** // Do the work. The ST-coroutine will terminated normally if it returned.
* Do the work. The ST-coroutine will terminated normally if it returned. // @remark If the cycle has its own loop, it must check the thread pull.
* @remark If the cycle has its own loop, it must check the thread pull.
*/
virtual srs_error_t cycle() = 0; virtual srs_error_t cycle() = 0;
}; };
/** // The corotine object.
* The corotine object.
*/
class SrsCoroutine class SrsCoroutine
{ {
public: public:
@ -89,10 +83,8 @@ public:
virtual int cid() = 0; virtual int cid() = 0;
}; };
/** // An empty coroutine, user can default to this object before create any real coroutine.
* An empty coroutine, user can default to this object before create any real coroutine. // @see https://github.com/ossrs/srs/pull/908
* @see https://github.com/ossrs/srs/pull/908
*/
class SrsDummyCoroutine : public SrsCoroutine class SrsDummyCoroutine : public SrsCoroutine
{ {
public: public:
@ -110,20 +102,18 @@ public:
typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size); typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size);
extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create; extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;
/** // A ST-coroutine is a lightweight thread, just like the goroutine.
* A ST-coroutine is a lightweight thread, just like the goroutine. // But the goroutine maybe run on different thread, while ST-coroutine only
* But the goroutine maybe run on different thread, while ST-coroutine only // run in single thread, because it use setjmp and longjmp, so it may cause
* run in single thread, because it use setjmp and longjmp, so it may cause // problem in multiple threads. For SRS, we only use single thread module,
* problem in multiple threads. For SRS, we only use single thread module, // like NGINX to get very high performance, with asynchronous and non-blocking
* like NGINX to get very high performance, with asynchronous and non-blocking // sockets.
* sockets. // @reamrk For multiple processes, please use go-oryx to fork many SRS processes.
* @reamrk For multiple processes, please use go-oryx to fork many SRS processes. // Please read https://github.com/ossrs/go-oryx
* Please read https://github.com/ossrs/go-oryx // @remark For debugging of ST-coroutine, read _st_iterate_threads_flag of ST/README
* @remark For debugging of ST-coroutine, read _st_iterate_threads_flag of ST/README // https://github.com/ossrs/state-threads/blob/st-1.9/README#L115
* https://github.com/ossrs/state-threads/blob/st-1.9/README#L115 // @remark We always create joinable thread, so we must join it or memory leak,
* @remark We always create joinable thread, so we must join it or memory leak, // Please read https://github.com/ossrs/srs/issues/78
* Please read https://github.com/ossrs/srs/issues/78
*/
class SrsSTCoroutine : public SrsCoroutine class SrsSTCoroutine : public SrsCoroutine
{ {
private: private:
@ -145,35 +135,25 @@ public:
SrsSTCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0); SrsSTCoroutine(const std::string& n, ISrsCoroutineHandler* h, int cid = 0);
virtual ~SrsSTCoroutine(); virtual ~SrsSTCoroutine();
public: public:
/** // Start the thread.
* Start the thread. // @remark Should never start it when stopped or terminated.
* @remark Should never start it when stopped or terminated.
*/
virtual srs_error_t start(); virtual srs_error_t start();
/** // Interrupt the thread then wait to terminated.
* Interrupt the thread then wait to terminated. // @remark If user want to notify thread to quit async, for example if there are
* @remark If user want to notify thread to quit async, for example if there are // many threads to stop like the encoder, use the interrupt to notify all threads
* many threads to stop like the encoder, use the interrupt to notify all threads // to terminate then use stop to wait for each to terminate.
* to terminate then use stop to wait for each to terminate.
*/
virtual void stop(); virtual void stop();
/** // Interrupt the thread and notify it to terminate, it will be wakeup if it's blocked
* Interrupt the thread and notify it to terminate, it will be wakeup if it's blocked // in some IO operations, such as st_read or st_write, then it will found should quit,
* in some IO operations, such as st_read or st_write, then it will found should quit, // finally the thread should terminated normally, user can use the stop to join it.
* finally the thread should terminated normally, user can use the stop to join it.
*/
virtual void interrupt(); virtual void interrupt();
/** // Check whether thread is terminated normally or error(stopped or termianted with error),
* Check whether thread is terminated normally or error(stopped or termianted with error), // and the thread should be running if it return ERROR_SUCCESS.
* and the thread should be running if it return ERROR_SUCCESS. // @remark Return specified error when thread terminated normally with error.
* @remark Return specified error when thread terminated normally with error. // @remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error.
* @remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error. // @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.
* @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.
*/
virtual srs_error_t pull(); virtual srs_error_t pull();
/** // Get the context id of thread.
* Get the context id of thread.
*/
virtual int cid(); virtual int cid();
private: private:
virtual srs_error_t cycle(); virtual srs_error_t cycle();

Loading…
Cancel
Save