From c1f98ddcc862372dccebfab11734e3b7eafe3a3b Mon Sep 17 00:00:00 2001
From: liulichuan <liulichuan@kuaishou.com>
Date: Mon, 11 May 2020 00:50:14 +0800
Subject: [PATCH] supprot LAS(live adaptive stream)

Change-Id: I2f9fd889fc924a3b080667bd2114f4632be461a0
---
 trunk/auto/auto_headers.sh            |   6 +
 trunk/auto/options.sh                 |   8 +
 trunk/configure                       |   5 +
 trunk/src/app/srs_app_http_stream.cpp |  18 +-
 trunk/src/app/srs_app_source.cpp      | 605 +++++++++++++++++++++++++-
 trunk/src/app/srs_app_source.hpp      | 136 ++++++
 trunk/src/kernel/srs_kernel_codec.cpp |  16 +
 trunk/src/kernel/srs_kernel_codec.hpp |   7 +
 trunk/src/kernel/srs_kernel_flv.cpp   |  41 ++
 trunk/src/kernel/srs_kernel_flv.hpp   |  11 +
 10 files changed, 843 insertions(+), 10 deletions(-)

diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh
index 60c00db9a..3b1af85fb 100755
--- a/trunk/auto/auto_headers.sh
+++ b/trunk/auto/auto_headers.sh
@@ -63,6 +63,12 @@ else
     srs_undefine_macro "SRS_HDS" $SRS_AUTO_HEADERS_H
 fi
 
+if [ $SRS_LAS = YES ]; then
+    srs_define_macro "SRS_LAS" $SRS_AUTO_HEADERS_H
+else
+    srs_undefine_macro "SRS_LAS" $SRS_AUTO_HEADERS_H
+fi
+
 if [ $SRS_SRT = YES ]; then
     srs_define_macro "SRS_SRT" $SRS_AUTO_HEADERS_H
 else
diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh
index 43f0e701a..8f7186542 100755
--- a/trunk/auto/options.sh
+++ b/trunk/auto/options.sh
@@ -16,6 +16,7 @@ help=no
 ################################################################
 # feature options
 SRS_HDS=NO
+SRS_LAS=NO
 SRS_SRT=NO
 SRS_RTC=YES
 SRS_GB28181=NO
@@ -146,6 +147,7 @@ Features:
 
   --ssl=on|off              Whether build the rtmp complex handshake, requires openssl-devel installed.
   --hds=on|off              Whether build the hds streaming, mux RTMP to F4M/F4V files.
+  --las=on|off              Whether use LAS for http-flv adaptive stream.
   --stream-caster=on|off    Whether build the stream caster to serve other stream over other protocol.
   --stat=on|off             Whether build the the data statistic, for http api.
   --librtmp=on|off          Whether build the srs-librtmp, library for client.
@@ -279,6 +281,10 @@ function parse_user_option() {
         --without-hds)                  SRS_HDS=NO                  ;;
         --hds)                          if [[ $value == off ]]; then SRS_HDS=NO; else SRS_HDS=YES; fi    ;;
 
+        --with-las)                     SRS_LAS=YES                 ;;
+        --without-las)                  SRS_LAS=NO                  ;;
+        --las)                          if [[ $value == off ]]; then SRS_LAS=NO; else SRS_LAS=YES; fi    ;;
+
         --with-nginx)                   SRS_NGINX=YES               ;;
         --without-nginx)                SRS_NGINX=NO                ;;
         --nginx)                        if [[ $value == off ]]; then SRS_NGINX=NO; else SRS_NGINX=YES; fi    ;;
@@ -531,6 +537,7 @@ function regenerate_options() {
     SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}"
     if [ $SRS_HLS = YES ]; then             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hls=on"; else             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hls=off"; fi
     if [ $SRS_HDS = YES ]; then             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hds=on"; else             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --hds=off"; fi
+    if [ $SRS_LAS = YES ]; then             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --las=on"; else             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --las=off"; fi
     if [ $SRS_DVR = YES ]; then             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --dvr=on"; else             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --dvr=off"; fi
     if [ $SRS_SSL = YES ]; then             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --ssl=on"; else             SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --ssl=off"; fi
     if [ $SRS_USE_SYS_SSL = YES ]; then     SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --sys-ssl=on"; else         SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --sys-ssl=off"; fi
@@ -629,6 +636,7 @@ function check_option_conflicts() {
 
     # check variable neccessary
     if [ $SRS_HDS = RESERVED ]; then echo "you must specifies the hds, see: ./configure --help"; __check_ok=NO; fi
+    if [ $SRS_LAS = RESERVED ]; then echo "you must specifies the las, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_SSL = RESERVED ]; then echo "you must specifies the ssl, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi
     if [ $SRS_UTEST = RESERVED ]; then echo "you must specifies the utest, see: ./configure --help"; __check_ok=NO; fi
diff --git a/trunk/configure b/trunk/configure
index 605fc2db8..c86c9ba17 100755
--- a/trunk/configure
+++ b/trunk/configure
@@ -679,6 +679,11 @@ if [ $SRS_HLS = YES ]; then
 else
     echo -e "${YELLOW}Warning: HLS is disabled.${BLACK}"
 fi
+if [ $SRS_LAS = YES ]; then
+    echo -e "${YELLOW}Experiment: LAS is enabled.${BLACK}"
+else
+    echo -e "${GREEN}Warning: LAS is disabled.${BLACK}"
+fi
 if [ $SRS_STREAM_CASTER = YES ]; then
     echo -e "${YELLOW}Experiment: StreamCaster is enabled.${BLACK}"
 else
diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp
index 9bbb4a6b1..2248f0091 100755
--- a/trunk/src/app/srs_app_http_stream.cpp
+++ b/trunk/src/app/srs_app_http_stream.cpp
@@ -587,13 +587,29 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
     // create consumer of souce, ignore gop cache, use the audio gop cache.
     SrsConsumer* consumer = NULL;
     SrsAutoFree(SrsConsumer, consumer);
+#ifdef SRS_LAS
+    int64_t lasspts = 0;
+    bool only_audio = false;
+    if (!r->query_get("startPts").empty()) {
+        lasspts = atoi(r->query_get("startPts").c_str());
+    }
+    if (r->query_get("onlyAudio") == "true") {
+        only_audio = true;
+    }
+    if ((err = source->create_consumer(NULL, consumer, lasspts, only_audio)) != srs_success) {
+        return srs_error_wrap(err, "create consumer");
+    }
+    if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache(), lasspts)) != srs_success) {
+        return srs_error_wrap(err, "dumps consumer");
+    }
+#else
     if ((err = source->create_consumer(NULL, consumer)) != srs_success) {
         return srs_error_wrap(err, "create consumer");
     }
     if ((err = source->consumer_dumps(consumer, true, true, !enc->has_cache())) != srs_success) {
         return srs_error_wrap(err, "dumps consumer");
     }
