From: James Sedgwick Date: Wed, 13 May 2015 01:42:39 +0000 (-0700) Subject: SharedPromise in OutputBufferingHandler X-Git-Tag: v0.39.0~21 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=86d0e6a9d4745170e4638ae38154c10b323a2a59;p=folly.git SharedPromise in OutputBufferingHandler Summary: as above. I'm torn on whether to sugar "*this = SharedPromise" as SharedPromise::reset() If I see another use case I'll probably do it Test Plan: unit Reviewed By: hans@fb.com Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2064449 Signature: t1:2064449:1431476780:7113366b11feaf9e8a4ea1dc60fbafb36dd46ac5 --- diff --git a/folly/futures/SharedPromise.h b/folly/futures/SharedPromise.h index 5041a38a..c681a3f0 100644 --- a/folly/futures/SharedPromise.h +++ b/folly/futures/SharedPromise.h @@ -83,14 +83,14 @@ public: template typename std::enable_if::value, void>::type setValue() { - set(Try()); + setTry(Try()); } /// Sugar to fulfill this SharedPromise template typename std::enable_if::value, void>::type setValue() { - set(Try(T())); + setTry(Try(T())); } /** Set the value (use perfect forwarding for both move and copy) */ diff --git a/folly/wangle/channel/OutputBufferingHandler.h b/folly/wangle/channel/OutputBufferingHandler.h index 31abbdb0..d712b8a0 100644 --- a/folly/wangle/channel/OutputBufferingHandler.h +++ b/folly/wangle/channel/OutputBufferingHandler.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -48,20 +49,16 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler, DCHECK(isLoopCallbackScheduled()); sends_->prependChain(std::move(buf)); } - Promise p; - auto f = p.getFuture(); - promises_.push_back(std::move(p)); - return f; + return sharedPromise_.getFuture(); } } void runLoopCallback() noexcept override { - MoveWrapper>> promises(std::move(promises_)); + MoveWrapper> sharedPromise; + std::swap(*sharedPromise, sharedPromise_); getContext()->fireWrite(std::move(sends_)) - .then([promises](Try t) mutable { - for (auto& p : *promises) { - p.setTry(Try(t)); - } + .then([sharedPromise](Try t) mutable { + sharedPromise->setTry(std::move(t)); }); } @@ -71,17 +68,15 @@ class OutputBufferingHandler : public OutboundBytesToBytesHandler, } // If there are sends queued, cancel them - for (auto& promise : promises_) { - promise.setException( - folly::make_exception_wrapper( - "close() called while sends still pending")); - } + sharedPromise_.setException( + folly::make_exception_wrapper( + "close() called while sends still pending")); sends_.reset(); - promises_.clear(); + sharedPromise_ = SharedPromise(); return ctx->fireClose(); } - std::vector> promises_; + SharedPromise sharedPromise_; std::unique_ptr sends_{nullptr}; bool queueSends_{true}; }; diff --git a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp index 51f67275..a0279666 100644 --- a/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp +++ b/folly/wangle/channel/test/OutputBufferingHandlerTest.cpp @@ -56,4 +56,11 @@ TEST(OutputBufferingHandlerTest, Basic) { EXPECT_TRUE(f1.isReady()); EXPECT_TRUE(f2.isReady()); EXPECT_CALL(mockHandler, detachPipeline(_)); + + // Make sure the SharedPromise resets correctly + auto f = pipeline.write(IOBuf::copyBuffer("foo")); + EXPECT_FALSE(f.isReady()); + EXPECT_CALL(mockHandler, write_(_, IOBufContains("foo"))); + eb.loopOnce(); + EXPECT_TRUE(f.isReady()); }