4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
54 struct list_head obd_zombie_imports;
55 struct list_head obd_zombie_exports;
56 spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
81 static void obd_device_free(struct obd_device *obd)
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 struct obd_type *class_search_type(const char *name)
97 struct list_head *tmp;
98 struct obd_type *type;
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
108 spin_unlock(&obd_types_lock);
111 EXPORT_SYMBOL(class_search_type);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
118 const char *modname = name;
120 if (strcmp(modname, "obdfilter") == 0)
123 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124 modname = LUSTRE_OSP_NAME;
126 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127 modname = LUSTRE_MDT_NAME;
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138 spin_lock(&type->obd_type_lock);
140 try_module_get(type->typ_dt_ops->o_owner);
141 spin_unlock(&type->obd_type_lock);
145 EXPORT_SYMBOL(class_get_type);
147 void class_put_type(struct obd_type *type)
150 spin_lock(&type->obd_type_lock);
152 module_put(type->typ_dt_ops->o_owner);
153 spin_unlock(&type->obd_type_lock);
155 EXPORT_SYMBOL(class_put_type);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
163 struct obd_type *type;
167 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169 if (class_search_type(name)) {
170 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
175 OBD_ALLOC(type, sizeof(*type));
179 OBD_ALLOC_PTR(type->typ_dt_ops);
180 OBD_ALLOC_PTR(type->typ_md_ops);
181 OBD_ALLOC(type->typ_name, strlen(name) + 1);
183 if (type->typ_dt_ops == NULL ||
184 type->typ_md_ops == NULL ||
185 type->typ_name == NULL)
188 *(type->typ_dt_ops) = *dt_ops;
189 /* md_ops is optional */
191 *(type->typ_md_ops) = *md_ops;
192 strcpy(type->typ_name, name);
193 spin_lock_init(&type->obd_type_lock);
195 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
197 if (IS_ERR(type->typ_procroot)) {
198 rc = PTR_ERR(type->typ_procroot);
199 type->typ_procroot = NULL;
205 rc = lu_device_type_init(ldt);
210 spin_lock(&obd_types_lock);
211 list_add(&type->typ_chain, &obd_types);
212 spin_unlock(&obd_types_lock);
217 if (type->typ_name != NULL)
218 OBD_FREE(type->typ_name, strlen(name) + 1);
219 if (type->typ_md_ops != NULL)
220 OBD_FREE_PTR(type->typ_md_ops);
221 if (type->typ_dt_ops != NULL)
222 OBD_FREE_PTR(type->typ_dt_ops);
223 OBD_FREE(type, sizeof(*type));
226 EXPORT_SYMBOL(class_register_type);
228 int class_unregister_type(const char *name)
230 struct obd_type *type = class_search_type(name);
233 CERROR("unknown obd type\n");
237 if (type->typ_refcnt) {
238 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239 /* This is a bad situation, let's make the best of it */
240 /* Remove ops, but leave the name for debugging */
241 OBD_FREE_PTR(type->typ_dt_ops);
242 OBD_FREE_PTR(type->typ_md_ops);
246 if (type->typ_procroot) {
247 lprocfs_remove(&type->typ_procroot);
251 lu_device_type_fini(type->typ_lu);
253 spin_lock(&obd_types_lock);
254 list_del(&type->typ_chain);
255 spin_unlock(&obd_types_lock);
256 OBD_FREE(type->typ_name, strlen(name) + 1);
257 if (type->typ_dt_ops != NULL)
258 OBD_FREE_PTR(type->typ_dt_ops);
259 if (type->typ_md_ops != NULL)
260 OBD_FREE_PTR(type->typ_md_ops);
261 OBD_FREE(type, sizeof(*type));
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
267 * Create a new obd device.
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
271 * \param[in] type_name obd device type string.
272 * \param[in] name obd device name.
274 * \retval NULL if create fails, otherwise return the obd device
277 struct obd_device *class_newdev(const char *type_name, const char *name)
279 struct obd_device *result = NULL;
280 struct obd_device *newdev;
281 struct obd_type *type = NULL;
283 int new_obd_minor = 0;
285 if (strlen(name) >= MAX_OBD_NAME) {
286 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287 return ERR_PTR(-EINVAL);
290 type = class_get_type(type_name);
292 CERROR("OBD: unknown type: %s\n", type_name);
293 return ERR_PTR(-ENODEV);
296 newdev = obd_device_alloc();
297 if (newdev == NULL) {
298 result = ERR_PTR(-ENOMEM);
302 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304 write_lock(&obd_dev_lock);
305 for (i = 0; i < class_devno_max(); i++) {
306 struct obd_device *obd = class_num2obd(i);
308 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309 CERROR("Device %s already exists at %d, won't add\n",
312 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313 "%p obd_magic %08x != %08x\n", result,
314 result->obd_magic, OBD_DEVICE_MAGIC);
315 LASSERTF(result->obd_minor == new_obd_minor,
316 "%p obd_minor %d != %d\n", result,
317 result->obd_minor, new_obd_minor);
319 obd_devs[result->obd_minor] = NULL;
320 result->obd_name[0] = '\0';
322 result = ERR_PTR(-EEXIST);
325 if (!result && !obd) {
327 result->obd_minor = i;
329 result->obd_type = type;
330 strncpy(result->obd_name, name,
331 sizeof(result->obd_name) - 1);
332 obd_devs[i] = result;
335 write_unlock(&obd_dev_lock);
337 if (result == NULL && i >= class_devno_max()) {
338 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340 result = ERR_PTR(-EOVERFLOW);
347 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348 result->obd_name, result);
352 obd_device_free(newdev);
354 class_put_type(type);
358 void class_release_dev(struct obd_device *obd)
360 struct obd_type *obd_type = obd->obd_type;
362 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366 LASSERT(obd_type != NULL);
368 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371 write_lock(&obd_dev_lock);
372 obd_devs[obd->obd_minor] = NULL;
373 write_unlock(&obd_dev_lock);
374 obd_device_free(obd);
376 class_put_type(obd_type);
379 int class_name2dev(const char *name)
386 read_lock(&obd_dev_lock);
387 for (i = 0; i < class_devno_max(); i++) {
388 struct obd_device *obd = class_num2obd(i);
390 if (obd && strcmp(name, obd->obd_name) == 0) {
391 /* Make sure we finished attaching before we give
392 out any references */
393 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394 if (obd->obd_attached) {
395 read_unlock(&obd_dev_lock);
401 read_unlock(&obd_dev_lock);
405 EXPORT_SYMBOL(class_name2dev);
407 struct obd_device *class_name2obd(const char *name)
409 int dev = class_name2dev(name);
411 if (dev < 0 || dev > class_devno_max())
413 return class_num2obd(dev);
415 EXPORT_SYMBOL(class_name2obd);
417 int class_uuid2dev(struct obd_uuid *uuid)
421 read_lock(&obd_dev_lock);
422 for (i = 0; i < class_devno_max(); i++) {
423 struct obd_device *obd = class_num2obd(i);
425 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427 read_unlock(&obd_dev_lock);
431 read_unlock(&obd_dev_lock);
435 EXPORT_SYMBOL(class_uuid2dev);
437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
439 int dev = class_uuid2dev(uuid);
442 return class_num2obd(dev);
444 EXPORT_SYMBOL(class_uuid2obd);
447 * Get obd device from ::obd_devs[]
449 * \param num [in] array index
451 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452 * otherwise return the obd device there.
454 struct obd_device *class_num2obd(int num)
456 struct obd_device *obd = NULL;
458 if (num < class_devno_max()) {
463 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464 "%p obd_magic %08x != %08x\n",
465 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466 LASSERTF(obd->obd_minor == num,
467 "%p obd_minor %0d != %0d\n",
468 obd, obd->obd_minor, num);
473 EXPORT_SYMBOL(class_num2obd);
476 * Get obd devices count. Device in any
478 * \retval obd device count
480 int get_devices_count(void)
482 int index, max_index = class_devno_max(), dev_count = 0;
484 read_lock(&obd_dev_lock);
485 for (index = 0; index <= max_index; index++) {
486 struct obd_device *obd = class_num2obd(index);
490 read_unlock(&obd_dev_lock);
494 EXPORT_SYMBOL(get_devices_count);
496 void class_obd_list(void)
501 read_lock(&obd_dev_lock);
502 for (i = 0; i < class_devno_max(); i++) {
503 struct obd_device *obd = class_num2obd(i);
507 if (obd->obd_stopping)
509 else if (obd->obd_set_up)
511 else if (obd->obd_attached)
515 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516 i, status, obd->obd_type->typ_name,
517 obd->obd_name, obd->obd_uuid.uuid,
518 atomic_read(&obd->obd_refcount));
520 read_unlock(&obd_dev_lock);
524 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
525 specified, then only the client with that uuid is returned,
526 otherwise any client connected to the tgt is returned. */
527 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
528 const char *typ_name,
529 struct obd_uuid *grp_uuid)
533 read_lock(&obd_dev_lock);
534 for (i = 0; i < class_devno_max(); i++) {
535 struct obd_device *obd = class_num2obd(i);
539 if ((strncmp(obd->obd_type->typ_name, typ_name,
540 strlen(typ_name)) == 0)) {
541 if (obd_uuid_equals(tgt_uuid,
542 &obd->u.cli.cl_target_uuid) &&
543 ((grp_uuid)? obd_uuid_equals(grp_uuid,
544 &obd->obd_uuid) : 1)) {
545 read_unlock(&obd_dev_lock);
550 read_unlock(&obd_dev_lock);
554 EXPORT_SYMBOL(class_find_client_obd);
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557 searching at *next, and if a device is found, the next index to look
558 at is saved in *next. If next is NULL, then the first matching device
559 will always be returned. */
560 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
566 else if (*next >= 0 && *next < class_devno_max())
571 read_lock(&obd_dev_lock);
572 for (; i < class_devno_max(); i++) {
573 struct obd_device *obd = class_num2obd(i);
577 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
580 read_unlock(&obd_dev_lock);
584 read_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_devices_in_group);
591 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592 * adjust sptlrpc settings accordingly.
594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
596 struct obd_device *obd;
600 LASSERT(namelen > 0);
602 read_lock(&obd_dev_lock);
603 for (i = 0; i < class_devno_max(); i++) {
604 obd = class_num2obd(i);
606 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
609 /* only notify mdc, osc, mdt, ost */
610 type = obd->obd_type->typ_name;
611 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614 strcmp(type, LUSTRE_OST_NAME) != 0)
617 if (strncmp(obd->obd_name, fsname, namelen))
620 class_incref(obd, __func__, obd);
621 read_unlock(&obd_dev_lock);
622 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623 sizeof(KEY_SPTLRPC_CONF),
624 KEY_SPTLRPC_CONF, 0, NULL, NULL);
626 class_decref(obd, __func__, obd);
627 read_lock(&obd_dev_lock);
629 read_unlock(&obd_dev_lock);
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
634 void obd_cleanup_caches(void)
636 if (obd_device_cachep) {
637 kmem_cache_destroy(obd_device_cachep);
638 obd_device_cachep = NULL;
641 kmem_cache_destroy(obdo_cachep);
645 kmem_cache_destroy(import_cachep);
646 import_cachep = NULL;
649 kmem_cache_destroy(capa_cachep);
654 int obd_init_caches(void)
656 LASSERT(obd_device_cachep == NULL);
657 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658 sizeof(struct obd_device),
660 if (!obd_device_cachep)
663 LASSERT(obdo_cachep == NULL);
664 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
669 LASSERT(import_cachep == NULL);
670 import_cachep = kmem_cache_create("ll_import_cache",
671 sizeof(struct obd_import),
676 LASSERT(capa_cachep == NULL);
677 capa_cachep = kmem_cache_create("capa_cache",
678 sizeof(struct obd_capa), 0, 0, NULL);
684 obd_cleanup_caches();
689 /* map connection to client */
690 struct obd_export *class_conn2export(struct lustre_handle *conn)
692 struct obd_export *export;
695 CDEBUG(D_CACHE, "looking for null handle\n");
699 if (conn->cookie == -1) { /* this means assign a new connection */
700 CDEBUG(D_CACHE, "want a new connection\n");
704 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705 export = class_handle2object(conn->cookie);
708 EXPORT_SYMBOL(class_conn2export);
710 struct obd_device *class_exp2obd(struct obd_export *exp)
716 EXPORT_SYMBOL(class_exp2obd);
718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
720 struct obd_export *export;
721 export = class_conn2export(conn);
723 struct obd_device *obd = export->exp_obd;
724 class_export_put(export);
729 EXPORT_SYMBOL(class_conn2obd);
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
733 struct obd_device *obd = exp->exp_obd;
736 return obd->u.cli.cl_import;
738 EXPORT_SYMBOL(class_exp2cliimp);
740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
742 struct obd_device *obd = class_conn2obd(conn);
745 return obd->u.cli.cl_import;
747 EXPORT_SYMBOL(class_conn2cliimp);
749 /* Export management functions */
750 static void class_export_destroy(struct obd_export *exp)
752 struct obd_device *obd = exp->exp_obd;
754 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755 LASSERT(obd != NULL);
757 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758 exp->exp_client_uuid.uuid, obd->obd_name);
760 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761 if (exp->exp_connection)
762 ptlrpc_put_connection_superhack(exp->exp_connection);
764 LASSERT(list_empty(&exp->exp_outstanding_replies));
765 LASSERT(list_empty(&exp->exp_uncommitted_replies));
766 LASSERT(list_empty(&exp->exp_req_replay_queue));
767 LASSERT(list_empty(&exp->exp_hp_rpcs));
768 obd_destroy_export(exp);
769 class_decref(obd, "export", exp);
771 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
774 static void export_handle_addref(void *export)
776 class_export_get(export);
779 static struct portals_handle_ops export_handle_ops = {
780 .hop_addref = export_handle_addref,
784 struct obd_export *class_export_get(struct obd_export *exp)
786 atomic_inc(&exp->exp_refcount);
787 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788 atomic_read(&exp->exp_refcount));
791 EXPORT_SYMBOL(class_export_get);
793 void class_export_put(struct obd_export *exp)
795 LASSERT(exp != NULL);
796 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798 atomic_read(&exp->exp_refcount) - 1);
800 if (atomic_dec_and_test(&exp->exp_refcount)) {
801 LASSERT(!list_empty(&exp->exp_obd_chain));
802 CDEBUG(D_IOCTL, "final put %p/%s\n",
803 exp, exp->exp_client_uuid.uuid);
805 /* release nid stat refererence */
806 lprocfs_exp_cleanup(exp);
808 obd_zombie_export_add(exp);
811 EXPORT_SYMBOL(class_export_put);
813 /* Creates a new export, adds it to the hash table, and returns a
814 * pointer to it. The refcount is 2: one for the hash reference, and
815 * one for the pointer returned by this function. */
816 struct obd_export *class_new_export(struct obd_device *obd,
817 struct obd_uuid *cluuid)
819 struct obd_export *export;
820 struct cfs_hash *hash = NULL;
823 OBD_ALLOC_PTR(export);
825 return ERR_PTR(-ENOMEM);
827 export->exp_conn_cnt = 0;
828 export->exp_lock_hash = NULL;
829 export->exp_flock_hash = NULL;
830 atomic_set(&export->exp_refcount, 2);
831 atomic_set(&export->exp_rpc_count, 0);
832 atomic_set(&export->exp_cb_count, 0);
833 atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835 INIT_LIST_HEAD(&export->exp_locks_list);
836 spin_lock_init(&export->exp_locks_list_guard);
838 atomic_set(&export->exp_replay_count, 0);
839 export->exp_obd = obd;
840 INIT_LIST_HEAD(&export->exp_outstanding_replies);
841 spin_lock_init(&export->exp_uncommitted_replies_lock);
842 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843 INIT_LIST_HEAD(&export->exp_req_replay_queue);
844 INIT_LIST_HEAD(&export->exp_handle.h_link);
845 INIT_LIST_HEAD(&export->exp_hp_rpcs);
846 class_handle_hash(&export->exp_handle, &export_handle_ops);
847 export->exp_last_request_time = get_seconds();
848 spin_lock_init(&export->exp_lock);
849 spin_lock_init(&export->exp_rpc_lock);
850 INIT_HLIST_NODE(&export->exp_uuid_hash);
851 INIT_HLIST_NODE(&export->exp_nid_hash);
852 spin_lock_init(&export->exp_bl_list_lock);
853 INIT_LIST_HEAD(&export->exp_bl_list);
855 export->exp_sp_peer = LUSTRE_SP_ANY;
856 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857 export->exp_client_uuid = *cluuid;
858 obd_init_export(export);
860 spin_lock(&obd->obd_dev_lock);
861 /* shouldn't happen, but might race */
862 if (obd->obd_stopping) {
867 hash = cfs_hash_getref(obd->obd_uuid_hash);
872 spin_unlock(&obd->obd_dev_lock);
874 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
877 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878 obd->obd_name, cluuid->uuid, rc);
884 spin_lock(&obd->obd_dev_lock);
885 if (obd->obd_stopping) {
886 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
891 class_incref(obd, "export", export);
892 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893 list_add_tail(&export->exp_obd_chain_timed,
894 &export->exp_obd->obd_exports_timed);
895 export->exp_obd->obd_num_exports++;
896 spin_unlock(&obd->obd_dev_lock);
897 cfs_hash_putref(hash);
901 spin_unlock(&obd->obd_dev_lock);
904 cfs_hash_putref(hash);
905 class_handle_unhash(&export->exp_handle);
906 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907 obd_destroy_export(export);
908 OBD_FREE_PTR(export);
911 EXPORT_SYMBOL(class_new_export);
913 void class_unlink_export(struct obd_export *exp)
915 class_handle_unhash(&exp->exp_handle);
917 spin_lock(&exp->exp_obd->obd_dev_lock);
918 /* delete an uuid-export hashitem from hashtables */
919 if (!hlist_unhashed(&exp->exp_uuid_hash))
920 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921 &exp->exp_client_uuid,
922 &exp->exp_uuid_hash);
924 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925 list_del_init(&exp->exp_obd_chain_timed);
926 exp->exp_obd->obd_num_exports--;
927 spin_unlock(&exp->exp_obd->obd_dev_lock);
928 class_export_put(exp);
930 EXPORT_SYMBOL(class_unlink_export);
932 /* Import management functions */
933 void class_import_destroy(struct obd_import *imp)
935 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936 imp->imp_obd->obd_name);
938 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
940 ptlrpc_put_connection_superhack(imp->imp_connection);
942 while (!list_empty(&imp->imp_conn_list)) {
943 struct obd_import_conn *imp_conn;
945 imp_conn = list_entry(imp->imp_conn_list.next,
946 struct obd_import_conn, oic_item);
947 list_del_init(&imp_conn->oic_item);
948 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949 OBD_FREE(imp_conn, sizeof(*imp_conn));
952 LASSERT(imp->imp_sec == NULL);
953 class_decref(imp->imp_obd, "import", imp);
954 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
957 static void import_handle_addref(void *import)
959 class_import_get(import);
962 static struct portals_handle_ops import_handle_ops = {
963 .hop_addref = import_handle_addref,
967 struct obd_import *class_import_get(struct obd_import *import)
969 atomic_inc(&import->imp_refcount);
970 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971 atomic_read(&import->imp_refcount),
972 import->imp_obd->obd_name);
975 EXPORT_SYMBOL(class_import_get);
977 void class_import_put(struct obd_import *imp)
979 LASSERT(list_empty(&imp->imp_zombie_chain));
980 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
982 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983 atomic_read(&imp->imp_refcount) - 1,
984 imp->imp_obd->obd_name);
986 if (atomic_dec_and_test(&imp->imp_refcount)) {
987 CDEBUG(D_INFO, "final put import %p\n", imp);
988 obd_zombie_import_add(imp);
991 /* catch possible import put race */
992 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
994 EXPORT_SYMBOL(class_import_put);
996 static void init_imp_at(struct imp_at *at) {
998 at_init(&at->iat_net_latency, 0, 0);
999 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000 /* max service estimates are tracked on the server side, so
1001 don't use the AT history here, just use the last reported
1002 val. (But keep hist for proc histogram, worst_ever) */
1003 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1008 struct obd_import *class_new_import(struct obd_device *obd)
1010 struct obd_import *imp;
1012 OBD_ALLOC(imp, sizeof(*imp));
1016 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018 INIT_LIST_HEAD(&imp->imp_replay_list);
1019 INIT_LIST_HEAD(&imp->imp_sending_list);
1020 INIT_LIST_HEAD(&imp->imp_delayed_list);
1021 INIT_LIST_HEAD(&imp->imp_committed_list);
1022 imp->imp_replay_cursor = &imp->imp_committed_list;
1023 spin_lock_init(&imp->imp_lock);
1024 imp->imp_last_success_conn = 0;
1025 imp->imp_state = LUSTRE_IMP_NEW;
1026 imp->imp_obd = class_incref(obd, "import", imp);
1027 mutex_init(&imp->imp_sec_mutex);
1028 init_waitqueue_head(&imp->imp_recovery_waitq);
1030 atomic_set(&imp->imp_refcount, 2);
1031 atomic_set(&imp->imp_unregistering, 0);
1032 atomic_set(&imp->imp_inflight, 0);
1033 atomic_set(&imp->imp_replay_inflight, 0);
1034 atomic_set(&imp->imp_inval_count, 0);
1035 INIT_LIST_HEAD(&imp->imp_conn_list);
1036 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038 init_imp_at(&imp->imp_at);
1040 /* the default magic is V2, will be used in connect RPC, and
1041 * then adjusted according to the flags in request/reply. */
1042 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1046 EXPORT_SYMBOL(class_new_import);
1048 void class_destroy_import(struct obd_import *import)
1050 LASSERT(import != NULL);
1051 LASSERT(import != LP_POISON);
1053 class_handle_unhash(&import->imp_handle);
1055 spin_lock(&import->imp_lock);
1056 import->imp_generation++;
1057 spin_unlock(&import->imp_lock);
1058 class_import_put(import);
1060 EXPORT_SYMBOL(class_destroy_import);
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1066 spin_lock(&exp->exp_locks_list_guard);
1068 LASSERT(lock->l_exp_refs_nr >= 0);
1070 if (lock->l_exp_refs_target != NULL &&
1071 lock->l_exp_refs_target != exp) {
1072 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073 exp, lock, lock->l_exp_refs_target);
1075 if ((lock->l_exp_refs_nr ++) == 0) {
1076 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077 lock->l_exp_refs_target = exp;
1079 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080 lock, exp, lock->l_exp_refs_nr);
1081 spin_unlock(&exp->exp_locks_list_guard);
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 spin_lock(&exp->exp_locks_list_guard);
1088 LASSERT(lock->l_exp_refs_nr > 0);
1089 if (lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1091 lock, lock->l_exp_refs_target, exp);
1093 if (-- lock->l_exp_refs_nr == 0) {
1094 list_del_init(&lock->l_exp_refs_link);
1095 lock->l_exp_refs_target = NULL;
1097 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098 lock, exp, lock->l_exp_refs_nr);
1099 spin_unlock(&exp->exp_locks_list_guard);
1101 EXPORT_SYMBOL(__class_export_del_lock_ref);
1104 /* A connection defines an export context in which preallocation can
1105 be managed. This releases the export pointer reference, and returns
1106 the export handle, so the export refcount is 1 when this function
1108 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1109 struct obd_uuid *cluuid)
1111 struct obd_export *export;
1112 LASSERT(conn != NULL);
1113 LASSERT(obd != NULL);
1114 LASSERT(cluuid != NULL);
1116 export = class_new_export(obd, cluuid);
1118 return PTR_ERR(export);
1120 conn->cookie = export->exp_handle.h_cookie;
1121 class_export_put(export);
1123 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1124 cluuid->uuid, conn->cookie);
1127 EXPORT_SYMBOL(class_connect);
1129 /* if export is involved in recovery then clean up related things */
1130 void class_export_recovery_cleanup(struct obd_export *exp)
1132 struct obd_device *obd = exp->exp_obd;
1134 spin_lock(&obd->obd_recovery_task_lock);
1135 if (exp->exp_delayed)
1136 obd->obd_delayed_clients--;
1137 if (obd->obd_recovering) {
1138 if (exp->exp_in_recovery) {
1139 spin_lock(&exp->exp_lock);
1140 exp->exp_in_recovery = 0;
1141 spin_unlock(&exp->exp_lock);
1142 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1143 atomic_dec(&obd->obd_connected_clients);
1146 /* if called during recovery then should update
1147 * obd_stale_clients counter,
1148 * lightweight exports are not counted */
1149 if (exp->exp_failed &&
1150 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1151 exp->exp_obd->obd_stale_clients++;
1153 spin_unlock(&obd->obd_recovery_task_lock);
1154 /** Cleanup req replay fields */
1155 if (exp->exp_req_replay_needed) {
1156 spin_lock(&exp->exp_lock);
1157 exp->exp_req_replay_needed = 0;
1158 spin_unlock(&exp->exp_lock);
1159 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160 atomic_dec(&obd->obd_req_replay_clients);
1162 /** Cleanup lock replay data */
1163 if (exp->exp_lock_replay_needed) {
1164 spin_lock(&exp->exp_lock);
1165 exp->exp_lock_replay_needed = 0;
1166 spin_unlock(&exp->exp_lock);
1167 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168 atomic_dec(&obd->obd_lock_replay_clients);
1172 /* This function removes 1-3 references from the export:
1173 * 1 - for export pointer passed
1174 * and if disconnect really need
1175 * 2 - removing from hash
1176 * 3 - in client_unlink_export
1177 * The export pointer passed to this function can destroyed */
1178 int class_disconnect(struct obd_export *export)
1180 int already_disconnected;
1182 if (export == NULL) {
1183 CWARN("attempting to free NULL export %p\n", export);
1187 spin_lock(&export->exp_lock);
1188 already_disconnected = export->exp_disconnected;
1189 export->exp_disconnected = 1;
1190 spin_unlock(&export->exp_lock);
1192 /* class_cleanup(), abort_recovery(), and class_fail_export()
1193 * all end up in here, and if any of them race we shouldn't
1194 * call extra class_export_puts(). */
1195 if (already_disconnected) {
1196 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1200 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1201 export->exp_handle.h_cookie);
1203 if (!hlist_unhashed(&export->exp_nid_hash))
1204 cfs_hash_del(export->exp_obd->obd_nid_hash,
1205 &export->exp_connection->c_peer.nid,
1206 &export->exp_nid_hash);
1208 class_export_recovery_cleanup(export);
1209 class_unlink_export(export);
1211 class_export_put(export);
1214 EXPORT_SYMBOL(class_disconnect);
1216 /* Return non-zero for a fully connected export */
1217 int class_connected_export(struct obd_export *exp)
1221 spin_lock(&exp->exp_lock);
1222 connected = (exp->exp_conn_cnt > 0);
1223 spin_unlock(&exp->exp_lock);
1228 EXPORT_SYMBOL(class_connected_export);
1230 static void class_disconnect_export_list(struct list_head *list,
1231 enum obd_option flags)
1234 struct obd_export *exp;
1236 /* It's possible that an export may disconnect itself, but
1237 * nothing else will be added to this list. */
1238 while (!list_empty(list)) {
1239 exp = list_entry(list->next, struct obd_export,
1241 /* need for safe call CDEBUG after obd_disconnect */
1242 class_export_get(exp);
1244 spin_lock(&exp->exp_lock);
1245 exp->exp_flags = flags;
1246 spin_unlock(&exp->exp_lock);
1248 if (obd_uuid_equals(&exp->exp_client_uuid,
1249 &exp->exp_obd->obd_uuid)) {
1251 "exp %p export uuid == obd uuid, don't discon\n",
1253 /* Need to delete this now so we don't end up pointing
1254 * to work_list later when this export is cleaned up. */
1255 list_del_init(&exp->exp_obd_chain);
1256 class_export_put(exp);
1260 class_export_get(exp);
1261 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1262 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1263 exp, exp->exp_last_request_time);
1264 /* release one export reference anyway */
1265 rc = obd_disconnect(exp);
1267 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1268 obd_export_nid2str(exp), exp, rc);
1269 class_export_put(exp);
1273 void class_disconnect_exports(struct obd_device *obd)
1275 struct list_head work_list;
1277 /* Move all of the exports from obd_exports to a work list, en masse. */
1278 INIT_LIST_HEAD(&work_list);
1279 spin_lock(&obd->obd_dev_lock);
1280 list_splice_init(&obd->obd_exports, &work_list);
1281 list_splice_init(&obd->obd_delayed_exports, &work_list);
1282 spin_unlock(&obd->obd_dev_lock);
1284 if (!list_empty(&work_list)) {
1285 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1286 obd->obd_minor, obd);
1287 class_disconnect_export_list(&work_list,
1288 exp_flags_from_obd(obd));
1290 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1291 obd->obd_minor, obd);
1293 EXPORT_SYMBOL(class_disconnect_exports);
1295 /* Remove exports that have not completed recovery.
1297 void class_disconnect_stale_exports(struct obd_device *obd,
1298 int (*test_export)(struct obd_export *))
1300 struct list_head work_list;
1301 struct obd_export *exp, *n;
1304 INIT_LIST_HEAD(&work_list);
1305 spin_lock(&obd->obd_dev_lock);
1306 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1308 /* don't count self-export as client */
1309 if (obd_uuid_equals(&exp->exp_client_uuid,
1310 &exp->exp_obd->obd_uuid))
1313 /* don't evict clients which have no slot in last_rcvd
1314 * (e.g. lightweight connection) */
1315 if (exp->exp_target_data.ted_lr_idx == -1)
1318 spin_lock(&exp->exp_lock);
1319 if (exp->exp_failed || test_export(exp)) {
1320 spin_unlock(&exp->exp_lock);
1323 exp->exp_failed = 1;
1324 spin_unlock(&exp->exp_lock);
1326 list_move(&exp->exp_obd_chain, &work_list);
1328 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1329 obd->obd_name, exp->exp_client_uuid.uuid,
1330 exp->exp_connection == NULL ? "<unknown>" :
1331 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1332 print_export_data(exp, "EVICTING", 0);
1334 spin_unlock(&obd->obd_dev_lock);
1337 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1338 obd->obd_name, evicted);
1340 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1341 OBD_OPT_ABORT_RECOV);
1343 EXPORT_SYMBOL(class_disconnect_stale_exports);
1345 void class_fail_export(struct obd_export *exp)
1347 int rc, already_failed;
1349 spin_lock(&exp->exp_lock);
1350 already_failed = exp->exp_failed;
1351 exp->exp_failed = 1;
1352 spin_unlock(&exp->exp_lock);
1354 if (already_failed) {
1355 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1356 exp, exp->exp_client_uuid.uuid);
1360 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1361 exp, exp->exp_client_uuid.uuid);
1363 if (obd_dump_on_timeout)
1364 libcfs_debug_dumplog();
1366 /* need for safe call CDEBUG after obd_disconnect */
1367 class_export_get(exp);
1369 /* Most callers into obd_disconnect are removing their own reference
1370 * (request, for example) in addition to the one from the hash table.
1371 * We don't have such a reference here, so make one. */
1372 class_export_get(exp);
1373 rc = obd_disconnect(exp);
1375 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1377 CDEBUG(D_HA, "disconnected export %p/%s\n",
1378 exp, exp->exp_client_uuid.uuid);
1379 class_export_put(exp);
1381 EXPORT_SYMBOL(class_fail_export);
1383 char *obd_export_nid2str(struct obd_export *exp)
1385 if (exp->exp_connection != NULL)
1386 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1390 EXPORT_SYMBOL(obd_export_nid2str);
1392 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1394 struct cfs_hash *nid_hash;
1395 struct obd_export *doomed_exp = NULL;
1396 int exports_evicted = 0;
1398 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1400 spin_lock(&obd->obd_dev_lock);
1401 /* umount has run already, so evict thread should leave
1402 * its task to umount thread now */
1403 if (obd->obd_stopping) {
1404 spin_unlock(&obd->obd_dev_lock);
1405 return exports_evicted;
1407 nid_hash = obd->obd_nid_hash;
1408 cfs_hash_getref(nid_hash);
1409 spin_unlock(&obd->obd_dev_lock);
1412 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1413 if (doomed_exp == NULL)
1416 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1417 "nid %s found, wanted nid %s, requested nid %s\n",
1418 obd_export_nid2str(doomed_exp),
1419 libcfs_nid2str(nid_key), nid);
1420 LASSERTF(doomed_exp != obd->obd_self_export,
1421 "self-export is hashed by NID?\n");
1423 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1425 obd_uuid2str(&doomed_exp->exp_client_uuid),
1426 obd_export_nid2str(doomed_exp));
1427 class_fail_export(doomed_exp);
1428 class_export_put(doomed_exp);
1431 cfs_hash_putref(nid_hash);
1433 if (!exports_evicted)
1435 "%s: can't disconnect NID '%s': no exports found\n",
1436 obd->obd_name, nid);
1437 return exports_evicted;
1439 EXPORT_SYMBOL(obd_export_evict_by_nid);
1441 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1443 struct cfs_hash *uuid_hash;
1444 struct obd_export *doomed_exp = NULL;
1445 struct obd_uuid doomed_uuid;
1446 int exports_evicted = 0;
1448 spin_lock(&obd->obd_dev_lock);
1449 if (obd->obd_stopping) {
1450 spin_unlock(&obd->obd_dev_lock);
1451 return exports_evicted;
1453 uuid_hash = obd->obd_uuid_hash;
1454 cfs_hash_getref(uuid_hash);
1455 spin_unlock(&obd->obd_dev_lock);
1457 obd_str2uuid(&doomed_uuid, uuid);
1458 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1459 CERROR("%s: can't evict myself\n", obd->obd_name);
1460 cfs_hash_putref(uuid_hash);
1461 return exports_evicted;
1464 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1466 if (doomed_exp == NULL) {
1467 CERROR("%s: can't disconnect %s: no exports found\n",
1468 obd->obd_name, uuid);
1470 CWARN("%s: evicting %s at administrative request\n",
1471 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1472 class_fail_export(doomed_exp);
1473 class_export_put(doomed_exp);
1476 cfs_hash_putref(uuid_hash);
1478 return exports_evicted;
1480 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1482 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1483 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1484 EXPORT_SYMBOL(class_export_dump_hook);
1487 static void print_export_data(struct obd_export *exp, const char *status,
1490 struct ptlrpc_reply_state *rs;
1491 struct ptlrpc_reply_state *first_reply = NULL;
1494 spin_lock(&exp->exp_lock);
1495 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1501 spin_unlock(&exp->exp_lock);
1503 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1504 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1505 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1506 atomic_read(&exp->exp_rpc_count),
1507 atomic_read(&exp->exp_cb_count),
1508 atomic_read(&exp->exp_locks_count),
1509 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1510 nreplies, first_reply, nreplies > 3 ? "..." : "",
1511 exp->exp_last_committed);
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513 if (locks && class_export_dump_hook != NULL)
1514 class_export_dump_hook(exp);
1518 void dump_exports(struct obd_device *obd, int locks)
1520 struct obd_export *exp;
1522 spin_lock(&obd->obd_dev_lock);
1523 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1524 print_export_data(exp, "ACTIVE", locks);
1525 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1526 print_export_data(exp, "UNLINKED", locks);
1527 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1528 print_export_data(exp, "DELAYED", locks);
1529 spin_unlock(&obd->obd_dev_lock);
1530 spin_lock(&obd_zombie_impexp_lock);
1531 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1532 print_export_data(exp, "ZOMBIE", locks);
1533 spin_unlock(&obd_zombie_impexp_lock);
1535 EXPORT_SYMBOL(dump_exports);
1537 void obd_exports_barrier(struct obd_device *obd)
1540 LASSERT(list_empty(&obd->obd_exports));
1541 spin_lock(&obd->obd_dev_lock);
1542 while (!list_empty(&obd->obd_unlinked_exports)) {
1543 spin_unlock(&obd->obd_dev_lock);
1544 set_current_state(TASK_UNINTERRUPTIBLE);
1545 schedule_timeout(cfs_time_seconds(waited));
1546 if (waited > 5 && IS_PO2(waited)) {
1547 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1548 obd->obd_name, waited,
1549 atomic_read(&obd->obd_refcount));
1550 dump_exports(obd, 1);
1553 spin_lock(&obd->obd_dev_lock);
1555 spin_unlock(&obd->obd_dev_lock);
1557 EXPORT_SYMBOL(obd_exports_barrier);
1559 /* Total amount of zombies to be destroyed */
1560 static int zombies_count = 0;
1563 * kill zombie imports and exports
1565 void obd_zombie_impexp_cull(void)
1567 struct obd_import *import;
1568 struct obd_export *export;
1571 spin_lock(&obd_zombie_impexp_lock);
1574 if (!list_empty(&obd_zombie_imports)) {
1575 import = list_entry(obd_zombie_imports.next,
1578 list_del_init(&import->imp_zombie_chain);
1582 if (!list_empty(&obd_zombie_exports)) {
1583 export = list_entry(obd_zombie_exports.next,
1586 list_del_init(&export->exp_obd_chain);
1589 spin_unlock(&obd_zombie_impexp_lock);
1591 if (import != NULL) {
1592 class_import_destroy(import);
1593 spin_lock(&obd_zombie_impexp_lock);
1595 spin_unlock(&obd_zombie_impexp_lock);
1598 if (export != NULL) {
1599 class_export_destroy(export);
1600 spin_lock(&obd_zombie_impexp_lock);
1602 spin_unlock(&obd_zombie_impexp_lock);
1606 } while (import != NULL || export != NULL);
1609 static struct completion obd_zombie_start;
1610 static struct completion obd_zombie_stop;
1611 static unsigned long obd_zombie_flags;
1612 static wait_queue_head_t obd_zombie_waitq;
1613 static pid_t obd_zombie_pid;
1616 OBD_ZOMBIE_STOP = 0x0001,
1620 * check for work for kill zombie import/export thread.
1622 static int obd_zombie_impexp_check(void *arg)
1626 spin_lock(&obd_zombie_impexp_lock);
1627 rc = (zombies_count == 0) &&
1628 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1629 spin_unlock(&obd_zombie_impexp_lock);
1635 * Add export to the obd_zombie thread and notify it.
1637 static void obd_zombie_export_add(struct obd_export *exp) {
1638 spin_lock(&exp->exp_obd->obd_dev_lock);
1639 LASSERT(!list_empty(&exp->exp_obd_chain));
1640 list_del_init(&exp->exp_obd_chain);
1641 spin_unlock(&exp->exp_obd->obd_dev_lock);
1642 spin_lock(&obd_zombie_impexp_lock);
1644 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1645 spin_unlock(&obd_zombie_impexp_lock);
1647 obd_zombie_impexp_notify();
1651 * Add import to the obd_zombie thread and notify it.
1653 static void obd_zombie_import_add(struct obd_import *imp) {
1654 LASSERT(imp->imp_sec == NULL);
1655 LASSERT(imp->imp_rq_pool == NULL);
1656 spin_lock(&obd_zombie_impexp_lock);
1657 LASSERT(list_empty(&imp->imp_zombie_chain));
1659 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1660 spin_unlock(&obd_zombie_impexp_lock);
1662 obd_zombie_impexp_notify();
1666 * notify import/export destroy thread about new zombie.
1668 static void obd_zombie_impexp_notify(void)
1671 * Make sure obd_zombie_impexp_thread get this notification.
1672 * It is possible this signal only get by obd_zombie_barrier, and
1673 * barrier gulps this notification and sleeps away and hangs ensues
1675 wake_up_all(&obd_zombie_waitq);
1679 * check whether obd_zombie is idle
1681 static int obd_zombie_is_idle(void)
1685 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1686 spin_lock(&obd_zombie_impexp_lock);
1687 rc = (zombies_count == 0);
1688 spin_unlock(&obd_zombie_impexp_lock);
1693 * wait when obd_zombie import/export queues become empty
1695 void obd_zombie_barrier(void)
1697 struct l_wait_info lwi = { 0 };
1699 if (obd_zombie_pid == current_pid())
1700 /* don't wait for myself */
1702 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1704 EXPORT_SYMBOL(obd_zombie_barrier);
1708 * destroy zombie export/import thread.
1710 static int obd_zombie_impexp_thread(void *unused)
1712 unshare_fs_struct();
1713 complete(&obd_zombie_start);
1715 obd_zombie_pid = current_pid();
1717 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1718 struct l_wait_info lwi = { 0 };
1720 l_wait_event(obd_zombie_waitq,
1721 !obd_zombie_impexp_check(NULL), &lwi);
1722 obd_zombie_impexp_cull();
1725 * Notify obd_zombie_barrier callers that queues
1728 wake_up(&obd_zombie_waitq);
1731 complete(&obd_zombie_stop);
1738 * start destroy zombie import/export thread
1740 int obd_zombie_impexp_init(void)
1742 struct task_struct *task;
1744 INIT_LIST_HEAD(&obd_zombie_imports);
1745 INIT_LIST_HEAD(&obd_zombie_exports);
1746 spin_lock_init(&obd_zombie_impexp_lock);
1747 init_completion(&obd_zombie_start);
1748 init_completion(&obd_zombie_stop);
1749 init_waitqueue_head(&obd_zombie_waitq);
1752 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1754 return PTR_ERR(task);
1756 wait_for_completion(&obd_zombie_start);
1760 * stop destroy zombie import/export thread
1762 void obd_zombie_impexp_stop(void)
1764 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1765 obd_zombie_impexp_notify();
1766 wait_for_completion(&obd_zombie_stop);
1769 /***** Kernel-userspace comm helpers *******/
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1774 return sizeof(struct kuc_hdr) + payload_len;
1776 EXPORT_SYMBOL(kuc_len);
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779 * @param p Pointer to payload area
1780 * @returns Pointer to kuc header
1782 struct kuc_hdr *kuc_ptr(void *p)
1784 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785 LASSERT(lh->kuc_magic == KUC_MAGIC);
1788 EXPORT_SYMBOL(kuc_ptr);
1790 /* Test if payload is part of kuc message
1791 * @param p Pointer to payload area
1794 int kuc_ispayload(void *p)
1796 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798 if (kh->kuc_magic == KUC_MAGIC)
1803 EXPORT_SYMBOL(kuc_ispayload);
1805 /* Alloc space for a message, and fill in header
1806 * @return Pointer to payload area
1808 void *kuc_alloc(int payload_len, int transport, int type)
1811 int len = kuc_len(payload_len);
1815 return ERR_PTR(-ENOMEM);
1817 lh->kuc_magic = KUC_MAGIC;
1818 lh->kuc_transport = transport;
1819 lh->kuc_msgtype = type;
1820 lh->kuc_msglen = len;
1822 return (void *)(lh + 1);
1824 EXPORT_SYMBOL(kuc_alloc);
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1829 struct kuc_hdr *lh = kuc_ptr(p);
1830 OBD_FREE(lh, kuc_len(payload_len));
1832 EXPORT_SYMBOL(kuc_free);