-
+#endif  
     SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
     SrsAutoFree(SrsPithyPrint, pprint);
     
diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp
index 2fe07696c..12583e1af 100755
--- a/trunk/src/app/srs_app_source.cpp
+++ b/trunk/src/app/srs_app_source.cpp
@@ -418,6 +418,34 @@ ISrsWakable::~ISrsWakable()
 {
 }
 
+#ifdef SRS_LAS
+SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c, int64_t lasspts, bool only_audio)
+{
+    source = s;
+    conn = c;
+    paused = false;
+    jitter = new SrsRtmpJitter();
+    queue = new SrsMessageQueue();
+    should_update_source_id = false;
+
+#ifdef SRS_PERF_QUEUE_COND_WAIT
+    mw_wait = srs_cond_new();
+    mw_min_msgs = 0;
+    mw_duration = 0;
+    mw_waiting = false;
+#endif
+
+    this->lasspts = lasspts;
+    this->only_audio = only_audio;
+    have_first_msg = false;
+    have_keyframe = false;
+    frames_dropped_by_lasspts = 0;
+    frames_dropped_by_first_keyframe = 0;
+    //TODO use config
+    lasspts_max_wait_time = 3000*1000;
+    create_time = srs_get_system_time();
+}
+#else
 SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
 {
     source = s;
@@ -434,6 +462,7 @@ SrsConsumer::SrsConsumer(SrsSource* s, SrsConnection* c)
     mw_waiting = false;
 #endif
 }
+#endif
 
 SrsConsumer::~SrsConsumer()
 {
@@ -444,6 +473,16 @@ SrsConsumer::~SrsConsumer()
 #ifdef SRS_PERF_QUEUE_COND_WAIT
     srs_cond_destroy(mw_wait);
 #endif
+
+#ifdef SRS_LAS
+  std::list<SrsSharedPtrMessage*>::iterator it;
+  for (it = headers_queue.begin(); it != headers_queue.end(); ++it) {
+    SrsSharedPtrMessage* msg = *it;
+    srs_freep(msg);
+  }
+  headers_queue.clear();
+#endif
+
 }
 
 void SrsConsumer::set_queue_size(srs_utime_t queue_size)
