Revert "Remove pipeline in favor of layer tree holder (#18285)" (#18427)

This reverts commit 2cdbc7f92793125d9312138877c05af6ad708683.
diff --git a/ci/licenses_golden/licenses_flutter b/ci/licenses_golden/licenses_flutter
index c891c35..8054331 100755
--- a/ci/licenses_golden/licenses_flutter
+++ b/ci/licenses_golden/licenses_flutter
@@ -576,12 +576,12 @@
 FILE: ../../../flutter/shell/common/input_events_unittests.cc
 FILE: ../../../flutter/shell/common/isolate_configuration.cc
 FILE: ../../../flutter/shell/common/isolate_configuration.h
-FILE: ../../../flutter/shell/common/layer_tree_holder.cc
-FILE: ../../../flutter/shell/common/layer_tree_holder.h
-FILE: ../../../flutter/shell/common/layer_tree_holder_unittests.cc
 FILE: ../../../flutter/shell/common/persistent_cache.cc
 FILE: ../../../flutter/shell/common/persistent_cache.h
 FILE: ../../../flutter/shell/common/persistent_cache_unittests.cc
+FILE: ../../../flutter/shell/common/pipeline.cc
+FILE: ../../../flutter/shell/common/pipeline.h
+FILE: ../../../flutter/shell/common/pipeline_unittests.cc
 FILE: ../../../flutter/shell/common/platform_view.cc
 FILE: ../../../flutter/shell/common/platform_view.h
 FILE: ../../../flutter/shell/common/pointer_data_dispatcher.cc
diff --git a/shell/common/BUILD.gn b/shell/common/BUILD.gn
index 9ada102..e9e5965 100644
--- a/shell/common/BUILD.gn
+++ b/shell/common/BUILD.gn
@@ -87,10 +87,10 @@
     "engine.h",
     "isolate_configuration.cc",
     "isolate_configuration.h",
-    "layer_tree_holder.cc",
-    "layer_tree_holder.h",
     "persistent_cache.cc",
     "persistent_cache.h",
+    "pipeline.cc",
+    "pipeline.h",
     "platform_view.cc",
     "platform_view.h",
     "pointer_data_dispatcher.cc",
@@ -191,8 +191,8 @@
       "animator_unittests.cc",
       "canvas_spy_unittests.cc",
       "input_events_unittests.cc",
-      "layer_tree_holder_unittests.cc",
       "persistent_cache_unittests.cc",
+      "pipeline_unittests.cc",
       "renderer_context_manager_unittests.cc",
       "renderer_context_test.cc",
       "renderer_context_test.h",
diff --git a/shell/common/animator.cc b/shell/common/animator.cc
index f8bbe58..7e581c0 100644
--- a/shell/common/animator.cc
+++ b/shell/common/animator.cc
@@ -3,7 +3,6 @@
 // found in the LICENSE file.
 
 #include "flutter/shell/common/animator.h"
-#include <memory>
 
 #include "flutter/fml/trace_event.h"
 #include "third_party/dart/runtime/include/dart_tools_api.h"
@@ -29,7 +28,18 @@
       last_frame_begin_time_(),
       last_frame_target_time_(),
       dart_frame_deadline_(0),
-      layer_tree_holder_(std::make_shared<LayerTreeHolder>()),
+#if FLUTTER_SHELL_ENABLE_METAL
+      layer_tree_pipeline_(fml::MakeRefCounted<LayerTreePipeline>(2)),
+#else   // FLUTTER_SHELL_ENABLE_METAL
+      // TODO(dnfield): We should remove this logic and set the pipeline depth
+      // back to 2 in this case. See
+      // https://github.com/flutter/engine/pull/9132 for discussion.
+      layer_tree_pipeline_(fml::MakeRefCounted<LayerTreePipeline>(
+          task_runners.GetPlatformTaskRunner() ==
+                  task_runners.GetRasterTaskRunner()
+              ? 1
+              : 2)),
+#endif  // FLUTTER_SHELL_ENABLE_METAL
       pending_frame_semaphore_(1),
       frame_number_(1),
       paused_(false),
@@ -37,7 +47,8 @@
       frame_scheduled_(false),
       notify_idle_task_id_(0),
       dimension_change_pending_(false),
-      weak_factory_(this) {}
+      weak_factory_(this) {
+}
 
 Animator::~Animator() = default;
 
@@ -103,6 +114,25 @@
   regenerate_layer_tree_ = false;
   pending_frame_semaphore_.Signal();
 
+  if (!producer_continuation_) {
+    // We may already have a valid pipeline continuation in case a previous
+    // begin frame did not result in an Animation::Render. Simply reuse that
+    // instead of asking the pipeline for a fresh continuation.
+    producer_continuation_ = layer_tree_pipeline_->Produce();
+
+    if (!producer_continuation_) {
+      // If we still don't have valid continuation, the pipeline is currently
+      // full because the consumer is being too slow. Try again at the next
+      // frame interval.
+      RequestFrame();
+      return;
+    }
+  }
+
+  // We have acquired a valid continuation from the pipeline and are ready
+  // to service potential frame.
+  FML_DCHECK(producer_continuation_);
+
   last_frame_begin_time_ = frame_start_time;
   last_frame_target_time_ = frame_target_time;
   dart_frame_deadline_ = FxlToDartOrEarlier(frame_target_time);
@@ -154,8 +184,13 @@
                                 last_frame_target_time_);
   }
 
