class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
+ std::shared_ptr<AsyncSocket> sock) {
+
pipelines++;
- return new BytesPipeline();
+ return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
+ new BytesPipeline());
}
std::atomic<int> pipelines{0};
};
class TestHandlerPipelineFactory
: public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
public:
- ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
+ std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+ folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) {
+
+ std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+ folly::DelayedDestruction::Destructor> pipeline(
+ new ServerBootstrap<BytesPipeline>::AcceptPipeline);
pipeline->addBack(HandlerPipeline());
return pipeline;
}
}
ClientBootstrap* connect(SocketAddress address) {
DCHECK(pipelineFactory_);
- pipeline_.reset(
+ pipeline_=
pipelineFactory_->newPipeline(
- AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
- ));
+ AsyncSocket::newSocket(
+ EventBaseManager::get()->getEventBase(), address));
return this;
}
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
- class ServerConnection : public wangle::ManagedConnection {
+ class ServerConnection : public wangle::ManagedConnection,
+ public wangle::PipelineManager {
public:
explicit ServerConnection(PipelinePtr pipeline)
- : pipeline_(std::move(pipeline)) {}
-
- ~ServerConnection() {
+ : pipeline_(std::move(pipeline)) {
+ pipeline_->setPipelineManager(this);
}
- void timeoutExpired() noexcept {
+ ~ServerConnection() {}
+
+ void timeoutExpired() noexcept override {
}
- void describe(std::ostream& os) const {}
- bool isBusy() const {
+ void describe(std::ostream& os) const override {}
+ bool isBusy() const override {
return false;
}
- void notifyPendingShutdown() {}
- void closeWhenIdle() {}
- void dropConnection() {
+ void notifyPendingShutdown() override {}
+ void closeWhenIdle() override {}
+ void dropConnection() override {
delete this;
}
- void dumpConnectionState(uint8_t loglevel) {}
+ void dumpConnectionState(uint8_t loglevel) override {}
+
+ void deletePipeline(wangle::PipelineBase* p) override {
+ CHECK(p == pipeline_.get());
+ delete this;
+ }
+
private:
PipelinePtr pipeline_;
};
typedef wangle::Pipeline<void*> AcceptPipeline;
public:
- AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- return new AcceptPipeline;
+ std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) {
+
+ return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+ (new AcceptPipeline);
}
};
}
sockets_->clear();
}
+ if (!stopped_) {
+ stopped_ = true;
+ // stopBaton_ may be null if ServerBootstrap has been std::move'd
+ if (stopBaton_) {
+ stopBaton_->post();
+ }
+ }
}
void join() {
}
}
+ void waitForStop() {
+ if (!stopped_) {
+ CHECK(stopBaton_);
+ stopBaton_->wait();
+ }
+ }
+
/*
* Get the list of listening sockets
*/
std::make_shared<DefaultAcceptPipelineFactory>()};
std::shared_ptr<ServerSocketFactory> socketFactory_{
std::make_shared<AsyncServerSocketFactory>()};
+
+ std::unique_ptr<folly::Baton<>> stopBaton_{
+ folly::make_unique<folly::Baton<>>()};
+ bool stopped_{false};
};
} // namespace
detachReadCallback();
socket_->closeNow();
}
+ ctx->getPipeline()->deletePipeline();
return folly::makeFuture();
}
}
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
}
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
}
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
namespace folly { namespace wangle {
+class PipelineBase;
+
template <class In, class Out>
class HandlerContext {
public:
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
};
namespace folly { namespace wangle {
+class PipelineManager {
+ public:
+ virtual ~PipelineManager() {}
+ virtual void deletePipeline(PipelineBase* pipeline) = 0;
+};
+
+class PipelineBase {
+ public:
+ void setPipelineManager(PipelineManager* manager) {
+ manager_ = manager;
+ }
+
+ void deletePipeline() {
+ if (manager_) {
+ manager_->deletePipeline(this);
+ }
+ }
+
+ private:
+ PipelineManager* manager_{nullptr};
+};
+
struct Nothing{};
/*
* If W is Nothing, write() and close() will be disabled.
*/
template <class R, class W = Nothing>
-class Pipeline : public DelayedDestruction {
+class Pipeline : public PipelineBase, public DelayedDestruction {
public:
Pipeline();
~Pipeline();
template <typename Pipeline>
class PipelineFactory {
public:
- virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+ virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+
virtual ~PipelineFactory() {}
};
/*
* StringCodec converts a pipeline from IOBufs to std::strings.
*/
-class StringCodec : public Handler<IOBufQueue&, std::string,
+class StringCodec : public Handler<std::unique_ptr<IOBuf>, std::string,
std::string, std::unique_ptr<IOBuf>> {
public:
typedef typename Handler<
- IOBufQueue&, std::string,
+ std::unique_ptr<IOBuf>, std::string,
std::string, std::unique_ptr<IOBuf>>::Context Context;
- void read(Context* ctx, IOBufQueue& q) override {
- auto buf = q.pop_front();
+ void read(Context* ctx, std::unique_ptr<IOBuf> buf) override {
buf->coalesce();
std::string data((const char*)buf->data(), buf->length());
#include <gtest/gtest.h>
#include <folly/wangle/codec/StringCodec.h>
+#include <folly/wangle/codec/ByteToMessageCodec.h>
#include <folly/wangle/service/ClientDispatcher.h>
#include <folly/wangle/service/ServerDispatcher.h>
#include <folly/wangle/service/Service.h>
typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
+class SimpleDecode : public ByteToMessageCodec {
+ public:
+ virtual std::unique_ptr<IOBuf> decode(
+ Context* ctx, IOBufQueue& buf, size_t&) {
+ return buf.move();
+ }
+};
+
class EchoService : public Service<std::string, std::string> {
public:
virtual Future<std::string> operator()(std::string req) override {
: public PipelineFactory<ServicePipeline> {
public:
- ServicePipeline* newPipeline(
- std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new ServicePipeline();
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+ new ServicePipeline());
pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(SimpleDecode());
pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
pipeline->finalize();
class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
public:
- ServicePipeline* newPipeline(
- std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new ServicePipeline();
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+ new ServicePipeline());
pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(SimpleDecode());
pipeline->addBack(StringCodec());
+ pipeline->finalize();
return pipeline;
}
};