@@ -464,7 +503,61 @@ int64_t SrsConsumer::get_time()
 srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
 {
     srs_error_t err = srs_success;
-    
+
+#ifdef SRS_LAS
+    //filter audio frame if only_audio is set
+    if (only_audio && !shared_msg->is_audio()) {
+        return err;
+    }
+
+    //cache header and wait data msg to set timestamp
+    if (shared_msg->is_header) {
+        headers_queue.push_back(shared_msg->copy());
+        srs_info("got msg to headers_queue, msg:", shared_msg->to_str());
+        return err;
+    }
+
+    // filter the begin data msg
+    if (!have_first_msg && lasspts > 0) {
+        // spts_max_wait_time < 0 means wait until shared_msg->pts >= expect_start_pts
+        if(shared_msg->is_audio() || shared_msg->is_keyframe) {
+            if (shared_msg->pts < lasspts && 
+                (lasspts_max_wait_time < 0 || srs_get_system_time() - create_time < lasspts_max_wait_time)) {
+                frames_dropped_by_lasspts++;
+                return err;
+            }
+        }
+    }
+
+    //filter first video data msg must be key frame
+    if (!have_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe) {
+        frames_dropped_by_first_keyframe++;
+        return err;
+    }
+    // now got first data frame
+    if (!have_first_msg) {
+        srs_trace("got first data msg, droped=%d, msg:%s", frames_dropped_by_lasspts, shared_msg->to_str().c_str());
+        have_first_msg = true;
+    }
+    // now got first key frame
+    if (!have_keyframe && shared_msg->is_keyframe) {
+        srs_trace("got first key frame, droped=%d, msg:%s", frames_dropped_by_first_keyframe, shared_msg->to_str().c_str());
+        have_keyframe = true;
+    }
+
+    // send cached headers, las mast use atc
+    while (headers_queue.size() > 0) {
+        SrsSharedPtrMessage* header_msg = headers_queue.front();
+        headers_queue.pop_front();
+        header_msg->timestamp = shared_msg->timestamp;
+        //no need to copy any more, but
+        //carefull。。。 enqueue may free header_msg, do not use it after enqueue.
+        if ((err = queue->enqueue(header_msg, NULL)) != ERROR_SUCCESS) {
+            return err;
+        }
+    }
+#endif
+
     SrsSharedPtrMessage* msg = shared_msg->copy();
 
     if (!atc) {
@@ -720,6 +813,430 @@ bool SrsGopCache::pure_audio()
     return cached_video_count == 0;
 }
 
+
+#ifdef SRS_LAS
+//-----------------SRS_LAS--------------
+
+SrsLasCache::SrsLasCache(int cache_dur) {
+    max_cache_dur = cache_dur;
+    //we use max max_cache_dur to calculate max cache size
+    //vfr 60fps, afr 45 fps, and double it size
+    max_cache_size = cache_dur * (60+45) * 2 /1000;
+    enable_gop_cache = true;
+    cached_video_header = NULL;
+    cached_audio_header = NULL;
+    cached_metadata_header = NULL;
+    wait_keyframe = true;
+    last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
+}
+
+SrsLasCache::~SrsLasCache()
+{
+    clear();
+}
+
+void SrsLasCache::dispose()
+{
+    clear();
+}
+
+void SrsLasCache::set(bool enabled) {
+  enable_gop_cache = enabled;
+
+  if (!enabled) {
+    //srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
+    clear();
+    return;
+  }
+
+  srs_info("enable srs cache");
+}
+
+bool SrsLasCache::enabled()
+{
+    return enable_gop_cache;
+}
+
+srs_error_t SrsLasCache::cache(SrsSharedPtrMessage* shared_msg) 
+{
+    srs_error_t err = srs_success;
+
+    if (!enable_gop_cache) {
+        return err;
+    }
+    //if wait_keyframe and first video is not key or video sequence header,do not cache it
+    if (wait_keyframe && shared_msg->is_video() && !shared_msg->is_keyframe && !shared_msg->is_header) {
+        srs_warn("wait idr frame, first video is not idr and sps");
+        return err;
+    }
+
+
+    //update cache_idr_index|cache_header_index|last av index
+    if (shared_msg->is_video()) {
+        if (shared_msg->is_keyframe) {
+            //if idr pts go back clear data,but (keep headers),
+            //video pts may go back, but not IDR frame
+            if(cache_idr_index.size() > 0 && shared_msg->pts < cache_vec[cache_idr_index.back()]->pts) {
+                srs_warn("video key frame pts rollback,erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str());
+                erase_data(cache_vec.size());
+            }
+            wait_keyframe = false;
+            //save key frame index in cache_vec
+            cache_idr_index.push_back(cache_vec.size());
+        }
+
+        if(!shared_msg->is_header) {
+            //save last video index in cache_vec
+            last_video_index = cache_vec.size();
+        } else {
+            //save video header index in cache_vec
+            cache_header_index.push_back(cache_vec.size());
+        }
+
+    } else if (shared_msg->is_audio()) {
+        if (!shared_msg->is_header) {
+            //if no video, use audio timestamp, clear data if audio timestamp go back
+            if (last_video_index < 0 && last_audio_index > 0 && shared_msg->pts < cache_vec[last_audio_index]->pts) {
+                srs_warn("audio frame pts rollback, erase all cache data, curent pts=%lld, cache_status=%s",shared_msg->pts, to_str().c_str());
+                erase_data(cache_vec.size());
+            }
+            //save last audio index in cache_vec
+            last_audio_index = cache_vec.size();
+        } else {
+            //save audio header index in cache_vec
+            cache_header_index.push_back(cache_vec.size());
+        }
+    } else {
+        //save metadata index in cache_vec
+        cache_header_index.push_back(cache_vec.size());
+    }
+    cache_vec.push_back(shared_msg->copy());
+
+    //there is no need to call drop old cache too often
+    // we call it only when key frame come, or overflow packet size limit
+    if (shared_msg->is_keyframe || (int)cache_vec.size() > max_cache_size) {
+        try_drop_old_cache();
+    }
+
+    return err;
+}
+
+srs_error_t SrsLasCache::dump(SrsConsumer* consumer,  bool ds, bool dm, int64_t lasspts)
+{
+    srs_error_t err = srs_success;
+    if (empty()) {
+        return err;
+    }
+    //the first msg index should dump in cache_vec, set to end as defult,
+    int begin_index = cache_vec.size();
+
+    //lasspts > 0 means give the buffer form first pts>=lasspts
+    if (lasspts > 0) {
+        if (last_video_index > 0) {
+            assert(cache_idr_index.size() > 0); //we use wait key, so if have video, must have idr
+            // get first key frame (pts>=lasspts) index timestamp in cache_vec!
+            for (int i = (int)cache_idr_index.size() - 1; i >= 0; i--) {
+                if (cache_vec[cache_idr_index[i]]->pts >= lasspts) {
+                    begin_index = cache_idr_index[i];
+                    continue;
+                }
+                break;
+            }
+        } else if (last_audio_index > 0){
+            // only audio use audio timestamp
+            assert(cache_vec.size() > last_audio_index);
+            for (int i = last_audio_index; i >= 0; i--) {
+                if (cache_vec[i]->is_header  || !cache_vec[i]->is_audio()) {
+                    continue;
+                }
+                if (cache_vec[i]->pts >= lasspts) {
+                    begin_index = i;
+                    continue;
+                }
+                break;
+            }
+        }
+    } else if (lasspts == 0) { //lasspts == 0 means from the newest keyframe or audio pkt
+        if (last_video_index > 0) {
+            assert(cache_idr_index.size() > 0);
+            begin_index = cache_idr_index.back();
+        } else if (last_audio_index > 0) {
+            assert(cache_vec.size() > last_audio_index);
+            for (int i = last_audio_index; i >= 0; i--) {
+                if (cache_vec[i]->is_header || !cache_vec[i]->is_audio()) {
+                    continue;
+                }
+                begin_index = i;
+                break;
+            }
+        }
+    } else {  // lasspts < 0 means from the nearest frame pts to newest_pts - |lasspts|
+        if (last_video_index > 0) {
+            assert(cache_idr_index.size() > 0);
+            int64_t point_pts = cache_vec[last_video_index]->pts + lasspts;
+            begin_index = cache_idr_index.back();
+            int64_t nearest_distance = abs(cache_vec[begin_index]->pts - point_pts);
+
+            for (int i = (int)cache_idr_index.size() - 2; i >= 0; i--) {
+                int64_t distance = abs(cache_vec[cache_idr_index[i]]->pts - point_pts);
+                if (distance < nearest_distance) {
+                    nearest_distance = distance;
+                    begin_index = cache_idr_index[i];
+                } else {  //timestamp is monotonically increasing, so the here is the nearest_distance.
+                    break;
+                }
+            }
+        } else if (last_audio_index > 0) {
+            // only audio, find nearest audio frame  pts to newest_pts - |lasspts|
+            int64_t point_pts = cache_vec[last_audio_index]->pts + lasspts;
+            int64_t nearest_distance = abs(cache_vec[last_audio_index]->pts - point_pts);
+            for (int i = last_audio_index - 1; i >= 0; i--) {
+                if (cache_vec[i]->is_header) {
+                    continue;  // do not care header.
+                }
+
+                int64_t distance = abs(cache_vec[i]->pts - point_pts);
+                if (distance < nearest_distance) {
+                    nearest_distance = distance;
+                    begin_index = i;
+                } else {  // timestamp is monotonically increasing, so the here is the nearest_distance.
+                    break;
+                }
+            }
+        }
+    }
+    return do_dump(consumer, ds, dm, begin_index);
+}
+
+
+srs_error_t SrsLasCache::do_dump(SrsConsumer* consumer,  bool ds, bool dm, int begin_index) {
+
+  srs_error_t err = srs_success;
+  //set oldest header first
+  SrsSharedPtrMessage* vh = cached_video_header;
+  SrsSharedPtrMessage* ah = cached_audio_header;
+  SrsSharedPtrMessage* mh = cached_metadata_header;
+  //check if have header before begin_index, if do, use new header
+  if(cache_header_index.size() > 0) {
+    for (size_t i = 0; i < cache_header_index.size(); i++) {
+        if (cache_header_index[i] < begin_index) {
+            if(cache_vec[cache_header_index[i]]->is_video()) {
+                vh = cache_vec[cache_header_index[i]];
+            } else if(cache_vec[cache_header_index[i]]->is_audio()) {
+                ah = cache_vec[cache_header_index[i]];
+            } else {
+                mh = cache_vec[cache_header_index[i]];
+            }
+        }
+        break;
+    }
+  }
+
+  SrsSharedPtrMessage* first_msg = NULL;
+  for (size_t i = begin_index; i < cache_vec.size(); i++) {
+    if(first_msg == NULL) {
+        //set header timestamp same with first data timestamp
+        first_msg = cache_vec[begin_index];
+        if (mh && dm) { //LAS must use atc
+            mh->timestamp = first_msg->timestamp;
+            consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF);
+        }
+        if (vh && ds) {
+            vh->timestamp = first_msg->timestamp;
+            consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF);
+        }
+        if (ah && ds) {
+            ah->timestamp = first_msg->timestamp;
+            consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF);
+        }
+    }
+    if(cache_vec[i]->is_header) {
+        if ((!ds && cache_vec[i]->is_av()) || (!dm && !cache_vec[i]->is_av())) {
+            continue;
+        }
+    }
+    consumer->enqueue(cache_vec[i], true, SrsRtmpJitterAlgorithmOFF);
+  }
+
+  if (first_msg != NULL) {
+    if (first_msg->is_video()) {
+      srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg_pts=%lld cache_status:%s",
+                cache_vec[last_video_index]->pts - first_msg->pts, begin_index, first_msg->pts, to_str().c_str());
+    } else if (first_msg->is_audio()) {
+      srs_trace("dump_cache, dump_dur=%d begin_index=%d, first_msg=%lld cache_status:%s",
+                cache_vec[last_audio_index]->pts  - first_msg->pts, begin_index, first_msg->pts, to_str().c_str());
+    } else {
+      srs_trace("dump_cache, but not first from av, begin_index=%d, cache_status:%s",
+                begin_index, first_msg->pts, begin_index, to_str().c_str());
+    }
+  } else { //if no data msg dump, just set headers
+      if (mh && dm) {
+          consumer->enqueue(mh, true, SrsRtmpJitterAlgorithmOFF);
+      }
+      if (vh && ds) {
+          consumer->enqueue(vh, true, SrsRtmpJitterAlgorithmOFF);
+      }
+      if (ah && ds) {
+          consumer->enqueue(ah, true, SrsRtmpJitterAlgorithmOFF);
+      }
+      srs_trace("dump_cache, no data frame from cache, begin_index=%d cache_status:%s", begin_index, to_str().c_str());
+  }
+
+  return err;
+}
+
+void SrsLasCache::clear()
+{
+    // remove old cache and save headers
+    std::vector<SrsSharedPtrMessage*>::iterator it;
+    for (it = cache_vec.begin(); it != cache_vec.end(); it++) {
+        SrsSharedPtrMessage* msg = *it;
+        srs_freep(msg);
+    }
+    cache_vec.clear();
+    cache_idr_index.clear();
+    wait_keyframe = true;
+    srs_freep(cached_video_header);
+    srs_freep(cached_audio_header);
+    srs_freep(cached_metadata_header);
+    last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
+}
+ 
+void SrsLasCache::try_drop_old_cache()
+{
+    int remove_count = 0;
+    srs_info("before drop, status:%s", to_str().c_str());
+
+    if (last_video_index > 0) {
+        assert(cache_idr_index.size() > 0);
+        //keep at least one key frame
+        for (int i = 0; i < (int)cache_idr_index.size() - 1; i++) {
+            int index = cache_idr_index[i];
+            if (cache_vec[last_video_index]->pts - cache_vec[index]->pts >= max_cache_dur) {
+                continue;
+            }
+            // find first not overflow key frame
+            remove_count = index;
+            break;
+        }
+    } else if (last_audio_index > 0) { //only audio drop cache by audio duration
+        std::vector<SrsSharedPtrMessage*>::iterator it;
+        for (it = cache_vec.begin(); it != cache_vec.end(); ++it) {
+            SrsSharedPtrMessage* msg = *it;
+            if (!msg->is_header && msg->is_audio()) {
+                if (cache_vec[last_audio_index]->pts - msg->pts <= max_cache_dur) {
+                    break;
+                }
+            }
+            remove_count++;
+        }
+    }
+
+    if (remove_count > 0) {
+        erase_data(remove_count);
+    }
+    srs_info("after drop count=%d, status:%s", remove_count, to_str().c_str());
+    
+    //if still overflow packet size limit after shrink by audio or video duration, just clear all buffer
+    if ((int)cache_vec.size() > max_cache_size) {
+        erase_data(cache_vec.size());
+        srs_warn("packet overflow after drop by video or audio duration, clear all data, status:%s", to_str().c_str());
+    }
+}
+
+bool SrsLasCache::empty() {
+  return (cache_vec.empty() == 0 && cached_metadata_header == NULL &&
+          cached_video_header == NULL && cached_audio_header == NULL);
+}
+
+void SrsLasCache::erase_data(int count) {
+    // remove old cache and save headers
+    for (int i = 0; i < count; i++) {
+        SrsSharedPtrMessage* msg = cache_vec[i];
+        if (msg->is_header) {
+            if (msg->is_video()) {
+                srs_freep(cached_video_header);
+                cached_video_header = msg;
+            } else if (msg->is_audio()) {
+                srs_freep(cached_audio_header);
+                cached_audio_header = msg;
+            } else {
+                srs_freep(cached_metadata_header);
+                cached_metadata_header = msg;
+            }
+        } else {
+            srs_freep(msg);
+        }
+        //remove idr index if data removed
+        if (!cache_idr_index.empty() && cache_idr_index[0] <= i) {
+            cache_idr_index.erase(cache_idr_index.begin());
+        }
+        //remove header index if data removed
+        if (!cache_header_index.empty() && cache_header_index[0] <= i) {
+            cache_header_index.erase(cache_header_index.begin());
+        }
+    }
+
+    cache_vec.erase(cache_vec.begin(), cache_vec.begin() + count);
+
+    if(cache_idr_index.empty()) {
+        wait_keyframe = true;
+    }
+
+    if (cache_vec.empty()) {
+        assert(cache_idr_index.empty());
+        assert(cache_header_index.empty());
+        last_video_index = last_audio_index = SRS_NO_DTAT_INDEX;
+    } else {
+        //slid index
+        for(size_t i = 0; i < cache_idr_index.size(); i++) {
+            cache_idr_index[i] -= count;
+            assert(cache_vec[cache_idr_index[i]]->is_keyframe);
+        }
+        for(size_t i = 0; i < cache_header_index.size(); i++) {
+            cache_header_index[i] -= count;
+            assert(cache_vec[cache_header_index[i]]->is_header);
+        }
+
+        last_video_index -= count;
+        last_audio_index -= count;
+    }
+}
+
+std::string SrsLasCache::to_str() {
+    stringstream ss;
+    ss << "{";
+    ss << "mcdur=" << max_cache_dur;
+    ss << ",mcsize=" << max_cache_size;
+    ss << ",csize=" << cache_vec.size();
+    ss << ",lvpts=" << (last_video_index >=0? cache_vec[last_video_index]->pts:-1);
+    ss << ",lvdts=" << (last_video_index >=0? cache_vec[last_video_index]->timestamp:-1);
+    ss << ",lapts=" << (last_audio_index >=0? cache_vec[last_audio_index]->pts:-1);
+    ss << ",ladts=" <<  (last_audio_index >=0? cache_vec[last_audio_index]->timestamp:-1);
+    ss << ",wait_key=" << wait_keyframe;
+    ss << ",key_index=[";
+    for (size_t i = 0; i < cache_idr_index.size(); i++) {
+        if (i != 0) {
+            ss << " | ";
+        }
+        ss << cache_idr_index[i] << ":" << cache_vec[cache_idr_index[i]]->pts;
+    }
+    ss << "],";
+    ss << "header_index=[";
+    for (size_t i = 0; i < cache_header_index.size(); i++) {
+        if (i != 0) {
+            ss << " | ";
+        }
+        ss << cache_header_index[i] << ":" << cache_vec[cache_header_index[i]]->pts;
+    }
+    ss << "]";
+    ss << " }";
+    return ss.str();
+}
+
+//-----------------SRS_LAS--------------
+#endif
+
 ISrsSourceHandler::ISrsSourceHandler()
 {
 }
