+
+TEST(Observer, SubscribeCallback) {
+ static auto mainThreadId = std::this_thread::get_id();
+ static std::function<void()> updatesCob;
+ static bool slowGet = false;
+ static std::atomic<size_t> getCallsStart{0};
+ static std::atomic<size_t> getCallsFinish{0};
+
+ struct Observable {
+ ~Observable() {
+ EXPECT_EQ(mainThreadId, std::this_thread::get_id());
+ }
+ };
+ struct Traits {
+ using element_type = int;
+ static std::shared_ptr<const int> get(Observable&) {
+ ++getCallsStart;
+ if (slowGet) {
+ /* sleep override */ std::this_thread::sleep_for(
+ std::chrono::seconds{2});
+ }
+ ++getCallsFinish;
+ return std::make_shared<const int>(42);
+ }
+
+ static void subscribe(Observable&, std::function<void()> cob) {
+ updatesCob = std::move(cob);
+ }
+
+ static void unsubscribe(Observable&) {}
+ };
+
+ std::thread cobThread;
+ {
+ auto observer =
+ folly::observer::ObserverCreator<Observable, Traits>().getObserver();
+
+ EXPECT_TRUE(updatesCob);
+ EXPECT_EQ(2, getCallsStart);
+ EXPECT_EQ(2, getCallsFinish);
+
+ updatesCob();
+ EXPECT_EQ(3, getCallsStart);
+ EXPECT_EQ(3, getCallsFinish);
+
+ slowGet = true;
+ cobThread = std::thread([] { updatesCob(); });
+ /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
+ EXPECT_EQ(4, getCallsStart);
+ EXPECT_EQ(3, getCallsFinish);
+
+ // Observer is destroyed here
+ }
+
+ // Make sure that destroying the observer actually joined the updates callback
+ EXPECT_EQ(4, getCallsStart);
+ EXPECT_EQ(4, getCallsFinish);
+ cobThread.join();
+}