Summary: similar to https://github.com/netty/netty/tree/master/example/src/main/java/io/netty/example/telnet
Test Plan:
fbconfig folly/wangle/example/telnet; fbmake dbg
_bin/folly/wangle/example/telnet_server --port=8080
telnet localhost 8080
Still a couple sharp edges:
* No easy way to wait for ServerBootstrap termination.
* Pipelines always have to call attachReadCallback
* a bunch of missing methods in pipeline still, like channelActive
Reviewed By: hans@fb.com
Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant
FB internal diff:
D1959172
Signature: t1:
1959172:
1427993978:
463f237036996451187e3ef3983cf2b4e89685ef
class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor> newPipeline(
+ std::shared_ptr<AsyncSocket> sock) {
+
- return new BytesPipeline();
+ return std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>(
+ new BytesPipeline());
}
std::atomic<int> pipelines{0};
};
}
std::atomic<int> pipelines{0};
};
class TestHandlerPipelineFactory
: public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
public:
class TestHandlerPipelineFactory
: public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
public:
- ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
+ std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+ folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) {
+
+ std::unique_ptr<ServerBootstrap<BytesPipeline>::AcceptPipeline,
+ folly::DelayedDestruction::Destructor> pipeline(
+ new ServerBootstrap<BytesPipeline>::AcceptPipeline);
pipeline->addBack(HandlerPipeline());
return pipeline;
}
pipeline->addBack(HandlerPipeline());
return pipeline;
}
}
ClientBootstrap* connect(SocketAddress address) {
DCHECK(pipelineFactory_);
}
ClientBootstrap* connect(SocketAddress address) {
DCHECK(pipelineFactory_);
pipelineFactory_->newPipeline(
pipelineFactory_->newPipeline(
- AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
- ));
+ AsyncSocket::newSocket(
+ EventBaseManager::get()->getEventBase(), address));
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
- class ServerConnection : public wangle::ManagedConnection {
+ class ServerConnection : public wangle::ManagedConnection,
+ public wangle::PipelineManager {
public:
explicit ServerConnection(PipelinePtr pipeline)
public:
explicit ServerConnection(PipelinePtr pipeline)
- : pipeline_(std::move(pipeline)) {}
-
- ~ServerConnection() {
+ : pipeline_(std::move(pipeline)) {
+ pipeline_->setPipelineManager(this);
- void timeoutExpired() noexcept {
+ ~ServerConnection() {}
+
+ void timeoutExpired() noexcept override {
- void describe(std::ostream& os) const {}
- bool isBusy() const {
+ void describe(std::ostream& os) const override {}
+ bool isBusy() const override {
- void notifyPendingShutdown() {}
- void closeWhenIdle() {}
- void dropConnection() {
+ void notifyPendingShutdown() override {}
+ void closeWhenIdle() override {}
+ void dropConnection() override {
- void dumpConnectionState(uint8_t loglevel) {}
+ void dumpConnectionState(uint8_t loglevel) override {}
+
+ void deletePipeline(wangle::PipelineBase* p) override {
+ CHECK(p == pipeline_.get());
+ delete this;
+ }
+
private:
PipelinePtr pipeline_;
};
private:
PipelinePtr pipeline_;
};
typedef wangle::Pipeline<void*> AcceptPipeline;
public:
typedef wangle::Pipeline<void*> AcceptPipeline;
public:
- AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
- return new AcceptPipeline;
+ std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) {
+
+ return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
+ (new AcceptPipeline);
+ if (!stopped_) {
+ stopped_ = true;
+ // stopBaton_ may be null if ServerBootstrap has been std::move'd
+ if (stopBaton_) {
+ stopBaton_->post();
+ }
+ }
+ void waitForStop() {
+ if (!stopped_) {
+ CHECK(stopBaton_);
+ stopBaton_->wait();
+ }
+ }
+
/*
* Get the list of listening sockets
*/
/*
* Get the list of listening sockets
*/
std::make_shared<DefaultAcceptPipelineFactory>()};
std::shared_ptr<ServerSocketFactory> socketFactory_{
std::make_shared<AsyncServerSocketFactory>()};
std::make_shared<DefaultAcceptPipelineFactory>()};
std::shared_ptr<ServerSocketFactory> socketFactory_{
std::make_shared<AsyncServerSocketFactory>()};
+
+ std::unique_ptr<folly::Baton<>> stopBaton_{
+ folly::make_unique<folly::Baton<>>()};
+ bool stopped_{false};
detachReadCallback();
socket_->closeNow();
}
detachReadCallback();
socket_->closeNow();
}
+ ctx->getPipeline()->deletePipeline();
return folly::makeFuture();
}
return folly::makeFuture();
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
+ PipelineBase* getPipeline() override {
+ return this->pipeline_;
+ }
+
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
namespace folly { namespace wangle {
namespace folly { namespace wangle {
template <class In, class Out>
class HandlerContext {
public:
template <class In, class Out>
class HandlerContext {
public:
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
+ virtual PipelineBase* getPipeline() = 0;
+
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
};
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
};
namespace folly { namespace wangle {
namespace folly { namespace wangle {
+class PipelineManager {
+ public:
+ virtual ~PipelineManager() {}
+ virtual void deletePipeline(PipelineBase* pipeline) = 0;
+};
+
+class PipelineBase {
+ public:
+ void setPipelineManager(PipelineManager* manager) {
+ manager_ = manager;
+ }
+
+ void deletePipeline() {
+ if (manager_) {
+ manager_->deletePipeline(this);
+ }
+ }
+
+ private:
+ PipelineManager* manager_{nullptr};
+};
+
* If W is Nothing, write() and close() will be disabled.
*/
template <class R, class W = Nothing>
* If W is Nothing, write() and close() will be disabled.
*/
template <class R, class W = Nothing>
-class Pipeline : public DelayedDestruction {
+class Pipeline : public PipelineBase, public DelayedDestruction {
public:
Pipeline();
~Pipeline();
public:
Pipeline();
~Pipeline();
template <typename Pipeline>
class PipelineFactory {
public:
template <typename Pipeline>
class PipelineFactory {
public:
- virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+ virtual std::unique_ptr<Pipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+
virtual ~PipelineFactory() {}
};
virtual ~PipelineFactory() {}
};
/*
* StringCodec converts a pipeline from IOBufs to std::strings.
*/
/*
* StringCodec converts a pipeline from IOBufs to std::strings.
*/
-class StringCodec : public Handler<IOBufQueue&, std::string,
+class StringCodec : public Handler<std::unique_ptr<IOBuf>, std::string,
std::string, std::unique_ptr<IOBuf>> {
public:
typedef typename Handler<
std::string, std::unique_ptr<IOBuf>> {
public:
typedef typename Handler<
- IOBufQueue&, std::string,
+ std::unique_ptr<IOBuf>, std::string,
std::string, std::unique_ptr<IOBuf>>::Context Context;
std::string, std::unique_ptr<IOBuf>>::Context Context;
- void read(Context* ctx, IOBufQueue& q) override {
- auto buf = q.pop_front();
+ void read(Context* ctx, std::unique_ptr<IOBuf> buf) override {
buf->coalesce();
std::string data((const char*)buf->data(), buf->length());
buf->coalesce();
std::string data((const char*)buf->data(), buf->length());
#include <gtest/gtest.h>
#include <folly/wangle/codec/StringCodec.h>
#include <gtest/gtest.h>
#include <folly/wangle/codec/StringCodec.h>
+#include <folly/wangle/codec/ByteToMessageCodec.h>
#include <folly/wangle/service/ClientDispatcher.h>
#include <folly/wangle/service/ServerDispatcher.h>
#include <folly/wangle/service/Service.h>
#include <folly/wangle/service/ClientDispatcher.h>
#include <folly/wangle/service/ServerDispatcher.h>
#include <folly/wangle/service/Service.h>
typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
typedef Pipeline<IOBufQueue&, std::string> ServicePipeline;
+class SimpleDecode : public ByteToMessageCodec {
+ public:
+ virtual std::unique_ptr<IOBuf> decode(
+ Context* ctx, IOBufQueue& buf, size_t&) {
+ return buf.move();
+ }
+};
+
class EchoService : public Service<std::string, std::string> {
public:
virtual Future<std::string> operator()(std::string req) override {
class EchoService : public Service<std::string, std::string> {
public:
virtual Future<std::string> operator()(std::string req) override {
: public PipelineFactory<ServicePipeline> {
public:
: public PipelineFactory<ServicePipeline> {
public:
- ServicePipeline* newPipeline(
- std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new ServicePipeline();
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+ new ServicePipeline());
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(SimpleDecode());
pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
pipeline->finalize();
pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
pipeline->finalize();
class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
public:
class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
public:
- ServicePipeline* newPipeline(
- std::shared_ptr<AsyncSocket> socket) override {
- auto pipeline = new ServicePipeline();
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor>
+ newPipeline(std::shared_ptr<AsyncSocket> socket) override {
+ std::unique_ptr<ServicePipeline, folly::DelayedDestruction::Destructor> pipeline(
+ new ServicePipeline());
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(AsyncSocketHandler(socket));
+ pipeline->addBack(SimpleDecode());
pipeline->addBack(StringCodec());
pipeline->addBack(StringCodec());