@@ -1874,7 +2391,13 @@ SrsSource::SrsSource()
     
     play_edge = new SrsPlayEdge();
     publish_edge = new SrsPublishEdge();
+
+#ifndef SRS_LAS
     gop_cache = new SrsGopCache();
+#else
+    gop_cache = new SrsLasCache();
+#endif
+
     hub = new SrsOriginHub();
     meta = new SrsMetaCache();
     
@@ -2160,6 +2683,13 @@ srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket*
                 return srs_error_wrap(err, "consume metadata");
             }
         }
+#ifdef SRS_LAS
+        // LASCache will cache all packets, except drop_for_reduce
+        if ((err = gop_cache->cache(meta->data())) != srs_success) {
+            return srs_error_wrap(err, "gop cache consume vdieo");
+        }
+#endif
+
     }
     
     // Copy to hub to all utilities.
@@ -2245,6 +2775,12 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
                 return srs_error_wrap(err, "consume message");
             }
         }
+#ifdef SRS_LAS 
+        //LASCache will cache all packets, except drop_for_reduce
+        if ((err = gop_cache->cache(msg)) != srs_success) {
+            return srs_error_wrap(err, "gop cache consume audio");
+        }
+#endif
     }
     
     // cache the sequence header of aac, or first packet of mp3.
