2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2013 President and Fellows of Harvard College
4 * Copyright (c) 2012-2013 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
22 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <netinet/tcp.h>
33 KVConn(const char *server, int port, int target_core = -1)
34 : inbuf_(new char[inbufsz]), inbufpos_(0), inbuflen_(0),
35 j_(Json::make_array()) {
36 struct hostent *ent = gethostbyname(server);
38 int fd = socket(AF_INET, SOCK_STREAM, 0);
39 always_assert(fd > 0);
42 always_assert(fd >= 0);
43 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
45 struct sockaddr_in sin;
46 memset(&sin, 0, sizeof(sin));
47 sin.sin_family = AF_INET;
48 sin.sin_port = htons(port);
49 memcpy(&sin.sin_addr.s_addr, ent->h_addr, ent->h_length);
50 int r = connect(fd, (const struct sockaddr *)&sin, sizeof(sin));
57 out_ = new_kvout(fd, 64*1024);
58 handshake(target_core);
60 KVConn(int fd, bool tcp)
61 : inbuf_(new char[inbufsz]), inbufpos_(0), inbuflen_(0), infd_(fd),
62 j_(Json::make_array()) {
63 out_ = new_kvout(fd, 64*1024);
73 for (auto x : oldinbuf_)
76 void sendgetwhole(Str key, unsigned seq) {
80 j_[2] = String::make_stable(key);
83 void sendgetcol(Str key, int col, unsigned seq) {
87 j_[2] = String::make_stable(key);
91 void sendget(Str key, const std::vector<unsigned>& f, unsigned seq) {
95 j_[2] = String::make_stable(key);
96 j_[3] = Json(f.begin(), f.end());
100 void sendputcol(Str key, int col, Str val, unsigned seq) {
104 j_[2] = String::make_stable(key);
106 j_[4] = String::make_stable(val);
109 void sendputwhole(Str key, Str val, unsigned seq) {
113 j_[2] = String::make_stable(key);
114 j_[3] = String::make_stable(val);
117 void sendremove(Str key, unsigned seq) {
121 j_[2] = String::make_stable(key);
125 void sendscanwhole(Str firstkey, int numpairs, unsigned seq) {
129 j_[2] = String::make_stable(firstkey);
133 void sendscan(Str firstkey, const std::vector<unsigned>& f,
134 int numpairs, unsigned seq) {
138 j_[2] = String::make_stable(firstkey);
140 j_[4] = Json(f.begin(), f.end());
144 void checkpoint(int childno) {
145 always_assert(childno == 0);
146 fprintf(stderr, "asking for a checkpoint\n");
149 j_[1] = Cmd_Checkpoint;
161 int check(int tryhard) {
162 if (inbufpos_ == inbuflen_ && tryhard)
164 return inbuflen_ - inbufpos_;
167 const Json& receive() {
168 while (!parser_.done() && check(2))
169 inbufpos_ += parser_.consume(inbuf_ + inbufpos_,
170 inbuflen_ - inbufpos_,
171 String::make_stable(inbuf_, inbufsz));
172 if (parser_.success() && parser_.result().is_a())
175 parser_.result() = Json();
176 return parser_.result();
180 enum { inbufsz = 64 * 1024, inbufrefill = 56 * 1024 };
184 std::vector<char*> oldinbuf_;
190 msgpack::streaming_parser parser_;
195 void handshake(int target_core) {
198 j_[1] = Cmd_Handshake;
199 j_[2] = Json::make_object().set("core", target_core)
200 .set("maxkeylen", MASSTREE_MAXKEYLEN);
204 const Json& result = receive();
206 || result[1] != Cmd_Handshake + 1
208 fprintf(stderr, "Incompatible kvdb protocol\n");
211 partition_ = result[3].as_i();
214 msgpack::unparse(*out_, j_);
216 void hard_check(int tryhard);