4 * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
6 * Ramster_r2net provides an interface between zcache and r2net.
8 * FIXME: support more than two nodes
11 #include <linux/list.h>
12 #include "cluster/tcp.h"
13 #include "cluster/nodemanager.h"
18 #define RAMSTER_TESTING
20 #define RMSTR_KEY 0x77347734
23 RMSTR_TMEM_PUT_EPH = 100,
25 RMSTR_TMEM_ASYNC_GET_REQUEST,
26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 RMSTR_TMEM_ASYNC_GET_REPLY,
30 RMSTR_TMEM_DESTROY_POOL,
33 #define RMSTR_R2NET_MAX_LEN \
34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
36 #include "cluster/tcp_internal.h"
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
41 int r2net_remote_target_node_set(int node_num)
45 r2net_target_node = r2nm_get_node_by_num(node_num);
46 if (r2net_target_node != NULL) {
47 r2net_target_nodenum = node_num;
48 r2nm_node_put(r2net_target_node);
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 u32 len, void *data, void **ret_data)
61 struct tmem_xhandle xh;
63 size_t size = RMSTR_R2NET_MAX_LEN;
64 u16 msgtype = be16_to_cpu(msg->msg_type);
65 bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
68 xh = *(struct tmem_xhandle *)msg->buf;
69 if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
71 pdata = ramster_async_get_buf;
72 *(struct tmem_xhandle *)pdata = xh;
73 pdata += sizeof(struct tmem_xhandle);
74 local_irq_save(flags);
75 found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 pdata, &size, 1, get_and_free ? 1 : -1);
77 local_irq_restore(flags);
79 /* a zero size indicates the get failed */
82 if (size > RMSTR_R2NET_MAX_LEN)
84 *ret_data = pdata - sizeof(struct tmem_xhandle);
85 /* now make caller (r2net_process_message) handle specially */
86 r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 return size + sizeof(struct tmem_xhandle);
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 u32 len, void *data, void **ret_data)
93 char *in = (char *)msg->buf;
94 int datalen = len - sizeof(struct r2net_msg);
96 struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
98 in += sizeof(struct tmem_xhandle);
99 datalen -= sizeof(struct tmem_xhandle);
100 BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 ret = zcache_localify(xh->pool_id, &xh->oid, xh->index,
102 in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
110 int ramster_remote_put_handler(struct r2net_msg *msg,
111 u32 len, void *data, void **ret_data)
113 struct tmem_xhandle *xh;
114 char *p = (char *)msg->buf;
115 int datalen = len - sizeof(struct r2net_msg) -
116 sizeof(struct tmem_xhandle);
117 u16 msgtype = be16_to_cpu(msg->msg_type);
118 bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
122 xh = (struct tmem_xhandle *)p;
123 p += sizeof(struct tmem_xhandle);
124 zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 local_irq_save(flags);
126 ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 p, datalen, 1, ephemeral ? 1 : -1);
128 local_irq_restore(flags);
132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133 u32 len, void *data, void **ret_data)
135 struct tmem_xhandle *xh;
136 char *p = (char *)msg->buf;
138 xh = (struct tmem_xhandle *)p;
139 p += sizeof(struct tmem_xhandle);
140 (void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index);
144 int ramster_remote_flobj_handler(struct r2net_msg *msg,
145 u32 len, void *data, void **ret_data)
147 struct tmem_xhandle *xh;
148 char *p = (char *)msg->buf;
150 xh = (struct tmem_xhandle *)p;
151 p += sizeof(struct tmem_xhandle);
152 (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
156 int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
157 size_t expect_size, uint8_t expect_cksum,
160 int ret = -1, status;
161 struct r2nm_node *node = NULL;
166 node = r2nm_get_node_by_num(remotenode);
169 xh->client_id = r2nm_this_node(); /* which node is getting */
170 xh->xh_data_cksum = expect_cksum;
171 xh->xh_data_size = expect_size;
173 vec[0].iov_len = sizeof(*xh);
174 vec[0].iov_base = xh;
176 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
178 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
179 ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
180 vec, veclen, remotenode, &status);
183 /* FIXME handle bad message possibilities here? */
184 pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
191 #ifdef RAMSTER_TESTING
192 /* leave me here to see if it catches a weird crash */
193 static void ramster_check_irq_counts(void)
195 static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
196 int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
198 cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
199 if (cur_hardirq_cnt > last_hardirq_cnt) {
200 last_hardirq_cnt = cur_hardirq_cnt;
201 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
202 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
205 cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
206 if (cur_softirq_cnt > last_softirq_cnt) {
207 last_softirq_cnt = cur_softirq_cnt;
208 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
209 pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
212 cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
213 if (cur_preempt_cnt > last_preempt_cnt) {
214 last_preempt_cnt = cur_preempt_cnt;
215 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
216 pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
222 int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
223 bool ephemeral, int *remotenode)
225 int nodenum, ret = -1, status;
226 struct r2nm_node *node = NULL;
230 #ifdef RAMSTER_TESTING
231 struct r2net_node *nn;
234 BUG_ON(size > RMSTR_R2NET_MAX_LEN);
235 xh->client_id = r2nm_this_node(); /* which node is putting */
236 vec[0].iov_len = sizeof(*xh);
237 vec[0].iov_base = xh;
238 vec[1].iov_len = size;
239 vec[1].iov_base = data;
240 node = r2net_target_node;
244 nodenum = r2net_target_nodenum;
248 #ifdef RAMSTER_TESTING
249 nn = r2net_nn_from_num(nodenum);
250 WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid);
254 msg_type = RMSTR_TMEM_PUT_EPH;
256 msg_type = RMSTR_TMEM_PUT_PERS;
257 #ifdef RAMSTER_TESTING
258 /* leave me here to see if it catches a weird crash */
259 ramster_check_irq_counts();
262 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
264 #ifdef RAMSTER_TESTING
266 static unsigned long cnt;
269 pr_err("ramster_remote_put: message failed, "
270 "ret=%d, cnt=%lu\n", ret, cnt);
278 *remotenode = nodenum;
286 int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode)
288 int ret = -1, status;
289 struct r2nm_node *node = NULL;
293 node = r2nm_get_node_by_num(remotenode);
294 BUG_ON(node == NULL);
295 xh->client_id = r2nm_this_node(); /* which node is flushing */
296 vec[0].iov_len = sizeof(*xh);
297 vec[0].iov_base = xh;
298 BUG_ON(irqs_disabled());
299 BUG_ON(in_softirq());
300 ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
301 vec, veclen, remotenode, &status);
306 int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
308 int ret = -1, status;
309 struct r2nm_node *node = NULL;
313 node = r2nm_get_node_by_num(remotenode);
314 BUG_ON(node == NULL);
315 xh->client_id = r2nm_this_node(); /* which node is flobjing */
316 vec[0].iov_len = sizeof(*xh);
317 vec[0].iov_base = xh;
318 ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
319 vec, veclen, remotenode, &status);
325 * Handler registration
328 static LIST_HEAD(r2net_unreg_list);
330 static void r2net_unregister_handlers(void)
332 r2net_unregister_handler_list(&r2net_unreg_list);
335 int r2net_register_handlers(void)
339 status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
341 ramster_remote_put_handler,
342 NULL, NULL, &r2net_unreg_list);
346 status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
348 ramster_remote_put_handler,
349 NULL, NULL, &r2net_unreg_list);
353 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
355 ramster_remote_async_get_request_handler,
361 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
362 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
363 ramster_remote_async_get_request_handler,
369 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
371 ramster_remote_async_get_reply_handler,
377 status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
379 ramster_remote_flush_handler,
385 status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
387 ramster_remote_flobj_handler,
393 pr_info("ramster: r2net handlers registered\n");
397 r2net_unregister_handlers();
398 pr_err("ramster: couldn't register r2net handlers\n");