@@ -2255,7 +2791,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
             return srs_error_wrap(err, "meta consume audio");
         }
     }
-    
+#ifndef SRS_LAS  
     // when sequence header, donot push to gop cache and adjust the timestamp.
     if (is_sequence_header) {
         return err;
@@ -2265,7 +2801,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
     if ((err = gop_cache->cache(msg)) != srs_success) {
         return srs_error_wrap(err, "gop cache consume audio");
     }
-
+#endif
     // if atc, update the sequence header to abs time.
     if (atc) {
         if (meta->ash()) {
@@ -2375,18 +2911,24 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
                 return srs_error_wrap(err, "consume video");
             }
         }
+#ifdef SRS_LAS
+        // LASCache will cache all packets, except drop_for_reduce
+        if ((err = gop_cache->cache(msg)) != srs_success) {
+            return srs_error_wrap(err, "gop cache consume vdieo");
+        }
+#endif
     }
-    
+#ifndef SRS_LAS
     // when sequence header, donot push to gop cache and adjust the timestamp.
     if (is_sequence_header) {
         return err;
     }
-    
+
     // cache the last gop packets
     if ((err = gop_cache->cache(msg)) != srs_success) {
         return srs_error_wrap(err, "gop cache consume vdieo");
     }
-    
+#endif  
     // if atc, update the sequence header to abs time.
     if (atc) {
         if (meta->vsh()) {
@@ -2396,7 +2938,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
             meta->data()->timestamp = msg->timestamp;
         }
     }
