diff --git a/trunk/configure b/trunk/configure index 60445a4fa..07f5e62f8 100755 --- a/trunk/configure +++ b/trunk/configure @@ -86,7 +86,7 @@ MODULE_FILES=("srs_core" "srs_core_log" "srs_core_server" "srs_core_error" "srs_core_conn" "srs_core_client" "srs_core_rtmp" "srs_core_socket" "srs_core_buffer" "srs_core_auto_free" "srs_core_protocol" "srs_core_amf0" - "srs_core_stream") + "srs_core_stream" "srs_core_source") MODULE_DIR="src/core" . auto/modules.sh CORE_OBJS="${MODULE_OBJS[@]}" diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index b36afb6b9..11195fa75 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -30,6 +30,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include #include +#include + +// wait for client message. +#define SRS_PULSE_TIME_MS 100 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) : SrsConnection(srs_server, client_stfd) @@ -113,6 +117,11 @@ int SrsClient::do_cycle() } srs_verbose("set chunk size success"); + // find a source to publish. + SrsSource* source = SrsSource::find(req->get_stream_url()); + srs_assert(source != NULL); + srs_info("source found, url=%s", req->get_stream_url().c_str()); + switch (type) { case SrsClientPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); @@ -122,7 +131,7 @@ int SrsClient::do_cycle() return ret; } srs_info("start to play stream %s success", req->stream.c_str()); - return streaming_play(); + return streaming_play(source); } case SrsClientPublish: { srs_verbose("start to publish stream %s.", req->stream.c_str()); @@ -132,7 +141,7 @@ int SrsClient::do_cycle() return ret; } srs_info("start to publish stream %s success", req->stream.c_str()); - return streaming_publish(); + return streaming_publish(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; @@ -144,13 +153,58 @@ int SrsClient::do_cycle() return ret; } -int SrsClient::streaming_play() +int SrsClient::streaming_play(SrsSource* source) { int ret = ERROR_SUCCESS; + + SrsConsumer* consumer = source->create_consumer(); + srs_assert(consumer != NULL); + SrsAutoFree(SrsConsumer, consumer, false); + srs_verbose("consumer created."); + + while (true) { + bool ready = false; + if ((ret = rtmp->can_read(SRS_PULSE_TIME_MS, ready)) != ERROR_SUCCESS) { + srs_error("wait client control message failed. ret=%d", ret); + return ret; + } + srs_verbose("client pulse %dms, ready=%d", SRS_PULSE_TIME_MS, ready); + + // read from client. + if (ready) { + SrsMessage* msg = NULL; + if ((ret = rtmp->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv client control message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsMessage, msg, false); + // TODO: process it. + } + + // get messages from consumer. + SrsMessage** msgs = NULL; + int count = 0; + if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) { + srs_error("get messages from consumer failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsMessage*, msgs, true); + + // sendout messages + for (int i = 0; i < count; i++) { + SrsMessage* msg = msgs[i]; + if ((ret = rtmp->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send message to client failed. ret=%d", ret); + return ret; + } + } + } + return ret; } -int SrsClient::streaming_publish() +int SrsClient::streaming_publish(SrsSource* source) { int ret = ERROR_SUCCESS; @@ -163,6 +217,17 @@ int SrsClient::streaming_publish() SrsAutoFree(SrsMessage, msg, false); + // process audio packet + if (msg->header.is_audio() && ((ret = source->on_audio(msg)) != ERROR_SUCCESS)) { + srs_error("process audio message failed. ret=%d", ret); + return ret; + } + // process video packet + if (msg->header.is_video() && ((ret = source->on_video(msg)) != ERROR_SUCCESS)) { + srs_error("process video message failed. ret=%d", ret); + return ret; + } + // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { @@ -173,6 +238,12 @@ int SrsClient::streaming_publish() SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast(pkt); + if ((ret = source->on_meta_data(metadata)) != ERROR_SUCCESS) { + srs_error("process onMetaData message failed. ret=%d", ret); + return ret; + } + srs_trace("process onMetaData message success."); + continue; } srs_trace("ignore AMF0/AMF3 data message."); diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index a212ef8e7..f0062751a 100755 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -35,6 +35,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. class SrsRtmp; class SrsRequest; class SrsResponse; +class SrsSource; /** * the client provides the main logic control for RTMP clients. @@ -52,8 +53,8 @@ public: protected: virtual int do_cycle(); private: - virtual int streaming_play(); - virtual int streaming_publish(); + virtual int streaming_play(SrsSource* source); + virtual int streaming_publish(SrsSource* source); virtual int get_peer_ip(); }; diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index 2cefff4fb..f18c4e44a 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -48,6 +48,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SOCKET_READ 207 #define ERROR_SOCKET_READ_FULLY 208 #define ERROR_SOCKET_WRITE 209 +#define ERROR_SOCKET_WAIT 210 #define ERROR_RTMP_PLAIN_REQUIRED 300 #define ERROR_RTMP_CHUNK_START 301 diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 4ee0a41ef..614bc73ca 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -273,6 +273,11 @@ SrsProtocol::~SrsProtocol() srs_freep(skt); } +int SrsProtocol::can_read(int timeout_ms, bool& ready) +{ + return skt->can_read(timeout_ms, ready); +} + int SrsProtocol::recv_message(SrsMessage** pmsg) { *pmsg = NULL; @@ -858,6 +863,16 @@ SrsMessageHeader::~SrsMessageHeader() { } +bool SrsMessageHeader::is_audio() +{ + return message_type == RTMP_MSG_AudioMessage; +} + +bool SrsMessageHeader::is_video() +{ + return message_type == RTMP_MSG_VideoMessage; +} + bool SrsMessageHeader::is_amf0_command() { return message_type == RTMP_MSG_AMF0CommandMessage; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index 0f436e157..07f086ba2 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -93,6 +93,10 @@ public: SrsProtocol(st_netfd_t client_stfd); virtual ~SrsProtocol(); public: + /** + * whether the peer can read. + */ + virtual int can_read(int timeout_ms, bool& ready); /** * recv a message with raw/undecoded payload from peer. * the payload is not decoded, use srs_rtmp_expect_message if requires @@ -179,6 +183,8 @@ struct SrsMessageHeader SrsMessageHeader(); virtual ~SrsMessageHeader(); + bool is_audio(); + bool is_video(); bool is_amf0_command(); bool is_amf0_data(); bool is_amf3_command(); diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 2dc51ca62..1dcb3c58e 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -112,6 +112,16 @@ int SrsRequest::discovery_app() return ret; } +std::string SrsRequest::get_stream_url() +{ + std::string url = vhost; + + url += app; + url += stream; + + return url; +} + SrsResponse::SrsResponse() { stream_id = SRS_DEFAULT_SID; @@ -132,6 +142,21 @@ SrsRtmp::~SrsRtmp() srs_freep(protocol); } +int SrsRtmp::recv_message(SrsMessage** pmsg) +{ + return protocol->recv_message(pmsg); +} + +int SrsRtmp::can_read(int timeout_ms, bool& ready) +{ + return protocol->can_read(timeout_ms, ready); +} + +int SrsRtmp::send_message(SrsMessage* msg) +{ + return protocol->send_message(msg); +} + int SrsRtmp::handshake() { int ret = ERROR_SUCCESS; @@ -178,11 +203,6 @@ int SrsRtmp::handshake() return ret; } -int SrsRtmp::recv_message(SrsMessage** pmsg) -{ - return protocol->recv_message(pmsg); -} - int SrsRtmp::connect_app(SrsRequest* req) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index fdb496f56..8f29c08b1 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -62,6 +62,7 @@ struct SrsRequest * disconvery vhost/app from tcUrl. */ virtual int discovery_app(); + virtual std::string get_stream_url(); }; /** @@ -99,8 +100,11 @@ public: SrsRtmp(st_netfd_t client_stfd); virtual ~SrsRtmp(); public: - virtual int handshake(); virtual int recv_message(SrsMessage** pmsg); + virtual int can_read(int timeout_ms, bool& ready); + virtual int send_message(SrsMessage* msg); +public: + virtual int handshake(); virtual int connect_app(SrsRequest* req); virtual int set_window_ack_size(int ack_size); /** diff --git a/trunk/src/core/srs_core_socket.cpp b/trunk/src/core/srs_core_socket.cpp index 210056b21..7708c2902 100755 --- a/trunk/src/core/srs_core_socket.cpp +++ b/trunk/src/core/srs_core_socket.cpp @@ -34,6 +34,26 @@ SrsSocket::~SrsSocket() { } +int SrsSocket::can_read(int timeout_ms, bool& ready) +{ + ready = false; + int ret = ERROR_SUCCESS; + + // If the named file descriptor object is ready for I/O within the specified amount of time, + // a value of 0 is returned. Otherwise, a value of -1 is returned and errno is set to + // indicate the error + if(st_netfd_poll(stfd, POLLIN, timeout_ms * 1000) == -1){ + if(errno == ETIME){ + return ret; + } + + return ERROR_SOCKET_WAIT; + } + + ready = true; + return ret; +} + int SrsSocket::read(const void* buf, size_t size, ssize_t* nread) { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_socket.hpp b/trunk/src/core/srs_core_socket.hpp index 6672db3b9..678306020 100755 --- a/trunk/src/core/srs_core_socket.hpp +++ b/trunk/src/core/srs_core_socket.hpp @@ -44,6 +44,7 @@ public: SrsSocket(st_netfd_t client_stfd); virtual ~SrsSocket(); public: + virtual int can_read(int timeout_ms, bool& ready); virtual int read(const void* buf, size_t size, ssize_t* nread); virtual int read_fully(const void* buf, size_t size, ssize_t* nread); virtual int write(const void* buf, size_t size, ssize_t* nwrite); diff --git a/trunk/src/core/srs_core_source.cpp b/trunk/src/core/srs_core_source.cpp new file mode 100755 index 000000000..55fc3e6ba --- /dev/null +++ b/trunk/src/core/srs_core_source.cpp @@ -0,0 +1,90 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#include + +#include +#include + +std::map SrsSource::pool; + +SrsSource* SrsSource::find(std::string stream_url) +{ + if (pool.find(stream_url) == pool.end()) { + pool[stream_url] = new SrsSource(stream_url); + srs_verbose("create new source for url=%s", stream_url.c_str()); + } + + return pool[stream_url]; +} + +SrsConsumer::SrsConsumer() +{ +} + +SrsConsumer::~SrsConsumer() +{ +} + +int SrsConsumer::get_packets(int max_count, SrsMessage**& msgs, int& count) +{ + msgs = NULL; + count = 0; + + int ret = ERROR_SUCCESS; + return ret; +} + +SrsSource::SrsSource(std::string _stream_url) +{ + stream_url = _stream_url; +} + +SrsSource::~SrsSource() +{ +} + +int SrsSource::on_meta_data(SrsOnMetaDataPacket* metadata) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsSource::on_audio(SrsMessage* audio) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +int SrsSource::on_video(SrsMessage* audio) +{ + int ret = ERROR_SUCCESS; + return ret; +} + +SrsConsumer* SrsSource::create_consumer() +{ + SrsConsumer* consumer = new SrsConsumer(); + return consumer; +} + diff --git a/trunk/src/core/srs_core_source.hpp b/trunk/src/core/srs_core_source.hpp new file mode 100755 index 000000000..e44719df6 --- /dev/null +++ b/trunk/src/core/srs_core_source.hpp @@ -0,0 +1,85 @@ +/* +The MIT License (MIT) + +Copyright (c) 2013 winlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +#ifndef SRS_CORE_SOURCE_HPP +#define SRS_CORE_SOURCE_HPP + +/* +#include +*/ + +#include + +#include +#include + +class SrsMessage; +class SrsOnMetaDataPacket; + +/** +* the consumer for SrsSource, that is a play client. +*/ +class SrsConsumer +{ +public: + SrsConsumer(); + virtual ~SrsConsumer(); +public: + /** + * get packets in consumer queue. + * @msgs SrsMessages*[], output the prt array. + * @count the count in array. + * @max_count the max count to dequeue, 0 to dequeue all. + */ + virtual int get_packets(int max_count, SrsMessage**& msgs, int& count); +}; + +/** +* live streaming source. +*/ +class SrsSource +{ +private: + static std::map pool; +public: + /** + * find stream by vhost/app/stream. + * @stream_url the stream url, for example, myserver.xxx.com/app/stream + * @return the matched source, never be NULL. + * @remark stream_url should without port and schema. + */ + static SrsSource* find(std::string stream_url); +private: + std::string stream_url; +public: + SrsSource(std::string _stream_url); + virtual ~SrsSource(); +public: + virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_audio(SrsMessage* audio); + virtual int on_video(SrsMessage* video); +public: + virtual SrsConsumer* create_consumer(); +}; + +#endif \ No newline at end of file diff --git a/trunk/src/srs/srs.upp b/trunk/src/srs/srs.upp index 932051ca7..6307a0a4d 100755 --- a/trunk/src/srs/srs.upp +++ b/trunk/src/srs/srs.upp @@ -14,6 +14,8 @@ file ..\core\srs_core_conn.cpp, ..\core\srs_core_client.hpp, ..\core\srs_core_client.cpp, + ..\core\srs_core_source.hpp, + ..\core\srs_core_source.cpp, ..\core\srs_core_rtmp.hpp, ..\core\srs_core_rtmp.cpp, ..\core\srs_core_protocol.hpp,