benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / mtclient.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2013 President and Fellows of Harvard College
4  * Copyright (c) 2012-2013 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef KVC_HH
17 #define KVC_HH 1
18 #include "kvproto.hh"
19 #include "kvrow.hh"
20 #include "json.hh"
21 #include "msgpack.hh"
22 #include <sys/socket.h>
23 #include <netdb.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <netinet/tcp.h>
27 #include <string>
28 #include <queue>
29 #include <vector>
30
31 class KVConn {
32   public:
33     KVConn(const char *server, int port, int target_core = -1)
34         : inbuf_(new char[inbufsz]), inbufpos_(0), inbuflen_(0),
35           j_(Json::make_array()) {
36         struct hostent *ent = gethostbyname(server);
37         always_assert(ent);
38         int fd = socket(AF_INET, SOCK_STREAM, 0);
39         always_assert(fd > 0);
40         fdtoclose_ = fd;
41         int yes = 1;
42         always_assert(fd >= 0);
43         setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
44
45         struct sockaddr_in sin;
46         memset(&sin, 0, sizeof(sin));
47         sin.sin_family = AF_INET;
48         sin.sin_port = htons(port);
49         memcpy(&sin.sin_addr.s_addr, ent->h_addr, ent->h_length);
50         int r = connect(fd, (const struct sockaddr *)&sin, sizeof(sin));
51         if (r) {
52             perror("connect");
53             exit(EXIT_FAILURE);
54         }
55
56         infd_ = fd;
57         out_ = new_kvout(fd, 64*1024);
58         handshake(target_core);
59     }
60     KVConn(int fd, bool tcp)
61         : inbuf_(new char[inbufsz]), inbufpos_(0), inbuflen_(0), infd_(fd),
62           j_(Json::make_array()) {
63         out_ = new_kvout(fd, 64*1024);
64         fdtoclose_ = -1;
65         if (tcp)
66             handshake(-1);
67     }
68     ~KVConn() {
69         if (fdtoclose_ >= 0)
70             close(fdtoclose_);
71         free_kvout(out_);
72         delete[] inbuf_;
73         for (auto x : oldinbuf_)
74             delete[] x;
75     }
76     void sendgetwhole(Str key, unsigned seq) {
77         j_.resize(3);
78         j_[0] = seq;
79         j_[1] = Cmd_Get;
80         j_[2] = String::make_stable(key);
81         send();
82     }
83     void sendgetcol(Str key, int col, unsigned seq) {
84         j_.resize(4);
85         j_[0] = seq;
86         j_[1] = Cmd_Get;
87         j_[2] = String::make_stable(key);
88         j_[3] = col;
89         send();
90     }
91     void sendget(Str key, const std::vector<unsigned>& f, unsigned seq) {
92         j_.resize(4);
93         j_[0] = seq;
94         j_[1] = Cmd_Get;
95         j_[2] = String::make_stable(key);
96         j_[3] = Json(f.begin(), f.end());
97         send();
98     }
99
100     void sendputcol(Str key, int col, Str val, unsigned seq) {
101         j_.resize(5);
102         j_[0] = seq;
103         j_[1] = Cmd_Put;
104         j_[2] = String::make_stable(key);
105         j_[3] = col;
106         j_[4] = String::make_stable(val);
107         send();
108     }
109     void sendputwhole(Str key, Str val, unsigned seq) {
110         j_.resize(3);
111         j_[0] = seq;
112         j_[1] = Cmd_Replace;
113         j_[2] = String::make_stable(key);
114         j_[3] = String::make_stable(val);
115         send();
116     }
117     void sendremove(Str key, unsigned seq) {
118         j_.resize(3);
119         j_[0] = seq;
120         j_[1] = Cmd_Remove;
121         j_[2] = String::make_stable(key);
122         send();
123     }
124
125     void sendscanwhole(Str firstkey, int numpairs, unsigned seq) {
126         j_.resize(4);
127         j_[0] = seq;
128         j_[1] = Cmd_Scan;
129         j_[2] = String::make_stable(firstkey);
130         j_[3] = numpairs;
131         send();
132     }
133     void sendscan(Str firstkey, const std::vector<unsigned>& f,
134                   int numpairs, unsigned seq) {
135         j_.resize(5);
136         j_[0] = seq;
137         j_[1] = Cmd_Scan;
138         j_[2] = String::make_stable(firstkey);
139         j_[3] = numpairs;
140         j_[4] = Json(f.begin(), f.end());
141         send();
142     }
143
144     void checkpoint(int childno) {
145         always_assert(childno == 0);
146         fprintf(stderr, "asking for a checkpoint\n");
147         j_.resize(2);
148         j_[0] = 0;
149         j_[1] = Cmd_Checkpoint;
150         send();
151         flush();
152
153         printf("sent\n");
154         (void) receive();
155     }
156
157     void flush() {
158         kvflush(out_);
159     }
160
161     int check(int tryhard) {
162         if (inbufpos_ == inbuflen_ && tryhard)
163             hard_check(tryhard);
164         return inbuflen_ - inbufpos_;
165     }
166
167     const Json& receive() {
168         while (!parser_.done() && check(2))
169             inbufpos_ += parser_.consume(inbuf_ + inbufpos_,
170                                          inbuflen_ - inbufpos_,
171                                          String::make_stable(inbuf_, inbufsz));
172         if (parser_.success() && parser_.result().is_a())
173             parser_.reset();
174         else
175             parser_.result() = Json();
176         return parser_.result();
177     }
178
179   private:
180     enum { inbufsz = 64 * 1024, inbufrefill = 56 * 1024 };
181     char* inbuf_;
182     int inbufpos_;
183     int inbuflen_;
184     std::vector<char*> oldinbuf_;
185     int infd_;
186
187     struct kvout *out_;
188
189     Json j_;
190     msgpack::streaming_parser parser_;
191
192     int fdtoclose_;
193     int partition_;
194
195     void handshake(int target_core) {
196         j_.resize(3);
197         j_[0] = 0;
198         j_[1] = Cmd_Handshake;
199         j_[2] = Json::make_object().set("core", target_core)
200             .set("maxkeylen", MASSTREE_MAXKEYLEN);
201         send();
202         kvflush(out_);
203
204         const Json& result = receive();
205         if (!result.is_a()
206             || result[1] != Cmd_Handshake + 1
207             || !result[2]) {
208             fprintf(stderr, "Incompatible kvdb protocol\n");
209             exit(EXIT_FAILURE);
210         }
211         partition_ = result[3].as_i();
212     }
213     inline void send() {
214         msgpack::unparse(*out_, j_);
215     }
216     void hard_check(int tryhard);
217 };
218
219 #endif