-    
+
     return err;
 }
 
@@ -2579,13 +3121,57 @@ void SrsSource::on_unpublish()
     }
 }
 
+#ifdef SRS_LAS
+srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, int64_t lasspts, bool only_audio) {
+   srs_error_t err = srs_success; 
+   consumer = new SrsConsumer(this, conn, lasspts, only_audio);
+   consumers.push_back(consumer);
+    // for edge, when play edge stream, check the state
+    if (_srs_config->get_vhost_is_edge(req->vhost)) {
+        // notice edge to start for the first client.
+        if ((err = play_edge->on_client_play()) != srs_success) {
+            return srs_error_wrap(err, "play edge");
+        }
+    }
+    return err;
+}
+
+srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, bool dg, int64_t lasspts)
+{
+    srs_error_t err = srs_success;
+    srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost);
+    consumer->set_queue_size(queue_size);
+    //las dump cache headers frome gop cache
+    if (hub->active()) {
+        if (!dg || !gop_cache->enabled()) {
+            if ((err = meta->dumps(consumer, atc, jitter_algorithm, dm, ds)) != srs_success) {
+                return srs_error_wrap(err, "meta dumps");
+            }
+        } else {
+            // copy gop cache to client.
+            if ((err = gop_cache->dump(consumer, dm, ds, lasspts)) != srs_success) {
+                return srs_error_wrap(err, "gop cache dumps");
+            }
+        }
+    }
+
+    // print status.
+    if (dg) {
+        srs_trace("create consumer, active=%d, queue_size=%.2f, jitter=%d", hub->active(), queue_size, jitter_algorithm);
+    } else {
+        srs_trace("create consumer, active=%d, ignore gop cache, jitter=%d", hub->active(), jitter_algorithm);
+    }
+
+    return err;
+}
+#else
 srs_error_t SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer)
 {
     srs_error_t err = srs_success;
-    
+
     consumer = new SrsConsumer(this, conn);
     consumers.push_back(consumer);
-    
+
     // for edge, when play edge stream, check the state
     if (_srs_config->get_vhost_is_edge(req->vhost)) {
         // notice edge to start for the first client.
@@ -2639,6 +3225,7 @@ srs_error_t SrsSource::consumer_dumps(SrsConsumer* consumer, bool ds, bool dm, b
 
     return err;
 }
+#endif
 
 void SrsSource::on_consumer_destroy(SrsConsumer* consumer)
 {
diff --git a/trunk/src/app/srs_app_source.hpp b/trunk/src/app/srs_app_source.hpp
index 4cdb4be09..6b99be915 100644
--- a/trunk/src/app/srs_app_source.hpp
+++ b/trunk/src/app/srs_app_source.hpp
@@ -29,6 +29,7 @@
 #include <map>
 #include <vector>
 #include <string>
+#include <list>
 
 #include <srs_app_st.hpp>
 #include <srs_app_reload.hpp>
@@ -202,8 +203,39 @@ private:
     int mw_min_msgs;
     srs_utime_t mw_duration;
 #endif
+
+#ifdef SRS_LAS
+    //LAS1.0 chapter 4 exeParam @startPts
+    int64_t lasspts;
+    //LAS1.0 chapter 4 exeParam @onlyAudio
+    bool only_audio;
+    //if have send first data msg
+    bool have_first_msg;
+    //if have send first key frame
+    bool have_keyframe;
+    //count of droped frames, (when lasspts > 0, if not have_first_msg, droped frame when pts < lasspts)
+    int64_t frames_dropped_by_lasspts;
+    //count of deroped frames,(when first video is not key frame drop it)
+    int64_t frames_dropped_by_first_keyframe;
+    //cache {metadata,video sequence header, audio header} and send with data timstamp just after them.
+    std::list<SrsSharedPtrMessage*> headers_queue;
+    //LAS1.5 chapter 5.3, not use timeoutPts, but wait mode with a timeout
+    int64_t lasspts_max_wait_time;
+    //comsumer create time is to check if expire lasspts_max_wait_time
+    srs_utime_t create_time;
+#endif
+
 public:
+#ifdef SRS_LAS
+    //@param s consumer source
+    //@param c consumer connection
+    //@param lasspts LAS1.0 chapter 4 exeParam stratPts
+    //@param only_audio LAS1.0 chapter 4 exeParam onlyAudio
+    SrsConsumer(SrsSource* s, SrsConnection* c, int64_t lasspts = 0, bool onlyAudio = false);
+#else
     SrsConsumer(SrsSource* s, SrsConnection* c);
+#endif
+
     virtual ~SrsConsumer();
 public:
     // Set the size of queue.
@@ -292,6 +324,91 @@ public:
     // when no video in gop cache, the stream is pure audio right now.
     virtual bool pure_audio();
 };
+#ifdef SRS_LAS
+#define SRS_NO_DTAT_INDEX -1 
+#define SRS_DEFAULT_LAS_MAX_CACHE_DUR 12000 //LAS1.0 chapter 5.2 @cacheLen  TODO use config, 
+class SrsLasCache {
+private:
+    /**
+     * if disabled the gop cache,
+     * the client will wait for the next keyframe for h264,
+     * and will be black-screen.
+     */
+    bool enable_gop_cache;
+
+    /**
+     * max cached duration.
+     */
+    int max_cache_dur;
+    int max_cache_size;
+
+    //latest headers before cache_vec
+    SrsSharedPtrMessage* cached_video_header;
+    SrsSharedPtrMessage* cached_audio_header;
+    SrsSharedPtrMessage* cached_metadata_header;
+
+    //if have video must start from keyframe frame
+    bool wait_keyframe;
+
+    //cache buffer, cache all msg include (data/headers/metadata)
+    std::vector<SrsSharedPtrMessage*> cache_vec;
+    //key_frame index in cache_vec
+    std::vector<int> cache_idr_index;
+    //header index in cache_vec
+    std::vector<int> cache_header_index;
+
+    int last_video_index;
+    int last_audio_index;
+public:
+    SrsLasCache(int cache_dur = SRS_DEFAULT_LAS_MAX_CACHE_DUR);
+    virtual ~SrsLasCache();
+
+    /**
+     * cleanup when system quit.
+     */
+    virtual void dispose();
+    /**
+     * to enable or disable the gop cache.
+     */
+    virtual void set(bool enabled);
+    virtual bool enabled();
+
+    /**
+     * cache msg to buffer
+     * @param shared_msg, directly ptr, copy it if need to save it. shared_msg may be data/headers/metadata
+     */
+    virtual srs_error_t cache(SrsSharedPtrMessage* shared_msg);
+
+    /**
+     * dump the cached gop to consumer.
+     * @param consumer the consumer we should give buffer to.
+     * @param lasspts: LAS1.0 chapter 5.3, may have the follow value
+     * 0> : give the buffer form pts>=lasspts 
+     * =0 : from the newest gop
+     * <0 : from the newest_pts - |lasspts|
+     */
+    virtual srs_error_t dump(SrsConsumer* consumer, bool dm, bool ds, int64_t lasspts = 0);
+    //@param begin_index the index of first msg to dump in cache_vec
+    virtual srs_error_t do_dump(SrsConsumer* consumer,  bool dm, bool ds, int begin_index);
+    //set LAS1.0 chapter 5.2 @cacheLen
+    virtual void set_max_cache_duration(int duration) {max_cache_dur = duration;};
+    /**
+     * clear all cached msg
+     */
+    virtual void clear();
+    /** 
+     * is cache empty, if have any of data|header|metadata, it return false
+     */
+    virtual bool empty();
+
+private:
+    //dorp old cache if overflow max_cache_dur or max_cache_size
+    virtual void try_drop_old_cache();
+    virtual std::string to_str();
+    //clear cache_vec msg, but keep headers
+    virtual void erase_data(int count);
+};
+#endif
 
 // The handler to handle the event of srs source.
 // For example, the http flv streaming module handle the event and
@@ -538,7 +655,11 @@ private:
     SrsPlayEdge* play_edge;
     SrsPublishEdge* publish_edge;
     // The gop cache for client fast startup.
+#ifdef SRS_LAS
+    SrsLasCache* gop_cache;
+#else
     SrsGopCache* gop_cache;
+#endif
     // The hub for origin server.
     SrsOriginHub* hub;
     // The metadata cache.
@@ -597,6 +718,20 @@ public:
     virtual srs_error_t on_publish();
     virtual void on_unpublish();
 public:
+
+#ifdef SRS_LAS
+    // Create consumer
+    // @param consumer, output the create consumer.
+    // @param lasspts, @startPts in las1.0
+    // @param only_audio, whether comsumer want audio only
+    virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer, int64_t lasspts = 0, bool only_audio = false);
+    // Dumps packets in cache to consumer.
+    // @param ds, whether dumps the sequence header.
+    // @param dm, whether dumps the metadata.
+    // @param dg, whether dumps the gop cache.
+    // @param lasspts, @startPts in las1.0
+    virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true, int64_t lasspts = 0);
+#else
     // Create consumer
     // @param consumer, output the create consumer.
     virtual srs_error_t create_consumer(SrsConnection* conn, SrsConsumer*& consumer);