-  layer_tree_holder_->PushIfNewer(std::move(layer_tree));
-  delegate_.OnAnimatorDraw(layer_tree_holder_, last_frame_target_time_);
+  // Commit the pending continuation.
+  bool result = producer_continuation_.Complete(std::move(layer_tree));
+  if (!result) {
+    FML_DLOG(INFO) << "No pending continuation to commit";
+  }
+
+  delegate_.OnAnimatorDraw(layer_tree_pipeline_, last_frame_target_time_);
 }
 
 bool Animator::CanReuseLastLayerTree() {
diff --git a/shell/common/animator.h b/shell/common/animator.h
index a2807ae..0bab577 100644
--- a/shell/common/animator.h
+++ b/shell/common/animator.h
@@ -6,14 +6,13 @@
 #define FLUTTER_SHELL_COMMON_ANIMATOR_H_
 
 #include <deque>
-#include <memory>
 
 #include "flutter/common/task_runners.h"
 #include "flutter/fml/memory/ref_ptr.h"
 #include "flutter/fml/memory/weak_ptr.h"
 #include "flutter/fml/synchronization/semaphore.h"
 #include "flutter/fml/time/time_point.h"
-#include "flutter/shell/common/layer_tree_holder.h"
+#include "flutter/shell/common/pipeline.h"
 #include "flutter/shell/common/rasterizer.h"
 #include "flutter/shell/common/vsync_waiter.h"
 
@@ -36,7 +35,7 @@
     virtual void OnAnimatorNotifyIdle(int64_t deadline) = 0;
 
     virtual void OnAnimatorDraw(
-        std::shared_ptr<LayerTreeHolder> layer_tree_holder,
+        fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline,
         fml::TimePoint frame_target_time) = 0;
 
     virtual void OnAnimatorDrawLastLayerTree() = 0;
@@ -82,6 +81,8 @@
   void EnqueueTraceFlowId(uint64_t trace_flow_id);
 
  private:
+  using LayerTreePipeline = Pipeline<flutter::LayerTree>;
+
   void BeginFrame(fml::TimePoint frame_start_time,
                   fml::TimePoint frame_target_time);
 
@@ -99,8 +100,9 @@
   fml::TimePoint last_frame_begin_time_;
   fml::TimePoint last_frame_target_time_;
   int64_t dart_frame_deadline_;
-  std::shared_ptr<LayerTreeHolder> layer_tree_holder_;
+  fml::RefPtr<LayerTreePipeline> layer_tree_pipeline_;
   fml::Semaphore pending_frame_semaphore_;
+  LayerTreePipeline::ProducerContinuation producer_continuation_;
   int64_t frame_number_;
   bool paused_;
   bool regenerate_layer_tree_;
diff --git a/shell/common/engine.h b/shell/common/engine.h
index c7b2d24..0f5f071 100644
--- a/shell/common/engine.h
+++ b/shell/common/engine.h
@@ -406,18 +406,25 @@
   ///             will cause the jank in the Flutter application:
   ///             * The time taken by this method to create a layer-tree exceeds
   ///               on frame interval (for example, 16.66 ms on a 60Hz display).
-  ///             * A new layer-tree produced by this method replaces a stale
-  ///               layer tree in `LayerTreeHolder`. See:
-  ///               `LayerTreeHolder::ReplaceIfNewer`. This could happen if
-  ///               rasterizer takes more than one frame interval to rasterize a
-  ///               layer tree. This would cause some frames to be skipped and
-  ///               could result in perceptible jank.
+  ///             * The time take by this method to generate a new layer-tree
+  ///               causes the current layer-tree pipeline depth to change. To
+  ///               illustrate this point, note that maximum pipeline depth used
+  ///               by layer tree in the engine is 2. If both the UI and GPU
+  ///               task runner tasks finish within one frame interval, the
+  ///               pipeline depth is one. If the UI thread happens to be
+  ///               working on a frame when the raster thread is still not done
+  ///               with the previous frame, the pipeline depth is 2. When the
+  ///               pipeline depth changes from 1 to 2, animations and UI
+  ///               interactions that cause the generation of the new layer tree
+  ///               appropriate for (frame_time + one frame interval) will
+  ///               actually end up at (frame_time + two frame intervals). This
+  ///               is not what code running on the UI thread expected would
+  ///               happen. This causes perceptible jank.
   ///
   /// @param[in]  frame_time  The point at which the current frame interval
   ///                         began. May be used by animation interpolators,
   ///                         physics simulations, etc..
   ///
-  /// @see         `LayerTreeHolder::ReplaceIfNewer`
   void BeginFrame(fml::TimePoint frame_time);
 
   //----------------------------------------------------------------------------
diff --git a/shell/common/layer_tree_holder.cc b/shell/common/layer_tree_holder.cc
deleted file mode 100644
index c329b37..0000000
--- a/shell/common/layer_tree_holder.cc
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2013 The Flutter Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "flutter/shell/common/layer_tree_holder.h"
-
-namespace flutter {
-
-LayerTreeHolder::LayerTreeHolder() = default;
-
-LayerTreeHolder::~LayerTreeHolder() = default;
-
-std::unique_ptr<LayerTree> LayerTreeHolder::Pop() {
-  std::scoped_lock lock(layer_tree_mutex);
-  return std::move(layer_tree_);
-}
-
-void LayerTreeHolder::PushIfNewer(
-    std::unique_ptr<LayerTree> proposed_layer_tree) {
-  std::scoped_lock lock(layer_tree_mutex);
-  if (!layer_tree_ ||
-      layer_tree_->target_time() < proposed_layer_tree->target_time()) {
-    layer_tree_ = std::move(proposed_layer_tree);
-  }
-}
-
-bool LayerTreeHolder::IsEmpty() const {
-  std::scoped_lock lock(layer_tree_mutex);
-  return !layer_tree_;
-}
-
-};  // namespace flutter
diff --git a/shell/common/layer_tree_holder.h b/shell/common/layer_tree_holder.h
deleted file mode 100644
index c1baf9f..0000000
--- a/shell/common/layer_tree_holder.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2013 The Flutter Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef FLUTTER_SHELL_COMMON_LAYER_TREE_HOLDER_H_
-#define FLUTTER_SHELL_COMMON_LAYER_TREE_HOLDER_H_
-
-#include <memory>
-
-#include "flow/layers/layer_tree.h"
-
-namespace flutter {
-
-/**
- * @brief Holds the next `flutter::LayerTree` that needs to be rasterized. The
- * accesses to `LayerTreeHolder` are thread safe. This is important as this
- * component is accessed from both the UI and the Raster threads.
- *
- * A typical flow of events through this component would be:
- *  1. `flutter::Animator` pushed a layer tree to be rendered during each
- * `Animator::Render` call.
- *  2. `flutter::Rasterizer::Draw` consumes the pushed layer tree via `Pop`.
- *
- * It is important to note that if a layer tree held by this class is yet to be
- * consumed, it can be overriden by a newer layer tree produced by the
- * `Animator`. The newness of the layer tree is determined by the target time.
- */
-class LayerTreeHolder {
- public:
-  LayerTreeHolder();
-
-  ~LayerTreeHolder();
-
-  /**
-   * @brief Checks if a layer tree is currently held.
-   *
-   * @return true is no layer tree is held.
-   * @return false if there is a layer tree waiting to be consumed.
-   */
-  bool IsEmpty() const;
-
-  [[nodiscard]] std::unique_ptr<LayerTree> Pop();
-
-  void PushIfNewer(std::unique_ptr<LayerTree> proposed_layer_tree);
-
- private:
-  mutable std::mutex layer_tree_mutex;
-  std::unique_ptr<LayerTree> layer_tree_;
-
-  FML_DISALLOW_COPY_AND_ASSIGN(LayerTreeHolder);
-};
-
-};  // namespace flutter
-
-#endif  // FLUTTER_SHELL_COMMON_LAYER_TREE_HOLDER_H_
diff --git a/shell/common/layer_tree_holder_unittests.cc b/shell/common/layer_tree_holder_unittests.cc
deleted file mode 100644
index ebf129b..0000000
--- a/shell/common/layer_tree_holder_unittests.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright 2013 The Flutter Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#define FML_USED_ON_EMBEDDER
-
-#include <functional>
-#include <future>
-#include <memory>
-
-#include "flutter/shell/common/layer_tree_holder.h"
-#include "gtest/gtest.h"
-
-namespace flutter {
-namespace testing {
-
-TEST(LayerTreeHolder, EmptyOnInit) {
-  const LayerTreeHolder layer_tree_holder;
-  ASSERT_TRUE(layer_tree_holder.IsEmpty());
-}
-
-TEST(LayerTreeHolder, PutOneAndGet) {
-  LayerTreeHolder layer_tree_holder;
-  const auto frame_size = SkISize::Make(64, 64);
-  auto layer_tree = std::make_unique<LayerTree>(frame_size, 100.0f, 1.0f);
-  layer_tree_holder.PushIfNewer(std::move(layer_tree));
-  ASSERT_FALSE(layer_tree_holder.IsEmpty());
-  const auto stored = layer_tree_holder.Pop();
-  ASSERT_EQ(stored->frame_size(), frame_size);
-  ASSERT_TRUE(layer_tree_holder.IsEmpty());
-}
-
-TEST(LayerTreeHolder, PutMultiGetsLatest) {
-  const auto build_begin = fml::TimePoint::Now();
-  const auto target_time_1 = build_begin + fml::TimeDelta::FromSeconds(2);
-  const auto target_time_2 = build_begin + fml::TimeDelta::FromSeconds(5);
-
-  LayerTreeHolder layer_tree_holder;
-  const auto frame_size_1 = SkISize::Make(64, 64);
-  auto layer_tree_1 = std::make_unique<LayerTree>(frame_size_1, 100.0f, 1.0f);
-  layer_tree_1->RecordBuildTime(build_begin, target_time_1);
-  layer_tree_holder.PushIfNewer(std::move(layer_tree_1));
-
-  const auto frame_size_2 = SkISize::Make(128, 128);
-  auto layer_tree_2 = std::make_unique<LayerTree>(frame_size_2, 100.0f, 1.0f);
-  layer_tree_2->RecordBuildTime(build_begin, target_time_2);
-  layer_tree_holder.PushIfNewer(std::move(layer_tree_2));
-
-  const auto stored = layer_tree_holder.Pop();
-  ASSERT_EQ(stored->frame_size(), frame_size_2);
-  ASSERT_TRUE(layer_tree_holder.IsEmpty());
-}
-
-TEST(LayerTreeHolder, RetainsOlderIfNewerFrameHasEarlierTargetTime) {
-  const auto build_begin = fml::TimePoint::Now();
-  const auto target_time_1 = build_begin + fml::TimeDelta::FromSeconds(5);
-  const auto target_time_2 = build_begin + fml::TimeDelta::FromSeconds(2);
-
-  LayerTreeHolder layer_tree_holder;
-  const auto frame_size_1 = SkISize::Make(64, 64);
-  auto layer_tree_1 = std::make_unique<LayerTree>(frame_size_1, 100.0f, 1.0f);
-  layer_tree_1->RecordBuildTime(build_begin, target_time_1);
-  layer_tree_holder.PushIfNewer(std::move(layer_tree_1));
-
-  const auto frame_size_2 = SkISize::Make(128, 128);
-  auto layer_tree_2 = std::make_unique<LayerTree>(frame_size_2, 100.0f, 1.0f);
-  layer_tree_2->RecordBuildTime(build_begin, target_time_2);
-  layer_tree_holder.PushIfNewer(std::move(layer_tree_2));
-
-  const auto stored = layer_tree_holder.Pop();
-  ASSERT_EQ(stored->frame_size(), frame_size_1);
-  ASSERT_TRUE(layer_tree_holder.IsEmpty());
-}
-
-}  // namespace testing
-}  // namespace flutter
diff --git a/shell/common/pipeline.cc b/shell/common/pipeline.cc
new file mode 100644
index 0000000..fb80c18
--- /dev/null
+++ b/shell/common/pipeline.cc
@@ -0,0 +1,14 @@
+// Copyright 2013 The Flutter Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "flutter/shell/common/pipeline.h"
+
+namespace flutter {
+
+size_t GetNextPipelineTraceID() {
+  static std::atomic_size_t PipelineLastTraceID = {0};
+  return ++PipelineLastTraceID;
+}
+
+}  // namespace flutter
diff --git a/shell/common/pipeline.h b/shell/common/pipeline.h
new file mode 100644
index 0000000..c225d9a
--- /dev/null
+++ b/shell/common/pipeline.h
@@ -0,0 +1,215 @@
+// Copyright 2013 The Flutter Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef FLUTTER_SHELL_COMMON_PIPELINE_H_
+#define FLUTTER_SHELL_COMMON_PIPELINE_H_
+
+#include "flutter/fml/macros.h"
+#include "flutter/fml/memory/ref_counted.h"
+#include "flutter/fml/synchronization/semaphore.h"
+#include "flutter/fml/trace_event.h"
+
+#include <deque>
+#include <memory>
+#include <mutex>
+
+namespace flutter {
+
+enum class PipelineConsumeResult {
+  NoneAvailable,
+  Done,
+  MoreAvailable,
+};
+
+size_t GetNextPipelineTraceID();
+
+/// A thread-safe queue of resources for a single consumer and a single
+/// producer.
+template <class R>
+class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> {
+ public:
+  using Resource = R;
+  using ResourcePtr = std::unique_ptr<Resource>;
+
+  /// Denotes a spot in the pipeline reserved for the producer to finish
+  /// preparing a completed pipeline resource.
+  class ProducerContinuation {
+   public:
+    ProducerContinuation() : trace_id_(0) {}
+
+    ProducerContinuation(ProducerContinuation&& other)
+        : continuation_(other.continuation_), trace_id_(other.trace_id_) {
+      other.continuation_ = nullptr;
+      other.trace_id_ = 0;
+    }
+
+    ProducerContinuation& operator=(ProducerContinuation&& other) {
+      std::swap(continuation_, other.continuation_);
+      std::swap(trace_id_, other.trace_id_);
+      return *this;
+    }
+
+    ~ProducerContinuation() {
+      if (continuation_) {
+        continuation_(nullptr, trace_id_);
+        TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
+        // The continuation is being dropped on the floor. End the flow.
+        TRACE_FLOW_END("flutter", "PipelineItem", trace_id_);
+        TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id_);
+      }
+    }
+
+    [[nodiscard]] bool Complete(ResourcePtr resource) {
+      bool result = false;
+      if (continuation_) {
+        result = continuation_(std::move(resource), trace_id_);
+        continuation_ = nullptr;
+        TRACE_EVENT_ASYNC_END0("flutter", "PipelineProduce", trace_id_);
+        TRACE_FLOW_STEP("flutter", "PipelineItem", trace_id_);
+      }
+      return result;
+    }
+
+    operator bool() const { return continuation_ != nullptr; }
+
+   private:
+    friend class Pipeline;
+    using Continuation = std::function<bool(ResourcePtr, size_t)>;
+
+    Continuation continuation_;
+    size_t trace_id_;
+
+    ProducerContinuation(const Continuation& continuation, size_t trace_id)
+        : continuation_(continuation), trace_id_(trace_id) {
+      TRACE_FLOW_BEGIN("flutter", "PipelineItem", trace_id_);
+      TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineItem", trace_id_);
+      TRACE_EVENT_ASYNC_BEGIN0("flutter", "PipelineProduce", trace_id_);
+    }
+
+    FML_DISALLOW_COPY_AND_ASSIGN(ProducerContinuation);
+  };
+
+  explicit Pipeline(uint32_t depth)
+      : depth_(depth), empty_(depth), available_(0), inflight_(0) {}
+
+  ~Pipeline() = default;
+
+  bool IsValid() const { return empty_.IsValid() && available_.IsValid(); }
+
+  ProducerContinuation Produce() {
+    if (!empty_.TryWait()) {
+      return {};
+    }
+    ++inflight_;
+    FML_TRACE_COUNTER("flutter", "Pipeline Depth",
+                      reinterpret_cast<int64_t>(this),      //
+                      "frames in flight", inflight_.load()  //
+    );
+
+    return ProducerContinuation{
+        std::bind(&Pipeline::ProducerCommit, this, std::placeholders::_1,
+                  std::placeholders::_2),  // continuation
+        GetNextPipelineTraceID()};         // trace id
+  }
+
+  // Create a `ProducerContinuation` that will only push the task if the queue
+  // is empty.
+  // Prefer using |Produce|. ProducerContinuation returned by this method
+  // doesn't guarantee that the frame will be rendered.
+  ProducerContinuation ProduceIfEmpty() {
+    if (!empty_.TryWait()) {
+      return {};
+    }
+    ++inflight_;
+    FML_TRACE_COUNTER("flutter", "Pipeline Depth",
+                      reinterpret_cast<int64_t>(this),      //
+                      "frames in flight", inflight_.load()  //
+    );
+
+    return ProducerContinuation{
+        std::bind(&Pipeline::ProducerCommitIfEmpty, this, std::placeholders::_1,
+                  std::placeholders::_2),  // continuation
+        GetNextPipelineTraceID()};         // trace id
+  }
+
+  using Consumer = std::function<void(ResourcePtr)>;
+
+  /// @note Procedure doesn't copy all closures.
+  [[nodiscard]] PipelineConsumeResult Consume(const Consumer& consumer) {
+    if (consumer == nullptr) {
+      return PipelineConsumeResult::NoneAvailable;
+    }
+
+    if (!available_.TryWait()) {
+      return PipelineConsumeResult::NoneAvailable;
+    }
+
+    ResourcePtr resource;
+    size_t trace_id = 0;
+    size_t items_count = 0;
+
+    {
+      std::scoped_lock lock(queue_mutex_);
+      std::tie(resource, trace_id) = std::move(queue_.front());
+      queue_.pop_front();
+      items_count = queue_.size();
+    }
+
+    {
+      TRACE_EVENT0("flutter", "PipelineConsume");
+      consumer(std::move(resource));
+    }
+
+    empty_.Signal();
+    --inflight_;
+
+    TRACE_FLOW_END("flutter", "PipelineItem", trace_id);
+    TRACE_EVENT_ASYNC_END0("flutter", "PipelineItem", trace_id);
+
+    return items_count > 0 ? PipelineConsumeResult::MoreAvailable
+                           : PipelineConsumeResult::Done;
+  }
+
+ private:
+  const uint32_t depth_;
+  fml::Semaphore empty_;
+  fml::Semaphore available_;
+  std::atomic<int> inflight_;
+  std::mutex queue_mutex_;
+  std::deque<std::pair<ResourcePtr, size_t>> queue_;
+
+  bool ProducerCommit(ResourcePtr resource, size_t trace_id) {
+    {
+      std::scoped_lock lock(queue_mutex_);
+      queue_.emplace_back(std::move(resource), trace_id);
+    }
+
+    // Ensure the queue mutex is not held as that would be a pessimization.
+    available_.Signal();
+    return true;
+  }
+
+  bool ProducerCommitIfEmpty(ResourcePtr resource, size_t trace_id) {
+    {
+      std::scoped_lock lock(queue_mutex_);
+      if (!queue_.empty()) {
+        // Bail if the queue is not empty, opens up spaces to produce other
+        // frames.
+        empty_.Signal();
+        return false;
+      }
+      queue_.emplace_back(std::move(resource), trace_id);
+    }
+
+    // Ensure the queue mutex is not held as that would be a pessimization.
+    available_.Signal();
+    return true;
+  }
+
+  FML_DISALLOW_COPY_AND_ASSIGN(Pipeline);
+};
+
+}  // namespace flutter
+
+#endif  // FLUTTER_SHELL_COMMON_PIPELINE_H_
diff --git a/shell/common/pipeline_unittests.cc b/shell/common/pipeline_unittests.cc
new file mode 100644
index 0000000..d9cce5a
--- /dev/null
+++ b/shell/common/pipeline_unittests.cc
@@ -0,0 +1,134 @@
+// Copyright 2013 The Flutter Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#define FML_USED_ON_EMBEDDER
+
+#include <functional>
+#include <future>
+#include <memory>
+
+#include "flutter/shell/common/pipeline.h"
+#include "gtest/gtest.h"
+
+namespace flutter {
+namespace testing {
+
+using IntPipeline = Pipeline<int>;
+using Continuation = IntPipeline::ProducerContinuation;
+
+TEST(PipelineTest, ConsumeOneVal) {
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(2);
+
+  Continuation continuation = pipeline->Produce();
+
+  const int test_val = 1;
+  bool result = continuation.Complete(std::make_unique<int>(test_val));
+  ASSERT_EQ(result, true);
+
+  PipelineConsumeResult consume_result = pipeline->Consume(
+      [&test_val](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val); });
+
+  ASSERT_EQ(consume_result, PipelineConsumeResult::Done);
+}
+
+TEST(PipelineTest, ContinuationCanOnlyBeUsedOnce) {
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(2);
+
+  Continuation continuation = pipeline->Produce();
+
+  const int test_val = 1;
+  bool result = continuation.Complete(std::make_unique<int>(test_val));
+  ASSERT_EQ(result, true);
+
+  PipelineConsumeResult consume_result_1 = pipeline->Consume(
+      [&test_val](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val); });
+
+  result = continuation.Complete(std::make_unique<int>(test_val));
+  ASSERT_EQ(result, false);
+  ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
+
+  PipelineConsumeResult consume_result_2 =
+      pipeline->Consume([](std::unique_ptr<int> v) { FAIL(); });
+
+  result = continuation.Complete(std::make_unique<int>(test_val));
+  ASSERT_EQ(result, false);
+  ASSERT_EQ(consume_result_2, PipelineConsumeResult::NoneAvailable);
+}
+
+TEST(PipelineTest, PushingMoreThanDepthCompletesFirstSubmission) {
+  const int depth = 1;
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
+
+  Continuation continuation_1 = pipeline->Produce();
+  Continuation continuation_2 = pipeline->Produce();
+
+  const int test_val_1 = 1, test_val_2 = 2;
+  bool result = continuation_1.Complete(std::make_unique<int>(test_val_1));
+  ASSERT_EQ(result, true);
+  result = continuation_2.Complete(std::make_unique<int>(test_val_2));
+  ASSERT_EQ(result, false);
+
+  PipelineConsumeResult consume_result_1 = pipeline->Consume(
+      [&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
+
+  ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
+}
+
+TEST(PipelineTest, PushingMultiProcessesInOrder) {
+  const int depth = 2;
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
+
+  Continuation continuation_1 = pipeline->Produce();
+  Continuation continuation_2 = pipeline->Produce();
+
+  const int test_val_1 = 1, test_val_2 = 2;
+  bool result = continuation_1.Complete(std::make_unique<int>(test_val_1));
+  ASSERT_EQ(result, true);
+  result = continuation_2.Complete(std::make_unique<int>(test_val_2));
+  ASSERT_EQ(result, true);
+
+  PipelineConsumeResult consume_result_1 = pipeline->Consume(
+      [&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
+  ASSERT_EQ(consume_result_1, PipelineConsumeResult::MoreAvailable);
+
+  PipelineConsumeResult consume_result_2 = pipeline->Consume(
+      [&test_val_2](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_2); });
+  ASSERT_EQ(consume_result_2, PipelineConsumeResult::Done);
+}
+
+TEST(PipelineTest, ProduceIfEmptyDoesNotConsumeWhenQueueIsNotEmpty) {
+  const int depth = 2;
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
+
+  Continuation continuation_1 = pipeline->Produce();
+  Continuation continuation_2 = pipeline->ProduceIfEmpty();
+
+  const int test_val_1 = 1, test_val_2 = 2;
+  bool result = continuation_1.Complete(std::make_unique<int>(test_val_1));
+  ASSERT_EQ(result, true);
+  result = continuation_2.Complete(std::make_unique<int>(test_val_2));
+  ASSERT_EQ(result, false);
+
+  PipelineConsumeResult consume_result_1 = pipeline->Consume(
+      [&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
+  ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
+}
+
+TEST(PipelineTest, ProduceIfEmptySuccessfulIfQueueIsEmpty) {
+  const int depth = 1;
+  fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
+
+  Continuation continuation_1 = pipeline->ProduceIfEmpty();
+
+  const int test_val_1 = 1;
+  bool result = continuation_1.Complete(std::make_unique<int>(test_val_1));
+  ASSERT_EQ(result, true);
+
+  PipelineConsumeResult consume_result_1 = pipeline->Consume(
+      [&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
+  ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
+}
+
+}  // namespace testing
+}  // namespace flutter
diff --git a/shell/common/rasterizer.cc b/shell/common/rasterizer.cc
index aa15c66..74329c9 100644
--- a/shell/common/rasterizer.cc
+++ b/shell/common/rasterizer.cc
@@ -4,11 +4,12 @@
 
 #include "flutter/shell/common/rasterizer.h"
 
+#include "flutter/shell/common/persistent_cache.h"
+
 #include <utility>
 
 #include "flutter/fml/time/time_delta.h"
 #include "flutter/fml/time/time_point.h"
-#include "flutter/shell/common/persistent_cache.h"
 #include "third_party/skia/include/core/SkEncodedImageFormat.h"
 #include "third_party/skia/include/core/SkImageEncoder.h"
 #include "third_party/skia/include/core/SkPictureRecorder.h"
@@ -110,7 +111,7 @@
   DrawToSurface(*last_layer_tree_);
 }
 
-void Rasterizer::Draw(std::shared_ptr<LayerTreeHolder> layer_tree_holder) {
+void Rasterizer::Draw(fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline) {
   TRACE_EVENT0("flutter", "GPURasterizer::Draw");
   if (raster_thread_merger_ &&
       !raster_thread_merger_->IsOnRasterizingThread()) {
@@ -119,32 +120,50 @@
   }
   FML_DCHECK(task_runners_.GetRasterTaskRunner()->RunsTasksOnCurrentThread());
 
-  std::unique_ptr<LayerTree> layer_tree = layer_tree_holder->Pop();
-  RasterStatus raster_status =
-      layer_tree ? DoDraw(std::move(layer_tree)) : RasterStatus::kFailed;
+  RasterStatus raster_status = RasterStatus::kFailed;
+  Pipeline<flutter::LayerTree>::Consumer consumer =
+      [&](std::unique_ptr<LayerTree> layer_tree) {
+        raster_status = DoDraw(std::move(layer_tree));
+      };
+
+  PipelineConsumeResult consume_result = pipeline->Consume(consumer);
+  // if the raster status is to resubmit the frame, we push the frame to the
+  // front of the queue and also change the consume status to more available.
+  if (raster_status == RasterStatus::kResubmit) {
+    auto front_continuation = pipeline->ProduceIfEmpty();
+    bool result =
+        front_continuation.Complete(std::move(resubmitted_layer_tree_));
+    if (result) {
+      consume_result = PipelineConsumeResult::MoreAvailable;
+    }
+  } else if (raster_status == RasterStatus::kEnqueuePipeline) {
+    consume_result = PipelineConsumeResult::MoreAvailable;
+  }
 
   // Merging the thread as we know the next `Draw` should be run on the platform
   // thread.
   if (raster_status == RasterStatus::kResubmit) {
-    layer_tree_holder->PushIfNewer(std::move(resubmitted_layer_tree_));
     auto* external_view_embedder = surface_->GetExternalViewEmbedder();
-    FML_DCHECK(external_view_embedder != nullptr)
-        << "kResubmit is an invalid raster status without external view "
-           "embedder.";
+    // We know only the `external_view_embedder` can
+    // causes|RasterStatus::kResubmit|. Check to make sure.
+    FML_DCHECK(external_view_embedder != nullptr);
     external_view_embedder->EndFrame(raster_thread_merger_);
   }
 
-  // Consume as many layer trees as possible. But yield the event loop
+  // Consume as many pipeline items as possible. But yield the event loop
   // between successive tries.
-  // Note: This behaviour is left as-is to be inline with the pipeline
-  // semantics. TODO(kaushikiska): explore removing this block.
-  if (!layer_tree_holder->IsEmpty()) {
-    task_runners_.GetRasterTaskRunner()->PostTask(
-        [weak_this = weak_factory_.GetWeakPtr(), layer_tree_holder]() {
-          if (weak_this) {
-            weak_this->Draw(layer_tree_holder);
-          }
-        });
+  switch (consume_result) {
+    case PipelineConsumeResult::MoreAvailable: {
+      task_runners_.GetRasterTaskRunner()->PostTask(
+          [weak_this = weak_factory_.GetTaskRunnerAffineWeakPtr(), pipeline]() {
+            if (weak_this) {
+              weak_this->Draw(pipeline);
+            }
+          });
+      break;
+    }
+    default:
+      break;
   }
 }
 
diff --git a/shell/common/rasterizer.h b/shell/common/rasterizer.h
index 1a4a3cf..6de1a84 100644
--- a/shell/common/rasterizer.h
+++ b/shell/common/rasterizer.h
@@ -19,7 +19,7 @@
 #include "flutter/fml/time/time_delta.h"
 #include "flutter/fml/time/time_point.h"
 #include "flutter/lib/ui/snapshot_delegate.h"
-#include "flutter/shell/common/layer_tree_holder.h"
+#include "flutter/shell/common/pipeline.h"
 #include "flutter/shell/common/surface.h"
 
 namespace flutter {
@@ -230,27 +230,35 @@
   flutter::TextureRegistry* GetTextureRegistry();
 
   //----------------------------------------------------------------------------
-  /// @brief      Takes the latest item from the layer tree holder and executes
-  ///             the raster thread frame workload for that item to render a
-  ///             frame on the on-screen surface.
+  /// @brief      Takes the next item from the layer tree pipeline and executes
+  ///             the raster thread frame workload for that pipeline item to
+  ///             render a frame on the on-screen surface.
   ///
-  ///             Why does the draw call take a layer tree holder and not the
+  ///             Why does the draw call take a layer tree pipeline and not the
   ///             layer tree directly?
   ///
-  ///             The layer tree holder is a thread safe way to produce frame
-  ///             workloads from the UI thread and rasterize them on the raster
-  ///             thread. To account for scenarious where the UI thread
-  ///             continues to produce the frames while a raster task is queued,
-  ///             `Rasterizer::DoDraw` that gets executed on the raster thread
-  ///             must pick up the newest layer tree produced by the UI thread.
-  ///             If we were to pass the layer tree as opposed to the holder, it
-  ///             would result in stale frames being rendered.
+  ///             The pipeline is the way book-keeping of frame workloads
+  ///             distributed across the multiple threads is managed. The
+  ///             rasterizer deals with the pipelines directly (instead of layer
+  ///             trees which is what it actually renders) because the pipeline
+  ///             consumer's workload must be accounted for within the pipeline
+  ///             itself. If the rasterizer took the layer tree directly, it
+  ///             would have to be taken out of the pipeline. That would signal
+  ///             the end of the frame workload and the pipeline would be ready
+  ///             for new frames. But the last frame has not been rendered by
+  ///             the frame yet! On the other hand, the pipeline must own the
+  ///             layer tree it renders because it keeps a reference to the last
+  ///             layer tree around till a new frame is rendered. So a simple
+  ///             reference wont work either. The `Rasterizer::DoDraw` method
+  ///             actually performs the GPU operations within the layer tree
+  ///             pipeline.
   ///
   /// @see        `Rasterizer::DoDraw`
   ///
-  /// @param[in]  layer_tree_holder  The layer tree holder to take the latest
-  ///                                layer tree to render from.
-  void Draw(std::shared_ptr<LayerTreeHolder> layer_tree_holder);
+  /// @param[in]  pipeline  The layer tree pipeline to take the next layer tree
+  ///                       to render from.
+  ///
+  void Draw(fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline);
 
   //----------------------------------------------------------------------------
   /// @brief      The type of the screenshot to obtain of the previously
@@ -417,8 +425,7 @@
   std::unique_ptr<flutter::LayerTree> last_layer_tree_;
   // Set when we need attempt to rasterize the layer tree again. This layer_tree
   // has not successfully rasterized. This can happen due to the change in the
-  // thread configuration. This layer tree could be rasterized again if there
-  // are no newer ones.
+  // thread configuration. This will be inserted to the front of the pipeline.
   std::unique_ptr<flutter::LayerTree> resubmitted_layer_tree_;
   fml::closure next_frame_callback_;
   bool user_override_resource_cache_bytes_;
diff --git a/shell/common/shell.cc b/shell/common/shell.cc
index 6fff42e..d83fadb 100644
--- a/shell/common/shell.cc
+++ b/shell/common/shell.cc
@@ -952,7 +952,7 @@
 }
 
 // |Animator::Delegate|
-void Shell::OnAnimatorDraw(std::shared_ptr<LayerTreeHolder> layer_tree_holder,
+void Shell::OnAnimatorDraw(fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline,
                            fml::TimePoint frame_target_time) {
   FML_DCHECK(is_setup_);
 
@@ -970,9 +970,10 @@
       [&waiting_for_first_frame = waiting_for_first_frame_,
        &waiting_for_first_frame_condition = waiting_for_first_frame_condition_,
        rasterizer = rasterizer_->GetWeakPtr(),
-       layer_tree_holder = std::move(layer_tree_holder)]() {
+       pipeline = std::move(pipeline)]() {
         if (rasterizer) {
-          rasterizer->Draw(std::move(layer_tree_holder));
+          rasterizer->Draw(pipeline);
+
           if (waiting_for_first_frame.load()) {
             waiting_for_first_frame.store(false);
             waiting_for_first_frame_condition.notify_all();
diff --git a/shell/common/shell.h b/shell/common/shell.h
index 83eb396..a39884a 100644
--- a/shell/common/shell.h
+++ b/shell/common/shell.h
@@ -30,7 +30,6 @@
 #include "flutter/runtime/service_protocol.h"
 #include "flutter/shell/common/animator.h"
 #include "flutter/shell/common/engine.h"
-#include "flutter/shell/common/layer_tree_holder.h"
 #include "flutter/shell/common/platform_view.h"
 #include "flutter/shell/common/rasterizer.h"
 #include "flutter/shell/common/shell_io_manager.h"
@@ -490,7 +489,7 @@
   void OnAnimatorNotifyIdle(int64_t deadline) override;
 
   // |Animator::Delegate|
-  void OnAnimatorDraw(std::shared_ptr<LayerTreeHolder> layer_tree_holder,
+  void OnAnimatorDraw(fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline,
                       fml::TimePoint frame_target_time) override;
 
   // |Animator::Delegate|
diff --git a/shell/common/shell_test.cc b/shell/common/shell_test.cc
index a24c9eb..8ef045c 100644
--- a/shell/common/shell_test.cc
+++ b/shell/common/shell_test.cc
@@ -136,7 +136,7 @@
                              flutter::ViewportMetrics viewport_metrics,
                              LayerTreeBuilder builder) {
   // Set viewport to nonempty, and call Animator::BeginFrame to make the layer
-  // tree holder nonempty. Without either of this, the layer tree below
+  // tree pipeline nonempty. Without either of this, the layer tree below
   // won't be rasterized.
   fml::AutoResetWaitableEvent latch;
   shell->GetTaskRunners().GetUITaskRunner()->PostTask(