d4d3b3165c6c4636d3eb64b783b9368fff219bec
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84         int val = simple_strtoul(buf, NULL, 0);
85         if (val == 1)
86                 set_bit(LSFL_NODIR, &ls->ls_flags);
87         return len;
88 }
89
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
92         uint32_t status = dlm_recover_status(ls);
93         return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100
101 struct dlm_attr {
102         struct attribute attr;
103         ssize_t (*show)(struct dlm_ls *, char *);
104         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106
107 static struct dlm_attr dlm_attr_control = {
108         .attr  = {.name = "control", .mode = S_IWUSR},
109         .store = dlm_control_store
110 };
111
112 static struct dlm_attr dlm_attr_event = {
113         .attr  = {.name = "event_done", .mode = S_IWUSR},
114         .store = dlm_event_store
115 };
116
117 static struct dlm_attr dlm_attr_id = {
118         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119         .show  = dlm_id_show,
120         .store = dlm_id_store
121 };
122
123 static struct dlm_attr dlm_attr_nodir = {
124         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125         .show  = dlm_nodir_show,
126         .store = dlm_nodir_store
127 };
128
129 static struct dlm_attr dlm_attr_recover_status = {
130         .attr  = {.name = "recover_status", .mode = S_IRUGO},
131         .show  = dlm_recover_status_show
132 };
133
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136         .show  = dlm_recover_nodeid_show
137 };
138
139 static struct attribute *dlm_attrs[] = {
140         &dlm_attr_control.attr,
141         &dlm_attr_event.attr,
142         &dlm_attr_id.attr,
143         &dlm_attr_nodir.attr,
144         &dlm_attr_recover_status.attr,
145         &dlm_attr_recover_nodeid.attr,
146         NULL,
147 };
148
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150                              char *buf)
151 {
152         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154         return a->show ? a->show(ls, buf) : 0;
155 }
156
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158                               const char *buf, size_t len)
159 {
160         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162         return a->store ? a->store(ls, buf, len) : len;
163 }
164
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168         kfree(ls);
169 }
170
171 static const struct sysfs_ops dlm_attr_ops = {
172         .show  = dlm_attr_show,
173         .store = dlm_attr_store,
174 };
175
176 static struct kobj_type dlm_ktype = {
177         .default_attrs = dlm_attrs,
178         .sysfs_ops     = &dlm_attr_ops,
179         .release       = lockspace_kobj_release,
180 };
181
182 static struct kset *dlm_kset;
183
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186         int error;
187
188         if (in)
189                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190         else
191                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195         /* dlm_controld will see the uevent, do the necessary group management
196            and then write to sysfs to wake us */
197
198         error = wait_event_interruptible(ls->ls_uevent_wait,
199                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203         if (error)
204                 goto out;
205
206         error = ls->ls_uevent_result;
207  out:
208         if (error)
209                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210                           error, ls->ls_uevent_result);
211         return error;
212 }
213
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215                       struct kobj_uevent_env *env)
216 {
217         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220         return 0;
221 }
222
223 static struct kset_uevent_ops dlm_uevent_ops = {
224         .uevent = dlm_uevent,
225 };
226
227 int __init dlm_lockspace_init(void)
228 {
229         ls_count = 0;
230         mutex_init(&ls_lock);
231         INIT_LIST_HEAD(&lslist);
232         spin_lock_init(&lslist_lock);
233
234         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235         if (!dlm_kset) {
236                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237                 return -ENOMEM;
238         }
239         return 0;
240 }
241
242 void dlm_lockspace_exit(void)
243 {
244         kset_unregister(dlm_kset);
245 }
246
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249         struct dlm_ls *ls;
250
251         spin_lock(&lslist_lock);
252         list_for_each_entry(ls, &lslist, ls_list) {
253                 if (time_after_eq(jiffies, ls->ls_scan_time +
254                                             dlm_config.ci_scan_secs * HZ)) {
255                         spin_unlock(&lslist_lock);
256                         return ls;
257                 }
258         }
259         spin_unlock(&lslist_lock);
260         return NULL;
261 }
262
263 static int dlm_scand(void *data)
264 {
265         struct dlm_ls *ls;
266
267         while (!kthread_should_stop()) {
268                 ls = find_ls_to_scan();
269                 if (ls) {
270                         if (dlm_lock_recovery_try(ls)) {
271                                 ls->ls_scan_time = jiffies;
272                                 dlm_scan_rsbs(ls);
273                                 dlm_scan_timeout(ls);
274                                 dlm_scan_waiters(ls);
275                                 dlm_unlock_recovery(ls);
276                         } else {
277                                 ls->ls_scan_time += HZ;
278                         }
279                         continue;
280                 }
281                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282         }
283         return 0;
284 }
285
286 static int dlm_scand_start(void)
287 {
288         struct task_struct *p;
289         int error = 0;
290
291         p = kthread_run(dlm_scand, NULL, "dlm_scand");
292         if (IS_ERR(p))
293                 error = PTR_ERR(p);
294         else
295                 scand_task = p;
296         return error;
297 }
298
299 static void dlm_scand_stop(void)
300 {
301         kthread_stop(scand_task);
302 }
303
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305 {
306         struct dlm_ls *ls;
307
308         spin_lock(&lslist_lock);
309
310         list_for_each_entry(ls, &lslist, ls_list) {
311                 if (ls->ls_global_id == id) {
312                         ls->ls_count++;
313                         goto out;
314                 }
315         }
316         ls = NULL;
317  out:
318         spin_unlock(&lslist_lock);
319         return ls;
320 }
321
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323 {
324         struct dlm_ls *ls;
325
326         spin_lock(&lslist_lock);
327         list_for_each_entry(ls, &lslist, ls_list) {
328                 if (ls->ls_local_handle == lockspace) {
329                         ls->ls_count++;
330                         goto out;
331                 }
332         }
333         ls = NULL;
334  out:
335         spin_unlock(&lslist_lock);
336         return ls;
337 }
338
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
340 {
341         struct dlm_ls *ls;
342
343         spin_lock(&lslist_lock);
344         list_for_each_entry(ls, &lslist, ls_list) {
345                 if (ls->ls_device.minor == minor) {
346                         ls->ls_count++;
347                         goto out;
348                 }
349         }
350         ls = NULL;
351  out:
352         spin_unlock(&lslist_lock);
353         return ls;
354 }
355
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358         spin_lock(&lslist_lock);
359         ls->ls_count--;
360         spin_unlock(&lslist_lock);
361 }
362
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365         for (;;) {
366                 spin_lock(&lslist_lock);
367                 if (ls->ls_count == 0) {
368                         WARN_ON(ls->ls_create_count != 0);
369                         list_del(&ls->ls_list);
370                         spin_unlock(&lslist_lock);
371                         return;
372                 }
373                 spin_unlock(&lslist_lock);
374                 ssleep(1);
375         }
376 }
377
378 static int threads_start(void)
379 {
380         int error;
381
382         error = dlm_scand_start();
383         if (error) {
384                 log_print("cannot start dlm_scand thread %d", error);
385                 goto fail;
386         }
387
388         /* Thread for sending/receiving messages for all lockspace's */
389         error = dlm_lowcomms_start();
390         if (error) {
391                 log_print("cannot start dlm lowcomms %d", error);
392                 goto scand_fail;
393         }
394
395         return 0;
396
397  scand_fail:
398         dlm_scand_stop();
399  fail:
400         return error;
401 }
402
403 static void threads_stop(void)
404 {
405         dlm_scand_stop();
406         dlm_lowcomms_stop();
407 }
408
409 static int new_lockspace(const char *name, const char *cluster,
410                          uint32_t flags, int lvblen,
411                          const struct dlm_lockspace_ops *ops, void *ops_arg,
412                          int *ops_result, dlm_lockspace_t **lockspace)
413 {
414         struct dlm_ls *ls;
415         int i, size, error;
416         int do_unreg = 0;
417         int namelen = strlen(name);
418
419         if (namelen > DLM_LOCKSPACE_LEN)
420                 return -EINVAL;
421
422         if (!lvblen || (lvblen % 8))
423                 return -EINVAL;
424
425         if (!try_module_get(THIS_MODULE))
426                 return -EINVAL;
427
428         if (!dlm_user_daemon_available()) {
429                 log_print("dlm user daemon not available");
430                 error = -EUNATCH;
431                 goto out;
432         }
433
434         if (ops && ops_result) {
435                 if (!dlm_config.ci_recover_callbacks)
436                         *ops_result = -EOPNOTSUPP;
437                 else
438                         *ops_result = 0;
439         }
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name %s mismatch %s",
444                           dlm_config.ci_cluster_name, cluster);
445                 error = -EBADR;
446                 goto out;
447         }
448
449         error = 0;
450
451         spin_lock(&lslist_lock);
452         list_for_each_entry(ls, &lslist, ls_list) {
453                 WARN_ON(ls->ls_create_count <= 0);
454                 if (ls->ls_namelen != namelen)
455                         continue;
456                 if (memcmp(ls->ls_name, name, namelen))
457                         continue;
458                 if (flags & DLM_LSFL_NEWEXCL) {
459                         error = -EEXIST;
460                         break;
461                 }
462                 ls->ls_create_count++;
463                 *lockspace = ls;
464                 error = 1;
465                 break;
466         }
467         spin_unlock(&lslist_lock);
468
469         if (error)
470                 goto out;
471
472         error = -ENOMEM;
473
474         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475         if (!ls)
476                 goto out;
477         memcpy(ls->ls_name, name, namelen);
478         ls->ls_namelen = namelen;
479         ls->ls_lvblen = lvblen;
480         ls->ls_count = 0;
481         ls->ls_flags = 0;
482         ls->ls_scan_time = jiffies;
483
484         if (ops && dlm_config.ci_recover_callbacks) {
485                 ls->ls_ops = ops;
486                 ls->ls_ops_arg = ops_arg;
487         }
488
489         if (flags & DLM_LSFL_TIMEWARN)
490                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492         /* ls_exflags are forced to match among nodes, and we don't
493            need to require all nodes to have some flags set */
494         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495                                     DLM_LSFL_NEWEXCL));
496
497         size = dlm_config.ci_rsbtbl_size;
498         ls->ls_rsbtbl_size = size;
499
500         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501         if (!ls->ls_rsbtbl)
502                 goto out_lsfree;
503         for (i = 0; i < size; i++) {
504                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507         }
508
509         idr_init(&ls->ls_lkbidr);
510         spin_lock_init(&ls->ls_lkbidr_spin);
511
512         INIT_LIST_HEAD(&ls->ls_waiters);
513         mutex_init(&ls->ls_waiters_mutex);
514         INIT_LIST_HEAD(&ls->ls_orphans);
515         mutex_init(&ls->ls_orphans_mutex);
516         INIT_LIST_HEAD(&ls->ls_timeout);
517         mutex_init(&ls->ls_timeout_mutex);
518
519         INIT_LIST_HEAD(&ls->ls_new_rsb);
520         spin_lock_init(&ls->ls_new_rsb_spin);
521
522         INIT_LIST_HEAD(&ls->ls_nodes);
523         INIT_LIST_HEAD(&ls->ls_nodes_gone);
524         ls->ls_num_nodes = 0;
525         ls->ls_low_nodeid = 0;
526         ls->ls_total_weight = 0;
527         ls->ls_node_array = NULL;
528
529         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
530         ls->ls_stub_rsb.res_ls = ls;
531
532         ls->ls_debug_rsb_dentry = NULL;
533         ls->ls_debug_waiters_dentry = NULL;
534
535         init_waitqueue_head(&ls->ls_uevent_wait);
536         ls->ls_uevent_result = 0;
537         init_completion(&ls->ls_members_done);
538         ls->ls_members_result = -1;
539
540         mutex_init(&ls->ls_cb_mutex);
541         INIT_LIST_HEAD(&ls->ls_cb_delay);
542
543         ls->ls_recoverd_task = NULL;
544         mutex_init(&ls->ls_recoverd_active);
545         spin_lock_init(&ls->ls_recover_lock);
546         spin_lock_init(&ls->ls_rcom_spin);
547         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
548         ls->ls_recover_status = 0;
549         ls->ls_recover_seq = 0;
550         ls->ls_recover_args = NULL;
551         init_rwsem(&ls->ls_in_recovery);
552         init_rwsem(&ls->ls_recv_active);
553         INIT_LIST_HEAD(&ls->ls_requestqueue);
554         mutex_init(&ls->ls_requestqueue_mutex);
555         mutex_init(&ls->ls_clear_proc_locks);
556
557         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
558         if (!ls->ls_recover_buf)
559                 goto out_lkbfree;
560
561         ls->ls_slot = 0;
562         ls->ls_num_slots = 0;
563         ls->ls_slots_size = 0;
564         ls->ls_slots = NULL;
565
566         INIT_LIST_HEAD(&ls->ls_recover_list);
567         spin_lock_init(&ls->ls_recover_list_lock);
568         idr_init(&ls->ls_recover_idr);
569         spin_lock_init(&ls->ls_recover_idr_lock);
570         ls->ls_recover_list_count = 0;
571         ls->ls_local_handle = ls;
572         init_waitqueue_head(&ls->ls_wait_general);
573         INIT_LIST_HEAD(&ls->ls_root_list);
574         init_rwsem(&ls->ls_root_sem);
575
576         down_write(&ls->ls_in_recovery);
577
578         spin_lock(&lslist_lock);
579         ls->ls_create_count = 1;
580         list_add(&ls->ls_list, &lslist);
581         spin_unlock(&lslist_lock);
582
583         if (flags & DLM_LSFL_FS) {
584                 error = dlm_callback_start(ls);
585                 if (error) {
586                         log_error(ls, "can't start dlm_callback %d", error);
587                         goto out_delist;
588                 }
589         }
590
591         /* needs to find ls in lslist */
592         error = dlm_recoverd_start(ls);
593         if (error) {
594                 log_error(ls, "can't start dlm_recoverd %d", error);
595                 goto out_callback;
596         }
597
598         ls->ls_kobj.kset = dlm_kset;
599         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
600                                      "%s", ls->ls_name);
601         if (error)
602                 goto out_recoverd;
603         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
604
605         /* let kobject handle freeing of ls if there's an error */
606         do_unreg = 1;
607
608         /* This uevent triggers dlm_controld in userspace to add us to the
609            group of nodes that are members of this lockspace (managed by the
610            cluster infrastructure.)  Once it's done that, it tells us who the
611            current lockspace members are (via configfs) and then tells the
612            lockspace to start running (via sysfs) in dlm_ls_start(). */
613
614         error = do_uevent(ls, 1);
615         if (error)
616                 goto out_recoverd;
617
618         wait_for_completion(&ls->ls_members_done);
619         error = ls->ls_members_result;
620         if (error)
621                 goto out_members;
622
623         dlm_create_debug_file(ls);
624
625         log_debug(ls, "join complete");
626         *lockspace = ls;
627         return 0;
628
629  out_members:
630         do_uevent(ls, 0);
631         dlm_clear_members(ls);
632         kfree(ls->ls_node_array);
633  out_recoverd:
634         dlm_recoverd_stop(ls);
635  out_callback:
636         dlm_callback_stop(ls);
637  out_delist:
638         spin_lock(&lslist_lock);
639         list_del(&ls->ls_list);
640         spin_unlock(&lslist_lock);
641         idr_destroy(&ls->ls_recover_idr);
642         kfree(ls->ls_recover_buf);
643  out_lkbfree:
644         idr_destroy(&ls->ls_lkbidr);
645         vfree(ls->ls_rsbtbl);
646  out_lsfree:
647         if (do_unreg)
648                 kobject_put(&ls->ls_kobj);
649         else
650                 kfree(ls);
651  out:
652         module_put(THIS_MODULE);
653         return error;
654 }
655
656 int dlm_new_lockspace(const char *name, const char *cluster,
657                       uint32_t flags, int lvblen,
658                       const struct dlm_lockspace_ops *ops, void *ops_arg,
659                       int *ops_result, dlm_lockspace_t **lockspace)
660 {
661         int error = 0;
662
663         mutex_lock(&ls_lock);
664         if (!ls_count)
665                 error = threads_start();
666         if (error)
667                 goto out;
668
669         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
670                               ops_result, lockspace);
671         if (!error)
672                 ls_count++;
673         if (error > 0)
674                 error = 0;
675         if (!ls_count)
676                 threads_stop();
677  out:
678         mutex_unlock(&ls_lock);
679         return error;
680 }
681
682 static int lkb_idr_is_local(int id, void *p, void *data)
683 {
684         struct dlm_lkb *lkb = p;
685
686         if (!lkb->lkb_nodeid)
687                 return 1;
688         return 0;
689 }
690
691 static int lkb_idr_is_any(int id, void *p, void *data)
692 {
693         return 1;
694 }
695
696 static int lkb_idr_free(int id, void *p, void *data)
697 {
698         struct dlm_lkb *lkb = p;
699
700         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
701                 dlm_free_lvb(lkb->lkb_lvbptr);
702
703         dlm_free_lkb(lkb);
704         return 0;
705 }
706
707 /* NOTE: We check the lkbidr here rather than the resource table.
708    This is because there may be LKBs queued as ASTs that have been unlinked
709    from their RSBs and are pending deletion once the AST has been delivered */
710
711 static int lockspace_busy(struct dlm_ls *ls, int force)
712 {
713         int rv;
714
715         spin_lock(&ls->ls_lkbidr_spin);
716         if (force == 0) {
717                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
718         } else if (force == 1) {
719                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
720         } else {
721                 rv = 0;
722         }
723         spin_unlock(&ls->ls_lkbidr_spin);
724         return rv;
725 }
726
727 static int release_lockspace(struct dlm_ls *ls, int force)
728 {
729         struct dlm_rsb *rsb;
730         struct rb_node *n;
731         int i, busy, rv;
732
733         busy = lockspace_busy(ls, force);
734
735         spin_lock(&lslist_lock);
736         if (ls->ls_create_count == 1) {
737                 if (busy) {
738                         rv = -EBUSY;
739                 } else {
740                         /* remove_lockspace takes ls off lslist */
741                         ls->ls_create_count = 0;
742                         rv = 0;
743                 }
744         } else if (ls->ls_create_count > 1) {
745                 rv = --ls->ls_create_count;
746         } else {
747                 rv = -EINVAL;
748         }
749         spin_unlock(&lslist_lock);
750
751         if (rv) {
752                 log_debug(ls, "release_lockspace no remove %d", rv);
753                 return rv;
754         }
755
756         dlm_device_deregister(ls);
757
758         if (force < 3 && dlm_user_daemon_available())
759                 do_uevent(ls, 0);
760
761         dlm_recoverd_stop(ls);
762
763         dlm_callback_stop(ls);
764
765         remove_lockspace(ls);
766
767         dlm_delete_debug_file(ls);
768
769         kfree(ls->ls_recover_buf);
770
771         /*
772          * Free all lkb's in idr
773          */
774
775         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
776         idr_remove_all(&ls->ls_lkbidr);
777         idr_destroy(&ls->ls_lkbidr);
778
779         /*
780          * Free all rsb's on rsbtbl[] lists
781          */
782
783         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
784                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
785                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
786                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
787                         dlm_free_rsb(rsb);
788                 }
789
790                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
791                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
792                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
793                         dlm_free_rsb(rsb);
794                 }
795         }
796
797         vfree(ls->ls_rsbtbl);
798
799         while (!list_empty(&ls->ls_new_rsb)) {
800                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
801                                        res_hashchain);
802                 list_del(&rsb->res_hashchain);
803                 dlm_free_rsb(rsb);
804         }
805
806         /*
807          * Free structures on any other lists
808          */
809
810         dlm_purge_requestqueue(ls);
811         kfree(ls->ls_recover_args);
812         dlm_clear_members(ls);
813         dlm_clear_members_gone(ls);
814         kfree(ls->ls_node_array);
815         log_debug(ls, "release_lockspace final free");
816         kobject_put(&ls->ls_kobj);
817         /* The ls structure will be freed when the kobject is done with */
818
819         module_put(THIS_MODULE);
820         return 0;
821 }
822
823 /*
824  * Called when a system has released all its locks and is not going to use the
825  * lockspace any longer.  We free everything we're managing for this lockspace.
826  * Remaining nodes will go through the recovery process as if we'd died.  The
827  * lockspace must continue to function as usual, participating in recoveries,
828  * until this returns.
829  *
830  * Force has 4 possible values:
831  * 0 - don't destroy locksapce if it has any LKBs
832  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
833  * 2 - destroy lockspace regardless of LKBs
834  * 3 - destroy lockspace as part of a forced shutdown
835  */
836
837 int dlm_release_lockspace(void *lockspace, int force)
838 {
839         struct dlm_ls *ls;
840         int error;
841
842         ls = dlm_find_lockspace_local(lockspace);
843         if (!ls)
844                 return -EINVAL;
845         dlm_put_lockspace(ls);
846
847         mutex_lock(&ls_lock);
848         error = release_lockspace(ls, force);
849         if (!error)
850                 ls_count--;
851         if (!ls_count)
852                 threads_stop();
853         mutex_unlock(&ls_lock);
854
855         return error;
856 }
857
858 void dlm_stop_lockspaces(void)
859 {
860         struct dlm_ls *ls;
861
862  restart:
863         spin_lock(&lslist_lock);
864         list_for_each_entry(ls, &lslist, ls_list) {
865                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
866                         continue;
867                 spin_unlock(&lslist_lock);
868                 log_error(ls, "no userland control daemon, stopping lockspace");
869                 dlm_ls_stop(ls);
870                 goto restart;
871         }
872         spin_unlock(&lslist_lock);
873 }
874