@@ -605,6 +740,7 @@ public:
     // @param dm, whether dumps the metadata.
     // @param dg, whether dumps the gop cache.
     virtual srs_error_t consumer_dumps(SrsConsumer* consumer, bool ds = true, bool dm = true, bool dg = true);
+#endif
     virtual void on_consumer_destroy(SrsConsumer* consumer);
     virtual void set_cache(bool enabled);
     virtual SrsRtmpJitterAlgorithm jitter();
diff --git a/trunk/src/kernel/srs_kernel_codec.cpp b/trunk/src/kernel/srs_kernel_codec.cpp
index e3d3ca4a3..c1dc946c2 100644
--- a/trunk/src/kernel/srs_kernel_codec.cpp
+++ b/trunk/src/kernel/srs_kernel_codec.cpp
@@ -178,6 +178,22 @@ bool SrsFlvVideo::acceptable(char* data, int size)
     return true;
 }
 
+#ifdef SRS_LAS
+int64_t SrsFlvVideo::cts(char *data, int size)
+{
+    if (!h264(data, size) || size < 6) {
+        return -1;
+    } else {
+        int32_t cts = 0x00;
+        char* pp = (char*)&cts;
+        pp[2] = data[2];
+        pp[1] = data[3];
+        pp[0] = data[4];
+        return cts;
+    }
+}
+#endif
+
 SrsFlvAudio::SrsFlvAudio()
 {
 }
