diff --git a/trunk/conf/srs.conf b/trunk/conf/srs.conf index b8a84849e..66525035f 100755 --- a/trunk/conf/srs.conf +++ b/trunk/conf/srs.conf @@ -15,7 +15,7 @@ vhost __defaultVhost__ { hls_path ./objs/nginx/html; hls_fragment 5; hls_window 30; - forward 127.0.0.1:1936; + forward 192.168.1.50; } # the vhost which forward publish streams. vhost forward.vhost.com { diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index d066128f4..a930a1507 100644 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -39,8 +39,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #define SRS_PULSE_TIMEOUT_MS 100 -#define SRS_SEND_TIMEOUT_MS 5000000L -#define SRS_RECV_TIMEOUT_MS SRS_SEND_TIMEOUT_MS +#define SRS_SEND_TIMEOUT_US 5000000L +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US #define SRS_STREAM_BUSY_SLEEP_MS 2000 SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) @@ -72,10 +72,10 @@ int SrsClient::do_cycle() return ret; } srs_trace("get peer ip success. ip=%s, send_to=%"PRId64", recv_to=%"PRId64"", - ip, SRS_SEND_TIMEOUT_MS, SRS_RECV_TIMEOUT_MS); + ip, SRS_SEND_TIMEOUT_US, SRS_RECV_TIMEOUT_US); - rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_MS * 1000); - rtmp->set_send_timeout(SRS_SEND_TIMEOUT_MS * 1000); + rtmp->set_recv_timeout(SRS_RECV_TIMEOUT_US); + rtmp->set_send_timeout(SRS_SEND_TIMEOUT_US); if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { srs_error("rtmp handshake failed. ret=%d", ret); @@ -400,7 +400,7 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } @@ -422,7 +422,7 @@ int SrsClient::process_publish_message(SrsSource* source, SrsCommonMessage* msg, // process UnPublish event. if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) { - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { srs_error("decode unpublish message failed. ret=%d", ret); return ret; } @@ -496,7 +496,7 @@ int SrsClient::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* return ret; } - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(rtmp->get_protocol())) != ERROR_SUCCESS) { srs_error("decode the amf0/amf3 command packet failed. ret=%d", ret); return ret; } diff --git a/trunk/src/core/srs_core_error.hpp b/trunk/src/core/srs_core_error.hpp index ed1bef447..24a5292c1 100644 --- a/trunk/src/core/srs_core_error.hpp +++ b/trunk/src/core/srs_core_error.hpp @@ -37,6 +37,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_ST_OPEN_SOCKET 102 #define ERROR_ST_CREATE_LISTEN_THREAD 103 #define ERROR_ST_CREATE_CYCLE_THREAD 104 +#define ERROR_ST_CREATE_FORWARD_THREAD 105 +#define ERROR_ST_CONNECT 106 #define ERROR_SOCKET_CREATE 200 #define ERROR_SOCKET_SETREUSE 201 @@ -67,6 +69,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_RTMP_PACKET_SIZE 313 #define ERROR_RTMP_VHOST_NOT_FOUND 314 #define ERROR_RTMP_ACCESS_DENIED 315 +#define ERROR_RTMP_HANDSHAKE 316 +#define ERROR_RTMP_NO_REQUEST 317 #define ERROR_SYSTEM_STREAM_INIT 400 #define ERROR_SYSTEM_PACKET_INVALID 401 @@ -79,6 +83,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define ERROR_SYSTEM_CONFIG_BLOCK_END 408 #define ERROR_SYSTEM_CONFIG_EOF 409 #define ERROR_SYSTEM_STREAM_BUSY 410 +#define ERROR_SYSTEM_IP_INVALID 411 // see librtmp. // failed when open ssl create the dh diff --git a/trunk/src/core/srs_core_forward.cpp b/trunk/src/core/srs_core_forward.cpp index 9be2ee5d1..1a3b36649 100644 --- a/trunk/src/core/srs_core_forward.cpp +++ b/trunk/src/core/srs_core_forward.cpp @@ -24,42 +24,47 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +#include +#include +#include +#include #include #include #include +#define SRS_FORWARDER_SLEEP_MS 2000 +#define SRS_SEND_TIMEOUT_US 3000000L +#define SRS_RECV_TIMEOUT_US SRS_SEND_TIMEOUT_US + SrsForwarder::SrsForwarder() { - client = new SrsRtmpClient(); + client = NULL; tid = NULL; + stfd = NULL; loop = false; + stream_id = 0; } SrsForwarder::~SrsForwarder() { - if (tid) { - loop = false; - st_thread_interrupt(tid); - st_thread_join(tid, NULL); - tid = NULL; - } - - srs_freep(client); + on_unpublish(); } -int SrsForwarder::on_publish(std::string vhost, std::string app, std::string stream, std::string forward_server) +int SrsForwarder::on_publish(std::string vhost, std::string _app, std::string stream, std::string forward_server) { int ret = ERROR_SUCCESS; - std::string tc_url = "rtmp://"; + app = _app; + + tc_url = "rtmp://"; tc_url += vhost; tc_url += "/"; tc_url += app; - std::string stream_name = stream; - std::string server = forward_server; - int port = 1935; + stream_name = stream; + server = forward_server; + port = 1935; size_t pos = forward_server.find(":"); if (pos != std::string::npos) { @@ -67,14 +72,40 @@ int SrsForwarder::on_publish(std::string vhost, std::string app, std::string str server = forward_server.substr(0, pos); } - srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", - stream_name.c_str(), tc_url.c_str(), server.c_str(), port); + if ((ret = open_socket()) != ERROR_SUCCESS) { + return ret; + } + + srs_assert(!tid); + if((tid = st_thread_create(forward_thread, this, 1, 0)) == NULL){ + ret = ERROR_ST_CREATE_FORWARD_THREAD; + srs_error("st_thread_create failed. ret=%d", ret); + return ret; + } return ret; } void SrsForwarder::on_unpublish() { + if (tid) { + loop = false; + st_thread_interrupt(tid); + st_thread_join(tid, NULL); + tid = NULL; + } + + if (stfd) { + int fd = st_netfd_fileno(stfd); + st_netfd_close(stfd); + stfd = NULL; + + // st does not close it sometimes, + // close it manually. + close(fd); + } + + srs_freep(client); } int SrsForwarder::on_meta_data(SrsOnMetaDataPacket* metadata) @@ -95,3 +126,147 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg) return ret; } +int SrsForwarder::open_socket() +{ + int ret = ERROR_SUCCESS; + + srs_trace("forward stream=%s, tcUrl=%s to server=%s, port=%d", + stream_name.c_str(), tc_url.c_str(), server.c_str(), port); + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1){ + ret = ERROR_SOCKET_CREATE; + srs_error("create socket error. ret=%d", ret); + return ret; + } + + stfd = st_netfd_open_socket(sock); + if(stfd == NULL){ + ret = ERROR_ST_OPEN_SOCKET; + srs_error("st_netfd_open_socket failed. ret=%d", ret); + return ret; + } + + srs_freep(client); + client = new SrsRtmpClient(stfd); + + return ret; +} + +int SrsForwarder::connect_server() +{ + int ret = ERROR_SUCCESS; + + std::string ip = parse_server(server); + if (ip.empty()) { + ret = ERROR_SYSTEM_IP_INVALID; + srs_error("dns resolve server error, ip empty. ret=%d", ret); + return ret; + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + if (st_connect(stfd, (const struct sockaddr*)&addr, sizeof(sockaddr_in), ST_UTIME_NO_TIMEOUT) == -1){ + ret = ERROR_ST_CONNECT; + srs_error("connect to server error. ip=%s, port=%d, ret=%d", ip.c_str(), port, ret); + return ret; + } + srs_trace("connect to server success. server=%s, ip=%s, port=%d", server.c_str(), ip.c_str(), port); + + return ret; +} + +std::string SrsForwarder::parse_server(std::string host) +{ + if (inet_addr(host.c_str()) != INADDR_NONE) { + return host; + } + + hostent* answer = gethostbyname(host.c_str()); + if (answer == NULL) { + srs_error("dns resolve host %s error.", host.c_str()); + return ""; + } + + char ipv4[16]; + memset(ipv4, 0, sizeof(ipv4)); + for (int i = 0; i < answer->h_length; i++) { + inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); + srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); + break; + } + + return ipv4; +} + +int SrsForwarder::forward_cycle_imp() +{ + int ret = ERROR_SUCCESS; + + client->set_recv_timeout(SRS_RECV_TIMEOUT_US); + client->set_send_timeout(SRS_SEND_TIMEOUT_US); + + if ((ret = connect_server()) != ERROR_SUCCESS) { + return ret; + } + srs_assert(client); + + if ((ret = client->handshake()) != ERROR_SUCCESS) { + srs_error("handshake with server failed. ret=%d", ret); + return ret; + } + if ((ret = client->connect_app(app, tc_url)) != ERROR_SUCCESS) { + srs_error("connect with server failed, tcUrl=%s. ret=%d", tc_url.c_str(), ret); + return ret; + } + if ((ret = client->play_stream(stream_name, stream_id)) != ERROR_SUCCESS) { + srs_error("connect with server failed, stream_name=%s. ret=%d", stream_name.c_str(), ret); + return ret; + } + + return ret; +} + +void SrsForwarder::forward_cycle() +{ + int ret = ERROR_SUCCESS; + + log_context->generate_id(); + srs_trace("forward cycle start"); + + while (loop) { + if ((ret = forward_cycle_imp()) != ERROR_SUCCESS) { + srs_warn("forward cycle failed, ignored and retry, ret=%d", ret); + } else { + srs_info("forward cycle success, retry"); + } + + if (!loop) { + break; + } + + st_usleep(SRS_FORWARDER_SLEEP_MS * 1000); + + if ((ret = open_socket()) != ERROR_SUCCESS) { + srs_warn("forward cycle reopen failed, ignored and retry, ret=%d", ret); + } else { + srs_info("forward cycle reopen success"); + } + } + srs_trace("forward cycle finished"); +} + +void* SrsForwarder::forward_thread(void* arg) +{ + SrsForwarder* obj = (SrsForwarder*)arg; + srs_assert(obj != NULL); + + obj->loop = true; + obj->forward_cycle(); + + return NULL; +} + diff --git a/trunk/src/core/srs_core_forward.hpp b/trunk/src/core/srs_core_forward.hpp index 1def8d821..e0570705b 100644 --- a/trunk/src/core/srs_core_forward.hpp +++ b/trunk/src/core/srs_core_forward.hpp @@ -43,11 +43,14 @@ class SrsRtmpClient; class SrsForwarder { private: + std::string app; std::string tc_url; std::string stream_name; + int stream_id; std::string server; int port; private: + st_netfd_t stfd; st_thread_t tid; bool loop; private: @@ -61,6 +64,14 @@ public: virtual int on_meta_data(SrsOnMetaDataPacket* metadata); virtual int on_audio(SrsSharedPtrMessage* msg); virtual int on_video(SrsSharedPtrMessage* msg); +private: + virtual int open_socket(); + virtual int connect_server(); + std::string parse_server(std::string host); +private: + virtual int forward_cycle_imp(); + virtual void forward_cycle(); + static void* forward_thread(void* arg); }; #endif diff --git a/trunk/src/core/srs_core_handshake.cpp b/trunk/src/core/srs_core_handshake.cpp index 36efa5bae..44c3e4a01 100644 --- a/trunk/src/core/srs_core_handshake.cpp +++ b/trunk/src/core/srs_core_handshake.cpp @@ -1067,7 +1067,7 @@ SrsSimpleHandshake::~SrsSimpleHandshake() { } -int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_hs) +int SrsSimpleHandshake::handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs) { int ret = ERROR_SUCCESS; @@ -1090,7 +1090,7 @@ int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_h srs_verbose("check c0 success, required plain text."); // try complex handshake - ret = complex_hs.handshake(skt, c0c1 + 1); + ret = complex_hs.handshake_with_client(skt, c0c1 + 1); if (ret == ERROR_SUCCESS) { srs_trace("complex handshake success."); return ret; @@ -1125,6 +1125,67 @@ int SrsSimpleHandshake::handshake(SrsSocket& skt, SrsComplexHandshake& complex_h return ret; } +int SrsSimpleHandshake::handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs) +{ + int ret = ERROR_SUCCESS; + + // try complex handshake + ret = complex_hs.handshake_with_server(skt); + if (ret == ERROR_SUCCESS) { + srs_trace("complex handshake success."); + return ret; + } + if (ret != ERROR_RTMP_TRY_SIMPLE_HS) { + srs_error("complex handshake failed. ret=%d", ret); + return ret; + } + srs_info("rollback complex to simple handshake. ret=%d", ret); + + // simple handshake + ssize_t nsize; + + char* c0c1 = new char[1537]; + SrsAutoFree(char, c0c1, true); + + srs_random_generate(c0c1, 1537); + // plain text required. + c0c1[0] = 0x03; + + if ((ret = skt.write(c0c1, 1537, &nsize)) != ERROR_SUCCESS) { + srs_warn("write c0c1 failed. ret=%d", ret); + return ret; + } + srs_verbose("write c0c1 success."); + + char* s0s1s2 = new char[3073]; + SrsAutoFree(char, s0s1s2, true); + if ((ret = skt.read_fully(s0s1s2, 3073, &nsize)) != ERROR_SUCCESS) { + srs_warn("simple handshake recv s0s1s2 failed. ret=%d", ret); + return ret; + } + srs_verbose("simple handshake recv s0s1s2 success."); + + // plain text required. + if (s0s1s2[0] != 0x03) { + ret = ERROR_RTMP_HANDSHAKE; + srs_warn("handshake failed, plain text required. ret=%d", ret); + return ret; + } + + char* c2 = new char[1536]; + SrsAutoFree(char, c2, true); + srs_random_generate(c2, 1536); + if ((ret = skt.write(c2, 1536, &nsize)) != ERROR_SUCCESS) { + srs_warn("simple handshake write c2 failed. ret=%d", ret); + return ret; + } + srs_verbose("simple handshake write c2 success."); + + srs_trace("simple handshake success."); + + return ret; +} + SrsComplexHandshake::SrsComplexHandshake() { } @@ -1134,12 +1195,12 @@ SrsComplexHandshake::~SrsComplexHandshake() } #ifndef SRS_SSL -int SrsComplexHandshake::handshake(SrsSocket& /*skt*/, char* /*_c1*/) +int SrsComplexHandshake::handshake_with_client(SrsSocket& /*skt*/, char* /*_c1*/) { return ERROR_RTMP_TRY_SIMPLE_HS; } #else -int SrsComplexHandshake::handshake(SrsSocket& skt, char* _c1) +int SrsComplexHandshake::handshake_with_client(SrsSocket& skt, char* _c1) { int ret = ERROR_SUCCESS; @@ -1216,3 +1277,20 @@ int SrsComplexHandshake::handshake(SrsSocket& skt, char* _c1) } #endif +#ifndef SRS_SSL +int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/) +{ + return ERROR_RTMP_TRY_SIMPLE_HS; +} +#else +int SrsComplexHandshake::handshake_with_server(SrsSocket& /*skt*/) +{ + int ret = ERROR_SUCCESS; + + // TODO: implements complex handshake. + ret = ERROR_RTMP_TRY_SIMPLE_HS; + + return ret; +} +#endif + diff --git a/trunk/src/core/srs_core_handshake.hpp b/trunk/src/core/srs_core_handshake.hpp index 96da2d7f0..e34135a7c 100644 --- a/trunk/src/core/srs_core_handshake.hpp +++ b/trunk/src/core/srs_core_handshake.hpp @@ -47,7 +47,8 @@ public: * @param complex_hs, try complex handshake first, * if failed, rollback to simple handshake. */ - virtual int handshake(SrsSocket& skt, SrsComplexHandshake& complex_hs); + virtual int handshake_with_client(SrsSocket& skt, SrsComplexHandshake& complex_hs); + virtual int handshake_with_server(SrsSocket& skt, SrsComplexHandshake& complex_hs); }; /** @@ -70,7 +71,8 @@ public: * try simple handshake if error is ERROR_RTMP_TRY_SIMPLE_HS, * otherwise, disconnect */ - virtual int handshake(SrsSocket& skt, char* _c1); + virtual int handshake_with_client(SrsSocket& skt, char* _c1); + virtual int handshake_with_server(SrsSocket& skt); }; #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index c6c490277..911deac04 100644 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -199,6 +199,7 @@ messages. #define RTMP_AMF0_COMMAND_ON_BW_DONE "onBWDone" #define RTMP_AMF0_COMMAND_ON_STATUS "onStatus" #define RTMP_AMF0_COMMAND_RESULT "_result" +#define RTMP_AMF0_COMMAND_ERROR "_error" #define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" #define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" #define RTMP_AMF0_COMMAND_UNPUBLISH "FCUnpublish" @@ -282,6 +283,15 @@ SrsProtocol::~SrsProtocol() srs_freep(skt); } +std::string SrsProtocol::get_request_name(double transcationId) +{ + if (requests.find(transcationId) == requests.end()) { + return ""; + } + + return requests[transcationId]; +} + void SrsProtocol::set_recv_timeout(int64_t timeout_us) { return skt->set_recv_timeout(timeout_us); @@ -548,7 +558,7 @@ int SrsProtocol::on_recv_message(SrsCommonMessage* msg) case RTMP_MSG_SetChunkSize: case RTMP_MSG_UserControlMessage: case RTMP_MSG_WindowAcknowledgementSize: - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(this)) != ERROR_SUCCESS) { srs_error("decode packet from message payload failed. ret=%d", ret); return ret; } @@ -624,6 +634,17 @@ int SrsProtocol::on_send_message(ISrsMessage* msg) srs_trace("set output chunk size to %d", pkt->chunk_size); break; } + case RTMP_MSG_AMF0CommandMessage: + case RTMP_MSG_AMF3CommandMessage: { + if (true) { + SrsConnectAppPacket* pkt = NULL; + pkt = dynamic_cast(common_msg->get_packet()); + if (pkt) { + requests[pkt->transaction_id] = RTMP_AMF0_COMMAND_CONNECT; + } + } + break; + } } return ret; @@ -1157,7 +1178,7 @@ bool SrsCommonMessage::can_decode() return true; } -int SrsCommonMessage::decode_packet() +int SrsCommonMessage::decode_packet(SrsProtocol* protocol) { int ret = ERROR_SUCCESS; @@ -1201,6 +1222,39 @@ int SrsCommonMessage::decode_packet() } srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str()); + // result/error packet + if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) { + double transactionId = 0.0; + if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) { + srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId); + + // reset stream, for header read completed. + stream->reset(); + + std::string request_name = protocol->get_request_name(transactionId); + if (request_name.empty()) { + ret = ERROR_RTMP_NO_REQUEST; + srs_error("decode AMF0/AMF3 request failed. ret=%d", ret); + return ret; + } + srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str()); + + if (request_name == RTMP_AMF0_COMMAND_CONNECT) { + srs_info("decode the AMF0/AMF3 response command(connect vhost/app message)."); + packet = new SrsConnectAppResPacket(); + return packet->decode(stream); + } else { + ret = ERROR_RTMP_NO_REQUEST; + srs_error("decode AMF0/AMF3 request failed. " + "request_name=%s, transactionId=%.2f, ret=%d", + request_name.c_str(), transactionId, ret); + return ret; + } + } + // reset to zero(amf3 to 1) to restart decode. stream->reset(); if (header.is_amf3_command()) { @@ -1319,7 +1373,13 @@ int SrsCommonMessage::encode_packet() size = 0; srs_freepa(payload); - return packet->encode(size, (char*&)payload); + if ((ret = packet->encode(size, (char*&)payload)) != ERROR_SUCCESS) { + return ret; + } + + header.payload_length = size; + + return ret; } SrsSharedPtrMessage::SrsSharedPtr::SrsSharedPtr() @@ -1582,6 +1642,49 @@ int SrsConnectAppPacket::decode(SrsStream* stream) return ret; } +int SrsConnectAppPacket::get_perfer_cid() +{ + return RTMP_CID_OverConnection; +} + +int SrsConnectAppPacket::get_message_type() +{ + return RTMP_MSG_AMF0CommandMessage; +} + +int SrsConnectAppPacket::get_size() +{ + return srs_amf0_get_string_size(command_name) + srs_amf0_get_number_size() + + srs_amf0_get_object_size(command_object); +} + +int SrsConnectAppPacket::encode_packet(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("encode command_name failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_name success."); + + if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("encode transaction_id failed. ret=%d", ret); + return ret; + } + srs_verbose("encode transaction_id success."); + + if ((ret = srs_amf0_write_object(stream, command_object)) != ERROR_SUCCESS) { + srs_error("encode command_object failed. ret=%d", ret); + return ret; + } + srs_verbose("encode command_object success."); + + srs_info("encode connect app request packet success."); + + return ret; +} + SrsConnectAppResPacket::SrsConnectAppResPacket() { command_name = RTMP_AMF0_COMMAND_RESULT; @@ -1596,6 +1699,57 @@ SrsConnectAppResPacket::~SrsConnectAppResPacket() srs_freep(info); } +int SrsConnectAppResPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_RESULT) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect transaction_id failed. ret=%d", ret); + return ret; + } + if (transaction_id != 1.0) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect transaction_id failed. " + "required=%.1f, actual=%.1f, ret=%d", 1.0, transaction_id, ret); + return ret; + } + + if ((ret = srs_amf0_read_object(stream, props)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect props failed. ret=%d", ret); + return ret; + } + if (props == NULL) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect props failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_object(stream, info)) != ERROR_SUCCESS) { + srs_error("amf0 decode connect info failed. ret=%d", ret); + return ret; + } + if (info == NULL) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode connect info failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode connect response packet success"); + + return ret; +} + int SrsConnectAppResPacket::get_perfer_cid() { return RTMP_CID_OverConnection; diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index cc12332f6..bc3fb6502 100644 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -88,6 +88,12 @@ private: st_netfd_t stfd; SrsSocket* skt; char* pp; + /** + * requests sent out, used to build the response. + * key: transactionId + * value: the request command name + */ + std::map requests; // peer in private: std::map chunk_streams; @@ -103,6 +109,7 @@ public: SrsProtocol(st_netfd_t client_stfd); virtual ~SrsProtocol(); public: + std::string get_request_name(double transcationId); /** * set the timeout in us. * if timeout, recv/send message return ERROR_SOCKET_TIMEOUT. @@ -319,7 +326,7 @@ public: /** * decode packet from message payload. */ - virtual int decode_packet(); + virtual int decode_packet(SrsProtocol* protocol); /** * get the decoded packet which decoded by decode_packet(). * @remark, user never free the pkt, the message will auto free it. @@ -481,6 +488,13 @@ public: virtual ~SrsConnectAppPacket(); public: virtual int decode(SrsStream* stream); +public: + virtual int get_perfer_cid(); +public: + virtual int get_message_type(); +protected: + virtual int get_size(); + virtual int encode_packet(SrsStream* stream); }; /** * response for SrsConnectAppPacket. @@ -502,6 +516,8 @@ public: public: SrsConnectAppResPacket(); virtual ~SrsConnectAppResPacket(); +public: + virtual int decode(SrsStream* stream); public: virtual int get_perfer_cid(); public: @@ -1076,7 +1092,7 @@ int srs_rtmp_expect_message(SrsProtocol* protocol, SrsCommonMessage** pmsg, T** } srs_verbose("recv message success."); - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { delete msg; srs_error("decode message failed. ret=%d", ret); return ret; diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index 2057998e7..d113d3717 100644 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -23,11 +23,6 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include -#include -#include -#include -#include - #include #include #include @@ -168,50 +163,104 @@ SrsResponse::~SrsResponse() { } -SrsRtmpClient::SrsRtmpClient() +SrsRtmpClient::SrsRtmpClient(st_netfd_t _stfd) { - stfd = NULL; + stfd = _stfd; + protocol = new SrsProtocol(stfd); } SrsRtmpClient::~SrsRtmpClient() { - if (stfd) { - int fd = st_netfd_fileno(stfd); - st_netfd_close(stfd); - stfd = NULL; - - // st does not close it sometimes, - // close it manually. - close(fd); - } + srs_freep(protocol); } -int SrsRtmpClient::connect_to(std::string server, int port) +void SrsRtmpClient::set_recv_timeout(int64_t timeout_us) { - int ret = ERROR_SUCCESS; - return ret; + protocol->set_recv_timeout(timeout_us); } -std::string SrsRtmpClient::parse_server(std::string host){ - if(inet_addr(host.c_str()) != INADDR_NONE){ - return host; - } - - hostent* answer = gethostbyname(host.c_str()); - if(answer == NULL){ - srs_error("dns resolve host %s error.", host.c_str()); - return ""; - } +void SrsRtmpClient::set_send_timeout(int64_t timeout_us) +{ + protocol->set_send_timeout(timeout_us); +} + +int SrsRtmpClient::handshake() +{ + int ret = ERROR_SUCCESS; + + SrsSocket skt(stfd); - char ipv4[16]; - memset(ipv4, 0, sizeof(ipv4)); - for(int i = 0; i < answer->h_length; i++){ - inet_ntop(AF_INET, answer->h_addr_list[i], ipv4, sizeof(ipv4)); - srs_info("dns resolve host %s to %s.", host.c_str(), ipv4); - break; + SrsComplexHandshake complex_hs; + SrsSimpleHandshake simple_hs; + if ((ret = simple_hs.handshake_with_server(skt, complex_hs)) != ERROR_SUCCESS) { + return ret; } - return ipv4; + return ret; +} + +int SrsRtmpClient::connect_app(std::string app, std::string tc_url) +{ + int ret = ERROR_SUCCESS; + + // Connect(vhost, app) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsConnectAppPacket* pkt = new SrsConnectAppPacket(); + msg->set_packet(pkt, 0); + + pkt->command_object = new SrsAmf0Object(); + pkt->command_object->set("app", new SrsAmf0String(app.c_str())); + pkt->command_object->set("swfUrl", new SrsAmf0String()); + pkt->command_object->set("tcUrl", new SrsAmf0String(tc_url.c_str())); + pkt->command_object->set("fpad", new SrsAmf0Boolean(false)); + pkt->command_object->set("capabilities", new SrsAmf0Number(239)); + pkt->command_object->set("audioCodecs", new SrsAmf0Number(3575)); + pkt->command_object->set("videoCodecs", new SrsAmf0Number(252)); + pkt->command_object->set("videoFunction", new SrsAmf0Number(1)); + pkt->command_object->set("pageUrl", new SrsAmf0String()); + pkt->command_object->set("objectEncoding", new SrsAmf0Number(0)); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + return ret; + } + } + + // Set Window Acknowledgement size(2500000) + if (true) { + SrsCommonMessage* msg = new SrsCommonMessage(); + SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket(); + + pkt->ackowledgement_window_size = 2500000; + msg->set_packet(pkt, 0); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + return ret; + } + } + + // expect connect _result + SrsCommonMessage* msg = NULL; + SrsConnectAppResPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("expect connect app response message failed. ret=%d", ret); + return ret; + } + SrsAutoFree(SrsCommonMessage, msg, false); + srs_info("get connect app response message"); + + return ret; +} + +int SrsRtmpClient::play_stream(std::string stream, int& stream_id) +{ + int ret = ERROR_SUCCESS; + + // CreateStream + if (true) { + } + + return ret; } SrsRtmp::SrsRtmp(st_netfd_t client_stfd) @@ -225,9 +274,14 @@ SrsRtmp::~SrsRtmp() srs_freep(protocol); } +SrsProtocol* SrsRtmp::get_protocol() +{ + return protocol; +} + void SrsRtmp::set_recv_timeout(int64_t timeout_us) { - return protocol->set_recv_timeout(timeout_us); + protocol->set_recv_timeout(timeout_us); } int64_t SrsRtmp::get_recv_timeout() @@ -237,7 +291,7 @@ int64_t SrsRtmp::get_recv_timeout() void SrsRtmp::set_send_timeout(int64_t timeout_us) { - return protocol->set_send_timeout(timeout_us); + protocol->set_send_timeout(timeout_us); } int64_t SrsRtmp::get_recv_bytes() @@ -278,7 +332,7 @@ int SrsRtmp::handshake() SrsComplexHandshake complex_hs; SrsSimpleHandshake simple_hs; - if ((ret = simple_hs.handshake(skt, complex_hs)) != ERROR_SUCCESS) { + if ((ret = simple_hs.handshake_with_client(skt, complex_hs)) != ERROR_SUCCESS) { return ret; } @@ -441,7 +495,7 @@ int SrsRtmp::identify_client(int stream_id, SrsClientType& type, std::string& st continue; } - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { srs_error("identify decode message failed. ret=%d", ret); return ret; } @@ -884,7 +938,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea continue; } - if ((ret = msg->decode_packet()) != ERROR_SUCCESS) { + if ((ret = msg->decode_packet(protocol)) != ERROR_SUCCESS) { srs_error("identify decode message failed. ret=%d", ret); return ret; } diff --git a/trunk/src/core/srs_core_rtmp.hpp b/trunk/src/core/srs_core_rtmp.hpp index 90c5498f1..c3c4ec5f3 100644 --- a/trunk/src/core/srs_core_rtmp.hpp +++ b/trunk/src/core/srs_core_rtmp.hpp @@ -100,13 +100,18 @@ enum SrsClientType class SrsRtmpClient { private: + SrsProtocol* protocol; st_netfd_t stfd; public: - SrsRtmpClient(); + SrsRtmpClient(st_netfd_t _stfd); virtual ~SrsRtmpClient(); -private: - virtual int connect_to(std::string server, int port); - std::string parse_server(std::string host); +public: + virtual void set_recv_timeout(int64_t timeout_us); + virtual void set_send_timeout(int64_t timeout_us); +public: + virtual int handshake(); + virtual int connect_app(std::string app, std::string tc_url); + virtual int play_stream(std::string stream, int& stream_id); }; /** @@ -123,6 +128,7 @@ public: SrsRtmp(st_netfd_t client_stfd); virtual ~SrsRtmp(); public: + virtual SrsProtocol* get_protocol(); virtual void set_recv_timeout(int64_t timeout_us); virtual int64_t get_recv_timeout(); virtual void set_send_timeout(int64_t timeout_us);