4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
54 struct list_head obd_zombie_imports;
55 struct list_head obd_zombie_exports;
56 spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
81 static void obd_device_free(struct obd_device *obd)
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 struct obd_type *class_search_type(const char *name)
97 struct list_head *tmp;
98 struct obd_type *type;
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
108 spin_unlock(&obd_types_lock);
111 EXPORT_SYMBOL(class_search_type);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
118 const char *modname = name;
120 if (strcmp(modname, "obdfilter") == 0)
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138 spin_lock(&type->obd_type_lock);
140 try_module_get(type->typ_dt_ops->o_owner);
141 spin_unlock(&type->obd_type_lock);
145 EXPORT_SYMBOL(class_get_type);
147 void class_put_type(struct obd_type *type)
150 spin_lock(&type->obd_type_lock);
152 module_put(type->typ_dt_ops->o_owner);
153 spin_unlock(&type->obd_type_lock);
155 EXPORT_SYMBOL(class_put_type);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
163 struct obd_type *type;
167 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169 if (class_search_type(name)) {
170 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175 OBD_ALLOC(type, sizeof(*type));
179 OBD_ALLOC_PTR(type->typ_dt_ops);
180 OBD_ALLOC_PTR(type->typ_md_ops);
181 OBD_ALLOC(type->typ_name, strlen(name) + 1);
183 if (type->typ_dt_ops == NULL ||
184 type->typ_md_ops == NULL ||
185 type->typ_name == NULL)
188 *(type->typ_dt_ops) = *dt_ops;
189 /* md_ops is optional */
191 *(type->typ_md_ops) = *md_ops;
192 strcpy(type->typ_name, name);
193 spin_lock_init(&type->obd_type_lock);
195 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
197 if (IS_ERR(type->typ_procroot)) {
198 rc = PTR_ERR(type->typ_procroot);
199 type->typ_procroot = NULL;
205 rc = lu_device_type_init(ldt);
210 spin_lock(&obd_types_lock);
211 list_add(&type->typ_chain, &obd_types);
212 spin_unlock(&obd_types_lock);
217 if (type->typ_name != NULL)
218 OBD_FREE(type->typ_name, strlen(name) + 1);
219 if (type->typ_md_ops != NULL)
220 OBD_FREE_PTR(type->typ_md_ops);
221 if (type->typ_dt_ops != NULL)
222 OBD_FREE_PTR(type->typ_dt_ops);
223 OBD_FREE(type, sizeof(*type));
226 EXPORT_SYMBOL(class_register_type);
228 int class_unregister_type(const char *name)
230 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 spin_lock(&obd_types_lock);
254 list_del(&type->typ_chain);
255 spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 return ERR_PTR(-EINVAL);
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 return ERR_PTR(-ENODEV);
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 result = ERR_PTR(-ENOMEM);
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304 write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
308 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0] = '\0';
322 result = ERR_PTR(-EEXIST);
325 if (!result && !obd) {
327 result->obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
335 write_unlock(&obd_dev_lock);
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 result = ERR_PTR(-EOVERFLOW);
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
352 obd_device_free(newdev);
354 class_put_type(type);
358 void class_release_dev(struct obd_device *obd)
360 struct obd_type *obd_type = obd->obd_type;
362 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366 LASSERT(obd_type != NULL);
368 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371 write_lock(&obd_dev_lock);
372 obd_devs[obd->obd_minor] = NULL;
373 write_unlock(&obd_dev_lock);
374 obd_device_free(obd);
376 class_put_type(obd_type);
379 int class_name2dev(const char *name)
386 read_lock(&obd_dev_lock);
387 for (i = 0; i < class_devno_max(); i++) {
388 struct obd_device *obd = class_num2obd(i);
390 if (obd && strcmp(name, obd->obd_name) == 0) {
391 /* Make sure we finished attaching before we give
392 out any references */
393 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394 if (obd->obd_attached) {
395 read_unlock(&obd_dev_lock);
401 read_unlock(&obd_dev_lock);
405 EXPORT_SYMBOL(class_name2dev);
407 struct obd_device *class_name2obd(const char *name)
409 int dev = class_name2dev(name);
411 if (dev < 0 || dev > class_devno_max())
413 return class_num2obd(dev);
415 EXPORT_SYMBOL(class_name2obd);
417 int class_uuid2dev(struct obd_uuid *uuid)
421 read_lock(&obd_dev_lock);
422 for (i = 0; i < class_devno_max(); i++) {
423 struct obd_device *obd = class_num2obd(i);
425 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427 read_unlock(&obd_dev_lock);
431 read_unlock(&obd_dev_lock);
435 EXPORT_SYMBOL(class_uuid2dev);
437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
439 int dev = class_uuid2dev(uuid);
442 return class_num2obd(dev);
444 EXPORT_SYMBOL(class_uuid2obd);
447 * Get obd device from ::obd_devs[]
449 * \param num [in] array index
451 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452 * otherwise return the obd device there.
454 struct obd_device *class_num2obd(int num)
456 struct obd_device *obd = NULL;
458 if (num < class_devno_max()) {
463 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464 "%p obd_magic %08x != %08x\n",
465 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466 LASSERTF(obd->obd_minor == num,
467 "%p obd_minor %0d != %0d\n",
468 obd, obd->obd_minor, num);
473 EXPORT_SYMBOL(class_num2obd);
476 * Get obd devices count. Device in any
478 * \retval obd device count
480 int get_devices_count(void)
482 int index, max_index = class_devno_max(), dev_count = 0;
484 read_lock(&obd_dev_lock);
485 for (index = 0; index <= max_index; index++) {
486 struct obd_device *obd = class_num2obd(index);
490 read_unlock(&obd_dev_lock);
494 EXPORT_SYMBOL(get_devices_count);
496 void class_obd_list(void)
501 read_lock(&obd_dev_lock);
502 for (i = 0; i < class_devno_max(); i++) {
503 struct obd_device *obd = class_num2obd(i);
507 if (obd->obd_stopping)
509 else if (obd->obd_set_up)
511 else if (obd->obd_attached)
515 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516 i, status, obd->obd_type->typ_name,
517 obd->obd_name, obd->obd_uuid.uuid,
518 atomic_read(&obd->obd_refcount));
520 read_unlock(&obd_dev_lock);
524 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
525 specified, then only the client with that uuid is returned,
526 otherwise any client connected to the tgt is returned. */
527 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
528 const char *typ_name,
529 struct obd_uuid *grp_uuid)
533 read_lock(&obd_dev_lock);
534 for (i = 0; i < class_devno_max(); i++) {
535 struct obd_device *obd = class_num2obd(i);
539 if ((strncmp(obd->obd_type->typ_name, typ_name,
540 strlen(typ_name)) == 0)) {
541 if (obd_uuid_equals(tgt_uuid,
542 &obd->u.cli.cl_target_uuid) &&
543 ((grp_uuid)? obd_uuid_equals(grp_uuid,
544 &obd->obd_uuid) : 1)) {
545 read_unlock(&obd_dev_lock);
550 read_unlock(&obd_dev_lock);
554 EXPORT_SYMBOL(class_find_client_obd);
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557 searching at *next, and if a device is found, the next index to look
558 at is saved in *next. If next is NULL, then the first matching device
559 will always be returned. */
560 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
566 else if (*next >= 0 && *next < class_devno_max())
571 read_lock(&obd_dev_lock);
572 for (; i < class_devno_max(); i++) {
573 struct obd_device *obd = class_num2obd(i);
577 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
580 read_unlock(&obd_dev_lock);
584 read_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_devices_in_group);
591 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592 * adjust sptlrpc settings accordingly.
594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
596 struct obd_device *obd;
600 LASSERT(namelen > 0);
602 read_lock(&obd_dev_lock);
603 for (i = 0; i < class_devno_max(); i++) {
604 obd = class_num2obd(i);
606 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
609 /* only notify mdc, osc, mdt, ost */
610 type = obd->obd_type->typ_name;
611 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614 strcmp(type, LUSTRE_OST_NAME) != 0)
617 if (strncmp(obd->obd_name, fsname, namelen))
620 class_incref(obd, __func__, obd);
621 read_unlock(&obd_dev_lock);
622 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623 sizeof(KEY_SPTLRPC_CONF),
624 KEY_SPTLRPC_CONF, 0, NULL, NULL);
626 class_decref(obd, __func__, obd);
627 read_lock(&obd_dev_lock);
629 read_unlock(&obd_dev_lock);
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
634 void obd_cleanup_caches(void)
636 if (obd_device_cachep) {
637 kmem_cache_destroy(obd_device_cachep);
638 obd_device_cachep = NULL;
641 kmem_cache_destroy(obdo_cachep);
645 kmem_cache_destroy(import_cachep);
646 import_cachep = NULL;
649 kmem_cache_destroy(capa_cachep);
654 int obd_init_caches(void)
656 LASSERT(obd_device_cachep == NULL);
657 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658 sizeof(struct obd_device),
660 if (!obd_device_cachep)
663 LASSERT(obdo_cachep == NULL);
664 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
669 LASSERT(import_cachep == NULL);
670 import_cachep = kmem_cache_create("ll_import_cache",
671 sizeof(struct obd_import),
676 LASSERT(capa_cachep == NULL);
677 capa_cachep = kmem_cache_create("capa_cache",
678 sizeof(struct obd_capa), 0, 0, NULL);
684 obd_cleanup_caches();
689 /* map connection to client */
690 struct obd_export *class_conn2export(struct lustre_handle *conn)
692 struct obd_export *export;
695 CDEBUG(D_CACHE, "looking for null handle\n");
699 if (conn->cookie == -1) { /* this means assign a new connection */
700 CDEBUG(D_CACHE, "want a new connection\n");
704 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705 export = class_handle2object(conn->cookie);
708 EXPORT_SYMBOL(class_conn2export);
710 struct obd_device *class_exp2obd(struct obd_export *exp)
716 EXPORT_SYMBOL(class_exp2obd);
718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
720 struct obd_export *export;
721 export = class_conn2export(conn);
723 struct obd_device *obd = export->exp_obd;
724 class_export_put(export);
729 EXPORT_SYMBOL(class_conn2obd);
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
733 struct obd_device *obd = exp->exp_obd;
736 return obd->u.cli.cl_import;
738 EXPORT_SYMBOL(class_exp2cliimp);
740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
742 struct obd_device *obd = class_conn2obd(conn);
745 return obd->u.cli.cl_import;
747 EXPORT_SYMBOL(class_conn2cliimp);
749 /* Export management functions */
750 static void class_export_destroy(struct obd_export *exp)
752 struct obd_device *obd = exp->exp_obd;
754 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755 LASSERT(obd != NULL);
757 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758 exp->exp_client_uuid.uuid, obd->obd_name);
760 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761 if (exp->exp_connection)
762 ptlrpc_put_connection_superhack(exp->exp_connection);
764 LASSERT(list_empty(&exp->exp_outstanding_replies));
765 LASSERT(list_empty(&exp->exp_uncommitted_replies));
766 LASSERT(list_empty(&exp->exp_req_replay_queue));
767 LASSERT(list_empty(&exp->exp_hp_rpcs));
768 obd_destroy_export(exp);
769 class_decref(obd, "export", exp);
771 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
774 static void export_handle_addref(void *export)
776 class_export_get(export);
779 static struct portals_handle_ops export_handle_ops = {
780 .hop_addref = export_handle_addref,
784 struct obd_export *class_export_get(struct obd_export *exp)
786 atomic_inc(&exp->exp_refcount);
787 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788 atomic_read(&exp->exp_refcount));
791 EXPORT_SYMBOL(class_export_get);
793 void class_export_put(struct obd_export *exp)
795 LASSERT(exp != NULL);
796 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798 atomic_read(&exp->exp_refcount) - 1);
800 if (atomic_dec_and_test(&exp->exp_refcount)) {
801 LASSERT(!list_empty(&exp->exp_obd_chain));
802 CDEBUG(D_IOCTL, "final put %p/%s\n",
803 exp, exp->exp_client_uuid.uuid);
805 /* release nid stat refererence */
806 lprocfs_exp_cleanup(exp);
808 obd_zombie_export_add(exp);
811 EXPORT_SYMBOL(class_export_put);
813 /* Creates a new export, adds it to the hash table, and returns a
814 * pointer to it. The refcount is 2: one for the hash reference, and
815 * one for the pointer returned by this function. */
816 struct obd_export *class_new_export(struct obd_device *obd,
817 struct obd_uuid *cluuid)
819 struct obd_export *export;
820 struct cfs_hash *hash = NULL;
823 OBD_ALLOC_PTR(export);
825 return ERR_PTR(-ENOMEM);
827 export->exp_conn_cnt = 0;
828 export->exp_lock_hash = NULL;
829 export->exp_flock_hash = NULL;
830 atomic_set(&export->exp_refcount, 2);
831 atomic_set(&export->exp_rpc_count, 0);
832 atomic_set(&export->exp_cb_count, 0);
833 atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835 INIT_LIST_HEAD(&export->exp_locks_list);
836 spin_lock_init(&export->exp_locks_list_guard);
838 atomic_set(&export->exp_replay_count, 0);
839 export->exp_obd = obd;
840 INIT_LIST_HEAD(&export->exp_outstanding_replies);
841 spin_lock_init(&export->exp_uncommitted_replies_lock);
842 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843 INIT_LIST_HEAD(&export->exp_req_replay_queue);
844 INIT_LIST_HEAD(&export->exp_handle.h_link);
845 INIT_LIST_HEAD(&export->exp_hp_rpcs);
846 class_handle_hash(&export->exp_handle, &export_handle_ops);
847 export->exp_last_request_time = get_seconds();
848 spin_lock_init(&export->exp_lock);
849 spin_lock_init(&export->exp_rpc_lock);
850 INIT_HLIST_NODE(&export->exp_uuid_hash);
851 INIT_HLIST_NODE(&export->exp_nid_hash);
852 spin_lock_init(&export->exp_bl_list_lock);
853 INIT_LIST_HEAD(&export->exp_bl_list);
855 export->exp_sp_peer = LUSTRE_SP_ANY;
856 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857 export->exp_client_uuid = *cluuid;
858 obd_init_export(export);
860 spin_lock(&obd->obd_dev_lock);
861 /* shouldn't happen, but might race */
862 if (obd->obd_stopping) {
867 hash = cfs_hash_getref(obd->obd_uuid_hash);
872 spin_unlock(&obd->obd_dev_lock);
874 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
877 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878 obd->obd_name, cluuid->uuid, rc);
884 spin_lock(&obd->obd_dev_lock);
885 if (obd->obd_stopping) {
886 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
891 class_incref(obd, "export", export);
892 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893 list_add_tail(&export->exp_obd_chain_timed,
894 &export->exp_obd->obd_exports_timed);
895 export->exp_obd->obd_num_exports++;
896 spin_unlock(&obd->obd_dev_lock);
897 cfs_hash_putref(hash);
901 spin_unlock(&obd->obd_dev_lock);
904 cfs_hash_putref(hash);
905 class_handle_unhash(&export->exp_handle);
906 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907 obd_destroy_export(export);
908 OBD_FREE_PTR(export);
911 EXPORT_SYMBOL(class_new_export);
913 void class_unlink_export(struct obd_export *exp)
915 class_handle_unhash(&exp->exp_handle);
917 spin_lock(&exp->exp_obd->obd_dev_lock);
918 /* delete an uuid-export hashitem from hashtables */
919 if (!hlist_unhashed(&exp->exp_uuid_hash))
920 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921 &exp->exp_client_uuid,
922 &exp->exp_uuid_hash);
924 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925 list_del_init(&exp->exp_obd_chain_timed);
926 exp->exp_obd->obd_num_exports--;
927 spin_unlock(&exp->exp_obd->obd_dev_lock);
928 class_export_put(exp);
930 EXPORT_SYMBOL(class_unlink_export);
932 /* Import management functions */
933 void class_import_destroy(struct obd_import *imp)
935 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936 imp->imp_obd->obd_name);
938 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
940 ptlrpc_put_connection_superhack(imp->imp_connection);
942 while (!list_empty(&imp->imp_conn_list)) {
943 struct obd_import_conn *imp_conn;
945 imp_conn = list_entry(imp->imp_conn_list.next,
946 struct obd_import_conn, oic_item);
947 list_del_init(&imp_conn->oic_item);
948 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949 OBD_FREE(imp_conn, sizeof(*imp_conn));
952 LASSERT(imp->imp_sec == NULL);
953 class_decref(imp->imp_obd, "import", imp);
954 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
957 static void import_handle_addref(void *import)
959 class_import_get(import);
962 static struct portals_handle_ops import_handle_ops = {
963 .hop_addref = import_handle_addref,
967 struct obd_import *class_import_get(struct obd_import *import)
969 atomic_inc(&import->imp_refcount);
970 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971 atomic_read(&import->imp_refcount),
972 import->imp_obd->obd_name);
975 EXPORT_SYMBOL(class_import_get);
977 void class_import_put(struct obd_import *imp)
979 LASSERT(list_empty(&imp->imp_zombie_chain));
980 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
982 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983 atomic_read(&imp->imp_refcount) - 1,
984 imp->imp_obd->obd_name);
986 if (atomic_dec_and_test(&imp->imp_refcount)) {
987 CDEBUG(D_INFO, "final put import %p\n", imp);
988 obd_zombie_import_add(imp);
991 /* catch possible import put race */
992 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
994 EXPORT_SYMBOL(class_import_put);
996 static void init_imp_at(struct imp_at *at) {
998 at_init(&at->iat_net_latency, 0, 0);
999 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000 /* max service estimates are tracked on the server side, so
1001 don't use the AT history here, just use the last reported
1002 val. (But keep hist for proc histogram, worst_ever) */
1003 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1008 struct obd_import *class_new_import(struct obd_device *obd)
1010 struct obd_import *imp;
1012 OBD_ALLOC(imp, sizeof(*imp));
1016 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018 INIT_LIST_HEAD(&imp->imp_replay_list);
1019 INIT_LIST_HEAD(&imp->imp_sending_list);
1020 INIT_LIST_HEAD(&imp->imp_delayed_list);
1021 INIT_LIST_HEAD(&imp->imp_committed_list);
1022 imp->imp_replay_cursor = &imp->imp_committed_list;
1023 spin_lock_init(&imp->imp_lock);
1024 imp->imp_last_success_conn = 0;
1025 imp->imp_state = LUSTRE_IMP_NEW;
1026 imp->imp_obd = class_incref(obd, "import", imp);
1027 mutex_init(&imp->imp_sec_mutex);
1028 init_waitqueue_head(&imp->imp_recovery_waitq);
1030 atomic_set(&imp->imp_refcount, 2);
1031 atomic_set(&imp->imp_unregistering, 0);
1032 atomic_set(&imp->imp_inflight, 0);
1033 atomic_set(&imp->imp_replay_inflight, 0);
1034 atomic_set(&imp->imp_inval_count, 0);
1035 INIT_LIST_HEAD(&imp->imp_conn_list);
1036 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038 init_imp_at(&imp->imp_at);
1040 /* the default magic is V2, will be used in connect RPC, and
1041 * then adjusted according to the flags in request/reply. */
1042 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1046 EXPORT_SYMBOL(class_new_import);
1048 void class_destroy_import(struct obd_import *import)
1050 LASSERT(import != NULL);
1051 LASSERT(import != LP_POISON);
1053 class_handle_unhash(&import->imp_handle);
1055 spin_lock(&import->imp_lock);
1056 import->imp_generation++;
1057 spin_unlock(&import->imp_lock);
1058 class_import_put(import);
1060 EXPORT_SYMBOL(class_destroy_import);
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1066 spin_lock(&exp->exp_locks_list_guard);
1068 LASSERT(lock->l_exp_refs_nr >= 0);
1070 if (lock->l_exp_refs_target != NULL &&
1071 lock->l_exp_refs_target != exp) {
1072 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073 exp, lock, lock->l_exp_refs_target);
1075 if ((lock->l_exp_refs_nr ++) == 0) {
1076 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077 lock->l_exp_refs_target = exp;
1079 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080 lock, exp, lock->l_exp_refs_nr);
1081 spin_unlock(&exp->exp_locks_list_guard);
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 spin_lock(&exp->exp_locks_list_guard);
1088 LASSERT(lock->l_exp_refs_nr > 0);
1089 if (lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1091 lock, lock->l_exp_refs_target, exp);
1093 if (-- lock->l_exp_refs_nr == 0) {
1094 list_del_init(&lock->l_exp_refs_link);
1095 lock->l_exp_refs_target = NULL;
1097 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098 lock, exp, lock->l_exp_refs_nr);
1099 spin_unlock(&exp->exp_locks_list_guard);
1101 EXPORT_SYMBOL(__class_export_del_lock_ref);
1104 /* A connection defines an export context in which preallocation can
1105 be managed. This releases the export pointer reference, and returns
1106 the export handle, so the export refcount is 1 when this function
1108 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1109 struct obd_uuid *cluuid)
1111 struct obd_export *export;
1112 LASSERT(conn != NULL);
1113 LASSERT(obd != NULL);
1114 LASSERT(cluuid != NULL);
1116 export = class_new_export(obd, cluuid);
1118 return PTR_ERR(export);
1120 conn->cookie = export->exp_handle.h_cookie;
1121 class_export_put(export);
1123 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1124 cluuid->uuid, conn->cookie);
1127 EXPORT_SYMBOL(class_connect);
1129 /* if export is involved in recovery then clean up related things */
1130 void class_export_recovery_cleanup(struct obd_export *exp)
1132 struct obd_device *obd = exp->exp_obd;
1134 spin_lock(&obd->obd_recovery_task_lock);
1135 if (exp->exp_delayed)
1136 obd->obd_delayed_clients--;
1137 if (obd->obd_recovering) {
1138 if (exp->exp_in_recovery) {
1139 spin_lock(&exp->exp_lock);
1140 exp->exp_in_recovery = 0;
1141 spin_unlock(&exp->exp_lock);
1142 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1143 atomic_dec(&obd->obd_connected_clients);
1146 /* if called during recovery then should update
1147 * obd_stale_clients counter,
1148 * lightweight exports are not counted */
1149 if (exp->exp_failed &&
1150 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1151 exp->exp_obd->obd_stale_clients++;
1153 spin_unlock(&obd->obd_recovery_task_lock);
1155 spin_lock(&exp->exp_lock);
1156 /** Cleanup req replay fields */
1157 if (exp->exp_req_replay_needed) {
1158 exp->exp_req_replay_needed = 0;
1160 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1161 atomic_dec(&obd->obd_req_replay_clients);
1164 /** Cleanup lock replay data */
1165 if (exp->exp_lock_replay_needed) {
1166 exp->exp_lock_replay_needed = 0;
1168 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1169 atomic_dec(&obd->obd_lock_replay_clients);
1171 spin_unlock(&exp->exp_lock);
1174 /* This function removes 1-3 references from the export:
1175 * 1 - for export pointer passed
1176 * and if disconnect really need
1177 * 2 - removing from hash
1178 * 3 - in client_unlink_export
1179 * The export pointer passed to this function can destroyed */
1180 int class_disconnect(struct obd_export *export)
1182 int already_disconnected;
1184 if (export == NULL) {
1185 CWARN("attempting to free NULL export %p\n", export);
1189 spin_lock(&export->exp_lock);
1190 already_disconnected = export->exp_disconnected;
1191 export->exp_disconnected = 1;
1192 spin_unlock(&export->exp_lock);
1194 /* class_cleanup(), abort_recovery(), and class_fail_export()
1195 * all end up in here, and if any of them race we shouldn't
1196 * call extra class_export_puts(). */
1197 if (already_disconnected) {
1198 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1202 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1203 export->exp_handle.h_cookie);
1205 if (!hlist_unhashed(&export->exp_nid_hash))
1206 cfs_hash_del(export->exp_obd->obd_nid_hash,
1207 &export->exp_connection->c_peer.nid,
1208 &export->exp_nid_hash);
1210 class_export_recovery_cleanup(export);
1211 class_unlink_export(export);
1213 class_export_put(export);
1216 EXPORT_SYMBOL(class_disconnect);
1218 /* Return non-zero for a fully connected export */
1219 int class_connected_export(struct obd_export *exp)
1223 spin_lock(&exp->exp_lock);
1224 connected = (exp->exp_conn_cnt > 0);
1225 spin_unlock(&exp->exp_lock);
1230 EXPORT_SYMBOL(class_connected_export);
1232 static void class_disconnect_export_list(struct list_head *list,
1233 enum obd_option flags)
1236 struct obd_export *exp;
1238 /* It's possible that an export may disconnect itself, but
1239 * nothing else will be added to this list. */
1240 while (!list_empty(list)) {
1241 exp = list_entry(list->next, struct obd_export,
1243 /* need for safe call CDEBUG after obd_disconnect */
1244 class_export_get(exp);
1246 spin_lock(&exp->exp_lock);
1247 exp->exp_flags = flags;
1248 spin_unlock(&exp->exp_lock);
1250 if (obd_uuid_equals(&exp->exp_client_uuid,
1251 &exp->exp_obd->obd_uuid)) {
1253 "exp %p export uuid == obd uuid, don't discon\n",
1255 /* Need to delete this now so we don't end up pointing
1256 * to work_list later when this export is cleaned up. */
1257 list_del_init(&exp->exp_obd_chain);
1258 class_export_put(exp);
1262 class_export_get(exp);
1263 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1264 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265 exp, exp->exp_last_request_time);
1266 /* release one export reference anyway */
1267 rc = obd_disconnect(exp);
1269 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270 obd_export_nid2str(exp), exp, rc);
1271 class_export_put(exp);
1275 void class_disconnect_exports(struct obd_device *obd)
1277 struct list_head work_list;
1279 /* Move all of the exports from obd_exports to a work list, en masse. */
1280 INIT_LIST_HEAD(&work_list);
1281 spin_lock(&obd->obd_dev_lock);
1282 list_splice_init(&obd->obd_exports, &work_list);
1283 list_splice_init(&obd->obd_delayed_exports, &work_list);
1284 spin_unlock(&obd->obd_dev_lock);
1286 if (!list_empty(&work_list)) {
1287 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1288 obd->obd_minor, obd);
1289 class_disconnect_export_list(&work_list,
1290 exp_flags_from_obd(obd));
1292 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1293 obd->obd_minor, obd);
1295 EXPORT_SYMBOL(class_disconnect_exports);
1297 /* Remove exports that have not completed recovery.
1299 void class_disconnect_stale_exports(struct obd_device *obd,
1300 int (*test_export)(struct obd_export *))
1302 struct list_head work_list;
1303 struct obd_export *exp, *n;
1306 INIT_LIST_HEAD(&work_list);
1307 spin_lock(&obd->obd_dev_lock);
1308 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1310 /* don't count self-export as client */
1311 if (obd_uuid_equals(&exp->exp_client_uuid,
1312 &exp->exp_obd->obd_uuid))
1315 /* don't evict clients which have no slot in last_rcvd
1316 * (e.g. lightweight connection) */
1317 if (exp->exp_target_data.ted_lr_idx == -1)
1320 spin_lock(&exp->exp_lock);
1321 if (exp->exp_failed || test_export(exp)) {
1322 spin_unlock(&exp->exp_lock);
1325 exp->exp_failed = 1;
1326 spin_unlock(&exp->exp_lock);
1328 list_move(&exp->exp_obd_chain, &work_list);
1330 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1331 obd->obd_name, exp->exp_client_uuid.uuid,
1332 exp->exp_connection == NULL ? "<unknown>" :
1333 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1334 print_export_data(exp, "EVICTING", 0);
1336 spin_unlock(&obd->obd_dev_lock);
1339 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1340 obd->obd_name, evicted);
1342 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1343 OBD_OPT_ABORT_RECOV);
1345 EXPORT_SYMBOL(class_disconnect_stale_exports);
1347 void class_fail_export(struct obd_export *exp)
1349 int rc, already_failed;
1351 spin_lock(&exp->exp_lock);
1352 already_failed = exp->exp_failed;
1353 exp->exp_failed = 1;
1354 spin_unlock(&exp->exp_lock);
1356 if (already_failed) {
1357 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1358 exp, exp->exp_client_uuid.uuid);
1362 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1363 exp, exp->exp_client_uuid.uuid);
1365 if (obd_dump_on_timeout)
1366 libcfs_debug_dumplog();
1368 /* need for safe call CDEBUG after obd_disconnect */
1369 class_export_get(exp);
1371 /* Most callers into obd_disconnect are removing their own reference
1372 * (request, for example) in addition to the one from the hash table.
1373 * We don't have such a reference here, so make one. */
1374 class_export_get(exp);
1375 rc = obd_disconnect(exp);
1377 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1379 CDEBUG(D_HA, "disconnected export %p/%s\n",
1380 exp, exp->exp_client_uuid.uuid);
1381 class_export_put(exp);
1383 EXPORT_SYMBOL(class_fail_export);
1385 char *obd_export_nid2str(struct obd_export *exp)
1387 if (exp->exp_connection != NULL)
1388 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1392 EXPORT_SYMBOL(obd_export_nid2str);
1394 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1396 struct cfs_hash *nid_hash;
1397 struct obd_export *doomed_exp = NULL;
1398 int exports_evicted = 0;
1400 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1402 spin_lock(&obd->obd_dev_lock);
1403 /* umount has run already, so evict thread should leave
1404 * its task to umount thread now */
1405 if (obd->obd_stopping) {
1406 spin_unlock(&obd->obd_dev_lock);
1407 return exports_evicted;
1409 nid_hash = obd->obd_nid_hash;
1410 cfs_hash_getref(nid_hash);
1411 spin_unlock(&obd->obd_dev_lock);
1414 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1415 if (doomed_exp == NULL)
1418 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1419 "nid %s found, wanted nid %s, requested nid %s\n",
1420 obd_export_nid2str(doomed_exp),
1421 libcfs_nid2str(nid_key), nid);
1422 LASSERTF(doomed_exp != obd->obd_self_export,
1423 "self-export is hashed by NID?\n");
1425 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1427 obd_uuid2str(&doomed_exp->exp_client_uuid),
1428 obd_export_nid2str(doomed_exp));
1429 class_fail_export(doomed_exp);
1430 class_export_put(doomed_exp);
1433 cfs_hash_putref(nid_hash);
1435 if (!exports_evicted)
1437 "%s: can't disconnect NID '%s': no exports found\n",
1438 obd->obd_name, nid);
1439 return exports_evicted;
1441 EXPORT_SYMBOL(obd_export_evict_by_nid);
1443 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1445 struct cfs_hash *uuid_hash;
1446 struct obd_export *doomed_exp = NULL;
1447 struct obd_uuid doomed_uuid;
1448 int exports_evicted = 0;
1450 spin_lock(&obd->obd_dev_lock);
1451 if (obd->obd_stopping) {
1452 spin_unlock(&obd->obd_dev_lock);
1453 return exports_evicted;
1455 uuid_hash = obd->obd_uuid_hash;
1456 cfs_hash_getref(uuid_hash);
1457 spin_unlock(&obd->obd_dev_lock);
1459 obd_str2uuid(&doomed_uuid, uuid);
1460 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1461 CERROR("%s: can't evict myself\n", obd->obd_name);
1462 cfs_hash_putref(uuid_hash);
1463 return exports_evicted;
1466 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1468 if (doomed_exp == NULL) {
1469 CERROR("%s: can't disconnect %s: no exports found\n",
1470 obd->obd_name, uuid);
1472 CWARN("%s: evicting %s at administrative request\n",
1473 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1474 class_fail_export(doomed_exp);
1475 class_export_put(doomed_exp);
1478 cfs_hash_putref(uuid_hash);
1480 return exports_evicted;
1482 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1484 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1485 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1486 EXPORT_SYMBOL(class_export_dump_hook);
1489 static void print_export_data(struct obd_export *exp, const char *status,
1492 struct ptlrpc_reply_state *rs;
1493 struct ptlrpc_reply_state *first_reply = NULL;
1496 spin_lock(&exp->exp_lock);
1497 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1503 spin_unlock(&exp->exp_lock);
1505 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1506 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1507 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1508 atomic_read(&exp->exp_rpc_count),
1509 atomic_read(&exp->exp_cb_count),
1510 atomic_read(&exp->exp_locks_count),
1511 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1512 nreplies, first_reply, nreplies > 3 ? "..." : "",
1513 exp->exp_last_committed);
1514 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1515 if (locks && class_export_dump_hook != NULL)
1516 class_export_dump_hook(exp);
1520 void dump_exports(struct obd_device *obd, int locks)
1522 struct obd_export *exp;
1524 spin_lock(&obd->obd_dev_lock);
1525 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1526 print_export_data(exp, "ACTIVE", locks);
1527 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1528 print_export_data(exp, "UNLINKED", locks);
1529 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1530 print_export_data(exp, "DELAYED", locks);
1531 spin_unlock(&obd->obd_dev_lock);
1532 spin_lock(&obd_zombie_impexp_lock);
1533 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1534 print_export_data(exp, "ZOMBIE", locks);
1535 spin_unlock(&obd_zombie_impexp_lock);
1537 EXPORT_SYMBOL(dump_exports);
1539 void obd_exports_barrier(struct obd_device *obd)
1542 LASSERT(list_empty(&obd->obd_exports));
1543 spin_lock(&obd->obd_dev_lock);
1544 while (!list_empty(&obd->obd_unlinked_exports)) {
1545 spin_unlock(&obd->obd_dev_lock);
1546 set_current_state(TASK_UNINTERRUPTIBLE);
1547 schedule_timeout(cfs_time_seconds(waited));
1548 if (waited > 5 && IS_PO2(waited)) {
1549 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1550 obd->obd_name, waited,
1551 atomic_read(&obd->obd_refcount));
1552 dump_exports(obd, 1);
1555 spin_lock(&obd->obd_dev_lock);
1557 spin_unlock(&obd->obd_dev_lock);
1559 EXPORT_SYMBOL(obd_exports_barrier);
1561 /* Total amount of zombies to be destroyed */
1562 static int zombies_count = 0;
1565 * kill zombie imports and exports
1567 void obd_zombie_impexp_cull(void)
1569 struct obd_import *import;
1570 struct obd_export *export;
1573 spin_lock(&obd_zombie_impexp_lock);
1576 if (!list_empty(&obd_zombie_imports)) {
1577 import = list_entry(obd_zombie_imports.next,
1580 list_del_init(&import->imp_zombie_chain);
1584 if (!list_empty(&obd_zombie_exports)) {
1585 export = list_entry(obd_zombie_exports.next,
1588 list_del_init(&export->exp_obd_chain);
1591 spin_unlock(&obd_zombie_impexp_lock);
1593 if (import != NULL) {
1594 class_import_destroy(import);
1595 spin_lock(&obd_zombie_impexp_lock);
1597 spin_unlock(&obd_zombie_impexp_lock);
1600 if (export != NULL) {
1601 class_export_destroy(export);
1602 spin_lock(&obd_zombie_impexp_lock);
1604 spin_unlock(&obd_zombie_impexp_lock);
1608 } while (import != NULL || export != NULL);
1611 static struct completion obd_zombie_start;
1612 static struct completion obd_zombie_stop;
1613 static unsigned long obd_zombie_flags;
1614 static wait_queue_head_t obd_zombie_waitq;
1615 static pid_t obd_zombie_pid;
1618 OBD_ZOMBIE_STOP = 0x0001,
1622 * check for work for kill zombie import/export thread.
1624 static int obd_zombie_impexp_check(void *arg)
1628 spin_lock(&obd_zombie_impexp_lock);
1629 rc = (zombies_count == 0) &&
1630 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1631 spin_unlock(&obd_zombie_impexp_lock);
1637 * Add export to the obd_zombie thread and notify it.
1639 static void obd_zombie_export_add(struct obd_export *exp) {
1640 spin_lock(&exp->exp_obd->obd_dev_lock);
1641 LASSERT(!list_empty(&exp->exp_obd_chain));
1642 list_del_init(&exp->exp_obd_chain);
1643 spin_unlock(&exp->exp_obd->obd_dev_lock);
1644 spin_lock(&obd_zombie_impexp_lock);
1646 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1647 spin_unlock(&obd_zombie_impexp_lock);
1649 obd_zombie_impexp_notify();
1653 * Add import to the obd_zombie thread and notify it.
1655 static void obd_zombie_import_add(struct obd_import *imp) {
1656 LASSERT(imp->imp_sec == NULL);
1657 LASSERT(imp->imp_rq_pool == NULL);
1658 spin_lock(&obd_zombie_impexp_lock);
1659 LASSERT(list_empty(&imp->imp_zombie_chain));
1661 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1662 spin_unlock(&obd_zombie_impexp_lock);
1664 obd_zombie_impexp_notify();
1668 * notify import/export destroy thread about new zombie.
1670 static void obd_zombie_impexp_notify(void)
1673 * Make sure obd_zombie_impexp_thread get this notification.
1674 * It is possible this signal only get by obd_zombie_barrier, and
1675 * barrier gulps this notification and sleeps away and hangs ensues
1677 wake_up_all(&obd_zombie_waitq);
1681 * check whether obd_zombie is idle
1683 static int obd_zombie_is_idle(void)
1687 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1688 spin_lock(&obd_zombie_impexp_lock);
1689 rc = (zombies_count == 0);
1690 spin_unlock(&obd_zombie_impexp_lock);
1695 * wait when obd_zombie import/export queues become empty
1697 void obd_zombie_barrier(void)
1699 struct l_wait_info lwi = { 0 };
1701 if (obd_zombie_pid == current_pid())
1702 /* don't wait for myself */
1704 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1706 EXPORT_SYMBOL(obd_zombie_barrier);
1710 * destroy zombie export/import thread.
1712 static int obd_zombie_impexp_thread(void *unused)
1714 unshare_fs_struct();
1715 complete(&obd_zombie_start);
1717 obd_zombie_pid = current_pid();
1719 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1720 struct l_wait_info lwi = { 0 };
1722 l_wait_event(obd_zombie_waitq,
1723 !obd_zombie_impexp_check(NULL), &lwi);
1724 obd_zombie_impexp_cull();
1727 * Notify obd_zombie_barrier callers that queues
1730 wake_up(&obd_zombie_waitq);
1733 complete(&obd_zombie_stop);
1740 * start destroy zombie import/export thread
1742 int obd_zombie_impexp_init(void)
1744 struct task_struct *task;
1746 INIT_LIST_HEAD(&obd_zombie_imports);
1747 INIT_LIST_HEAD(&obd_zombie_exports);
1748 spin_lock_init(&obd_zombie_impexp_lock);
1749 init_completion(&obd_zombie_start);
1750 init_completion(&obd_zombie_stop);
1751 init_waitqueue_head(&obd_zombie_waitq);
1754 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1756 return PTR_ERR(task);
1758 wait_for_completion(&obd_zombie_start);
1762 * stop destroy zombie import/export thread
1764 void obd_zombie_impexp_stop(void)
1766 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1767 obd_zombie_impexp_notify();
1768 wait_for_completion(&obd_zombie_stop);
1771 /***** Kernel-userspace comm helpers *******/
1773 /* Get length of entire message, including header */
1774 int kuc_len(int payload_len)
1776 return sizeof(struct kuc_hdr) + payload_len;
1778 EXPORT_SYMBOL(kuc_len);
1780 /* Get a pointer to kuc header, given a ptr to the payload
1781 * @param p Pointer to payload area
1782 * @returns Pointer to kuc header
1784 struct kuc_hdr *kuc_ptr(void *p)
1786 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1787 LASSERT(lh->kuc_magic == KUC_MAGIC);
1790 EXPORT_SYMBOL(kuc_ptr);
1792 /* Test if payload is part of kuc message
1793 * @param p Pointer to payload area
1796 int kuc_ispayload(void *p)
1798 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1800 if (kh->kuc_magic == KUC_MAGIC)
1805 EXPORT_SYMBOL(kuc_ispayload);
1807 /* Alloc space for a message, and fill in header
1808 * @return Pointer to payload area
1810 void *kuc_alloc(int payload_len, int transport, int type)
1813 int len = kuc_len(payload_len);
1817 return ERR_PTR(-ENOMEM);
1819 lh->kuc_magic = KUC_MAGIC;
1820 lh->kuc_transport = transport;
1821 lh->kuc_msgtype = type;
1822 lh->kuc_msglen = len;
1824 return (void *)(lh + 1);
1826 EXPORT_SYMBOL(kuc_alloc);
1828 /* Takes pointer to payload area */
1829 inline void kuc_free(void *p, int payload_len)
1831 struct kuc_hdr *lh = kuc_ptr(p);
1832 OBD_FREE(lh, kuc_len(payload_len));
1834 EXPORT_SYMBOL(kuc_free);