diff --git a/trunk/src/kernel/srs_kernel_codec.hpp b/trunk/src/kernel/srs_kernel_codec.hpp
index a5bb960cb..b851dcfb3 100644
--- a/trunk/src/kernel/srs_kernel_codec.hpp
+++ b/trunk/src/kernel/srs_kernel_codec.hpp
@@ -268,6 +268,13 @@ public:
      * @remark all type of audio is possible, no need to check audio.
      */
     static bool acceptable(char* data, int size);
+
+#ifdef SRS_LAS
+    /**
+     * get cts if is 264, if not 264 return 0
+     */
+     static int64_t cts(char *data, int size);
+#endif
 };
 
 /**
diff --git a/trunk/src/kernel/srs_kernel_flv.cpp b/trunk/src/kernel/srs_kernel_flv.cpp
index edbcb829c..93b2007d5 100644
--- a/trunk/src/kernel/srs_kernel_flv.cpp
+++ b/trunk/src/kernel/srs_kernel_flv.cpp
@@ -220,6 +220,11 @@ SrsSharedPtrMessage::SrsSharedPtrPayload::~SrsSharedPtrPayload()
 SrsSharedPtrMessage::SrsSharedPtrMessage() : timestamp(0), stream_id(0), size(0), payload(NULL)
 {
     ptr = NULL;
+#ifdef SRS_LAS
+    is_keyframe = false;
+    is_header = false;
+    pts = 0;
+#endif 
 }
 
 SrsSharedPtrMessage::~SrsSharedPtrMessage()
@@ -232,6 +237,23 @@ SrsSharedPtrMessage::~SrsSharedPtrMessage()
         }
     }
 }
+#ifdef SRS_LAS
+std::string SrsSharedPtrMessage::to_str() {
+    stringstream ss;
+    ss << "{";
+    ss << "dts=" << timestamp;
+    ss << ",pts=" << pts;
+    ss << ",is_k=" << is_keyframe;
+    ss << ",is_h=" << is_header;
+    if (ptr) {
+        ss << ",type=" << (int)ptr->header.message_type;
+    } else {
+        ss << ",type=unknow";
+    }
+    ss << "}";
+    return ss.str();
+};
+#endif
 
 srs_error_t SrsSharedPtrMessage::create(SrsCommonMessage* msg)
 {
@@ -268,6 +290,19 @@ srs_error_t SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload
         ptr->header.perfer_cid = pheader->perfer_cid;
         this->timestamp = pheader->timestamp;
         this->stream_id = pheader->stream_id;
+#ifdef SRS_LAS
+        if (ptr->header.message_type == RTMP_MSG_VideoMessage) {
+            is_header = SrsFlvVideo::sh(payload, size);
+            is_keyframe =  SrsFlvVideo::keyframe(payload, size) && !is_header;
+            pts = this->timestamp + SrsFlvVideo::cts(payload, size);
+        } else if (ptr->header.message_type == RTMP_MSG_AudioMessage) {
+            is_header = SrsFlvAudio::sh(payload, size);
+            pts=this->timestamp;
+        } else {  // metadate
+            is_header = true;
+            pts=this->timestamp;
+        }
+#endif
     }
     ptr->payload = payload;
     ptr->size = size;
@@ -361,6 +396,12 @@ SrsSharedPtrMessage* SrsSharedPtrMessage::copy()
     copy->payload = ptr->payload;
     copy->size = ptr->size;
 
+#ifdef SRS_LAS
+    copy->pts = pts;
+    copy->is_header = is_header;
+    copy->is_keyframe = is_keyframe;
+#endif
+
     return copy;
 }
 
diff --git a/trunk/src/kernel/srs_kernel_flv.hpp b/trunk/src/kernel/srs_kernel_flv.hpp
index f3be5931b..614b80399 100644
--- a/trunk/src/kernel/srs_kernel_flv.hpp
+++ b/trunk/src/kernel/srs_kernel_flv.hpp
@@ -290,6 +290,17 @@ public:
     //       video/audio packet use raw bytes, no video/audio packet.
     char* payload;
 
+#ifdef SRS_LAS
+    //if is video key frame, not include video sequence header
+    bool is_keyframe;
+    // if is {video sequence header|aac header|metadata}
+    bool is_header;
+    // for video pts = timstamp + cts, others pts = timestamp
+    int64_t pts;
+public:
+ std::string to_str();
+#endif 
+
 private:
     class SrsSharedPtrPayload
     {