diff --git a/trunk/src/service/srs_service_http_client.cpp b/trunk/src/service/srs_service_http_client.cpp index 21fe27307..527aa4f43 100644 --- a/trunk/src/service/srs_service_http_client.cpp +++ b/trunk/src/service/srs_service_http_client.cpp @@ -41,7 +41,7 @@ SrsHttpClient::SrsHttpClient() clk = new SrsWallClock(); kbps = new SrsKbps(clk); parser = NULL; - timeout = SRS_UTIME_NO_TIMEOUT; + recv_timeout = timeout = SRS_UTIME_NO_TIMEOUT; port = 0; } @@ -68,7 +68,7 @@ srs_error_t SrsHttpClient::initialize(string h, int p, srs_utime_t tm) // Always disconnect the transport. host = h; port = p; - timeout = tm; + recv_timeout = timeout = tm; disconnect(); // ep used for host in header. @@ -187,7 +187,7 @@ srs_error_t SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg) void SrsHttpClient::set_recv_timeout(srs_utime_t tm) { - transport->set_recv_timeout(tm); + recv_timeout = tm; } void SrsHttpClient::kbps_sample(const char* label, int64_t age) @@ -222,11 +222,12 @@ srs_error_t SrsHttpClient::connect() transport = new SrsTcpClient(host, port, timeout); if ((err = transport->connect()) != srs_success) { disconnect(); - return srs_error_wrap(err, "http: tcp connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); + return srs_error_wrap(err, "http: tcp connect %s:%d to=%dms, rto=%dms", + host.c_str(), port, srsu2msi(timeout), srsu2msi(recv_timeout)); } // Set the recv/send timeout in srs_utime_t. - transport->set_recv_timeout(timeout); + transport->set_recv_timeout(recv_timeout); transport->set_send_timeout(timeout); kbps->set_io(transport, transport); diff --git a/trunk/src/service/srs_service_http_client.hpp b/trunk/src/service/srs_service_http_client.hpp index d11b1fb8e..12193809e 100644 --- a/trunk/src/service/srs_service_http_client.hpp +++ b/trunk/src/service/srs_service_http_client.hpp @@ -63,6 +63,7 @@ private: private: // The timeout in srs_utime_t. srs_utime_t timeout; + srs_utime_t recv_timeout; // The host name or ip. std::string host; int port; diff --git a/trunk/src/utest/srs_utest_service.cpp b/trunk/src/utest/srs_utest_service.cpp index e75f7285d..7cb9dcc28 100644 --- a/trunk/src/utest/srs_utest_service.cpp +++ b/trunk/src/utest/srs_utest_service.cpp @@ -36,6 +36,7 @@ using namespace std; #include #include #include +#include #include #include @@ -1079,3 +1080,141 @@ VOID TEST(TCPServerTest, CoverUtility) } } +class MockOnCycleThread4 : public ISrsCoroutineHandler +{ +public: + SrsSTCoroutine trd; + srs_netfd_t fd; + MockOnCycleThread4() : trd("mock", this, 0) { + }; + virtual ~MockOnCycleThread4() { + trd.stop(); + srs_close_stfd(fd); + } + virtual srs_error_t start(string ip, int port) { + srs_error_t err = srs_success; + if ((err = srs_tcp_listen(ip, port, &fd)) != srs_success) { + return err; + } + + return trd.start(); + } + virtual srs_error_t do_cycle(srs_netfd_t cfd) { + srs_error_t err = srs_success; + + SrsStSocket skt; + if ((err = skt.initialize(cfd)) != srs_success) { + return err; + } + + skt.set_recv_timeout(1 * SRS_UTIME_SECONDS); + skt.set_send_timeout(1 * SRS_UTIME_SECONDS); + + while (true) { + if ((err = trd.pull()) != srs_success) { + return err; + } + + char buf[1024]; + if ((err = skt.read(buf, 1024, NULL)) != srs_success) { + return err; + } + + string res = mock_http_response(200, "OK"); + if ((err = skt.write((char*)res.data(), (int)res.length(), NULL)) != srs_success) { + return err; + } + } + + return err; + } + virtual srs_error_t cycle() { + srs_error_t err = srs_success; + + srs_netfd_t cfd = srs_accept(fd, NULL, NULL, SRS_UTIME_NO_TIMEOUT); + if (cfd == NULL) { + return err; + } + + err = do_cycle(cfd); + srs_close_stfd(cfd); + srs_freep(err); + + return err; + } +}; + +VOID TEST(TCPServerTest, HTTPClientUtility) +{ + srs_error_t err; + + // Typical HTTP POST. + if (true) { + MockOnCycleThread4 trd; + HELPER_ASSERT_SUCCESS(trd.start("127.0.0.1", 8080)); + + SrsHttpClient client; + HELPER_ASSERT_SUCCESS(client.initialize("127.0.0.1", 8080, 1*SRS_UTIME_SECONDS)); + + ISrsHttpMessage* res = NULL; + SrsAutoFree(ISrsHttpMessage, res); + HELPER_ASSERT_SUCCESS(client.post("/api/v1", "", &res)); + + ISrsHttpResponseReader* br = res->body_reader(); + ASSERT_FALSE(br->eof()); + + ssize_t nn = 0; char buf[1024]; + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(br->read(buf, sizeof(buf), &nn)); + ASSERT_EQ(2, nn); + EXPECT_STREQ("OK", buf); + } + + // Typical HTTP GET. + if (true) { + MockOnCycleThread4 trd; + HELPER_ASSERT_SUCCESS(trd.start("127.0.0.1", 8080)); + + SrsHttpClient client; + HELPER_ASSERT_SUCCESS(client.initialize("127.0.0.1", 8080, 1*SRS_UTIME_SECONDS)); + + ISrsHttpMessage* res = NULL; + SrsAutoFree(ISrsHttpMessage, res); + HELPER_ASSERT_SUCCESS(client.get("/api/v1", "", &res)); + + ISrsHttpResponseReader* br = res->body_reader(); + ASSERT_FALSE(br->eof()); + + ssize_t nn = 0; char buf[1024]; + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(br->read(buf, sizeof(buf), &nn)); + ASSERT_EQ(2, nn); + EXPECT_STREQ("OK", buf); + } + + // Set receive timeout and Kbps ample. + if (true) { + MockOnCycleThread4 trd; + HELPER_ASSERT_SUCCESS(trd.start("127.0.0.1", 8080)); + + SrsHttpClient client; + HELPER_ASSERT_SUCCESS(client.initialize("127.0.0.1", 8080, 1*SRS_UTIME_SECONDS)); + client.set_recv_timeout(1 * SRS_UTIME_SECONDS); + + ISrsHttpMessage* res = NULL; + SrsAutoFree(ISrsHttpMessage, res); + HELPER_ASSERT_SUCCESS(client.get("/api/v1", "", &res)); + + ISrsHttpResponseReader* br = res->body_reader(); + ASSERT_FALSE(br->eof()); + + ssize_t nn = 0; char buf[1024]; + HELPER_ARRAY_INIT(buf, sizeof(buf), 0); + HELPER_ASSERT_SUCCESS(br->read(buf, sizeof(buf), &nn)); + ASSERT_EQ(2, nn); + EXPECT_STREQ("OK", buf); + + client.kbps_sample("SRS", 0); + } +} +