Perform resync for cluster node failure
[firefly-linux-kernel-4.4.55.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "bitmap.h"
17 #include "md-cluster.h"
18
19 #define LVB_SIZE        64
20
21 struct dlm_lock_resource {
22         dlm_lockspace_t *ls;
23         struct dlm_lksb lksb;
24         char *name; /* lock name. */
25         uint32_t flags; /* flags to pass to dlm_lock() */
26         struct completion completion; /* completion for synchronized locking */
27         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
28         struct mddev *mddev; /* pointing back to mddev. */
29 };
30
31 struct suspend_info {
32         int slot;
33         sector_t lo;
34         sector_t hi;
35         struct list_head list;
36 };
37
38 struct resync_info {
39         __le64 lo;
40         __le64 hi;
41 };
42
43 struct md_cluster_info {
44         /* dlm lock space and resources for clustered raid. */
45         dlm_lockspace_t *lockspace;
46         int slot_number;
47         struct completion completion;
48         struct dlm_lock_resource *sb_lock;
49         struct mutex sb_mutex;
50         struct dlm_lock_resource *bitmap_lockres;
51         struct list_head suspend_list;
52         spinlock_t suspend_lock;
53         struct md_thread *recovery_thread;
54         unsigned long recovery_map;
55 };
56
57 static void sync_ast(void *arg)
58 {
59         struct dlm_lock_resource *res;
60
61         res = (struct dlm_lock_resource *) arg;
62         complete(&res->completion);
63 }
64
65 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
66 {
67         int ret = 0;
68
69         init_completion(&res->completion);
70         ret = dlm_lock(res->ls, mode, &res->lksb,
71                         res->flags, res->name, strlen(res->name),
72                         0, sync_ast, res, res->bast);
73         if (ret)
74                 return ret;
75         wait_for_completion(&res->completion);
76         return res->lksb.sb_status;
77 }
78
79 static int dlm_unlock_sync(struct dlm_lock_resource *res)
80 {
81         return dlm_lock_sync(res, DLM_LOCK_NL);
82 }
83
84 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
85                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
86 {
87         struct dlm_lock_resource *res = NULL;
88         int ret, namelen;
89         struct md_cluster_info *cinfo = mddev->cluster_info;
90
91         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
92         if (!res)
93                 return NULL;
94         res->ls = cinfo->lockspace;
95         res->mddev = mddev;
96         namelen = strlen(name);
97         res->name = kzalloc(namelen + 1, GFP_KERNEL);
98         if (!res->name) {
99                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
100                 goto out_err;
101         }
102         strlcpy(res->name, name, namelen + 1);
103         if (with_lvb) {
104                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
105                 if (!res->lksb.sb_lvbptr) {
106                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
107                         goto out_err;
108                 }
109                 res->flags = DLM_LKF_VALBLK;
110         }
111
112         if (bastfn)
113                 res->bast = bastfn;
114
115         res->flags |= DLM_LKF_EXPEDITE;
116
117         ret = dlm_lock_sync(res, DLM_LOCK_NL);
118         if (ret) {
119                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
120                 goto out_err;
121         }
122         res->flags &= ~DLM_LKF_EXPEDITE;
123         res->flags |= DLM_LKF_CONVERT;
124
125         return res;
126 out_err:
127         kfree(res->lksb.sb_lvbptr);
128         kfree(res->name);
129         kfree(res);
130         return NULL;
131 }
132
133 static void lockres_free(struct dlm_lock_resource *res)
134 {
135         if (!res)
136                 return;
137
138         init_completion(&res->completion);
139         dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
140         wait_for_completion(&res->completion);
141
142         kfree(res->name);
143         kfree(res->lksb.sb_lvbptr);
144         kfree(res);
145 }
146
147 static char *pretty_uuid(char *dest, char *src)
148 {
149         int i, len = 0;
150
151         for (i = 0; i < 16; i++) {
152                 if (i == 4 || i == 6 || i == 8 || i == 10)
153                         len += sprintf(dest + len, "-");
154                 len += sprintf(dest + len, "%02x", (__u8)src[i]);
155         }
156         return dest;
157 }
158
159 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
160                 sector_t lo, sector_t hi)
161 {
162         struct resync_info *ri;
163
164         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
165         ri->lo = cpu_to_le64(lo);
166         ri->hi = cpu_to_le64(hi);
167 }
168
169 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
170 {
171         struct resync_info ri;
172         struct suspend_info *s = NULL;
173         sector_t hi = 0;
174
175         dlm_lock_sync(lockres, DLM_LOCK_CR);
176         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
177         hi = le64_to_cpu(ri.hi);
178         if (ri.hi > 0) {
179                 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
180                 if (!s)
181                         goto out;
182                 s->hi = hi;
183                 s->lo = le64_to_cpu(ri.lo);
184         }
185         dlm_unlock_sync(lockres);
186 out:
187         return s;
188 }
189
190 void recover_bitmaps(struct md_thread *thread)
191 {
192         struct mddev *mddev = thread->mddev;
193         struct md_cluster_info *cinfo = mddev->cluster_info;
194         struct dlm_lock_resource *bm_lockres;
195         char str[64];
196         int slot, ret;
197         struct suspend_info *s, *tmp;
198         sector_t lo, hi;
199
200         while (cinfo->recovery_map) {
201                 slot = fls64((u64)cinfo->recovery_map) - 1;
202
203                 /* Clear suspend_area associated with the bitmap */
204                 spin_lock_irq(&cinfo->suspend_lock);
205                 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
206                         if (slot == s->slot) {
207                                 list_del(&s->list);
208                                 kfree(s);
209                         }
210                 spin_unlock_irq(&cinfo->suspend_lock);
211
212                 snprintf(str, 64, "bitmap%04d", slot);
213                 bm_lockres = lockres_init(mddev, str, NULL, 1);
214                 if (!bm_lockres) {
215                         pr_err("md-cluster: Cannot initialize bitmaps\n");
216                         goto clear_bit;
217                 }
218
219                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
220                 if (ret) {
221                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
222                                         str, ret);
223                         goto clear_bit;
224                 }
225                 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
226                 if (ret) {
227                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
228                         goto dlm_unlock;
229                 }
230                 if (hi > 0) {
231                         /* TODO:Wait for current resync to get over */
232                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
233                         if (lo < mddev->recovery_cp)
234                                 mddev->recovery_cp = lo;
235                         md_check_recovery(mddev);
236                 }
237 dlm_unlock:
238                 dlm_unlock_sync(bm_lockres);
239 clear_bit:
240                 clear_bit(slot, &cinfo->recovery_map);
241         }
242 }
243
244 static void recover_prep(void *arg)
245 {
246 }
247
248 static void recover_slot(void *arg, struct dlm_slot *slot)
249 {
250         struct mddev *mddev = arg;
251         struct md_cluster_info *cinfo = mddev->cluster_info;
252
253         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
254                         mddev->bitmap_info.cluster_name,
255                         slot->nodeid, slot->slot,
256                         cinfo->slot_number);
257         set_bit(slot->slot - 1, &cinfo->recovery_map);
258         if (!cinfo->recovery_thread) {
259                 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
260                                 mddev, "recover");
261                 if (!cinfo->recovery_thread) {
262                         pr_warn("md-cluster: Could not create recovery thread\n");
263                         return;
264                 }
265         }
266         md_wakeup_thread(cinfo->recovery_thread);
267 }
268
269 static void recover_done(void *arg, struct dlm_slot *slots,
270                 int num_slots, int our_slot,
271                 uint32_t generation)
272 {
273         struct mddev *mddev = arg;
274         struct md_cluster_info *cinfo = mddev->cluster_info;
275
276         cinfo->slot_number = our_slot;
277         complete(&cinfo->completion);
278 }
279
280 static const struct dlm_lockspace_ops md_ls_ops = {
281         .recover_prep = recover_prep,
282         .recover_slot = recover_slot,
283         .recover_done = recover_done,
284 };
285
286 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
287 {
288         struct md_cluster_info *cinfo = mddev->cluster_info;
289         int i, ret = 0;
290         struct dlm_lock_resource *bm_lockres;
291         struct suspend_info *s;
292         char str[64];
293
294
295         for (i = 0; i < total_slots; i++) {
296                 memset(str, '\0', 64);
297                 snprintf(str, 64, "bitmap%04d", i);
298                 bm_lockres = lockres_init(mddev, str, NULL, 1);
299                 if (!bm_lockres)
300                         return -ENOMEM;
301                 if (i == (cinfo->slot_number - 1))
302                         continue;
303
304                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
305                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
306                 if (ret == -EAGAIN) {
307                         memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
308                         s = read_resync_info(mddev, bm_lockres);
309                         if (s) {
310                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
311                                                 __func__, __LINE__,
312                                                 (unsigned long long) s->lo,
313                                                 (unsigned long long) s->hi, i);
314                                 spin_lock_irq(&cinfo->suspend_lock);
315                                 s->slot = i;
316                                 list_add(&s->list, &cinfo->suspend_list);
317                                 spin_unlock_irq(&cinfo->suspend_lock);
318                         }
319                         ret = 0;
320                         lockres_free(bm_lockres);
321                         continue;
322                 }
323                 if (ret)
324                         goto out;
325                 /* TODO: Read the disk bitmap sb and check if it needs recovery */
326                 dlm_unlock_sync(bm_lockres);
327                 lockres_free(bm_lockres);
328         }
329 out:
330         return ret;
331 }
332
333 static int join(struct mddev *mddev, int nodes)
334 {
335         struct md_cluster_info *cinfo;
336         int ret, ops_rv;
337         char str[64];
338
339         if (!try_module_get(THIS_MODULE))
340                 return -ENOENT;
341
342         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
343         if (!cinfo)
344                 return -ENOMEM;
345
346         init_completion(&cinfo->completion);
347
348         mutex_init(&cinfo->sb_mutex);
349         mddev->cluster_info = cinfo;
350
351         memset(str, 0, 64);
352         pretty_uuid(str, mddev->uuid);
353         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
354                                 DLM_LSFL_FS, LVB_SIZE,
355                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
356         if (ret)
357                 goto err;
358         wait_for_completion(&cinfo->completion);
359         if (nodes <= cinfo->slot_number) {
360                 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
361                         nodes);
362                 ret = -ERANGE;
363                 goto err;
364         }
365         cinfo->sb_lock = lockres_init(mddev, "cmd-super",
366                                         NULL, 0);
367         if (!cinfo->sb_lock) {
368                 ret = -ENOMEM;
369                 goto err;
370         }
371
372         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
373         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
374         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
375         if (!cinfo->bitmap_lockres)
376                 goto err;
377         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
378                 pr_err("Failed to get bitmap lock\n");
379                 ret = -EINVAL;
380                 goto err;
381         }
382
383         INIT_LIST_HEAD(&cinfo->suspend_list);
384         spin_lock_init(&cinfo->suspend_lock);
385
386         ret = gather_all_resync_info(mddev, nodes);
387         if (ret)
388                 goto err;
389
390         return 0;
391 err:
392         lockres_free(cinfo->bitmap_lockres);
393         lockres_free(cinfo->sb_lock);
394         if (cinfo->lockspace)
395                 dlm_release_lockspace(cinfo->lockspace, 2);
396         mddev->cluster_info = NULL;
397         kfree(cinfo);
398         module_put(THIS_MODULE);
399         return ret;
400 }
401
402 static int leave(struct mddev *mddev)
403 {
404         struct md_cluster_info *cinfo = mddev->cluster_info;
405
406         if (!cinfo)
407                 return 0;
408         md_unregister_thread(&cinfo->recovery_thread);
409         lockres_free(cinfo->sb_lock);
410         lockres_free(cinfo->bitmap_lockres);
411         dlm_release_lockspace(cinfo->lockspace, 2);
412         return 0;
413 }
414
415 /* slot_number(): Returns the MD slot number to use
416  * DLM starts the slot numbers from 1, wheras cluster-md
417  * wants the number to be from zero, so we deduct one
418  */
419 static int slot_number(struct mddev *mddev)
420 {
421         struct md_cluster_info *cinfo = mddev->cluster_info;
422
423         return cinfo->slot_number - 1;
424 }
425
426 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
427 {
428         struct md_cluster_info *cinfo = mddev->cluster_info;
429
430         add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
431         /* Re-acquire the lock to refresh LVB */
432         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
433 }
434
435 static struct md_cluster_operations cluster_ops = {
436         .join   = join,
437         .leave  = leave,
438         .slot_number = slot_number,
439         .resync_info_update = resync_info_update,
440 };
441
442 static int __init cluster_init(void)
443 {
444         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
445         pr_info("Registering Cluster MD functions\n");
446         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
447         return 0;
448 }
449
450 static void cluster_exit(void)
451 {
452         unregister_md_cluster_operations();
453 }
454
455 module_init(cluster_init);
456 module_exit(cluster_exit);
457 MODULE_LICENSE("GPL");
458 MODULE_DESCRIPTION("Clustering support for MD");