refine the forwarder, use srs message queue

pull/133/head
winlin 11 years ago
parent c47f07c69a
commit 5836ffd2d4

@ -52,22 +52,16 @@ SrsForwarder::SrsForwarder(SrsSource* _source)
pthread = new SrsThread(this, SRS_FORWARDER_SLEEP_MS);
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
}
SrsForwarder::~SrsForwarder()
{
on_unpublish();
// TODO: FIXME: remove it.
std::vector<SrsSharedPtrMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsSharedPtrMessage* msg = *it;
srs_freep(msg);
}
msgs.clear();
srs_freep(pthread);
srs_freep(queue);
srs_freep(jitter);
}
void SrsForwarder::set_queue_size(double queue_size)
@ -143,7 +137,14 @@ int SrsForwarder::on_meta_data(SrsSharedPtrMessage* metadata)
{
int ret = ERROR_SUCCESS;
msgs.push_back(metadata);
if ((ret = jitter->correct(metadata, 0, 0)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
if ((ret = queue->enqueue(metadata)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -152,8 +153,14 @@ int SrsForwarder::on_audio(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: must drop the msgs when server failed.
msgs.push_back(msg);
if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -162,8 +169,14 @@ int SrsForwarder::on_video(SrsSharedPtrMessage* msg)
{
int ret = ERROR_SUCCESS;
// TODO: FIXME: must drop the msgs when server failed.
msgs.push_back(msg);
if ((ret = jitter->correct(msg, 0, 0)) != ERROR_SUCCESS) {
srs_freep(msg);
return ret;
}
if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
@ -292,11 +305,20 @@ int SrsForwarder::forward()
}
}
// forward all messages.
int count = 0;
SrsSharedPtrMessage** msgs = NULL;
if ((ret = queue->get_packets(0, &msgs, count)) != ERROR_SUCCESS) {
srs_error("get message to forward failed. ret=%d", ret);
return ret;
}
// ignore when no messages.
int count = (int)msgs.size();
if (msgs.empty()) {
if (count <= 0) {
srs_verbose("no packets to forward.");
continue;
}
SrsAutoFree(SrsSharedPtrMessage*, msgs, true);
// pithy print
pithy_print.elapse(SRS_PULSE_TIMEOUT_MS);
@ -306,36 +328,17 @@ int SrsForwarder::forward()
}
// all msgs to forward.
int i = 0;
for (i = 0; i < count; i++) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
msgs[i] = NULL;
// we erased the sendout messages, the msg must not be NULL.
srs_assert(msg);
msgs[i] = NULL;
ret = client->send_message(msg);
if (ret != ERROR_SUCCESS) {
if ((ret = client->send_message(msg)) != ERROR_SUCCESS) {
srs_error("forwarder send message to server failed. ret=%d", ret);
// convert the index to count when error.
i++;
break;
return ret;
}
}
// clear sendout mesages.
if (i < count) {
srs_warn("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);
} else {
srs_info("clear forwarded msg, total=%d, forwarded=%d, ret=%d", count, i, ret);
}
msgs.erase(msgs.begin(), msgs.begin() + i);
if (ret != ERROR_SUCCESS) {
break;
}
}
return ret;

@ -30,13 +30,13 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <srs_core_thread.hpp>
class SrsSharedPtrMessage;
class SrsOnMetaDataPacket;
class SrsMessageQueue;
class SrsRtmpJitter;
class SrsRtmpClient;
class SrsRequest;
class SrsSource;
@ -59,8 +59,8 @@ private:
private:
SrsSource* source;
SrsRtmpClient* client;
SrsRtmpJitter* jitter;
SrsMessageQueue* queue;
std::vector<SrsSharedPtrMessage*> msgs;
public:
SrsForwarder(SrsSource* _source);
virtual ~SrsForwarder();

@ -101,7 +101,10 @@ public:
*/
virtual int enqueue(SrsSharedPtrMessage* msg);
/**
* get messages from the queue.
* get packets in consumer queue.
* @pmsgs SrsMessages*[], output the prt array.
* @count the count in array.
* @max_count the max count to dequeue, 0 to dequeue all.
*/
virtual int get_packets(int max_count, SrsSharedPtrMessage**& pmsgs, int& count);
private:
@ -122,7 +125,6 @@ private:
SrsRtmpJitter* jitter;
SrsSource* source;
SrsMessageQueue* queue;
std::vector<SrsSharedPtrMessage*> msgs;
bool paused;
public:
SrsConsumer(SrsSource* _source);

Loading…
Cancel
Save