Merge tag 'for-3.4' of git://openrisc.net/jonas/linux
[firefly-linux-kernel-4.4.55.git] / drivers / staging / ramster / r2net.c
1 /*
2  * r2net.c
3  *
4  * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
5  *
6  * Ramster_r2net provides an interface between zcache and r2net.
7  *
8  * FIXME: support more than two nodes
9  */
10
11 #include <linux/list.h>
12 #include "cluster/tcp.h"
13 #include "cluster/nodemanager.h"
14 #include "tmem.h"
15 #include "zcache.h"
16 #include "ramster.h"
17
18 #define RAMSTER_TESTING
19
20 #define RMSTR_KEY       0x77347734
21
22 enum {
23         RMSTR_TMEM_PUT_EPH = 100,
24         RMSTR_TMEM_PUT_PERS,
25         RMSTR_TMEM_ASYNC_GET_REQUEST,
26         RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27         RMSTR_TMEM_ASYNC_GET_REPLY,
28         RMSTR_TMEM_FLUSH,
29         RMSTR_TMEM_FLOBJ,
30         RMSTR_TMEM_DESTROY_POOL,
31 };
32
33 #define RMSTR_R2NET_MAX_LEN \
34                 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35
36 #include "cluster/tcp_internal.h"
37
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40
41 int r2net_remote_target_node_set(int node_num)
42 {
43         int ret = -1;
44
45         r2net_target_node = r2nm_get_node_by_num(node_num);
46         if (r2net_target_node != NULL) {
47                 r2net_target_nodenum = node_num;
48                 r2nm_node_put(r2net_target_node);
49                 ret = 0;
50         }
51         return ret;
52 }
53
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56
57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58                                 u32 len, void *data, void **ret_data)
59 {
60         char *pdata;
61         struct tmem_xhandle xh;
62         int found;
63         size_t size = RMSTR_R2NET_MAX_LEN;
64         u16 msgtype = be16_to_cpu(msg->msg_type);
65         bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66         unsigned long flags;
67
68         xh = *(struct tmem_xhandle *)msg->buf;
69         if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70                 BUG();
71         pdata = ramster_async_get_buf;
72         *(struct tmem_xhandle *)pdata = xh;
73         pdata += sizeof(struct tmem_xhandle);
74         local_irq_save(flags);
75         found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76                                 pdata, &size, 1, get_and_free ? 1 : -1);
77         local_irq_restore(flags);
78         if (found < 0) {
79                 /* a zero size indicates the get failed */
80                 size = 0;
81         }
82         if (size > RMSTR_R2NET_MAX_LEN)
83                 BUG();
84         *ret_data = pdata - sizeof(struct tmem_xhandle);
85         /* now make caller (r2net_process_message) handle specially */
86         r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87         return size + sizeof(struct tmem_xhandle);
88 }
89
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91                                 u32 len, void *data, void **ret_data)
92 {
93         char *in = (char *)msg->buf;
94         int datalen = len - sizeof(struct r2net_msg);
95         int ret = -1;
96         struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97
98         in += sizeof(struct tmem_xhandle);
99         datalen -= sizeof(struct tmem_xhandle);
100         BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101         ret = zcache_localify(xh->pool_id, &xh->oid, xh->index,
102                                 in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104         if (ret == -EEXIST)
105                 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107         return ret;
108 }
109
110 int ramster_remote_put_handler(struct r2net_msg *msg,
111                                 u32 len, void *data, void **ret_data)
112 {
113         struct tmem_xhandle *xh;
114         char *p = (char *)msg->buf;
115         int datalen = len - sizeof(struct r2net_msg) -
116                                 sizeof(struct tmem_xhandle);
117         u16 msgtype = be16_to_cpu(msg->msg_type);
118         bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119         unsigned long flags;
120         int ret;
121
122         xh = (struct tmem_xhandle *)p;
123         p += sizeof(struct tmem_xhandle);
124         zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125         local_irq_save(flags);
126         ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127                                 p, datalen, 1, ephemeral ? 1 : -1);
128         local_irq_restore(flags);
129         return ret;
130 }
131
132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133                                 u32 len, void *data, void **ret_data)
134 {
135         struct tmem_xhandle *xh;
136         char *p = (char *)msg->buf;
137
138         xh = (struct tmem_xhandle *)p;
139         p += sizeof(struct tmem_xhandle);
140         (void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index);
141         return 0;
142 }
143
144 int ramster_remote_flobj_handler(struct r2net_msg *msg,
145                                 u32 len, void *data, void **ret_data)
146 {
147         struct tmem_xhandle *xh;
148         char *p = (char *)msg->buf;
149
150         xh = (struct tmem_xhandle *)p;
151         p += sizeof(struct tmem_xhandle);
152         (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
153         return 0;
154 }
155
156 int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
157                                 size_t expect_size, uint8_t expect_cksum,
158                                 void *extra)
159 {
160         int ret = -1, status;
161         struct r2nm_node *node = NULL;
162         struct kvec vec[1];
163         size_t veclen = 1;
164         u32 msg_type;
165
166         node = r2nm_get_node_by_num(remotenode);
167         if (node == NULL)
168                 goto out;
169         xh->client_id = r2nm_this_node(); /* which node is getting */
170         xh->xh_data_cksum = expect_cksum;
171         xh->xh_data_size = expect_size;
172         xh->extra = extra;
173         vec[0].iov_len = sizeof(*xh);
174         vec[0].iov_base = xh;
175         if (free)
176                 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
177         else
178                 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
179         ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
180                                         vec, veclen, remotenode, &status);
181         r2nm_node_put(node);
182         if (ret < 0) {
183                 /* FIXME handle bad message possibilities here? */
184                 pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
185         }
186         ret = status;
187 out:
188         return ret;
189 }
190
191 #ifdef RAMSTER_TESTING
192 /* leave me here to see if it catches a weird crash */
193 static void ramster_check_irq_counts(void)
194 {
195         static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
196         int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
197
198         cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
199         if (cur_hardirq_cnt > last_hardirq_cnt) {
200                 last_hardirq_cnt = cur_hardirq_cnt;
201                 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
202                         pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
203                                 last_hardirq_cnt);
204         }
205         cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
206         if (cur_softirq_cnt > last_softirq_cnt) {
207                 last_softirq_cnt = cur_softirq_cnt;
208                 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
209                         pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
210                                 last_softirq_cnt);
211         }
212         cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
213         if (cur_preempt_cnt > last_preempt_cnt) {
214                 last_preempt_cnt = cur_preempt_cnt;
215                 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
216                         pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
217                                 last_preempt_cnt);
218         }
219 }
220 #endif
221
222 int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
223                                 bool ephemeral, int *remotenode)
224 {
225         int nodenum, ret = -1, status;
226         struct r2nm_node *node = NULL;
227         struct kvec vec[2];
228         size_t veclen = 2;
229         u32 msg_type;
230 #ifdef RAMSTER_TESTING
231         struct r2net_node *nn;
232 #endif
233
234         BUG_ON(size > RMSTR_R2NET_MAX_LEN);
235         xh->client_id = r2nm_this_node(); /* which node is putting */
236         vec[0].iov_len = sizeof(*xh);
237         vec[0].iov_base = xh;
238         vec[1].iov_len = size;
239         vec[1].iov_base = data;
240         node = r2net_target_node;
241         if (!node)
242                 goto out;
243
244         nodenum = r2net_target_nodenum;
245
246         r2nm_node_get(node);
247
248 #ifdef RAMSTER_TESTING
249         nn = r2net_nn_from_num(nodenum);
250         WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid);
251 #endif
252
253         if (ephemeral)
254                 msg_type = RMSTR_TMEM_PUT_EPH;
255         else
256                 msg_type = RMSTR_TMEM_PUT_PERS;
257 #ifdef RAMSTER_TESTING
258         /* leave me here to see if it catches a weird crash */
259         ramster_check_irq_counts();
260 #endif
261
262         ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
263                                                 nodenum, &status);
264 #ifdef RAMSTER_TESTING
265         if (ret != 0) {
266                 static unsigned long cnt;
267                 cnt++;
268                 if (!(cnt&(cnt-1)))
269                         pr_err("ramster_remote_put: message failed, "
270                                 "ret=%d, cnt=%lu\n", ret, cnt);
271                 ret = -1;
272         }
273 #endif
274         if (ret < 0)
275                 ret = -1;
276         else {
277                 ret = status;
278                 *remotenode = nodenum;
279         }
280
281         r2nm_node_put(node);
282 out:
283         return ret;
284 }
285
286 int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode)
287 {
288         int ret = -1, status;
289         struct r2nm_node *node = NULL;
290         struct kvec vec[1];
291         size_t veclen = 1;
292
293         node = r2nm_get_node_by_num(remotenode);
294         BUG_ON(node == NULL);
295         xh->client_id = r2nm_this_node(); /* which node is flushing */
296         vec[0].iov_len = sizeof(*xh);
297         vec[0].iov_base = xh;
298         BUG_ON(irqs_disabled());
299         BUG_ON(in_softirq());
300         ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
301                                         vec, veclen, remotenode, &status);
302         r2nm_node_put(node);
303         return ret;
304 }
305
306 int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
307 {
308         int ret = -1, status;
309         struct r2nm_node *node = NULL;
310         struct kvec vec[1];
311         size_t veclen = 1;
312
313         node = r2nm_get_node_by_num(remotenode);
314         BUG_ON(node == NULL);
315         xh->client_id = r2nm_this_node(); /* which node is flobjing */
316         vec[0].iov_len = sizeof(*xh);
317         vec[0].iov_base = xh;
318         ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
319                                         vec, veclen, remotenode, &status);
320         r2nm_node_put(node);
321         return ret;
322 }
323
324 /*
325  * Handler registration
326  */
327
328 static LIST_HEAD(r2net_unreg_list);
329
330 static void r2net_unregister_handlers(void)
331 {
332         r2net_unregister_handler_list(&r2net_unreg_list);
333 }
334
335 int r2net_register_handlers(void)
336 {
337         int status;
338
339         status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
340                                 RMSTR_R2NET_MAX_LEN,
341                                 ramster_remote_put_handler,
342                                 NULL, NULL, &r2net_unreg_list);
343         if (status)
344                 goto bail;
345
346         status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
347                                 RMSTR_R2NET_MAX_LEN,
348                                 ramster_remote_put_handler,
349                                 NULL, NULL, &r2net_unreg_list);
350         if (status)
351                 goto bail;
352
353         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
354                                 RMSTR_R2NET_MAX_LEN,
355                                 ramster_remote_async_get_request_handler,
356                                 NULL, NULL,
357                                 &r2net_unreg_list);
358         if (status)
359                 goto bail;
360
361         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
362                                 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
363                                 ramster_remote_async_get_request_handler,
364                                 NULL, NULL,
365                                 &r2net_unreg_list);
366         if (status)
367                 goto bail;
368
369         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
370                                 RMSTR_R2NET_MAX_LEN,
371                                 ramster_remote_async_get_reply_handler,
372                                 NULL, NULL,
373                                 &r2net_unreg_list);
374         if (status)
375                 goto bail;
376
377         status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
378                                 RMSTR_R2NET_MAX_LEN,
379                                 ramster_remote_flush_handler,
380                                 NULL, NULL,
381                                 &r2net_unreg_list);
382         if (status)
383                 goto bail;
384
385         status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
386                                 RMSTR_R2NET_MAX_LEN,
387                                 ramster_remote_flobj_handler,
388                                 NULL, NULL,
389                                 &r2net_unreg_list);
390         if (status)
391                 goto bail;
392
393         pr_info("ramster: r2net handlers registered\n");
394
395 bail:
396         if (status) {
397                 r2net_unregister_handlers();
398                 pr_err("ramster: couldn't register r2net handlers\n");
399         }
400         return status;
401 }