9bd3c83d8868d36a115077a4df6ed8cc00a2177a
[folly.git] / folly / experimental / observer / Observable-inl.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 namespace folly {
19 namespace observer {
20
21 template <typename Observable, typename Traits>
22 class ObserverCreator<Observable, Traits>::Context {
23  public:
24   template <typename... Args>
25   Context(Args&&... args) : observable_(std::forward<Args>(args)...) {}
26
27   ~Context() {
28     if (value_.copy()) {
29       Traits::unsubscribe(observable_);
30     }
31   }
32
33   void setCore(observer_detail::Core::WeakPtr coreWeak) {
34     coreWeak_ = std::move(coreWeak);
35   }
36
37   std::shared_ptr<const T> get() {
38     updateRequested_ = false;
39     return value_.copy();
40   }
41
42   void update() {
43     {
44       auto newValue = Traits::get(observable_);
45       if (!newValue) {
46         throw std::logic_error("Observable returned nullptr.");
47       }
48       value_.swap(newValue);
49     }
50
51     bool expected = false;
52     if (updateRequested_.compare_exchange_strong(expected, true)) {
53       if (auto core = coreWeak_.lock()) {
54         observer_detail::ObserverManager::scheduleRefreshNewVersion(
55             std::move(core));
56       }
57     }
58   }
59
60   template <typename F>
61   void subscribe(F&& callback) {
62     Traits::subscribe(observable_, std::forward<F>(callback));
63   }
64
65  private:
66   folly::Synchronized<std::shared_ptr<const T>> value_;
67   std::atomic<bool> updateRequested_{false};
68
69   observer_detail::Core::WeakPtr coreWeak_;
70
71   Observable observable_;
72 };
73
74 template <typename Observable, typename Traits>
75 template <typename... Args>
76 ObserverCreator<Observable, Traits>::ObserverCreator(Args&&... args)
77     : context_(std::make_shared<Context>(std::forward<Args>(args)...)) {}
78
79 template <typename Observable, typename Traits>
80 Observer<typename ObserverCreator<Observable, Traits>::T>
81 ObserverCreator<Observable, Traits>::getObserver()&& {
82   auto core = observer_detail::Core::create([context = context_]() {
83     return context->get();
84   });
85
86   context_->setCore(core);
87
88   context_->subscribe([contextWeak = std::weak_ptr<Context>(context_)] {
89     if (auto context = contextWeak.lock()) {
90       context->update();
91     }
92   });
93
94   context_->update();
95   context_.reset();
96
97   DCHECK(core->getVersion() > 0);
98
99   return Observer<T>(std::move(core));
100 }
101 }
102 }