namespace folly {
+typedef folly::wangle::ChannelPipeline<
+ folly::IOBufQueue&, std::unique_ptr<folly::IOBuf>> DefaultPipeline;
+
/*
* ServerBootstrap is a parent class intended to set up a
* high-performance TCP accepting server. It will manage a pool of
*
* @param childHandler - acceptor factory to call for each IO thread
*/
- ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
- acceptorFactory_ = childHandler;
+ ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> h) {
+ acceptorFactory_ = h;
return this;
}
sockets_.push_back(socket);
}
- void bind(folly::SocketAddress address) {
+ void bind(folly::SocketAddress& address) {
bindImpl(-1, address);
}
*/
void bind(int port) {
CHECK(port >= 0);
- bindImpl(port, folly::SocketAddress());
+ folly::SocketAddress address;
+ bindImpl(port, address);
}
- void bindImpl(int port, folly::SocketAddress address) {
+ void bindImpl(int port, folly::SocketAddress& address) {
if (!workerFactory_) {
group(nullptr);
}
std::mutex sock_lock;
std::vector<std::shared_ptr<folly::AsyncServerSocket>> new_sockets;
+ std::exception_ptr exn;
+
auto startupFunc = [&](std::shared_ptr<folly::Baton<>> barrier){
auto socket = folly::AsyncServerSocket::newSocket();
- sock_lock.lock();
- new_sockets.push_back(socket);
- sock_lock.unlock();
socket->setReusePortEnabled(reusePort);
socket->attachEventBase(EventBaseManager::get()->getEventBase());
- if (port >= 0) {
- socket->bind(port);
- } else {
- socket->bind(address);
- port = address.getPort();
+
+ try {
+ if (port >= 0) {
+ socket->bind(port);
+ } else {
+ socket->bind(address);
+ port = address.getPort();
+ }
+
+ socket->listen(socketConfig.acceptBacklog);
+ socket->startAccepting();
+ } catch (...) {
+ exn = std::current_exception();
+ barrier->post();
+
+ return;
}
- socket->listen(socketConfig.acceptBacklog);
- socket->startAccepting();
+
+ sock_lock.lock();
+ new_sockets.push_back(socket);
+ sock_lock.unlock();
if (port == 0) {
- SocketAddress address;
socket->getAddress(&address);
port = address.getPort();
}
barrier->wait();
}
+ if (exn) {
+ std::rethrow_exception(exn);
+ }
+
// Startup all the threads
for(auto socket : new_sockets) {
workerFactory_->forEachWorker([this, socket](Acceptor* worker){
ChannelPipeline<R, W, Handlers...>::finalizeHelper();
back_ = ChannelPipeline<R, W, Handlers...>::back_;
if (!back_) {
- auto is_end = ChannelPipeline<R, W, Handlers...>::is_end;
- CHECK(is_end);
+ auto is_at_end = ChannelPipeline<R, W, Handlers...>::is_end;
+ CHECK(is_at_end);
back_ = dynamic_cast<OutboundChannelHandlerContext<W>*>(&ctx_);
if (!back_) {
throw std::invalid_argument("wrong type for last handler");