diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 48f97d8bb..8c992eeff 100755 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -45,4 +45,20 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include +// free the p and set to NULL. +// p must be a T*. +#define srs_freep(p) \ + if (p) { \ + delete p; \ + p = NULL; \ + } \ + (void)0 +// free the p which represents a array +#define srs_freepa(p) \ + if (p) { \ + delete[] p; \ + p = NULL; \ + } \ + (void)0 + #endif \ No newline at end of file diff --git a/trunk/src/core/srs_core_amf0.cpp b/trunk/src/core/srs_core_amf0.cpp index 909f2d7c6..999fb99e6 100755 --- a/trunk/src/core/srs_core_amf0.cpp +++ b/trunk/src/core/srs_core_amf0.cpp @@ -23,6 +23,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include +#include + #include #include #include @@ -169,33 +171,76 @@ SrsAmf0ObjectEOF::~SrsAmf0ObjectEOF() { } -SrsAmf0Object::SrsAmf0Object() +SrsUnSortedHashtable::SrsUnSortedHashtable() { - marker = RTMP_AMF0_Object; } -SrsAmf0Object::~SrsAmf0Object() +SrsUnSortedHashtable::~SrsUnSortedHashtable() { - std::map::iterator it; + std::vector::iterator it; for (it = properties.begin(); it != properties.end(); ++it) { - SrsAmf0Any* any = it->second; - delete any; + SrsObjectPropertyType& elem = *it; + SrsAmf0Any* any = elem.second; + srs_freep(any); } properties.clear(); } -SrsAmf0Any* SrsAmf0Object::get_property(std::string name) +int SrsUnSortedHashtable::size() +{ + return (int)properties.size(); +} + +std::string SrsUnSortedHashtable::key_at(int index) +{ + srs_assert(index < size()); + SrsObjectPropertyType& elem = properties[index]; + return elem.first; +} + +SrsAmf0Any* SrsUnSortedHashtable::value_at(int index) { - std::map::iterator it; + srs_assert(index < size()); + SrsObjectPropertyType& elem = properties[index]; + return elem.second; +} + +void SrsUnSortedHashtable::set(std::string key, SrsAmf0Any* value) +{ + std::vector::iterator it; - if ((it = properties.find(name)) == properties.end()) { - return NULL; + for (it = properties.begin(); it != properties.end(); ++it) { + SrsObjectPropertyType& elem = *it; + std::string name = elem.first; + SrsAmf0Any* any = elem.second; + + if (key == name) { + srs_freep(any); + properties.erase(it); + break; + } } - return it->second; + properties.push_back(std::make_pair(key, value)); } -SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) +SrsAmf0Any* SrsUnSortedHashtable::get_property(std::string name) +{ + std::vector::iterator it; + + for (it = properties.begin(); it != properties.end(); ++it) { + SrsObjectPropertyType& elem = *it; + std::string key = elem.first; + SrsAmf0Any* any = elem.second; + if (key == name) { + return any; + } + } + + return NULL; +} + +SrsAmf0Any* SrsUnSortedHashtable::ensure_property_string(std::string name) { SrsAmf0Any* prop = get_property(name); @@ -210,6 +255,45 @@ SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) return prop; } +SrsAmf0Object::SrsAmf0Object() +{ + marker = RTMP_AMF0_Object; +} + +SrsAmf0Object::~SrsAmf0Object() +{ +} + +int SrsAmf0Object::size() +{ + return properties.size(); +} + +std::string SrsAmf0Object::key_at(int index) +{ + return properties.key_at(index); +} + +SrsAmf0Any* SrsAmf0Object::value_at(int index) +{ + return properties.value_at(index); +} + +void SrsAmf0Object::set(std::string key, SrsAmf0Any* value) +{ + properties.set(key, value); +} + +SrsAmf0Any* SrsAmf0Object::get_property(std::string name) +{ + return properties.get_property(name); +} + +SrsAmf0Any* SrsAmf0Object::ensure_property_string(std::string name) +{ + return properties.ensure_property_string(name); +} + SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray() { marker = RTMP_AMF0_EcmaArray; @@ -217,38 +301,36 @@ SrsASrsAmf0EcmaArray::SrsASrsAmf0EcmaArray() SrsASrsAmf0EcmaArray::~SrsASrsAmf0EcmaArray() { - std::map::iterator it; - for (it = properties.begin(); it != properties.end(); ++it) { - SrsAmf0Any* any = it->second; - delete any; - } - properties.clear(); +} + +int SrsASrsAmf0EcmaArray::size() +{ + return properties.size(); +} + +std::string SrsASrsAmf0EcmaArray::key_at(int index) +{ + return properties.key_at(index); +} + +SrsAmf0Any* SrsASrsAmf0EcmaArray::value_at(int index) +{ + return properties.value_at(index); +} + +void SrsASrsAmf0EcmaArray::set(std::string key, SrsAmf0Any* value) +{ + properties.set(key, value); } SrsAmf0Any* SrsASrsAmf0EcmaArray::get_property(std::string name) { - std::map::iterator it; - - if ((it = properties.find(name)) == properties.end()) { - return NULL; - } - - return it->second; + return properties.get_property(name); } SrsAmf0Any* SrsASrsAmf0EcmaArray::ensure_property_string(std::string name) { - SrsAmf0Any* prop = get_property(name); - - if (!prop) { - return NULL; - } - - if (!prop->is_string()) { - return NULL; - } - - return prop; + return properties.ensure_property_string(name); } int srs_amf0_read_utf8(SrsStream* stream, std::string& value) @@ -877,14 +959,14 @@ int srs_amf0_read_object(SrsStream* stream, SrsAmf0Object*& value) // AMF0 Object EOF. if (property_name.empty() || !property_value || property_value->is_object_eof()) { if (property_value) { - delete property_value; + srs_freep(property_value); } srs_info("amf0 read object EOF."); break; } // add property - value->properties[property_name] = property_value; + value->set(property_name, property_value); } return ret; @@ -906,10 +988,9 @@ int srs_amf0_write_object(SrsStream* stream, SrsAmf0Object* value) srs_verbose("amf0 write object marker success"); // value - std::map::iterator it; - for (it = value->properties.begin(); it != value->properties.end(); ++it) { - std::string name = it->first; - SrsAmf0Any* any = it->second; + for (int i = 0; i < value->size(); i++) { + std::string name = value->key_at(i); + SrsAmf0Any* any = value->value_at(i); if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) { srs_error("write object property name failed. ret=%d", ret); @@ -986,14 +1067,14 @@ int srs_amf0_read_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray*& value) // AMF0 Object EOF. if (property_name.empty() || !property_value || property_value->is_object_eof()) { if (property_value) { - delete property_value; + srs_freep(property_value); } srs_info("amf0 read ecma_array EOF."); break; } // add property - value->properties[property_name] = property_value; + value->set(property_name, property_value); } return ret; @@ -1025,10 +1106,9 @@ int srs_amf0_write_ecma_array(SrsStream* stream, SrsASrsAmf0EcmaArray* value) srs_verbose("amf0 write ecma_array count success. count=%d", value->count); // value - std::map::iterator it; - for (it = value->properties.begin(); it != value->properties.end(); ++it) { - std::string name = it->first; - SrsAmf0Any* any = it->second; + for (int i = 0; i < value->size(); i++) { + std::string name = value->key_at(i); + SrsAmf0Any* any = value->value_at(i); if ((ret = srs_amf0_write_utf8(stream, name)) != ERROR_SUCCESS) { srs_error("write ecma_array property name failed. ret=%d", ret); @@ -1091,10 +1171,9 @@ int srs_amf0_get_object_size(SrsAmf0Object* obj) int size = 1; - std::map::iterator it; - for (it = obj->properties.begin(); it != obj->properties.end(); ++it) { - std::string name = it->first; - SrsAmf0Any* value = it->second; + for (int i = 0; i < obj->size(); i++){ + std::string name = obj->key_at(i); + SrsAmf0Any* value = obj->value_at(i); size += srs_amf0_get_utf8_size(name); size += srs_amf0_get_any_size(value); @@ -1113,10 +1192,9 @@ int srs_amf0_get_ecma_array_size(SrsASrsAmf0EcmaArray* arr) int size = 1 + 4; - std::map::iterator it; - for (it = arr->properties.begin(); it != arr->properties.end(); ++it) { - std::string name = it->first; - SrsAmf0Any* value = it->second; + for (int i = 0; i < arr->size(); i++){ + std::string name = arr->key_at(i); + SrsAmf0Any* value = arr->value_at(i); size += srs_amf0_get_utf8_size(name); size += srs_amf0_get_any_size(value); diff --git a/trunk/src/core/srs_core_amf0.hpp b/trunk/src/core/srs_core_amf0.hpp index 3624eacd0..8b42a00d7 100755 --- a/trunk/src/core/srs_core_amf0.hpp +++ b/trunk/src/core/srs_core_amf0.hpp @@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #include #include -#include +#include class SrsStream; class SrsAmf0Object; @@ -139,6 +139,30 @@ struct SrsAmf0ObjectEOF : public SrsAmf0Any virtual ~SrsAmf0ObjectEOF(); }; +/** +* to ensure in inserted order. +* for the FMLE will crash when AMF0Object is not ordered by inserted, +* if ordered in map, the string compare order, the FMLE will creash when +* get the response of connect app. +*/ +struct SrsUnSortedHashtable +{ +private: + typedef std::pair SrsObjectPropertyType; + std::vector properties; +public: + SrsUnSortedHashtable(); + virtual ~SrsUnSortedHashtable(); + + virtual int size(); + virtual std::string key_at(int index); + virtual SrsAmf0Any* value_at(int index); + virtual void set(std::string key, SrsAmf0Any* value); + + virtual SrsAmf0Any* get_property(std::string name); + virtual SrsAmf0Any* ensure_property_string(std::string name); +}; + /** * 2.5 Object Type * anonymous-object-type = object-marker *(object-property) @@ -146,11 +170,18 @@ struct SrsAmf0ObjectEOF : public SrsAmf0Any */ struct SrsAmf0Object : public SrsAmf0Any { - std::map properties; +private: + SrsUnSortedHashtable properties; +public: SrsAmf0ObjectEOF eof; SrsAmf0Object(); virtual ~SrsAmf0Object(); + + virtual int size(); + virtual std::string key_at(int index); + virtual SrsAmf0Any* value_at(int index); + virtual void set(std::string key, SrsAmf0Any* value); virtual SrsAmf0Any* get_property(std::string name); virtual SrsAmf0Any* ensure_property_string(std::string name); @@ -164,12 +195,19 @@ struct SrsAmf0Object : public SrsAmf0Any */ struct SrsASrsAmf0EcmaArray : public SrsAmf0Any { +private: + SrsUnSortedHashtable properties; +public: int32_t count; - std::map properties; SrsAmf0ObjectEOF eof; SrsASrsAmf0EcmaArray(); virtual ~SrsASrsAmf0EcmaArray(); + + virtual int size(); + virtual std::string key_at(int index); + virtual SrsAmf0Any* value_at(int index); + virtual void set(std::string key, SrsAmf0Any* value); virtual SrsAmf0Any* get_property(std::string name); virtual SrsAmf0Any* ensure_property_string(std::string name); diff --git a/trunk/src/core/srs_core_buffer.cpp b/trunk/src/core/srs_core_buffer.cpp index df1376d76..ca1d283f9 100755 --- a/trunk/src/core/srs_core_buffer.cpp +++ b/trunk/src/core/srs_core_buffer.cpp @@ -55,7 +55,7 @@ void SrsBuffer::append(char* bytes, int size) { std::vector vec(bytes, bytes + size); - data.insert(data.begin(), vec.begin(), vec.end()); + data.insert(data.end(), vec.begin(), vec.end()); } int SrsBuffer::ensure_buffer_bytes(SrsSocket* skt, int required_size) diff --git a/trunk/src/core/srs_core_client.cpp b/trunk/src/core/srs_core_client.cpp index 093ce13a3..a676e8d5a 100755 --- a/trunk/src/core/srs_core_client.cpp +++ b/trunk/src/core/srs_core_client.cpp @@ -42,20 +42,9 @@ SrsClient::SrsClient(SrsServer* srs_server, st_netfd_t client_stfd) SrsClient::~SrsClient() { - if (ip) { - delete[] ip; - ip = NULL; - } - - if (req) { - delete req; - req = NULL; - } - - if (rtmp) { - delete rtmp; - rtmp = NULL; - } + srs_freepa(ip); + srs_freep(req); + srs_freep(rtmp); } int SrsClient::do_cycle() diff --git a/trunk/src/core/srs_core_protocol.cpp b/trunk/src/core/srs_core_protocol.cpp index b5c7eb610..06a39507e 100755 --- a/trunk/src/core/srs_core_protocol.cpp +++ b/trunk/src/core/srs_core_protocol.cpp @@ -199,6 +199,7 @@ messages. #define RTMP_AMF0_COMMAND_RESULT "_result" #define RTMP_AMF0_COMMAND_RELEASE_STREAM "releaseStream" #define RTMP_AMF0_COMMAND_FC_PUBLISH "FCPublish" +#define RTMP_AMF0_COMMAND_PUBLISH "publish" #define RTMP_AMF0_DATA_SAMPLE_ACCESS "|RtmpSampleAccess" /**************************************************************************** @@ -260,23 +261,13 @@ SrsProtocol::~SrsProtocol() for (it = chunk_streams.begin(); it != chunk_streams.end(); ++it) { SrsChunkStream* stream = it->second; - - if (stream) { - delete stream; - } + srs_freep(stream); } chunk_streams.clear(); - if (buffer) { - delete buffer; - buffer = NULL; - } - - if (skt) { - delete skt; - skt = NULL; - } + srs_freep(buffer); + srs_freep(skt); } int SrsProtocol::recv_message(SrsMessage** pmsg) @@ -302,13 +293,13 @@ int SrsProtocol::recv_message(SrsMessage** pmsg) srs_trace("ignore empty message(type=%d, size=%d, time=%d, sid=%d).", msg->header.message_type, msg->header.payload_length, msg->header.timestamp, msg->header.stream_id); - delete msg; + srs_freep(msg); continue; } if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) { srs_error("hook the received msg failed. ret=%d", ret); - delete msg; + srs_freep(msg); return ret; } @@ -615,13 +606,36 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz { int ret = ERROR_SUCCESS; - // when not exists cached msg, means get an new message, - // the fmt must be type0/type1 which means new message. - if (!chunk->msg && fmt != RTMP_FMT_TYPE0 && fmt != RTMP_FMT_TYPE1) { + /** + * we should not assert anything about fmt, for the first packet. + * (when first packet, the chunk->msg is NULL). + * the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet. + * the previous packet is: + * 04 // fmt=0, cid=4 + * 00 00 1a // timestamp=26 + * 00 00 9d // payload_length=157 + * 08 // message_type=8(audio) + * 01 00 00 00 // stream_id=1 + * the current packet maybe: + * c4 // fmt=3, cid=4 + * it's ok, for the packet is audio, and timestamp delta is 26. + * the current packet must be parsed as: + * fmt=0, cid=4 + * timestamp=26+26=52 + * payload_length=157 + * message_type=8(audio) + * stream_id=1 + * so we must update the timestamp even fmt=3 for first packet. + */ + // fresh packet used to update the timestamp even fmt=3 for first packet. + bool is_fresh_packet = !chunk->msg; + + // but, we can ensure that when a chunk stream is fresh, + // the fmt must be 0, a new stream. + if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) { ret = ERROR_RTMP_CHUNK_START; - srs_error("chunk stream start, " - "fmt must be %d or %d, actual is %d. ret=%d", - RTMP_FMT_TYPE0, RTMP_FMT_TYPE1, fmt, ret); + srs_error("chunk stream is fresh, " + "fmt must be %d, actual is %d. ret=%d", RTMP_FMT_TYPE0, fmt, ret); return ret; } @@ -636,13 +650,12 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // create msg when new chunk stream start if (!chunk->msg) { - srs_assert(fmt == RTMP_FMT_TYPE0 || fmt == RTMP_FMT_TYPE1); chunk->msg = new SrsMessage(); srs_verbose("create message for new chunk, fmt=%d, cid=%d", fmt, chunk->cid); } // read message header from socket to buffer. - static char mh_sizes[] = {11, 7, 1, 0}; + static char mh_sizes[] = {11, 7, 3, 0}; mh_size = mh_sizes[(int)fmt]; srs_verbose("calc chunk message header size. fmt=%d, mh_size=%d", fmt, mh_size); @@ -656,8 +669,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // parse the message header. // see also: ngx_rtmp_recv if (fmt <= RTMP_FMT_TYPE2) { - int32_t timestamp_delta; - char* pp = (char*)×tamp_delta; + char* pp = (char*)&chunk->header.timestamp_delta; pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; @@ -667,13 +679,13 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // 6.1.2.1. Type 0 // For a type-0 chunk, the absolute timestamp of the message is sent // here. - chunk->header.timestamp = timestamp_delta; + chunk->header.timestamp = chunk->header.timestamp_delta; } else { // 6.1.2.2. Type 1 // 6.1.2.3. Type 2 // For a type-1 or type-2 chunk, the difference between the previous // chunk's timestamp and the current chunk's timestamp is sent here. - chunk->header.timestamp += timestamp_delta; + chunk->header.timestamp += chunk->header.timestamp_delta; } // fmt: 0 @@ -689,7 +701,7 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // 0x00ffffff), this value MUST be 16777215, and the ‘extended // timestamp header’ MUST be present. Otherwise, this value SHOULD be // the entire delta. - chunk->extended_timestamp = (timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); + chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); if (chunk->extended_timestamp) { chunk->header.timestamp = RTMP_EXTENDED_TIMESTAMP; } @@ -722,6 +734,10 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz fmt, mh_size, chunk->extended_timestamp, chunk->header.timestamp); } } else { + // update the timestamp even fmt=3 for first stream + if (is_fresh_packet && !chunk->extended_timestamp) { + chunk->header.timestamp += chunk->header.timestamp_delta; + } srs_verbose("header read completed. fmt=%d, size=%d, ext_time=%d", fmt, mh_size, chunk->extended_timestamp); } @@ -754,6 +770,9 @@ int SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt, int bh_siz // copy header to msg chunk->msg->header = chunk->header; + // increase the msg count, the chunk stream can accept fmt=1/2/3 message now. + chunk->msg_count++; + return ret; } @@ -802,7 +821,7 @@ int SrsProtocol::read_message_payload(SrsChunkStream* chunk, int bh_size, int mh buffer->erase(bh_size + mh_size + payload_size); chunk->msg->size += payload_size; - srs_verbose("chunk payload read complted. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); + srs_verbose("chunk payload read completed. bh_size=%d, mh_size=%d, payload_size=%d", bh_size, mh_size, payload_size); // got entire RTMP message? if (chunk->header.payload_length == chunk->msg->size) { @@ -826,8 +845,10 @@ SrsMessageHeader::SrsMessageHeader() { message_type = 0; payload_length = 0; - timestamp = 0; + timestamp_delta = 0; stream_id = 0; + + timestamp = 0; } SrsMessageHeader::~SrsMessageHeader() @@ -849,20 +870,23 @@ bool SrsMessageHeader::is_window_ackledgement_size() return message_type == RTMP_MSG_WindowAcknowledgementSize; } +bool SrsMessageHeader::is_set_chunk_size() +{ + return message_type == RTMP_MSG_SetChunkSize; +} + SrsChunkStream::SrsChunkStream(int _cid) { fmt = 0; cid = _cid; extended_timestamp = false; msg = NULL; + msg_count = 0; } SrsChunkStream::~SrsChunkStream() { - if (msg) { - delete msg; - msg = NULL; - } + srs_freep(msg); } SrsMessage::SrsMessage() @@ -875,20 +899,9 @@ SrsMessage::SrsMessage() SrsMessage::~SrsMessage() { - if (payload) { - delete[] payload; - payload = NULL; - } - - if (packet) { - delete packet; - packet = NULL; - } - - if (stream) { - delete stream; - stream = NULL; - } + srs_freep(payload); + srs_freep(packet); + srs_freep(stream); } int SrsMessage::decode_packet() @@ -962,6 +975,10 @@ int SrsMessage::decode_packet() srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message)."); packet = new SrsFMLEStartPacket(); return packet->decode(stream); + } else if(command == RTMP_AMF0_COMMAND_PUBLISH) { + srs_info("decode the AMF0/AMF3 command(publish message)."); + packet = new SrsPublishPacket(); + return packet->decode(stream); } // default packet to drop message. @@ -972,6 +989,10 @@ int SrsMessage::decode_packet() srs_verbose("start to decode set ack window size message."); packet = new SrsSetWindowAckSizePacket(); return packet->decode(stream); + } else if(header.is_set_chunk_size()) { + srs_verbose("start to decode set chunk size message."); + packet = new SrsSetChunkSizePacket(); + return packet->decode(stream); } else { // default packet to drop message. srs_trace("drop the unknown message, type=%d", header.message_type); @@ -1008,9 +1029,7 @@ int SrsMessage::get_perfer_cid() void SrsMessage::set_packet(SrsPacket* pkt, int stream_id) { - if (packet) { - delete packet; - } + srs_freep(packet); packet = pkt; @@ -1029,10 +1048,7 @@ int SrsMessage::encode_packet() } // realloc the payload. size = 0; - if (payload) { - delete[] payload; - payload = NULL; - } + srs_freepa(payload); return packet->encode(size, (char*&)payload); } @@ -1087,14 +1103,14 @@ int SrsPacket::encode(int& psize, char*& ppayload) if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) { srs_error("initialize the stream failed. ret=%d", ret); - delete[] payload; + srs_freepa(payload); return ret; } } if ((ret = encode_packet(&stream)) != ERROR_SUCCESS) { srs_error("encode the packet failed. ret=%d", ret); - delete[] payload; + srs_freepa(payload); return ret; } @@ -1132,10 +1148,7 @@ SrsConnectAppPacket::SrsConnectAppPacket() SrsConnectAppPacket::~SrsConnectAppPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } + srs_freep(command_object); } int SrsConnectAppPacket::decode(SrsStream* stream) @@ -1189,15 +1202,8 @@ SrsConnectAppResPacket::SrsConnectAppResPacket() SrsConnectAppResPacket::~SrsConnectAppResPacket() { - if (props) { - delete props; - props = NULL; - } - - if (info) { - delete info; - info = NULL; - } + srs_freep(props); + srs_freep(info); } int SrsConnectAppResPacket::get_perfer_cid() @@ -1259,10 +1265,7 @@ SrsCreateStreamPacket::SrsCreateStreamPacket() SrsCreateStreamPacket::~SrsCreateStreamPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } + srs_freep(command_object); } int SrsCreateStreamPacket::decode(SrsStream* stream) @@ -1305,10 +1308,7 @@ SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, doubl SrsCreateStreamResPacket::~SrsCreateStreamResPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } + srs_freep(command_object); } int SrsCreateStreamResPacket::get_perfer_cid() @@ -1365,10 +1365,12 @@ SrsFMLEStartPacket::SrsFMLEStartPacket() { command_name = RTMP_AMF0_COMMAND_CREATE_STREAM; transaction_id = 0; + command_object = new SrsAmf0Null(); } SrsFMLEStartPacket::~SrsFMLEStartPacket() { + srs_freep(command_object); } int SrsFMLEStartPacket::decode(SrsStream* stream) @@ -1380,8 +1382,8 @@ int SrsFMLEStartPacket::decode(SrsStream* stream) return ret; } if (command_name.empty() - || command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM - || command_name != RTMP_AMF0_COMMAND_FC_PUBLISH + || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM + && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH) ) { ret = ERROR_RTMP_AMF0_DECODE; srs_error("amf0 decode FMLE start command_name failed. " @@ -1394,6 +1396,11 @@ int SrsFMLEStartPacket::decode(SrsStream* stream) return ret; } + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret); + return ret; + } + if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret); return ret; @@ -1414,14 +1421,8 @@ SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id) SrsFMLEStartResPacket::~SrsFMLEStartResPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } - if (args) { - delete args; - args = NULL; - } + srs_freep(command_object); + srs_freep(args); } int SrsFMLEStartResPacket::get_perfer_cid() @@ -1474,6 +1475,59 @@ int SrsFMLEStartResPacket::encode_packet(SrsStream* stream) return ret; } +SrsPublishPacket::SrsPublishPacket() +{ + command_name = RTMP_AMF0_COMMAND_PUBLISH; + transaction_id = 0; + command_object = new SrsAmf0Null(); + type = "live"; +} + +SrsPublishPacket::~SrsPublishPacket() +{ + srs_freep(command_object); +} + +int SrsPublishPacket::decode(SrsStream* stream) +{ + int ret = ERROR_SUCCESS; + + if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode publish command_name failed. ret=%d", ret); + return ret; + } + if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) { + ret = ERROR_RTMP_AMF0_DECODE; + srs_error("amf0 decode publish command_name failed. " + "command_name=%s, ret=%d", command_name.c_str(), ret); + return ret; + } + + if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) { + srs_error("amf0 decode publish transaction_id failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) { + srs_error("amf0 decode publish command_object failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) { + srs_error("amf0 decode publish stream_name failed. ret=%d", ret); + return ret; + } + + if ((ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) { + srs_error("amf0 decode publish type failed. ret=%d", ret); + return ret; + } + + srs_info("amf0 decode publish packet success"); + + return ret; +} + SrsPlayPacket::SrsPlayPacket() { command_name = RTMP_AMF0_COMMAND_PLAY; @@ -1487,10 +1541,7 @@ SrsPlayPacket::SrsPlayPacket() SrsPlayPacket::~SrsPlayPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } + srs_freep(command_object); } int SrsPlayPacket::decode(SrsStream* stream) @@ -1551,15 +1602,8 @@ SrsPlayResPacket::SrsPlayResPacket() SrsPlayResPacket::~SrsPlayResPacket() { - if (command_object) { - delete command_object; - command_object = NULL; - } - - if (desc) { - delete desc; - desc = NULL; - } + srs_freep(command_object); + srs_freep(desc); } int SrsPlayResPacket::get_perfer_cid() @@ -1621,10 +1665,7 @@ SrsOnBWDonePacket::SrsOnBWDonePacket() SrsOnBWDonePacket::~SrsOnBWDonePacket() { - if (args) { - delete args; - args = NULL; - } + srs_freep(args); } int SrsOnBWDonePacket::get_perfer_cid() @@ -1680,15 +1721,8 @@ SrsOnStatusCallPacket::SrsOnStatusCallPacket() SrsOnStatusCallPacket::~SrsOnStatusCallPacket() { - if (args) { - delete args; - args = NULL; - } - - if (data) { - delete data; - data = NULL; - } + srs_freep(args); + srs_freep(data); } int SrsOnStatusCallPacket::get_perfer_cid() @@ -1748,10 +1782,7 @@ SrsOnStatusDataPacket::SrsOnStatusDataPacket() SrsOnStatusDataPacket::~SrsOnStatusDataPacket() { - if (data) { - delete data; - data = NULL; - } + srs_freep(data); } int SrsOnStatusDataPacket::get_perfer_cid() diff --git a/trunk/src/core/srs_core_protocol.hpp b/trunk/src/core/srs_core_protocol.hpp index e76ac20ea..386d84c11 100755 --- a/trunk/src/core/srs_core_protocol.hpp +++ b/trunk/src/core/srs_core_protocol.hpp @@ -160,22 +160,29 @@ struct SrsMessageHeader */ int32_t payload_length; /** - * Four-byte field that contains a timestamp of the message. + * Three-byte field that contains a timestamp delta of the message. * The 4 bytes are packed in the big-endian order. */ - int32_t timestamp; + int32_t timestamp_delta; /** * Three-byte field that identifies the stream of the message. These * bytes are set in big-endian format. */ int32_t stream_id; + /** + * Four-byte field that contains a timestamp of the message. + * The 4 bytes are packed in the big-endian order. + */ + int32_t timestamp; + SrsMessageHeader(); virtual ~SrsMessageHeader(); bool is_amf0_command(); bool is_amf3_command(); bool is_window_ackledgement_size(); + bool is_set_chunk_size(); }; /** @@ -207,6 +214,10 @@ public: * partially read message. */ SrsMessage* msg; + /** + * decoded msg count, to identify whether the chunk stream is fresh. + */ + int64_t msg_count; public: SrsChunkStream(int _cid); virtual ~SrsChunkStream(); @@ -257,7 +268,7 @@ public: * @stream_id, the id of stream which is created by createStream. * @remark, user never free the pkt, the message will auto free it. */ - virtual void set_packet(SrsPacket* pkt, int stream_id = 0); + virtual void set_packet(SrsPacket* pkt, int stream_id); /** * encode the packet to message payload bytes. * @remark there exists empty packet, so maybe the payload is NULL. @@ -443,6 +454,7 @@ protected: public: std::string command_name; double transaction_id; + SrsAmf0Null* command_object; std::string stream_name; public: SrsFMLEStartPacket(); @@ -479,6 +491,35 @@ protected: virtual int encode_packet(SrsStream* stream); }; +/** +* FMLE/flash publish +* 4.2.6. Publish +* The client sends the publish command to publish a named stream to the +* server. Using this name, any client can play this stream and receive +* the published audio, video, and data messages. +*/ +class SrsPublishPacket : public SrsPacket +{ +private: + typedef SrsPacket super; +protected: + virtual const char* get_class_name() + { + return CLASS_NAME_STRING(SrsPublishPacket); + } +public: + std::string command_name; + double transaction_id; + SrsAmf0Null* command_object; + std::string stream_name; + std::string type; +public: + SrsPublishPacket(); + virtual ~SrsPublishPacket(); +public: + virtual int decode(SrsStream* stream); +}; + /** * 4.2.1. play * The client sends this command to the server to play a stream. @@ -566,7 +607,7 @@ protected: /** * onStatus command, AMF0 Call -* @remark, user must set the stream_id in header. +* @remark, user must set the stream_id by SrsMessage.set_packet(). */ class SrsOnStatusCallPacket : public SrsPacket { @@ -596,7 +637,7 @@ protected: /** * onStatus data, AMF0 Data -* @remark, user must set the stream_id in header. +* @remark, user must set the stream_id by SrsMessage.set_packet(). */ class SrsOnStatusDataPacket : public SrsPacket { @@ -624,7 +665,7 @@ protected: /** * AMF0Data RtmpSampleAccess -* @remark, user must set the stream_id in header. +* @remark, user must set the stream_id by SrsMessage.set_packet(). */ class SrsSampleAccessPacket : public SrsPacket { diff --git a/trunk/src/core/srs_core_rtmp.cpp b/trunk/src/core/srs_core_rtmp.cpp index cccb698f6..408dd9c40 100755 --- a/trunk/src/core/srs_core_rtmp.cpp +++ b/trunk/src/core/srs_core_rtmp.cpp @@ -51,8 +51,12 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #define StatusCodeConnectSuccess "NetConnection.Connect.Success" #define StatusCodeStreamReset "NetStream.Play.Reset" #define StatusCodeStreamStart "NetStream.Play.Start" +#define StatusCodePublishStart "NetStream.Publish.Start" #define StatusCodeDataStart "NetStream.Data.Start" +// FMLE +#define RTMP_AMF0_COMMAND_ON_FC_PUBLISH "onFCPublish" + SrsRequest::SrsRequest() { objectEncoding = RTMP_SIG_AMF0_VER; @@ -111,10 +115,7 @@ SrsRtmp::SrsRtmp(st_netfd_t client_stfd) SrsRtmp::~SrsRtmp() { - if (protocol) { - delete protocol; - protocol = NULL; - } + srs_freep(protocol); } int SrsRtmp::handshake() @@ -210,7 +211,7 @@ int SrsRtmp::set_window_ack_size(int ack_size) SrsSetWindowAckSizePacket* pkt = new SrsSetWindowAckSizePacket(); pkt->ackowledgement_window_size = ack_size; - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send ack size message failed. ret=%d", ret); @@ -230,7 +231,7 @@ int SrsRtmp::set_peer_bandwidth(int bandwidth, int type) pkt->bandwidth = bandwidth; pkt->type = type; - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send set bandwidth message failed. ret=%d", ret); @@ -249,23 +250,23 @@ int SrsRtmp::response_connect_app(SrsRequest* req) SrsMessage* msg = new SrsMessage(); SrsConnectAppResPacket* pkt = new SrsConnectAppResPacket(); - pkt->props->properties["fmsVer"] = new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER); - pkt->props->properties["capabilities"] = new SrsAmf0Number(123); - pkt->props->properties["mode"] = new SrsAmf0Number(1); + pkt->props->set("fmsVer", new SrsAmf0String("FMS/"RTMP_SIG_FMS_VER)); + pkt->props->set("capabilities", new SrsAmf0Number(127)); + pkt->props->set("mode", new SrsAmf0Number(1)); - pkt->info->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus); - pkt->info->properties[StatusCode] = new SrsAmf0String(StatusCodeConnectSuccess); - pkt->info->properties[StatusDescription] = new SrsAmf0String("Connection succeeded"); - pkt->info->properties["objectEncoding"] = new SrsAmf0Number(req->objectEncoding); + pkt->info->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->info->set(StatusCode, new SrsAmf0String(StatusCodeConnectSuccess)); + pkt->info->set(StatusDescription, new SrsAmf0String("Connection succeeded")); + pkt->info->set("objectEncoding", new SrsAmf0Number(req->objectEncoding)); SrsASrsAmf0EcmaArray* data = new SrsASrsAmf0EcmaArray(); - pkt->info->properties["data"] = data; + pkt->info->set("data", data); - data->properties["version"] = new SrsAmf0String(RTMP_SIG_FMS_VER); - data->properties["server"] = new SrsAmf0String(RTMP_SIG_SRS_NAME); - data->properties["srs_url"] = new SrsAmf0String(RTMP_SIG_SRS_URL); - data->properties["srs_version"] = new SrsAmf0String(RTMP_SIG_SRS_VERSION); + data->set("version", new SrsAmf0String(RTMP_SIG_FMS_VER)); + data->set("server", new SrsAmf0String(RTMP_SIG_SRS_NAME)); + data->set("srs_url", new SrsAmf0String(RTMP_SIG_SRS_URL)); + data->set("srs_version", new SrsAmf0String(RTMP_SIG_SRS_VERSION)); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send connect app response message failed. ret=%d", ret); @@ -283,7 +284,7 @@ int SrsRtmp::on_bw_done() SrsMessage* msg = new SrsMessage(); SrsOnBWDonePacket* pkt = new SrsOnBWDonePacket(); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send onBWDone message failed. ret=%d", ret); @@ -345,7 +346,7 @@ int SrsRtmp::set_chunk_size(int chunk_size) SrsSetChunkSizePacket* pkt = new SrsSetChunkSizePacket(); pkt->chunk_size = chunk_size; - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send set chunk size message failed. ret=%d", ret); @@ -367,7 +368,7 @@ int SrsRtmp::start_play(int stream_id) pkt->event_type = SrcPCUCStreamBegin; pkt->event_data = stream_id; - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send PCUC(StreamBegin) message failed. ret=%d", ret); @@ -381,14 +382,13 @@ int SrsRtmp::start_play(int stream_id) SrsMessage* msg = new SrsMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); - pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus); - pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamReset); - pkt->data->properties[StatusDescription] = new SrsAmf0String("Playing and resetting stream."); - pkt->data->properties[StatusDetails] = new SrsAmf0String("stream"); - pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID); + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamReset)); + pkt->data->set(StatusDescription, new SrsAmf0String("Playing and resetting stream.")); + pkt->data->set(StatusDetails, new SrsAmf0String("stream")); + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID)); - msg->header.stream_id = stream_id; - msg->set_packet(pkt); + msg->set_packet(pkt, stream_id); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret); @@ -402,14 +402,13 @@ int SrsRtmp::start_play(int stream_id) SrsMessage* msg = new SrsMessage(); SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); - pkt->data->properties[StatusLevel] = new SrsAmf0String(StatusLevelStatus); - pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeStreamStart); - pkt->data->properties[StatusDescription] = new SrsAmf0String("Started playing stream."); - pkt->data->properties[StatusDetails] = new SrsAmf0String("stream"); - pkt->data->properties[StatusClientId] = new SrsAmf0String(RTMP_SIG_CLIENT_ID); + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeStreamStart)); + pkt->data->set(StatusDescription, new SrsAmf0String("Started playing stream.")); + pkt->data->set(StatusDetails, new SrsAmf0String("stream")); + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID)); - msg->header.stream_id = stream_id; - msg->set_packet(pkt); + msg->set_packet(pkt, stream_id); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Play.Reset) message failed. ret=%d", ret); @@ -423,8 +422,7 @@ int SrsRtmp::start_play(int stream_id) SrsMessage* msg = new SrsMessage(); SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket(); - msg->header.stream_id = stream_id; - msg->set_packet(pkt); + msg->set_packet(pkt, stream_id); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send |RtmpSampleAccess(false, false) message failed. ret=%d", ret); @@ -438,10 +436,9 @@ int SrsRtmp::start_play(int stream_id) SrsMessage* msg = new SrsMessage(); SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket(); - pkt->data->properties[StatusCode] = new SrsAmf0String(StatusCodeDataStart); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodeDataStart)); - msg->header.stream_id = stream_id; - msg->set_packet(pkt); + msg->set_packet(pkt, stream_id); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send onStatus(NetStream.Data.Start) message failed. ret=%d", ret); @@ -463,7 +460,7 @@ int SrsRtmp::identify_create_stream_client(SrsCreateStreamPacket* req, int strea SrsMessage* msg = new SrsMessage(); SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(req->transaction_id, stream_id); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send createStream response message failed. ret=%d", ret); @@ -519,7 +516,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id SrsMessage* msg = new SrsMessage(); SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send releaseStream response message failed. ret=%d", ret); @@ -547,7 +544,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id SrsMessage* msg = new SrsMessage(); SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send FCPublish response message failed. ret=%d", ret); @@ -575,7 +572,7 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id SrsMessage* msg = new SrsMessage(); SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, stream_id); - msg->set_packet(pkt); + msg->set_packet(pkt, 0); if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { srs_error("send createStream response message failed. ret=%d", ret); @@ -584,6 +581,64 @@ int SrsRtmp::identify_fmle_publish_client(SrsFMLEStartPacket* req, int stream_id srs_info("send createStream response message success."); } + // publish + if (true) { + SrsMessage* msg = NULL; + SrsPublishPacket* pkt = NULL; + if ((ret = srs_rtmp_expect_message(protocol, &msg, &pkt)) != ERROR_SUCCESS) { + srs_error("recv publish message failed. ret=%d", ret); + return ret; + } + srs_info("recv publish request message success."); + + SrsAutoFree(SrsMessage, msg, false); + } + // publish response onFCPublish(NetStream.Publish.Start) + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH; + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart)); + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream.")); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", ret); + return ret; + } + srs_info("send onFCPublish(NetStream.Publish.Start) message success."); + } + // publish response onStatus(NetStream.Publish.Start) + if (true) { + SrsMessage* msg = new SrsMessage(); + SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket(); + + pkt->data->set(StatusLevel, new SrsAmf0String(StatusLevelStatus)); + pkt->data->set(StatusCode, new SrsAmf0String(StatusCodePublishStart)); + pkt->data->set(StatusDescription, new SrsAmf0String("Started publishing stream.")); + pkt->data->set(StatusClientId, new SrsAmf0String(RTMP_SIG_CLIENT_ID)); + + msg->set_packet(pkt, stream_id); + + if ((ret = protocol->send_message(msg)) != ERROR_SUCCESS) { + srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", ret); + return ret; + } + srs_info("send onStatus(NetStream.Publish.Start) message success."); + } + + while (true) { + SrsMessage* msg = NULL; + if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) { + srs_error("recv identify client message failed. ret=%d", ret); + return ret; + } + + SrsAutoFree(SrsMessage, msg, false); + } + return ret; } diff --git a/trunk/src/core/srs_core_server.cpp b/trunk/src/core/srs_core_server.cpp index 4454d514a..bc7dcb063 100755 --- a/trunk/src/core/srs_core_server.cpp +++ b/trunk/src/core/srs_core_server.cpp @@ -45,7 +45,7 @@ SrsServer::~SrsServer() { for (std::vector::iterator it = conns.begin(); it != conns.end(); ++it) { SrsConnection* conn = *it; - delete conn; + srs_freep(conn); } conns.clear(); } @@ -151,8 +151,8 @@ void SrsServer::remove(SrsConnection* conn) srs_info("conn removed. conns=%d", (int)conns.size()); // all connections are created by server, - // so we delete it here. - delete conn; + // so we free it here. + srs_freep(conn); } int SrsServer::accept_client(st_netfd_t client_stfd)