}
typedef size_t Handle;
- typedef std::function<Future<void>()> FutureFunc;
+ typedef std::function<Future<Unit>()> FutureFunc;
Handle add(FutureFunc func, Executor* executor = nullptr) {
nodes.emplace_back(std::move(func), executor);
nodes[a].hasDependents = true;
}
- Future<void> go() {
+ Future<Unit> go() {
if (hasCycle()) {
- return makeFuture<void>(std::runtime_error("Cycle in FutureDAG graph"));
+ return makeFuture<Unit>(std::runtime_error("Cycle in FutureDAG graph"));
}
std::vector<Handle> rootNodes;
std::vector<Handle> leafNodes;
}
}
- auto sinkHandle = add([] { return Future<void>(); });
+ auto sinkHandle = add([] { return Future<Unit>(); });
for (auto handle : leafNodes) {
dependency(handle, sinkHandle);
}
}
for (Handle handle = 0; handle < nodes.size() - 1; handle++) {
- std::vector<Future<void>> dependencies;
+ std::vector<Future<Unit>> dependencies;
for (auto depHandle : nodes[handle].dependencies) {
dependencies.push_back(nodes[depHandle].promise.getFuture());
}
.via(nodes[handle].executor)
.then([this, handle] {
nodes[handle].func()
- .then([this, handle] (Try<void>&& t) {
+ .then([this, handle] (Try<Unit>&& t) {
nodes[handle].promise.setTry(std::move(t));
});
})
FutureFunc func{nullptr};
Executor* executor{nullptr};
- SharedPromise<void> promise;
+ SharedPromise<Unit> promise;
std::vector<Handle> dependencies;
bool hasDependents{false};
bool visited{false};
explicit TestNode(FutureDAGTest* test) {
func = [this, test] {
test->order.push_back(handle);
- return Future<void>();
+ return Future<Unit>();
};
handle = test->dag->add(func);
}
};
FutureDAG::FutureFunc throwFunc = []{
- return makeFuture<void>(std::runtime_error("oops"));
+ return makeFuture<Unit>(std::runtime_error("oops"));
};
TEST_F(FutureDAGTest, ThrowBegin) {
TEST_F(FutureDAGTest, DestroyBeforeComplete) {
auto barrier = std::make_shared<boost::barrier>(2);
- Future<void> f;
+ Future<Unit> f;
{
auto dag = FutureDAG::create();
auto h1 = dag->add([barrier] {
- auto p = std::make_shared<Promise<void>>();
+ auto p = std::make_shared<Promise<Unit>>();
std::thread t([p, barrier]{
barrier->wait();
p->setValue();
detachReadCallback();
}
- folly::Future<void> write(
+ folly::Future<Unit> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
if (UNLIKELY(!buf)) {
if (!socket_->good()) {
VLOG(5) << "socket is closed in write()";
- return folly::makeFuture<void>(AsyncSocketException(
+ return folly::makeFuture<Unit>(AsyncSocketException(
AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
"socket is closed in write()"));
}
return future;
};
- folly::Future<void> close(Context* ctx) override {
+ folly::Future<Unit> close(Context* ctx) override {
if (socket_) {
detachReadCallback();
socket_->closeNow();
private:
friend class AsyncSocketHandler;
- folly::Promise<void> promise_;
+ folly::Promise<Unit> promise_;
};
folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
class EventBaseHandler : public OutboundBytesToBytesHandler {
public:
- folly::Future<void> write(
+ folly::Future<Unit> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
- folly::Future<void> retval;
+ folly::Future<Unit> retval;
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
return retval;
}
- Future<void> close(Context* ctx) override {
+ Future<Unit> close(Context* ctx) override {
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
- Future<void> retval;
+ Future<Unit> retval;
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
retval = ctx->fireClose();
});
FileRegion(int fd, off_t offset, size_t count)
: fd_(fd), offset_(offset), count_(count) {}
- Future<void> transferTo(std::shared_ptr<AsyncTransport> transport) {
+ Future<Unit> transferTo(std::shared_ptr<AsyncTransport> transport) {
auto socket = std::dynamic_pointer_cast<AsyncSocket>(
transport);
CHECK(socket);
}
friend class FileRegion;
- folly::Promise<void> promise_;
+ folly::Promise<Unit> promise_;
};
const int fd_;
ctx->fireTransportInactive();
}
- virtual Future<void> write(Context* ctx, Win msg) = 0;
- virtual Future<void> close(Context* ctx) {
+ virtual Future<Unit> write(Context* ctx, Win msg) = 0;
+ virtual Future<Unit> close(Context* ctx) {
return ctx->fireClose();
}
virtual void channelWritabilityChanged(HandlerContext* ctx) {}
// outbound
- virtual Future<void> bind(
+ virtual Future<Unit> bind(
HandlerContext* ctx,
SocketAddress localAddress) {}
- virtual Future<void> connect(
+ virtual Future<Unit> connect(
HandlerContext* ctx,
SocketAddress remoteAddress, SocketAddress localAddress) {}
- virtual Future<void> disconnect(HandlerContext* ctx) {}
- virtual Future<void> deregister(HandlerContext* ctx) {}
- virtual Future<void> read(HandlerContext* ctx) {}
+ virtual Future<Unit> disconnect(HandlerContext* ctx) {}
+ virtual Future<Unit> deregister(HandlerContext* ctx) {}
+ virtual Future<Unit> read(HandlerContext* ctx) {}
virtual void flush(HandlerContext* ctx) {}
*/
};
typedef OutboundHandlerContext<Wout> Context;
virtual ~OutboundHandler() = default;
- virtual Future<void> write(Context* ctx, Win msg) = 0;
- virtual Future<void> close(Context* ctx) {
+ virtual Future<Unit> write(Context* ctx, Win msg) = 0;
+ virtual Future<Unit> close(Context* ctx) {
return ctx->fireClose();
}
};
ctx->fireRead(std::forward<R>(msg));
}
- Future<void> write(Context* ctx, W msg) override {
+ Future<Unit> write(Context* ctx, W msg) override {
return ctx->fireWrite(std::forward<W>(msg));
}
};
class OutboundLink {
public:
virtual ~OutboundLink() = default;
- virtual Future<void> write(Out msg) = 0;
- virtual Future<void> close() = 0;
+ virtual Future<Unit> write(Out msg) = 0;
+ virtual Future<Unit> close() = 0;
};
template <class H, class Context>
}
}
- Future<void> fireWrite(Wout msg) override {
+ Future<Unit> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
}
}
- Future<void> fireClose() override {
+ Future<Unit> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
}
// OutboundLink overrides
- Future<void> write(Win msg) override {
+ Future<Unit> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
- Future<void> close() override {
+ Future<Unit> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
~OutboundContextImpl() = default;
// OutboundHandlerContext overrides
- Future<void> fireWrite(Wout msg) override {
+ Future<Unit> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
}
}
- Future<void> fireClose() override {
+ Future<Unit> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
}
// OutboundLink overrides
- Future<void> write(Win msg) override {
+ Future<Unit> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
- Future<void> close() override {
+ Future<Unit> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
virtual void fireTransportActive() = 0;
virtual void fireTransportInactive() = 0;
- virtual Future<void> fireWrite(Out msg) = 0;
- virtual Future<void> fireClose() = 0;
+ virtual Future<Unit> fireWrite(Out msg) = 0;
+ virtual Future<Unit> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
std::shared_ptr<AsyncTransport> getTransport() {
public:
virtual ~OutboundHandlerContext() = default;
- virtual Future<void> fireWrite(Out msg) = 0;
- virtual Future<void> fireClose() = 0;
+ virtual Future<Unit> fireWrite(Out msg) = 0;
+ virtual Future<Unit> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
std::shared_ptr<AsyncTransport> getTransport() {
class OutputBufferingHandler : public OutboundBytesToBytesHandler,
protected EventBase::LoopCallback {
public:
- Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
+ Future<Unit> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
CHECK(buf);
if (!queueSends_) {
return ctx->fireWrite(std::move(buf));
}
void runLoopCallback() noexcept override {
- MoveWrapper<SharedPromise<void>> sharedPromise;
+ MoveWrapper<SharedPromise<Unit>> sharedPromise;
std::swap(*sharedPromise, sharedPromise_);
getContext()->fireWrite(std::move(sends_))
- .then([sharedPromise](Try<void> t) mutable {
+ .then([sharedPromise](Try<Unit> t) mutable {
sharedPromise->setTry(std::move(t));
});
}
- Future<void> close(Context* ctx) override {
+ Future<Unit> close(Context* ctx) override {
if (isLoopCallbackScheduled()) {
cancelLoopCallback();
}
folly::make_exception_wrapper<std::runtime_error>(
"close() called while sends still pending"));
sends_.reset();
- sharedPromise_ = SharedPromise<void>();
+ sharedPromise_ = SharedPromise<Unit>();
return ctx->fireClose();
}
- SharedPromise<void> sharedPromise_;
+ SharedPromise<Unit> sharedPromise_;
std::unique_ptr<IOBuf> sends_{nullptr};
bool queueSends_{true};
};
template <class R, class W>
template <class T>
-typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
Pipeline<R, W>::write(W msg) {
if (!back_) {
throw std::invalid_argument("write(): no outbound handler in Pipeline");
template <class R, class W>
template <class T>
-typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
Pipeline<R, W>::close() {
if (!back_) {
throw std::invalid_argument("close(): no outbound handler in Pipeline");
transportInactive();
template <class T = W>
- typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+ typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
write(W msg);
template <class T = W>
- typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
+ typename std::enable_if<!std::is_same<T, Unit>::value, Future<Unit>>::type
close();
void finalize() override;
int sendCount = 1000;
FileRegion fileRegion(fd, 0, count);
- std::vector<Future<void>> fs;
+ std::vector<Future<Unit>> fs;
for (int i = 0; i < sendCount; i++) {
fs.push_back(fileRegion.transferTo(socket));
}
read_(ctx, msg);
}
- Future<void> write(Context* ctx, Win msg) override {
+ Future<Unit> write(Context* ctx, Win msg) override {
return makeFutureWith([&](){
write_(ctx, msg);
});
}
- Future<void> close(Context* ctx) override {
+ Future<Unit> close(Context* ctx) override {
return makeFutureWith([&](){
close_(ctx);
});
typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
public:
void read(Context* ctx, Rin msg) override {}
- Future<void> write(Context* ctx, Win msg) override { return makeFuture(); }
+ Future<Unit> write(Context* ctx, Win msg) override { return makeFuture(); }
};
typedef HandlerAdapter<std::string, std::string> StringHandler;
class BytesReflector
: public BytesToBytesHandler {
public:
- Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
+ Future<Unit> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
IOBufQueue q_(IOBufQueue::cacheChainLength());
q_.append(std::move(buf));
ctx->fireRead(q_);
lengthFieldLength == 8 );
}
-Future<void> LengthFieldPrepender::write(
+Future<Unit> LengthFieldPrepender::write(
Context* ctx, std::unique_ptr<IOBuf> buf) {
int length = lengthAdjustment_ + buf->computeChainDataLength();
if (lengthIncludesLengthField_) {
bool lengthIncludesLengthField = false,
bool networkByteOrder = true);
- Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf);
+ Future<Unit> write(Context* ctx, std::unique_ptr<IOBuf> buf);
private:
int lengthFieldLength_;
ctx->fireRead(data);
}
- Future<void> write(Context* ctx, std::string msg) override {
+ Future<Unit> write(Context* ctx, std::string msg) override {
auto buf = IOBuf::copyBuffer(msg.data(), msg.length());
return ctx->fireWrite(std::move(buf));
}
*/
template <typename F>
typename std::enable_if<!isFuture<typename std::result_of<F()>::type>::value,
- Future<typename std::result_of<F()>::type>>::type
+ Future<typename Unit::Lift<typename std::result_of<F()>::type>::type>>::type
addFuture(F func) {
- typedef typename std::result_of<F()>::type T;
+ using T = typename Unit::Lift<typename std::result_of<F()>::type>::type;
Promise<T> promise;
auto future = promise.getFuture();
auto movePromise = folly::makeMoveWrapper(std::move(promise));
EXPECT_EQ(100, t.value());
});
fe.addFuture([] () { return makeFuture(); }).then(
- [&] (Try<void>&& t) {
+ [&] (Try<Unit>&& t) {
c++;
EXPECT_NO_THROW(t.value());
});
fe.addFuture([] () { return; }).then(
- [&] (Try<void>&& t) {
+ [&] (Try<Unit>&& t) {
c++;
EXPECT_NO_THROW(t.value());
});
fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
- [&] (Try<void>&& t) {
+ [&] (Try<Unit>&& t) {
c++;
EXPECT_THROW(t.value(), std::runtime_error);
});
return f;
}
- virtual Future<void> close() override {
+ virtual Future<Unit> close() override {
return HandlerAdapter<Req, Resp>::close(nullptr);
}
- virtual Future<void> close(Context* ctx) override {
+ virtual Future<Unit> close(Context* ctx) override {
return HandlerAdapter<Req, Resp>::close(ctx);
}
public:
virtual Future<Resp> operator()(Req request) = 0;
virtual ~Service() = default;
- virtual Future<void> close() {
+ virtual Future<Unit> close() {
return makeFuture();
}
virtual bool isAvailable() {
: service_(service) {}
virtual ~ServiceFilter() = default;
- virtual Future<void> close() override {
+ virtual Future<Unit> close() override {
return service_->close();
}