From: Aravind Anbudurai Date: Thu, 18 Aug 2016 20:48:15 +0000 (-0700) Subject: Support for EPOLLPRI X-Git-Tag: v2016.08.22.00~13 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=2714e5b653644da6278875d92ab1caca16ee825c;p=folly.git Support for EPOLLPRI Summary: Depends on D3718573 and the other diff to sync tp2 on fbcode after D3718573 lands. This adds the PRI flag for using on EPOLLPRI events. The test makes a server/client and client a MSG_OOB message to make it appear as a priority event. Masked under a preprocessor directive until libevent upstreams the patch that I will send for 2.0 Reviewed By: djwatson Differential Revision: D3722009 fbshipit-source-id: c15233d4739a38092d11c3026c483c7a9c8e5dac --- diff --git a/folly/io/async/EventHandler.h b/folly/io/async/EventHandler.h index af50c36e..b6a147cb 100644 --- a/folly/io/async/EventHandler.h +++ b/folly/io/async/EventHandler.h @@ -44,7 +44,11 @@ class EventHandler : private boost::noncopyable { READ = EV_READ, WRITE = EV_WRITE, READ_WRITE = (READ | WRITE), - PERSIST = EV_PERSIST + PERSIST = EV_PERSIST, +// Temporary flag until EPOLLPRI is upstream on libevent. +#ifdef EV_PRI + PRI = EV_PRI, +#endif }; /** diff --git a/folly/io/async/test/EventHandlerTest.cpp b/folly/io/async/test/EventHandlerTest.cpp index 52ff6408..daef1b72 100644 --- a/folly/io/async/test/EventHandlerTest.cpp +++ b/folly/io/async/test/EventHandlerTest.cpp @@ -14,16 +14,18 @@ * limitations under the License. */ -#include - -#include +#include +#include #include + #include #include #include - -#include +#include +#include #include +#include +#include using namespace std; using namespace folly; @@ -185,3 +187,82 @@ TEST_F(EventHandlerTest, many_concurrent_consumers) { EXPECT_EQ(0, writesRemaining); EXPECT_EQ(0, readsRemaining); } + +#ifdef EV_PRI +TEST(EventHandlerSocketTest, EPOLLPRI) { + std::promise serverReady; + std::thread t([serverReadyFuture = serverReady.get_future()] { + // client + serverReadyFuture.wait(); + LOG(INFO) << "Server is ready"; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + SCOPE_EXIT { + close(sockfd); + }; + struct hostent* he; + struct sockaddr_in server; + + const char hostname[] = "localhost"; + he = gethostbyname(hostname); + PCHECK(he); + + memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length); + server.sin_family = AF_INET; + server.sin_port = htons(12345); + + PCHECK(::connect(sockfd, (struct sockaddr*)&server, sizeof(server)) == 0); + LOG(INFO) << "Server connection available"; + + char buffer[] = "banana"; + int n = send(sockfd, buffer, strlen(buffer) + 1, MSG_OOB); + PCHECK(n > 0); + }); + SCOPE_EXIT { + t.join(); + }; + // make the server. + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + SCOPE_EXIT { + close(sockfd); + }; + PCHECK(sockfd != -1) << "unable to open socket"; + + struct sockaddr_in sin; + sin.sin_port = htons(12345); + sin.sin_addr.s_addr = INADDR_ANY; + sin.sin_family = AF_INET; + + PCHECK(::bind(sockfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0) + << "Can't bind to port"; + listen(sockfd, 5); + serverReady.set_value(); + + socklen_t clilen; + struct sockaddr_in cli_addr; + clilen = sizeof(cli_addr); + int newsockfd = accept(sockfd, (struct sockaddr*)&cli_addr, &clilen); + PCHECK(newsockfd >= 0) << "can't accept"; + SCOPE_EXIT { + close(newsockfd); + }; + + EventBase eb; + struct SockEvent : public EventHandler { + SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {} + + void handlerReady(uint16_t events) noexcept override { + EXPECT_TRUE(EventHandler::EventFlags::PRI & events); + char buffer[256]; + int n = read(fd_, buffer, 255); + EXPECT_EQ(6, n); + EXPECT_EQ("banana", std::string(buffer)); + } + + private: + int fd_; + } sockHandler(&eb, newsockfd); + sockHandler.registerHandler(EventHandler::EventFlags::PRI); + LOG(INFO) << "Registered Handler"; + eb.loop(); +} +#endif