[DLM] variable allocation
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
201
202         /* dlm_controld will see the uevent, do the necessary group management
203            and then write to sysfs to wake us */
204
205         error = wait_event_interruptible(ls->ls_uevent_wait,
206                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
207
208         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
209
210         if (error)
211                 goto out;
212
213         error = ls->ls_uevent_result;
214  out:
215         if (error)
216                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217                           error, ls->ls_uevent_result);
218         return error;
219 }
220
221
222 int dlm_lockspace_init(void)
223 {
224         int error;
225
226         ls_count = 0;
227         mutex_init(&ls_lock);
228         INIT_LIST_HEAD(&lslist);
229         spin_lock_init(&lslist_lock);
230
231         kobj_set_kset_s(&dlm_kset, kernel_subsys);
232         error = kset_register(&dlm_kset);
233         if (error)
234                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
235         return error;
236 }
237
238 void dlm_lockspace_exit(void)
239 {
240         kset_unregister(&dlm_kset);
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 list_for_each_entry(ls, &lslist, ls_list) {
249                         if (dlm_lock_recovery_try(ls)) {
250                                 dlm_scan_rsbs(ls);
251                                 dlm_scan_timeout(ls);
252                                 dlm_unlock_recovery(ls);
253                         }
254                 }
255                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256         }
257         return 0;
258 }
259
260 static int dlm_scand_start(void)
261 {
262         struct task_struct *p;
263         int error = 0;
264
265         p = kthread_run(dlm_scand, NULL, "dlm_scand");
266         if (IS_ERR(p))
267                 error = PTR_ERR(p);
268         else
269                 scand_task = p;
270         return error;
271 }
272
273 static void dlm_scand_stop(void)
274 {
275         kthread_stop(scand_task);
276 }
277
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280         struct dlm_ls *ls;
281
282         spin_lock(&lslist_lock);
283
284         list_for_each_entry(ls, &lslist, ls_list) {
285                 if (ls->ls_namelen == namelen &&
286                     memcmp(ls->ls_name, name, namelen) == 0)
287                         goto out;
288         }
289         ls = NULL;
290  out:
291         spin_unlock(&lslist_lock);
292         return ls;
293 }
294
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297         struct dlm_ls *ls;
298
299         spin_lock(&lslist_lock);
300
301         list_for_each_entry(ls, &lslist, ls_list) {
302                 if (ls->ls_global_id == id) {
303                         ls->ls_count++;
304                         goto out;
305                 }
306         }
307         ls = NULL;
308  out:
309         spin_unlock(&lslist_lock);
310         return ls;
311 }
312
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315         struct dlm_ls *ls;
316
317         spin_lock(&lslist_lock);
318         list_for_each_entry(ls, &lslist, ls_list) {
319                 if (ls->ls_local_handle == lockspace) {
320                         ls->ls_count++;
321                         goto out;
322                 }
323         }
324         ls = NULL;
325  out:
326         spin_unlock(&lslist_lock);
327         return ls;
328 }
329
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332         struct dlm_ls *ls;
333
334         spin_lock(&lslist_lock);
335         list_for_each_entry(ls, &lslist, ls_list) {
336                 if (ls->ls_device.minor == minor) {
337                         ls->ls_count++;
338                         goto out;
339                 }
340         }
341         ls = NULL;
342  out:
343         spin_unlock(&lslist_lock);
344         return ls;
345 }
346
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349         spin_lock(&lslist_lock);
350         ls->ls_count--;
351         spin_unlock(&lslist_lock);
352 }
353
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356         for (;;) {
357                 spin_lock(&lslist_lock);
358                 if (ls->ls_count == 0) {
359                         list_del(&ls->ls_list);
360                         spin_unlock(&lslist_lock);
361                         return;
362                 }
363                 spin_unlock(&lslist_lock);
364                 ssleep(1);
365         }
366 }
367
368 static int threads_start(void)
369 {
370         int error;
371
372         /* Thread which process lock requests for all lockspace's */
373         error = dlm_astd_start();
374         if (error) {
375                 log_print("cannot start dlm_astd thread %d", error);
376                 goto fail;
377         }
378
379         error = dlm_scand_start();
380         if (error) {
381                 log_print("cannot start dlm_scand thread %d", error);
382                 goto astd_fail;
383         }
384
385         /* Thread for sending/receiving messages for all lockspace's */
386         error = dlm_lowcomms_start();
387         if (error) {
388                 log_print("cannot start dlm lowcomms %d", error);
389                 goto scand_fail;
390         }
391
392         return 0;
393
394  scand_fail:
395         dlm_scand_stop();
396  astd_fail:
397         dlm_astd_stop();
398  fail:
399         return error;
400 }
401
402 static void threads_stop(void)
403 {
404         dlm_scand_stop();
405         dlm_lowcomms_stop();
406         dlm_astd_stop();
407 }
408
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410                          uint32_t flags, int lvblen)
411 {
412         struct dlm_ls *ls;
413         int i, size, error = -ENOMEM;
414         int do_unreg = 0;
415
416         if (namelen > DLM_LOCKSPACE_LEN)
417                 return -EINVAL;
418
419         if (!lvblen || (lvblen % 8))
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         ls = dlm_find_lockspace_name(name, namelen);
426         if (ls) {
427                 *lockspace = ls;
428                 module_put(THIS_MODULE);
429                 return -EEXIST;
430         }
431
432         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433         if (!ls)
434                 goto out;
435         memcpy(ls->ls_name, name, namelen);
436         ls->ls_namelen = namelen;
437         ls->ls_lvblen = lvblen;
438         ls->ls_count = 0;
439         ls->ls_flags = 0;
440
441         /* ls_exflags are forced to match among nodes, and we don't
442            need to require all nodes to have TIMEWARN active */
443         if (flags & DLM_LSFL_TIMEWARN)
444                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445         ls->ls_exflags = (flags & ~DLM_LSFL_TIMEWARN);
446
447         if (flags & DLM_LSFL_FS)
448                 ls->ls_allocation = GFP_NOFS;
449         else
450                 ls->ls_allocation = GFP_KERNEL;
451
452         size = dlm_config.ci_rsbtbl_size;
453         ls->ls_rsbtbl_size = size;
454
455         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
456         if (!ls->ls_rsbtbl)
457                 goto out_lsfree;
458         for (i = 0; i < size; i++) {
459                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
460                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
461                 rwlock_init(&ls->ls_rsbtbl[i].lock);
462         }
463
464         size = dlm_config.ci_lkbtbl_size;
465         ls->ls_lkbtbl_size = size;
466
467         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
468         if (!ls->ls_lkbtbl)
469                 goto out_rsbfree;
470         for (i = 0; i < size; i++) {
471                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
472                 rwlock_init(&ls->ls_lkbtbl[i].lock);
473                 ls->ls_lkbtbl[i].counter = 1;
474         }
475
476         size = dlm_config.ci_dirtbl_size;
477         ls->ls_dirtbl_size = size;
478
479         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
480         if (!ls->ls_dirtbl)
481                 goto out_lkbfree;
482         for (i = 0; i < size; i++) {
483                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
484                 rwlock_init(&ls->ls_dirtbl[i].lock);
485         }
486
487         INIT_LIST_HEAD(&ls->ls_waiters);
488         mutex_init(&ls->ls_waiters_mutex);
489         INIT_LIST_HEAD(&ls->ls_orphans);
490         mutex_init(&ls->ls_orphans_mutex);
491         INIT_LIST_HEAD(&ls->ls_timeout);
492         mutex_init(&ls->ls_timeout_mutex);
493
494         INIT_LIST_HEAD(&ls->ls_nodes);
495         INIT_LIST_HEAD(&ls->ls_nodes_gone);
496         ls->ls_num_nodes = 0;
497         ls->ls_low_nodeid = 0;
498         ls->ls_total_weight = 0;
499         ls->ls_node_array = NULL;
500
501         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
502         ls->ls_stub_rsb.res_ls = ls;
503
504         ls->ls_debug_rsb_dentry = NULL;
505         ls->ls_debug_waiters_dentry = NULL;
506
507         init_waitqueue_head(&ls->ls_uevent_wait);
508         ls->ls_uevent_result = 0;
509         init_completion(&ls->ls_members_done);
510         ls->ls_members_result = -1;
511
512         ls->ls_recoverd_task = NULL;
513         mutex_init(&ls->ls_recoverd_active);
514         spin_lock_init(&ls->ls_recover_lock);
515         spin_lock_init(&ls->ls_rcom_spin);
516         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
517         ls->ls_recover_status = 0;
518         ls->ls_recover_seq = 0;
519         ls->ls_recover_args = NULL;
520         init_rwsem(&ls->ls_in_recovery);
521         INIT_LIST_HEAD(&ls->ls_requestqueue);
522         mutex_init(&ls->ls_requestqueue_mutex);
523         mutex_init(&ls->ls_clear_proc_locks);
524
525         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
526         if (!ls->ls_recover_buf)
527                 goto out_dirfree;
528
529         INIT_LIST_HEAD(&ls->ls_recover_list);
530         spin_lock_init(&ls->ls_recover_list_lock);
531         ls->ls_recover_list_count = 0;
532         ls->ls_local_handle = ls;
533         init_waitqueue_head(&ls->ls_wait_general);
534         INIT_LIST_HEAD(&ls->ls_root_list);
535         init_rwsem(&ls->ls_root_sem);
536
537         down_write(&ls->ls_in_recovery);
538
539         spin_lock(&lslist_lock);
540         list_add(&ls->ls_list, &lslist);
541         spin_unlock(&lslist_lock);
542
543         /* needs to find ls in lslist */
544         error = dlm_recoverd_start(ls);
545         if (error) {
546                 log_error(ls, "can't start dlm_recoverd %d", error);
547                 goto out_delist;
548         }
549
550         error = kobject_setup(ls);
551         if (error)
552                 goto out_stop;
553
554         error = kobject_register(&ls->ls_kobj);
555         if (error)
556                 goto out_stop;
557
558         /* let kobject handle freeing of ls if there's an error */
559         do_unreg = 1;
560
561         /* This uevent triggers dlm_controld in userspace to add us to the
562            group of nodes that are members of this lockspace (managed by the
563            cluster infrastructure.)  Once it's done that, it tells us who the
564            current lockspace members are (via configfs) and then tells the
565            lockspace to start running (via sysfs) in dlm_ls_start(). */
566
567         error = do_uevent(ls, 1);
568         if (error)
569                 goto out_stop;
570
571         wait_for_completion(&ls->ls_members_done);
572         error = ls->ls_members_result;
573         if (error)
574                 goto out_members;
575
576         dlm_create_debug_file(ls);
577
578         log_debug(ls, "join complete");
579
580         *lockspace = ls;
581         return 0;
582
583  out_members:
584         do_uevent(ls, 0);
585         dlm_clear_members(ls);
586         kfree(ls->ls_node_array);
587  out_stop:
588         dlm_recoverd_stop(ls);
589  out_delist:
590         spin_lock(&lslist_lock);
591         list_del(&ls->ls_list);
592         spin_unlock(&lslist_lock);
593         kfree(ls->ls_recover_buf);
594  out_dirfree:
595         kfree(ls->ls_dirtbl);
596  out_lkbfree:
597         kfree(ls->ls_lkbtbl);
598  out_rsbfree:
599         kfree(ls->ls_rsbtbl);
600  out_lsfree:
601         if (do_unreg)
602                 kobject_unregister(&ls->ls_kobj);
603         else
604                 kfree(ls);
605  out:
606         module_put(THIS_MODULE);
607         return error;
608 }
609
610 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
611                       uint32_t flags, int lvblen)
612 {
613         int error = 0;
614
615         mutex_lock(&ls_lock);
616         if (!ls_count)
617                 error = threads_start();
618         if (error)
619                 goto out;
620
621         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
622         if (!error)
623                 ls_count++;
624         else if (!ls_count)
625                 threads_stop();
626  out:
627         mutex_unlock(&ls_lock);
628         return error;
629 }
630
631 /* Return 1 if the lockspace still has active remote locks,
632  *        2 if the lockspace still has active local locks.
633  */
634 static int lockspace_busy(struct dlm_ls *ls)
635 {
636         int i, lkb_found = 0;
637         struct dlm_lkb *lkb;
638
639         /* NOTE: We check the lockidtbl here rather than the resource table.
640            This is because there may be LKBs queued as ASTs that have been
641            unlinked from their RSBs and are pending deletion once the AST has
642            been delivered */
643
644         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
645                 read_lock(&ls->ls_lkbtbl[i].lock);
646                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
647                         lkb_found = 1;
648                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
649                                             lkb_idtbl_list) {
650                                 if (!lkb->lkb_nodeid) {
651                                         read_unlock(&ls->ls_lkbtbl[i].lock);
652                                         return 2;
653                                 }
654                         }
655                 }
656                 read_unlock(&ls->ls_lkbtbl[i].lock);
657         }
658         return lkb_found;
659 }
660
661 static int release_lockspace(struct dlm_ls *ls, int force)
662 {
663         struct dlm_lkb *lkb;
664         struct dlm_rsb *rsb;
665         struct list_head *head;
666         int i;
667         int busy = lockspace_busy(ls);
668
669         if (busy > force)
670                 return -EBUSY;
671
672         if (force < 3)
673                 do_uevent(ls, 0);
674
675         dlm_recoverd_stop(ls);
676
677         remove_lockspace(ls);
678
679         dlm_delete_debug_file(ls);
680
681         dlm_astd_suspend();
682
683         kfree(ls->ls_recover_buf);
684
685         /*
686          * Free direntry structs.
687          */
688
689         dlm_dir_clear(ls);
690         kfree(ls->ls_dirtbl);
691
692         /*
693          * Free all lkb's on lkbtbl[] lists.
694          */
695
696         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
697                 head = &ls->ls_lkbtbl[i].list;
698                 while (!list_empty(head)) {
699                         lkb = list_entry(head->next, struct dlm_lkb,
700                                          lkb_idtbl_list);
701
702                         list_del(&lkb->lkb_idtbl_list);
703
704                         dlm_del_ast(lkb);
705
706                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
707                                 free_lvb(lkb->lkb_lvbptr);
708
709                         free_lkb(lkb);
710                 }
711         }
712         dlm_astd_resume();
713
714         kfree(ls->ls_lkbtbl);
715
716         /*
717          * Free all rsb's on rsbtbl[] lists
718          */
719
720         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
721                 head = &ls->ls_rsbtbl[i].list;
722                 while (!list_empty(head)) {
723                         rsb = list_entry(head->next, struct dlm_rsb,
724                                          res_hashchain);
725
726                         list_del(&rsb->res_hashchain);
727                         free_rsb(rsb);
728                 }
729
730                 head = &ls->ls_rsbtbl[i].toss;
731                 while (!list_empty(head)) {
732                         rsb = list_entry(head->next, struct dlm_rsb,
733                                          res_hashchain);
734                         list_del(&rsb->res_hashchain);
735                         free_rsb(rsb);
736                 }
737         }
738
739         kfree(ls->ls_rsbtbl);
740
741         /*
742          * Free structures on any other lists
743          */
744
745         dlm_purge_requestqueue(ls);
746         kfree(ls->ls_recover_args);
747         dlm_clear_free_entries(ls);
748         dlm_clear_members(ls);
749         dlm_clear_members_gone(ls);
750         kfree(ls->ls_node_array);
751         kobject_unregister(&ls->ls_kobj);
752         /* The ls structure will be freed when the kobject is done with */
753
754         mutex_lock(&ls_lock);
755         ls_count--;
756         if (!ls_count)
757                 threads_stop();
758         mutex_unlock(&ls_lock);
759
760         module_put(THIS_MODULE);
761         return 0;
762 }
763
764 /*
765  * Called when a system has released all its locks and is not going to use the
766  * lockspace any longer.  We free everything we're managing for this lockspace.
767  * Remaining nodes will go through the recovery process as if we'd died.  The
768  * lockspace must continue to function as usual, participating in recoveries,
769  * until this returns.
770  *
771  * Force has 4 possible values:
772  * 0 - don't destroy locksapce if it has any LKBs
773  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
774  * 2 - destroy lockspace regardless of LKBs
775  * 3 - destroy lockspace as part of a forced shutdown
776  */
777
778 int dlm_release_lockspace(void *lockspace, int force)
779 {
780         struct dlm_ls *ls;
781
782         ls = dlm_find_lockspace_local(lockspace);
783         if (!ls)
784                 return -EINVAL;
785         dlm_put_lockspace(ls);
786         return release_lockspace(ls, force);
787 }
788