diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 4ebe76dfa..86b420bca 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -108,17 +108,49 @@ int SrsClient::do_cycle() } srs_verbose("on_bw_done success"); + int stream_id = SRS_DEFAULT_SID; SrsClientType type; - std::string stream_name; - if ((ret = rtmp->identify_client(SRS_DEFAULT_SID, type, stream_name)) != ERROR_SUCCESS) { + if ((ret = rtmp->identify_client(stream_id, type, req->stream)) != ERROR_SUCCESS) { srs_error("identify client failed. ret=%d", ret); return ret; } - srs_verbose("identify client success. type=%d", type); + srs_verbose("identify client success. type=%d, stream_name=%s", type, req->stream.c_str()); + + // TODO: read from config. + int chunk_size = 4096; + if ((ret = rtmp->set_chunk_size(chunk_size)) != ERROR_SUCCESS) { + srs_error("set chunk size failed. ret=%d", ret); + return ret; + } + srs_verbose("set chunk size success"); + + switch (type) { + case SrsClientPlay: { + srs_verbose("start to play stream %s.", req->stream.c_str()); + + if ((ret = rtmp->start_play(stream_id)) != ERROR_SUCCESS) { + srs_error("start to play stream failed. ret=%d", ret); + return ret; + } + srs_info("start to play stream %s success", req->stream.c_str()); + return streaming_play(); + } + default: { + ret = ERROR_SYSTEM_CLIENT_INVALID; + srs_info("invalid client type=%d. ret=%d", type, ret); + return ret; + } + } return ret; } +int SrsClient::streaming_play() +{ + int ret = ERROR_SUCCESS; + return ret; +} + int SrsClient::get_peer_ip() { int ret = ERROR_SUCCESS; diff --git a/trunk/src/core/srs_core_client.hpp b/trunk/src/core/srs_core_client.hpp index 5f030fda9..266745a4a 100755 --- a/trunk/src/core/srs_core_client.hpp +++ b/trunk/src/core/srs_core_client.hpp @@ -50,6 +50,7 @@ public: protected: virtual int do_cycle(); private: + virtual int streaming_play(); virtual int get_peer_ip(); }; diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index f43f003a2..2cefff4fb 100755 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -59,8 +59,10 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_MESSAGE_DECODE 307 #define ERROR_RTMP_MESSAGE_ENCODE 308 #define ERROR_RTMP_AMF0_ENCODE 309 +#define ERROR_RTMP_CHUNK_SIZE 310 #define ERROR_SYSTEM_STREAM_INIT 400 #define ERROR_SYSTEM_PACKET_INVALID 401 +#define ERROR_SYSTEM_CLIENT_INVALID 402 #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index 9a6b5c272..666181645 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -50,20 +50,6 @@ with IDs 3-6 are reserved for usage of RTMP. Protocol message with ID #define RTMP_MSG_SetPeerBandwidth 0x06 #define RTMP_MSG_EdgeAndOriginServerCommand 0x07 /** -* The server sends this event to test whether the client is reachable. -* -* Event data is a 4-byte timestamp, representing the local server time when the server dispatched the command. -* The client responds with PingResponse on receiving PingRequest. -*/ -#define RTMP_MSG_PCUC_PingRequest 0x06 - -/** -* The client sends this event to the server in response to the ping request. -* -* The event data is a 4-byte timestamp, which was received with the PingRequest request. -*/ -#define RTMP_MSG_PCUC_PingResponse 0x07 -/** 3. Types of messages The server and the client send messages over the network to communicate with each other. The messages can be of any type which @@ -184,6 +170,7 @@ messages. * independently for each direction. */ #define RTMP_DEFAULT_CHUNK_SIZE 128 +#define RTMP_MIN_CHUNK_SIZE 2 /** * 6.1. Chunk Format @@ -316,7 +303,7 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) } if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { - srs_error("update context when received msg. ret=%d", ret); + srs_error("hook the received msg failed. ret=%d", ret); delete msg; return ret; } @@ -438,6 +425,11 @@ int SrsProtocol::send_message(SrsMessage* msg) } } while (p < (char*)msg->payload + msg->size); + if ((ret = on_send_message(msg)) != ERROR_SUCCESS) { + srs_error("hook the send message failed. ret=%d", ret); + return ret; + } + return ret; } @@ -448,6 +440,7 @@ int SrsProtocol::on_recv_message(SrsMessage* msg) srs_assert(msg != NULL); switch (msg->header.message_type) { + case RTMP_MSG_SetChunkSize: case RTMP_MSG_WindowAcknowledgementSize: if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { srs_error("decode packet from message payload failed. ret=%d", ret); @@ -465,6 +458,36 @@ int SrsProtocol::on_recv_message(SrsMessage* msg) srs_trace("set ack window size to %d", pkt->ackowledgement_window_size); break; } + case RTMP_MSG_SetChunkSize: { + SrsSetChunkSizePacket* pkt = dynamic_cast(msg->get_packet()); + srs_assert(pkt != NULL); + + in_chunk_size = pkt->chunk_size; + + srs_trace("set input chunk size to %d", pkt->chunk_size); + break; + } + } + + return ret; +} + +int SrsProtocol::on_send_message(SrsMessage* msg) +{ + int ret = ERROR_SUCCESS; + + srs_assert(msg != NULL); + + switch (msg->header.message_type) { + case RTMP_MSG_SetChunkSize: { + SrsSetChunkSizePacket* pkt = dynamic_cast(msg->get_packet()); + srs_assert(pkt != NULL); + + in_chunk_size = pkt->chunk_size; + + srs_trace("set output chunk size to %d", pkt->chunk_size); + break; + } } return ret; @@ -1580,6 +1603,70 @@ int SrsSetWindowAckSizePacket::encode_packet(SrsStream* stream) return ret; } +SrsSetChunkSizePacket::SrsSetChunkSizePacket() +{ + chunk_size = RTMP_DEFAULT_CHUNK_SIZE; +} + +SrsSetChunkSizePacket::~SrsSetChunkSizePacket() +{ +} + +int SrsSetChunkSizePacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(4)) { + ret = ERROR_RTMP_MESSAGE_DECODE; + srs_error("decode chunk size failed. ret=%d", ret); + return ret; + } + + chunk_size = stream->read_4bytes(); + srs_info("decode chunk size success. chunk_size=%d", chunk_size); + + if (chunk_size < RTMP_MIN_CHUNK_SIZE) { + ret = ERROR_RTMP_CHUNK_SIZE; + srs_error("invalid chunk size. min=%d, actual=%d, ret=%d", + ERROR_RTMP_CHUNK_SIZE, chunk_size, ret); + return ret; + } + + return ret; +} + +int SrsSetChunkSizePacket::get_perfer_cid() +{ + return RTMP_CID_ProtocolControl; +} + +int SrsSetChunkSizePacket::get_message_type() +{ + return RTMP_MSG_SetChunkSize; +} + +int SrsSetChunkSizePacket::get_size() +{ + return 4; +} + +int SrsSetChunkSizePacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(4)) { + ret = ERROR_RTMP_MESSAGE_ENCODE; + srs_error("encode chunk packet failed. ret=%d", ret); + return ret; + } + + stream->write_4bytes(chunk_size); + + srs_verbose("encode chunk packet success. ack_size=%d", chunk_size); + + return ret; +} + SrsSetPeerBandwidthPacket::SrsSetPeerBandwidthPacket() { bandwidth = 0; @@ -1624,3 +1711,47 @@ int SrsSetPeerBandwidthPacket::encode_packet(SrsStream* stream) return ret; } +SrsPCUC4BytesPacket::SrsPCUC4BytesPacket() +{ + event_type = 0; + event_data = 0; +} + +SrsPCUC4BytesPacket::~SrsPCUC4BytesPacket() +{ +} + +int SrsPCUC4BytesPacket::get_perfer_cid() +{ + return RTMP_CID_ProtocolControl; +} + +int SrsPCUC4BytesPacket::get_message_type() +{ + return RTMP_MSG_UserControlMessage; +} + +int SrsPCUC4BytesPacket::get_size() +{ + return 2 + 4; +} + +int SrsPCUC4BytesPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if (!stream->require(6)) { + ret = ERROR_RTMP_MESSAGE_ENCODE; + srs_error("encode set bandwidth packet failed. ret=%d", ret); + return ret; + } + + stream->write_2bytes(event_type); + stream->write_4bytes(event_data); + + srs_verbose("encode PCUC packet success. " + "event_type=%d, event_data=%d", event_type, event_data); + + return ret; +} + diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index c09439c07..f43e14586 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -113,6 +113,10 @@ private: */ virtual int on_recv_message(SrsMessage* msg); /** + * when message sentout, update the context. + */ + virtual int on_send_message(SrsMessage* msg); + /** * try to recv interlaced message from peer, * return error if error occur and nerver set the pmsg, * return success and pmsg set to NULL if no entire message got, @@ -532,6 +536,36 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* 7.1. Set Chunk Size +* Protocol control message 1, Set Chunk Size, is used to notify the +* peer about the new maximum chunk size. +*/ +class SrsSetChunkSizePacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsSetChunkSizePacket); + } +public: + int32_t chunk_size; +public: + SrsSetChunkSizePacket(); + virtual ~SrsSetChunkSizePacket(); +public: + virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + /** * 5.6. Set Peer Bandwidth (6) * The client or the server sends this message to update the output @@ -561,6 +595,57 @@ protected: virtual int encode_packet(SrsStream* stream); }; +enum SrcPCUCEventType +{ + // generally, 4bytes event-data + SrcPCUCStreamBegin = 0x00, + SrcPCUCStreamEOF = 0x01, + SrcPCUCStreamDry = 0x02, + SrcPCUCSetBufferLength = 0x03, // 8bytes event-data + SrcPCUCStreamIsRecorded = 0x04, + SrcPCUCPingRequest = 0x06, + SrcPCUCPingResponse = 0x07, +}; + +/** +* for the EventData is 4bytes. +* Stream Begin(=0) 4-bytes stream ID +* Stream EOF(=1) 4-bytes stream ID +* StreamDry(=2) 4-bytes stream ID +* StreamIsRecorded(=4) 4-bytes stream ID +* PingRequest(=6) 4-bytes timestamp local server time +* PingResponse(=7) 4-bytes timestamp received ping request. +* +* 3.7. User Control message +* +------------------------------+------------------------- +* | Event Type ( 2- bytes ) | Event Data +* +------------------------------+------------------------- +* Figure 5 Pay load for the ‘User Control Message’. +*/ +class SrsPCUC4BytesPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsPCUC4BytesPacket); + } +public: + int16_t event_type; + int32_t event_data; +public: + SrsPCUC4BytesPacket(); + virtual ~SrsPCUC4BytesPacket(); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); +}; + /** * expect a specified message, drop others util got specified one. * @pmsg, user must free it. NULL if not success. diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 824f5c0bb..dcda1ea4d 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -303,6 +303,50 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st return ret; } +int SrsRtmp::set_chunk_size(int chunk_size) +{ + int ret = ERROR_SUCCESS; + + SrsMessage* msg = new SrsMessage(); + SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); + + pkt->chunk_size = chunk_size; + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send set chunk size message failed. ret=%d", ret); + return ret; + } + srs_info("send set chunk size message success. chunk_size=%d", chunk_size); + + return ret; +} + +int SrsRtmp::start_play(int stream_id) +{ + int ret = ERROR_SUCCESS; + + // StreamBegin + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsPCUC4BytesPacket* pkt = new SrsPCUC4BytesPacket(); + + pkt->event_type = SrcPCUCStreamBegin; + pkt->event_data = stream_id; + msg->set_packet(pkt); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret); + return ret; + } + srs_info("send PCUC(StreamBegin) message success."); + } + + srs_info("start play success."); + + return ret; +} + int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name) { int ret = ERROR_SUCCESS; @@ -343,7 +387,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea SrsPacket* pkt = msg->get_packet(); if (dynamic_cast(pkt)) { SrsPlayPacket* play = dynamic_cast(pkt); - type = SrsClientPublish; + type = SrsClientPlay; stream_name = play->stream_name; srs_trace("identity client type=play, stream_name=%s", stream_name.c_str()); return ret; diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index b106cff50..e9421b448 100755 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -99,6 +99,15 @@ public: * @type, output the client type. */ virtual int identify_client(int stream_id, SrsClientType& type, std::string& stream_name); + /** + * set the chunk size when client type identified. + */ + virtual int set_chunk_size(int chunk_size); + /** + * when client type is play, response with + * StreamBegin, onStatus(NetStream.Play.Reset), onStatus(NetStream.Play.Start). + */ + virtual int start_play(int stream_id); private: virtual int identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id, SrsClientType& type, std::string& stream_name); };