md-cluster: remove capabilities
[firefly-linux-kernel-4.4.55.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
16 #include "md.h"
17 #include "bitmap.h"
18 #include "md-cluster.h"
19
20 #define LVB_SIZE        64
21 #define NEW_DEV_TIMEOUT 5000
22
23 struct dlm_lock_resource {
24         dlm_lockspace_t *ls;
25         struct dlm_lksb lksb;
26         char *name; /* lock name. */
27         uint32_t flags; /* flags to pass to dlm_lock() */
28         struct completion completion; /* completion for synchronized locking */
29         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30         struct mddev *mddev; /* pointing back to mddev. */
31 };
32
33 struct suspend_info {
34         int slot;
35         sector_t lo;
36         sector_t hi;
37         struct list_head list;
38 };
39
40 struct resync_info {
41         __le64 lo;
42         __le64 hi;
43 };
44
45 /* md_cluster_info flags */
46 #define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
47
48
49 struct md_cluster_info {
50         /* dlm lock space and resources for clustered raid. */
51         dlm_lockspace_t *lockspace;
52         int slot_number;
53         struct completion completion;
54         struct dlm_lock_resource *sb_lock;
55         struct mutex sb_mutex;
56         struct dlm_lock_resource *bitmap_lockres;
57         struct list_head suspend_list;
58         spinlock_t suspend_lock;
59         struct md_thread *recovery_thread;
60         unsigned long recovery_map;
61         /* communication loc resources */
62         struct dlm_lock_resource *ack_lockres;
63         struct dlm_lock_resource *message_lockres;
64         struct dlm_lock_resource *token_lockres;
65         struct dlm_lock_resource *no_new_dev_lockres;
66         struct md_thread *recv_thread;
67         struct completion newdisk_completion;
68         unsigned long state;
69 };
70
71 enum msg_type {
72         METADATA_UPDATED = 0,
73         RESYNCING,
74         NEWDISK,
75         REMOVE,
76 };
77
78 struct cluster_msg {
79         int type;
80         int slot;
81         /* TODO: Unionize this for smaller footprint */
82         sector_t low;
83         sector_t high;
84         char uuid[16];
85         int raid_slot;
86 };
87
88 static void sync_ast(void *arg)
89 {
90         struct dlm_lock_resource *res;
91
92         res = (struct dlm_lock_resource *) arg;
93         complete(&res->completion);
94 }
95
96 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
97 {
98         int ret = 0;
99
100         init_completion(&res->completion);
101         ret = dlm_lock(res->ls, mode, &res->lksb,
102                         res->flags, res->name, strlen(res->name),
103                         0, sync_ast, res, res->bast);
104         if (ret)
105                 return ret;
106         wait_for_completion(&res->completion);
107         return res->lksb.sb_status;
108 }
109
110 static int dlm_unlock_sync(struct dlm_lock_resource *res)
111 {
112         return dlm_lock_sync(res, DLM_LOCK_NL);
113 }
114
115 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
116                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
117 {
118         struct dlm_lock_resource *res = NULL;
119         int ret, namelen;
120         struct md_cluster_info *cinfo = mddev->cluster_info;
121
122         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
123         if (!res)
124                 return NULL;
125         res->ls = cinfo->lockspace;
126         res->mddev = mddev;
127         namelen = strlen(name);
128         res->name = kzalloc(namelen + 1, GFP_KERNEL);
129         if (!res->name) {
130                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
131                 goto out_err;
132         }
133         strlcpy(res->name, name, namelen + 1);
134         if (with_lvb) {
135                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
136                 if (!res->lksb.sb_lvbptr) {
137                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
138                         goto out_err;
139                 }
140                 res->flags = DLM_LKF_VALBLK;
141         }
142
143         if (bastfn)
144                 res->bast = bastfn;
145
146         res->flags |= DLM_LKF_EXPEDITE;
147
148         ret = dlm_lock_sync(res, DLM_LOCK_NL);
149         if (ret) {
150                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
151                 goto out_err;
152         }
153         res->flags &= ~DLM_LKF_EXPEDITE;
154         res->flags |= DLM_LKF_CONVERT;
155
156         return res;
157 out_err:
158         kfree(res->lksb.sb_lvbptr);
159         kfree(res->name);
160         kfree(res);
161         return NULL;
162 }
163
164 static void lockres_free(struct dlm_lock_resource *res)
165 {
166         if (!res)
167                 return;
168
169         init_completion(&res->completion);
170         dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
171         wait_for_completion(&res->completion);
172
173         kfree(res->name);
174         kfree(res->lksb.sb_lvbptr);
175         kfree(res);
176 }
177
178 static char *pretty_uuid(char *dest, char *src)
179 {
180         int i, len = 0;
181
182         for (i = 0; i < 16; i++) {
183                 if (i == 4 || i == 6 || i == 8 || i == 10)
184                         len += sprintf(dest + len, "-");
185                 len += sprintf(dest + len, "%02x", (__u8)src[i]);
186         }
187         return dest;
188 }
189
190 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
191                 sector_t lo, sector_t hi)
192 {
193         struct resync_info *ri;
194
195         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
196         ri->lo = cpu_to_le64(lo);
197         ri->hi = cpu_to_le64(hi);
198 }
199
200 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
201 {
202         struct resync_info ri;
203         struct suspend_info *s = NULL;
204         sector_t hi = 0;
205
206         dlm_lock_sync(lockres, DLM_LOCK_CR);
207         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
208         hi = le64_to_cpu(ri.hi);
209         if (ri.hi > 0) {
210                 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
211                 if (!s)
212                         goto out;
213                 s->hi = hi;
214                 s->lo = le64_to_cpu(ri.lo);
215         }
216         dlm_unlock_sync(lockres);
217 out:
218         return s;
219 }
220
221 static void recover_bitmaps(struct md_thread *thread)
222 {
223         struct mddev *mddev = thread->mddev;
224         struct md_cluster_info *cinfo = mddev->cluster_info;
225         struct dlm_lock_resource *bm_lockres;
226         char str[64];
227         int slot, ret;
228         struct suspend_info *s, *tmp;
229         sector_t lo, hi;
230
231         while (cinfo->recovery_map) {
232                 slot = fls64((u64)cinfo->recovery_map) - 1;
233
234                 /* Clear suspend_area associated with the bitmap */
235                 spin_lock_irq(&cinfo->suspend_lock);
236                 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
237                         if (slot == s->slot) {
238                                 list_del(&s->list);
239                                 kfree(s);
240                         }
241                 spin_unlock_irq(&cinfo->suspend_lock);
242
243                 snprintf(str, 64, "bitmap%04d", slot);
244                 bm_lockres = lockres_init(mddev, str, NULL, 1);
245                 if (!bm_lockres) {
246                         pr_err("md-cluster: Cannot initialize bitmaps\n");
247                         goto clear_bit;
248                 }
249
250                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
251                 if (ret) {
252                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
253                                         str, ret);
254                         goto clear_bit;
255                 }
256                 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
257                 if (ret) {
258                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
259                         goto dlm_unlock;
260                 }
261                 if (hi > 0) {
262                         /* TODO:Wait for current resync to get over */
263                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
264                         if (lo < mddev->recovery_cp)
265                                 mddev->recovery_cp = lo;
266                         md_check_recovery(mddev);
267                 }
268 dlm_unlock:
269                 dlm_unlock_sync(bm_lockres);
270 clear_bit:
271                 clear_bit(slot, &cinfo->recovery_map);
272         }
273 }
274
275 static void recover_prep(void *arg)
276 {
277 }
278
279 static void recover_slot(void *arg, struct dlm_slot *slot)
280 {
281         struct mddev *mddev = arg;
282         struct md_cluster_info *cinfo = mddev->cluster_info;
283
284         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
285                         mddev->bitmap_info.cluster_name,
286                         slot->nodeid, slot->slot,
287                         cinfo->slot_number);
288         set_bit(slot->slot - 1, &cinfo->recovery_map);
289         if (!cinfo->recovery_thread) {
290                 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
291                                 mddev, "recover");
292                 if (!cinfo->recovery_thread) {
293                         pr_warn("md-cluster: Could not create recovery thread\n");
294                         return;
295                 }
296         }
297         md_wakeup_thread(cinfo->recovery_thread);
298 }
299
300 static void recover_done(void *arg, struct dlm_slot *slots,
301                 int num_slots, int our_slot,
302                 uint32_t generation)
303 {
304         struct mddev *mddev = arg;
305         struct md_cluster_info *cinfo = mddev->cluster_info;
306
307         cinfo->slot_number = our_slot;
308         complete(&cinfo->completion);
309 }
310
311 static const struct dlm_lockspace_ops md_ls_ops = {
312         .recover_prep = recover_prep,
313         .recover_slot = recover_slot,
314         .recover_done = recover_done,
315 };
316
317 /*
318  * The BAST function for the ack lock resource
319  * This function wakes up the receive thread in
320  * order to receive and process the message.
321  */
322 static void ack_bast(void *arg, int mode)
323 {
324         struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
325         struct md_cluster_info *cinfo = res->mddev->cluster_info;
326
327         if (mode == DLM_LOCK_EX)
328                 md_wakeup_thread(cinfo->recv_thread);
329 }
330
331 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
332 {
333         struct suspend_info *s, *tmp;
334
335         list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
336                 if (slot == s->slot) {
337                         pr_info("%s:%d Deleting suspend_info: %d\n",
338                                         __func__, __LINE__, slot);
339                         list_del(&s->list);
340                         kfree(s);
341                         break;
342                 }
343 }
344
345 static void remove_suspend_info(struct md_cluster_info *cinfo, int slot)
346 {
347         spin_lock_irq(&cinfo->suspend_lock);
348         __remove_suspend_info(cinfo, slot);
349         spin_unlock_irq(&cinfo->suspend_lock);
350 }
351
352
353 static void process_suspend_info(struct md_cluster_info *cinfo,
354                 int slot, sector_t lo, sector_t hi)
355 {
356         struct suspend_info *s;
357
358         if (!hi) {
359                 remove_suspend_info(cinfo, slot);
360                 return;
361         }
362         s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
363         if (!s)
364                 return;
365         s->slot = slot;
366         s->lo = lo;
367         s->hi = hi;
368         spin_lock_irq(&cinfo->suspend_lock);
369         /* Remove existing entry (if exists) before adding */
370         __remove_suspend_info(cinfo, slot);
371         list_add(&s->list, &cinfo->suspend_list);
372         spin_unlock_irq(&cinfo->suspend_lock);
373 }
374
375 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
376 {
377         char disk_uuid[64];
378         struct md_cluster_info *cinfo = mddev->cluster_info;
379         char event_name[] = "EVENT=ADD_DEVICE";
380         char raid_slot[16];
381         char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
382         int len;
383
384         len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
385         pretty_uuid(disk_uuid + len, cmsg->uuid);
386         snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
387         pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
388         init_completion(&cinfo->newdisk_completion);
389         set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
390         kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
391         wait_for_completion_timeout(&cinfo->newdisk_completion,
392                         NEW_DEV_TIMEOUT);
393         clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
394 }
395
396
397 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
398 {
399         struct md_cluster_info *cinfo = mddev->cluster_info;
400
401         md_reload_sb(mddev);
402         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
403 }
404
405 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
406 {
407         struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
408
409         if (rdev)
410                 md_kick_rdev_from_array(rdev);
411         else
412                 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot);
413 }
414
415 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
416 {
417         switch (msg->type) {
418         case METADATA_UPDATED:
419                 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
420                         __func__, __LINE__, msg->slot);
421                 process_metadata_update(mddev, msg);
422                 break;
423         case RESYNCING:
424                 pr_info("%s: %d Received message: RESYNCING from %d\n",
425                         __func__, __LINE__, msg->slot);
426                 process_suspend_info(mddev->cluster_info, msg->slot,
427                                 msg->low, msg->high);
428                 break;
429         case NEWDISK:
430                 pr_info("%s: %d Received message: NEWDISK from %d\n",
431                         __func__, __LINE__, msg->slot);
432                 process_add_new_disk(mddev, msg);
433                 break;
434         case REMOVE:
435                 pr_info("%s: %d Received REMOVE from %d\n",
436                         __func__, __LINE__, msg->slot);
437                 process_remove_disk(mddev, msg);
438                 break;
439         default:
440                 pr_warn("%s:%d Received unknown message from %d\n",
441                         __func__, __LINE__, msg->slot);
442         }
443 }
444
445 /*
446  * thread for receiving message
447  */
448 static void recv_daemon(struct md_thread *thread)
449 {
450         struct md_cluster_info *cinfo = thread->mddev->cluster_info;
451         struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
452         struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
453         struct cluster_msg msg;
454
455         /*get CR on Message*/
456         if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
457                 pr_err("md/raid1:failed to get CR on MESSAGE\n");
458                 return;
459         }
460
461         /* read lvb and wake up thread to process this message_lockres */
462         memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
463         process_recvd_msg(thread->mddev, &msg);
464
465         /*release CR on ack_lockres*/
466         dlm_unlock_sync(ack_lockres);
467         /*up-convert to EX on message_lockres*/
468         dlm_lock_sync(message_lockres, DLM_LOCK_EX);
469         /*get CR on ack_lockres again*/
470         dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
471         /*release CR on message_lockres*/
472         dlm_unlock_sync(message_lockres);
473 }
474
475 /* lock_comm()
476  * Takes the lock on the TOKEN lock resource so no other
477  * node can communicate while the operation is underway.
478  */
479 static int lock_comm(struct md_cluster_info *cinfo)
480 {
481         int error;
482
483         error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
484         if (error)
485                 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
486                                 __func__, __LINE__, error);
487         return error;
488 }
489
490 static void unlock_comm(struct md_cluster_info *cinfo)
491 {
492         dlm_unlock_sync(cinfo->token_lockres);
493 }
494
495 /* __sendmsg()
496  * This function performs the actual sending of the message. This function is
497  * usually called after performing the encompassing operation
498  * The function:
499  * 1. Grabs the message lockresource in EX mode
500  * 2. Copies the message to the message LVB
501  * 3. Downconverts message lockresource to CR
502  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
503  *    and the other nodes read the message. The thread will wait here until all other
504  *    nodes have released ack lock resource.
505  * 5. Downconvert ack lockresource to CR
506  */
507 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
508 {
509         int error;
510         int slot = cinfo->slot_number - 1;
511
512         cmsg->slot = cpu_to_le32(slot);
513         /*get EX on Message*/
514         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
515         if (error) {
516                 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
517                 goto failed_message;
518         }
519
520         memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
521                         sizeof(struct cluster_msg));
522         /*down-convert EX to CR on Message*/
523         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR);
524         if (error) {
525                 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
526                                 error);
527                 goto failed_message;
528         }
529
530         /*up-convert CR to EX on Ack*/
531         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
532         if (error) {
533                 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
534                                 error);
535                 goto failed_ack;
536         }
537
538         /*down-convert EX to CR on Ack*/
539         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
540         if (error) {
541                 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
542                                 error);
543                 goto failed_ack;
544         }
545
546 failed_ack:
547         dlm_unlock_sync(cinfo->message_lockres);
548 failed_message:
549         return error;
550 }
551
552 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
553 {
554         int ret;
555
556         lock_comm(cinfo);
557         ret = __sendmsg(cinfo, cmsg);
558         unlock_comm(cinfo);
559         return ret;
560 }
561
562 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
563 {
564         struct md_cluster_info *cinfo = mddev->cluster_info;
565         int i, ret = 0;
566         struct dlm_lock_resource *bm_lockres;
567         struct suspend_info *s;
568         char str[64];
569
570
571         for (i = 0; i < total_slots; i++) {
572                 memset(str, '\0', 64);
573                 snprintf(str, 64, "bitmap%04d", i);
574                 bm_lockres = lockres_init(mddev, str, NULL, 1);
575                 if (!bm_lockres)
576                         return -ENOMEM;
577                 if (i == (cinfo->slot_number - 1))
578                         continue;
579
580                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
581                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
582                 if (ret == -EAGAIN) {
583                         memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
584                         s = read_resync_info(mddev, bm_lockres);
585                         if (s) {
586                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
587                                                 __func__, __LINE__,
588                                                 (unsigned long long) s->lo,
589                                                 (unsigned long long) s->hi, i);
590                                 spin_lock_irq(&cinfo->suspend_lock);
591                                 s->slot = i;
592                                 list_add(&s->list, &cinfo->suspend_list);
593                                 spin_unlock_irq(&cinfo->suspend_lock);
594                         }
595                         ret = 0;
596                         lockres_free(bm_lockres);
597                         continue;
598                 }
599                 if (ret)
600                         goto out;
601                 /* TODO: Read the disk bitmap sb and check if it needs recovery */
602                 dlm_unlock_sync(bm_lockres);
603                 lockres_free(bm_lockres);
604         }
605 out:
606         return ret;
607 }
608
609 static int join(struct mddev *mddev, int nodes)
610 {
611         struct md_cluster_info *cinfo;
612         int ret, ops_rv;
613         char str[64];
614
615         if (!try_module_get(THIS_MODULE))
616                 return -ENOENT;
617
618         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
619         if (!cinfo)
620                 return -ENOMEM;
621
622         init_completion(&cinfo->completion);
623
624         mutex_init(&cinfo->sb_mutex);
625         mddev->cluster_info = cinfo;
626
627         memset(str, 0, 64);
628         pretty_uuid(str, mddev->uuid);
629         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
630                                 DLM_LSFL_FS, LVB_SIZE,
631                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
632         if (ret)
633                 goto err;
634         wait_for_completion(&cinfo->completion);
635         if (nodes < cinfo->slot_number) {
636                 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
637                         cinfo->slot_number, nodes);
638                 ret = -ERANGE;
639                 goto err;
640         }
641         cinfo->sb_lock = lockres_init(mddev, "cmd-super",
642                                         NULL, 0);
643         if (!cinfo->sb_lock) {
644                 ret = -ENOMEM;
645                 goto err;
646         }
647         /* Initiate the communication resources */
648         ret = -ENOMEM;
649         cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
650         if (!cinfo->recv_thread) {
651                 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
652                 goto err;
653         }
654         cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
655         if (!cinfo->message_lockres)
656                 goto err;
657         cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
658         if (!cinfo->token_lockres)
659                 goto err;
660         cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
661         if (!cinfo->ack_lockres)
662                 goto err;
663         cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
664         if (!cinfo->no_new_dev_lockres)
665                 goto err;
666
667         /* get sync CR lock on ACK. */
668         if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
669                 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
670                                 ret);
671         /* get sync CR lock on no-new-dev. */
672         if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
673                 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
674
675
676         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
677         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
678         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
679         if (!cinfo->bitmap_lockres)
680                 goto err;
681         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
682                 pr_err("Failed to get bitmap lock\n");
683                 ret = -EINVAL;
684                 goto err;
685         }
686
687         INIT_LIST_HEAD(&cinfo->suspend_list);
688         spin_lock_init(&cinfo->suspend_lock);
689
690         ret = gather_all_resync_info(mddev, nodes);
691         if (ret)
692                 goto err;
693
694         return 0;
695 err:
696         lockres_free(cinfo->message_lockres);
697         lockres_free(cinfo->token_lockres);
698         lockres_free(cinfo->ack_lockres);
699         lockres_free(cinfo->no_new_dev_lockres);
700         lockres_free(cinfo->bitmap_lockres);
701         lockres_free(cinfo->sb_lock);
702         if (cinfo->lockspace)
703                 dlm_release_lockspace(cinfo->lockspace, 2);
704         mddev->cluster_info = NULL;
705         kfree(cinfo);
706         module_put(THIS_MODULE);
707         return ret;
708 }
709
710 static int leave(struct mddev *mddev)
711 {
712         struct md_cluster_info *cinfo = mddev->cluster_info;
713
714         if (!cinfo)
715                 return 0;
716         md_unregister_thread(&cinfo->recovery_thread);
717         md_unregister_thread(&cinfo->recv_thread);
718         lockres_free(cinfo->message_lockres);
719         lockres_free(cinfo->token_lockres);
720         lockres_free(cinfo->ack_lockres);
721         lockres_free(cinfo->no_new_dev_lockres);
722         lockres_free(cinfo->sb_lock);
723         lockres_free(cinfo->bitmap_lockres);
724         dlm_release_lockspace(cinfo->lockspace, 2);
725         return 0;
726 }
727
728 /* slot_number(): Returns the MD slot number to use
729  * DLM starts the slot numbers from 1, wheras cluster-md
730  * wants the number to be from zero, so we deduct one
731  */
732 static int slot_number(struct mddev *mddev)
733 {
734         struct md_cluster_info *cinfo = mddev->cluster_info;
735
736         return cinfo->slot_number - 1;
737 }
738
739 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
740 {
741         struct md_cluster_info *cinfo = mddev->cluster_info;
742
743         add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
744         /* Re-acquire the lock to refresh LVB */
745         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
746 }
747
748 static int metadata_update_start(struct mddev *mddev)
749 {
750         return lock_comm(mddev->cluster_info);
751 }
752
753 static int metadata_update_finish(struct mddev *mddev)
754 {
755         struct md_cluster_info *cinfo = mddev->cluster_info;
756         struct cluster_msg cmsg;
757         int ret;
758
759         memset(&cmsg, 0, sizeof(cmsg));
760         cmsg.type = cpu_to_le32(METADATA_UPDATED);
761         ret = __sendmsg(cinfo, &cmsg);
762         unlock_comm(cinfo);
763         return ret;
764 }
765
766 static int metadata_update_cancel(struct mddev *mddev)
767 {
768         struct md_cluster_info *cinfo = mddev->cluster_info;
769
770         return dlm_unlock_sync(cinfo->token_lockres);
771 }
772
773 static int resync_send(struct mddev *mddev, enum msg_type type,
774                 sector_t lo, sector_t hi)
775 {
776         struct md_cluster_info *cinfo = mddev->cluster_info;
777         struct cluster_msg cmsg;
778         int slot = cinfo->slot_number - 1;
779
780         pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__,
781                         (unsigned long long)lo,
782                         (unsigned long long)hi);
783         resync_info_update(mddev, lo, hi);
784         cmsg.type = cpu_to_le32(type);
785         cmsg.slot = cpu_to_le32(slot);
786         cmsg.low = cpu_to_le64(lo);
787         cmsg.high = cpu_to_le64(hi);
788         return sendmsg(cinfo, &cmsg);
789 }
790
791 static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi)
792 {
793         pr_info("%s:%d\n", __func__, __LINE__);
794         return resync_send(mddev, RESYNCING, lo, hi);
795 }
796
797 static void resync_finish(struct mddev *mddev)
798 {
799         pr_info("%s:%d\n", __func__, __LINE__);
800         resync_send(mddev, RESYNCING, 0, 0);
801 }
802
803 static int area_resyncing(struct mddev *mddev, sector_t lo, sector_t hi)
804 {
805         struct md_cluster_info *cinfo = mddev->cluster_info;
806         int ret = 0;
807         struct suspend_info *s;
808
809         spin_lock_irq(&cinfo->suspend_lock);
810         if (list_empty(&cinfo->suspend_list))
811                 goto out;
812         list_for_each_entry(s, &cinfo->suspend_list, list)
813                 if (hi > s->lo && lo < s->hi) {
814                         ret = 1;
815                         break;
816                 }
817 out:
818         spin_unlock_irq(&cinfo->suspend_lock);
819         return ret;
820 }
821
822 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
823 {
824         struct md_cluster_info *cinfo = mddev->cluster_info;
825         struct cluster_msg cmsg;
826         int ret = 0;
827         struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
828         char *uuid = sb->device_uuid;
829
830         memset(&cmsg, 0, sizeof(cmsg));
831         cmsg.type = cpu_to_le32(NEWDISK);
832         memcpy(cmsg.uuid, uuid, 16);
833         cmsg.raid_slot = rdev->desc_nr;
834         lock_comm(cinfo);
835         ret = __sendmsg(cinfo, &cmsg);
836         if (ret)
837                 return ret;
838         cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
839         ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
840         cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
841         /* Some node does not "see" the device */
842         if (ret == -EAGAIN)
843                 ret = -ENOENT;
844         else
845                 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
846         return ret;
847 }
848
849 static int add_new_disk_finish(struct mddev *mddev)
850 {
851         struct cluster_msg cmsg;
852         struct md_cluster_info *cinfo = mddev->cluster_info;
853         int ret;
854         /* Write sb and inform others */
855         md_update_sb(mddev, 1);
856         cmsg.type = METADATA_UPDATED;
857         ret = __sendmsg(cinfo, &cmsg);
858         unlock_comm(cinfo);
859         return ret;
860 }
861
862 static int new_disk_ack(struct mddev *mddev, bool ack)
863 {
864         struct md_cluster_info *cinfo = mddev->cluster_info;
865
866         if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
867                 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
868                 return -EINVAL;
869         }
870
871         if (ack)
872                 dlm_unlock_sync(cinfo->no_new_dev_lockres);
873         complete(&cinfo->newdisk_completion);
874         return 0;
875 }
876
877 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
878 {
879         struct cluster_msg cmsg;
880         struct md_cluster_info *cinfo = mddev->cluster_info;
881         cmsg.type = REMOVE;
882         cmsg.raid_slot = rdev->desc_nr;
883         return __sendmsg(cinfo, &cmsg);
884 }
885
886 static struct md_cluster_operations cluster_ops = {
887         .join   = join,
888         .leave  = leave,
889         .slot_number = slot_number,
890         .resync_info_update = resync_info_update,
891         .resync_start = resync_start,
892         .resync_finish = resync_finish,
893         .metadata_update_start = metadata_update_start,
894         .metadata_update_finish = metadata_update_finish,
895         .metadata_update_cancel = metadata_update_cancel,
896         .area_resyncing = area_resyncing,
897         .add_new_disk_start = add_new_disk_start,
898         .add_new_disk_finish = add_new_disk_finish,
899         .new_disk_ack = new_disk_ack,
900         .remove_disk = remove_disk,
901 };
902
903 static int __init cluster_init(void)
904 {
905         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
906         pr_info("Registering Cluster MD functions\n");
907         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
908         return 0;
909 }
910
911 static void cluster_exit(void)
912 {
913         unregister_md_cluster_operations();
914 }
915
916 module_init(cluster_init);
917 module_exit(cluster_exit);
918 MODULE_LICENSE("GPL");
919 MODULE_DESCRIPTION("Clustering support for MD");