namespace folly { namespace wangle {
+// This handler may only be used in a single Pipeline
class AsyncSocketHandler
: public folly::wangle::BytesToBytesHandler,
public AsyncSocket::ReadCallback {
}
void attachPipeline(Context* ctx) override {
- CHECK(!ctx_);
- ctx_ = ctx;
attachReadCallback();
}
}
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
- const auto readBufferSettings = ctx_->getReadBufferSettings();
+ const auto readBufferSettings = getContext()->getReadBufferSettings();
const auto ret = bufQueue_.preallocate(
readBufferSettings.first,
readBufferSettings.second);
void readDataAvailable(size_t len) noexcept override {
bufQueue_.postallocate(len);
- ctx_->fireRead(bufQueue_);
+ getContext()->fireRead(bufQueue_);
}
void readEOF() noexcept override {
- ctx_->fireReadEOF();
+ getContext()->fireReadEOF();
}
void readErr(const AsyncSocketException& ex)
noexcept override {
- ctx_->fireReadException(make_exception_wrapper<AsyncSocketException>(ex));
+ getContext()->fireReadException(
+ make_exception_wrapper<AsyncSocketException>(ex));
}
private:
folly::Promise<void> promise_;
};
- Context* ctx_{nullptr};
folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
std::shared_ptr<AsyncSocket> socket_{nullptr};
};
namespace folly { namespace wangle {
+template <class Context>
+class HandlerBase {
+ public:
+ virtual ~HandlerBase() {}
+
+ virtual void attachPipeline(Context* ctx) {}
+ virtual void attachTransport(Context* ctx) {}
+
+ virtual void detachPipeline(Context* ctx) {}
+ virtual void detachTransport(Context* ctx) {}
+
+ Context* getContext() {
+ if (attachCount_ != 1) {
+ return nullptr;
+ }
+ CHECK(ctx_);
+ return ctx_;
+ }
+
+ private:
+ friend detail::HandlerContextBase<Context>;
+ uint64_t attachCount_{0};
+ Context* ctx_{nullptr};
+};
+
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
-class Handler {
+class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:
typedef Rin rin;
typedef Rout rout;
return ctx->fireClose();
}
- virtual void attachPipeline(Context* ctx) {}
- virtual void attachTransport(Context* ctx) {}
-
- virtual void detachPipeline(Context* ctx) {}
- virtual void detachTransport(Context* ctx) {}
-
/*
// Other sorts of things we might want, all shamelessly stolen from Netty
// inbound
namespace folly { namespace wangle {
+namespace detail {
+
+template <class HandlerContext>
+class HandlerContextBase {
+ protected:
+ template <class H>
+ void attachContext(H* handler, HandlerContext* ctx) {
+ if (++handler->attachCount_ == 1) {
+ handler->ctx_ = ctx;
+ } else {
+ handler->ctx_ = nullptr;
+ }
+ }
+};
+
+} // detail
+
template <class In, class Out>
-class HandlerContext {
+class HandlerContext
+ : public detail::HandlerContextBase<HandlerContext<In, Out>> {
public:
virtual ~HandlerContext() {}
// PipelineContext overrides
void attachPipeline() override {
if (!attached_) {
+ this->attachContext(handler_.get(), this);
handler_->attachPipeline(this);
attached_ = true;
}
/*
* OutputBufferingHandler buffers writes in order to minimize syscalls. The
* transport will be written to once per event loop instead of on every write.
+ *
+ * This handler may only be used in a single Pipeline.
*/
class OutputBufferingHandler : public BytesToBytesHandler,
protected EventBase::LoopCallback {
if (!queueSends_) {
return ctx->fireWrite(std::move(buf));
} else {
- ctx_ = ctx;
// Delay sends to optimize for fewer syscalls
if (!sends_) {
DCHECK(!isLoopCallbackScheduled());
void runLoopCallback() noexcept override {
MoveWrapper<std::vector<Promise<void>>> promises(std::move(promises_));
- ctx_->fireWrite(std::move(sends_)).then([promises](Try<void> t) mutable {
- for (auto& p : *promises) {
- p.setTry(t);
- }
- });
+ getContext()->fireWrite(std::move(sends_))
+ .then([promises](Try<void> t) mutable {
+ for (auto& p : *promises) {
+ p.setTry(t);
+ }
+ });
}
Future<void> close(Context* ctx) override {
std::vector<Promise<void>> promises_;
std::unique_ptr<IOBuf> sends_{nullptr};
bool queueSends_{true};
- Context* ctx_;
};
}}
}
}
+TEST(PipelineTest, GetContext) {
+ IntHandler handler;
+ EXPECT_CALL(handler, attachPipeline(_));
+ StaticPipeline<int, int, IntHandler> pipeline(&handler);
+ EXPECT_TRUE(handler.getContext());
+ EXPECT_CALL(handler, detachPipeline(_));
+}
+
TEST(PipelineTest, HandlerInMultiplePipelines) {
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_)).Times(2);
StaticPipeline<int, int, IntHandler> pipeline1(&handler);
StaticPipeline<int, int, IntHandler> pipeline2(&handler);
+ EXPECT_FALSE(handler.getContext());
EXPECT_CALL(handler, detachPipeline(_)).Times(2);
}
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_)).Times(2);
StaticPipeline<int, int, IntHandler, IntHandler> pipeline(&handler, &handler);
+ EXPECT_FALSE(handler.getContext());
EXPECT_CALL(handler, detachPipeline(_)).Times(2);
}