Lock bitmap while joining the cluster
[firefly-linux-kernel-4.4.55.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "md-cluster.h"
17
18 #define LVB_SIZE        64
19
20 struct dlm_lock_resource {
21         dlm_lockspace_t *ls;
22         struct dlm_lksb lksb;
23         char *name; /* lock name. */
24         uint32_t flags; /* flags to pass to dlm_lock() */
25         struct completion completion; /* completion for synchronized locking */
26         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
27         struct mddev *mddev; /* pointing back to mddev. */
28 };
29
30 struct md_cluster_info {
31         /* dlm lock space and resources for clustered raid. */
32         dlm_lockspace_t *lockspace;
33         int slot_number;
34         struct completion completion;
35         struct dlm_lock_resource *sb_lock;
36         struct mutex sb_mutex;
37         struct dlm_lock_resource *bitmap_lockres;
38 };
39
40 static void sync_ast(void *arg)
41 {
42         struct dlm_lock_resource *res;
43
44         res = (struct dlm_lock_resource *) arg;
45         complete(&res->completion);
46 }
47
48 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
49 {
50         int ret = 0;
51
52         init_completion(&res->completion);
53         ret = dlm_lock(res->ls, mode, &res->lksb,
54                         res->flags, res->name, strlen(res->name),
55                         0, sync_ast, res, res->bast);
56         if (ret)
57                 return ret;
58         wait_for_completion(&res->completion);
59         return res->lksb.sb_status;
60 }
61
62 static int dlm_unlock_sync(struct dlm_lock_resource *res)
63 {
64         return dlm_lock_sync(res, DLM_LOCK_NL);
65 }
66
67 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
68                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
69 {
70         struct dlm_lock_resource *res = NULL;
71         int ret, namelen;
72         struct md_cluster_info *cinfo = mddev->cluster_info;
73
74         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
75         if (!res)
76                 return NULL;
77         res->ls = cinfo->lockspace;
78         res->mddev = mddev;
79         namelen = strlen(name);
80         res->name = kzalloc(namelen + 1, GFP_KERNEL);
81         if (!res->name) {
82                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
83                 goto out_err;
84         }
85         strlcpy(res->name, name, namelen + 1);
86         if (with_lvb) {
87                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
88                 if (!res->lksb.sb_lvbptr) {
89                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
90                         goto out_err;
91                 }
92                 res->flags = DLM_LKF_VALBLK;
93         }
94
95         if (bastfn)
96                 res->bast = bastfn;
97
98         res->flags |= DLM_LKF_EXPEDITE;
99
100         ret = dlm_lock_sync(res, DLM_LOCK_NL);
101         if (ret) {
102                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
103                 goto out_err;
104         }
105         res->flags &= ~DLM_LKF_EXPEDITE;
106         res->flags |= DLM_LKF_CONVERT;
107
108         return res;
109 out_err:
110         kfree(res->lksb.sb_lvbptr);
111         kfree(res->name);
112         kfree(res);
113         return NULL;
114 }
115
116 static void lockres_free(struct dlm_lock_resource *res)
117 {
118         if (!res)
119                 return;
120
121         init_completion(&res->completion);
122         dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
123         wait_for_completion(&res->completion);
124
125         kfree(res->name);
126         kfree(res->lksb.sb_lvbptr);
127         kfree(res);
128 }
129
130 static char *pretty_uuid(char *dest, char *src)
131 {
132         int i, len = 0;
133
134         for (i = 0; i < 16; i++) {
135                 if (i == 4 || i == 6 || i == 8 || i == 10)
136                         len += sprintf(dest + len, "-");
137                 len += sprintf(dest + len, "%02x", (__u8)src[i]);
138         }
139         return dest;
140 }
141
142 static void recover_prep(void *arg)
143 {
144 }
145
146 static void recover_slot(void *arg, struct dlm_slot *slot)
147 {
148         struct mddev *mddev = arg;
149         struct md_cluster_info *cinfo = mddev->cluster_info;
150
151         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
152                         mddev->bitmap_info.cluster_name,
153                         slot->nodeid, slot->slot,
154                         cinfo->slot_number);
155 }
156
157 static void recover_done(void *arg, struct dlm_slot *slots,
158                 int num_slots, int our_slot,
159                 uint32_t generation)
160 {
161         struct mddev *mddev = arg;
162         struct md_cluster_info *cinfo = mddev->cluster_info;
163
164         cinfo->slot_number = our_slot;
165         complete(&cinfo->completion);
166 }
167
168 static const struct dlm_lockspace_ops md_ls_ops = {
169         .recover_prep = recover_prep,
170         .recover_slot = recover_slot,
171         .recover_done = recover_done,
172 };
173
174 static int join(struct mddev *mddev, int nodes)
175 {
176         struct md_cluster_info *cinfo;
177         int ret, ops_rv;
178         char str[64];
179
180         if (!try_module_get(THIS_MODULE))
181                 return -ENOENT;
182
183         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
184         if (!cinfo)
185                 return -ENOMEM;
186
187         init_completion(&cinfo->completion);
188
189         mutex_init(&cinfo->sb_mutex);
190         mddev->cluster_info = cinfo;
191
192         memset(str, 0, 64);
193         pretty_uuid(str, mddev->uuid);
194         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
195                                 DLM_LSFL_FS, LVB_SIZE,
196                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
197         if (ret)
198                 goto err;
199         wait_for_completion(&cinfo->completion);
200         if (nodes <= cinfo->slot_number) {
201                 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
202                         nodes);
203                 ret = -ERANGE;
204                 goto err;
205         }
206         cinfo->sb_lock = lockres_init(mddev, "cmd-super",
207                                         NULL, 0);
208         if (!cinfo->sb_lock) {
209                 ret = -ENOMEM;
210                 goto err;
211         }
212
213         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
214         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
215         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
216         if (!cinfo->bitmap_lockres)
217                 goto err;
218         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
219                 pr_err("Failed to get bitmap lock\n");
220                 ret = -EINVAL;
221                 goto err;
222         }
223
224         return 0;
225 err:
226         if (cinfo->lockspace)
227                 dlm_release_lockspace(cinfo->lockspace, 2);
228         mddev->cluster_info = NULL;
229         kfree(cinfo);
230         module_put(THIS_MODULE);
231         return ret;
232 }
233
234 static int leave(struct mddev *mddev)
235 {
236         struct md_cluster_info *cinfo = mddev->cluster_info;
237
238         if (!cinfo)
239                 return 0;
240         lockres_free(cinfo->sb_lock);
241         lockres_free(cinfo->bitmap_lockres);
242         dlm_release_lockspace(cinfo->lockspace, 2);
243         return 0;
244 }
245
246 /* slot_number(): Returns the MD slot number to use
247  * DLM starts the slot numbers from 1, wheras cluster-md
248  * wants the number to be from zero, so we deduct one
249  */
250 static int slot_number(struct mddev *mddev)
251 {
252         struct md_cluster_info *cinfo = mddev->cluster_info;
253
254         return cinfo->slot_number - 1;
255 }
256
257 static struct md_cluster_operations cluster_ops = {
258         .join   = join,
259         .leave  = leave,
260         .slot_number = slot_number,
261 };
262
263 static int __init cluster_init(void)
264 {
265         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
266         pr_info("Registering Cluster MD functions\n");
267         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
268         return 0;
269 }
270
271 static void cluster_exit(void)
272 {
273         unregister_md_cluster_operations();
274 }
275
276 module_init(cluster_init);
277 module_exit(cluster_exit);
278 MODULE_LICENSE("GPL");
279 MODULE_DESCRIPTION("Clustering support for MD");