Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dtor/input
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
48
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
53
54 struct list_head      obd_zombie_imports;
55 struct list_head      obd_zombie_exports;
56 spinlock_t  obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137         if (type) {
138                 spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 try_module_get(type->typ_dt_ops->o_owner);
141                 spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         module_put(type->typ_dt_ops->o_owner);
153         spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165
166         /* sanity check */
167         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168
169         if (class_search_type(name)) {
170                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171                 return -EEXIST;
172         }
173
174         rc = -ENOMEM;
175         OBD_ALLOC(type, sizeof(*type));
176         if (type == NULL)
177                 return rc;
178
179         OBD_ALLOC_PTR(type->typ_dt_ops);
180         OBD_ALLOC_PTR(type->typ_md_ops);
181         OBD_ALLOC(type->typ_name, strlen(name) + 1);
182
183         if (type->typ_dt_ops == NULL ||
184             type->typ_md_ops == NULL ||
185             type->typ_name == NULL)
186                 goto failed;
187
188         *(type->typ_dt_ops) = *dt_ops;
189         /* md_ops is optional */
190         if (md_ops)
191                 *(type->typ_md_ops) = *md_ops;
192         strcpy(type->typ_name, name);
193         spin_lock_init(&type->obd_type_lock);
194
195         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196                                               vars, type);
197         if (IS_ERR(type->typ_procroot)) {
198                 rc = PTR_ERR(type->typ_procroot);
199                 type->typ_procroot = NULL;
200                 goto failed;
201         }
202
203         if (ldt != NULL) {
204                 type->typ_lu = ldt;
205                 rc = lu_device_type_init(ldt);
206                 if (rc != 0)
207                         goto failed;
208         }
209
210         spin_lock(&obd_types_lock);
211         list_add(&type->typ_chain, &obd_types);
212         spin_unlock(&obd_types_lock);
213
214         return 0;
215
216  failed:
217         if (type->typ_name != NULL)
218                 OBD_FREE(type->typ_name, strlen(name) + 1);
219         if (type->typ_md_ops != NULL)
220                 OBD_FREE_PTR(type->typ_md_ops);
221         if (type->typ_dt_ops != NULL)
222                 OBD_FREE_PTR(type->typ_dt_ops);
223         OBD_FREE(type, sizeof(*type));
224         return rc;
225 }
226 EXPORT_SYMBOL(class_register_type);
227
228 int class_unregister_type(const char *name)
229 {
230         struct obd_type *type = class_search_type(name);
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 return -EINVAL;
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 return -EBUSY;
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         spin_lock(&obd_types_lock);
254         list_del(&type->typ_chain);
255         spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         return 0;
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *       pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 return ERR_PTR(-EINVAL);
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 return ERR_PTR(-ENODEV);
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 result = ERR_PTR(-ENOMEM);
299                 goto out_type;
300         }
301
302         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304         write_lock(&obd_dev_lock);
305         for (i = 0; i < class_devno_max(); i++) {
306                 struct obd_device *obd = class_num2obd(i);
307
308                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0] = '\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 result = ERR_PTR(-EOVERFLOW);
341                 goto out;
342         }
343
344         if (IS_ERR(result))
345                 goto out;
346
347         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                result->obd_name, result);
349
350         return result;
351 out:
352         obd_device_free(newdev);
353 out_type:
354         class_put_type(type);
355         return result;
356 }
357
358 void class_release_dev(struct obd_device *obd)
359 {
360         struct obd_type *obd_type = obd->obd_type;
361
362         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366         LASSERT(obd_type != NULL);
367
368         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370
371         write_lock(&obd_dev_lock);
372         obd_devs[obd->obd_minor] = NULL;
373         write_unlock(&obd_dev_lock);
374         obd_device_free(obd);
375
376         class_put_type(obd_type);
377 }
378
379 int class_name2dev(const char *name)
380 {
381         int i;
382
383         if (!name)
384                 return -1;
385
386         read_lock(&obd_dev_lock);
387         for (i = 0; i < class_devno_max(); i++) {
388                 struct obd_device *obd = class_num2obd(i);
389
390                 if (obd && strcmp(name, obd->obd_name) == 0) {
391                         /* Make sure we finished attaching before we give
392                            out any references */
393                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394                         if (obd->obd_attached) {
395                                 read_unlock(&obd_dev_lock);
396                                 return i;
397                         }
398                         break;
399                 }
400         }
401         read_unlock(&obd_dev_lock);
402
403         return -1;
404 }
405 EXPORT_SYMBOL(class_name2dev);
406
407 struct obd_device *class_name2obd(const char *name)
408 {
409         int dev = class_name2dev(name);
410
411         if (dev < 0 || dev > class_devno_max())
412                 return NULL;
413         return class_num2obd(dev);
414 }
415 EXPORT_SYMBOL(class_name2obd);
416
417 int class_uuid2dev(struct obd_uuid *uuid)
418 {
419         int i;
420
421         read_lock(&obd_dev_lock);
422         for (i = 0; i < class_devno_max(); i++) {
423                 struct obd_device *obd = class_num2obd(i);
424
425                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427                         read_unlock(&obd_dev_lock);
428                         return i;
429                 }
430         }
431         read_unlock(&obd_dev_lock);
432
433         return -1;
434 }
435 EXPORT_SYMBOL(class_uuid2dev);
436
437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438 {
439         int dev = class_uuid2dev(uuid);
440         if (dev < 0)
441                 return NULL;
442         return class_num2obd(dev);
443 }
444 EXPORT_SYMBOL(class_uuid2obd);
445
446 /**
447  * Get obd device from ::obd_devs[]
448  *
449  * \param num [in] array index
450  *
451  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452  *       otherwise return the obd device there.
453  */
454 struct obd_device *class_num2obd(int num)
455 {
456         struct obd_device *obd = NULL;
457
458         if (num < class_devno_max()) {
459                 obd = obd_devs[num];
460                 if (obd == NULL)
461                         return NULL;
462
463                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464                          "%p obd_magic %08x != %08x\n",
465                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466                 LASSERTF(obd->obd_minor == num,
467                          "%p obd_minor %0d != %0d\n",
468                          obd, obd->obd_minor, num);
469         }
470
471         return obd;
472 }
473 EXPORT_SYMBOL(class_num2obd);
474
475 /**
476  * Get obd devices count. Device in any
477  *    state are counted
478  * \retval obd device count
479  */
480 int get_devices_count(void)
481 {
482         int index, max_index = class_devno_max(), dev_count = 0;
483
484         read_lock(&obd_dev_lock);
485         for (index = 0; index <= max_index; index++) {
486                 struct obd_device *obd = class_num2obd(index);
487                 if (obd != NULL)
488                         dev_count++;
489         }
490         read_unlock(&obd_dev_lock);
491
492         return dev_count;
493 }
494 EXPORT_SYMBOL(get_devices_count);
495
496 void class_obd_list(void)
497 {
498         char *status;
499         int i;
500
501         read_lock(&obd_dev_lock);
502         for (i = 0; i < class_devno_max(); i++) {
503                 struct obd_device *obd = class_num2obd(i);
504
505                 if (obd == NULL)
506                         continue;
507                 if (obd->obd_stopping)
508                         status = "ST";
509                 else if (obd->obd_set_up)
510                         status = "UP";
511                 else if (obd->obd_attached)
512                         status = "AT";
513                 else
514                         status = "--";
515                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516                          i, status, obd->obd_type->typ_name,
517                          obd->obd_name, obd->obd_uuid.uuid,
518                          atomic_read(&obd->obd_refcount));
519         }
520         read_unlock(&obd_dev_lock);
521         return;
522 }
523
524 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
525    specified, then only the client with that uuid is returned,
526    otherwise any client connected to the tgt is returned. */
527 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
528                                           const char *typ_name,
529                                           struct obd_uuid *grp_uuid)
530 {
531         int i;
532
533         read_lock(&obd_dev_lock);
534         for (i = 0; i < class_devno_max(); i++) {
535                 struct obd_device *obd = class_num2obd(i);
536
537                 if (obd == NULL)
538                         continue;
539                 if ((strncmp(obd->obd_type->typ_name, typ_name,
540                              strlen(typ_name)) == 0)) {
541                         if (obd_uuid_equals(tgt_uuid,
542                                             &obd->u.cli.cl_target_uuid) &&
543                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
544                                                          &obd->obd_uuid) : 1)) {
545                                 read_unlock(&obd_dev_lock);
546                                 return obd;
547                         }
548                 }
549         }
550         read_unlock(&obd_dev_lock);
551
552         return NULL;
553 }
554 EXPORT_SYMBOL(class_find_client_obd);
555
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557    searching at *next, and if a device is found, the next index to look
558    at is saved in *next. If next is NULL, then the first matching device
559    will always be returned. */
560 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
561 {
562         int i;
563
564         if (next == NULL)
565                 i = 0;
566         else if (*next >= 0 && *next < class_devno_max())
567                 i = *next;
568         else
569                 return NULL;
570
571         read_lock(&obd_dev_lock);
572         for (; i < class_devno_max(); i++) {
573                 struct obd_device *obd = class_num2obd(i);
574
575                 if (obd == NULL)
576                         continue;
577                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578                         if (next != NULL)
579                                 *next = i+1;
580                         read_unlock(&obd_dev_lock);
581                         return obd;
582                 }
583         }
584         read_unlock(&obd_dev_lock);
585
586         return NULL;
587 }
588 EXPORT_SYMBOL(class_devices_in_group);
589
590 /**
591  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592  * adjust sptlrpc settings accordingly.
593  */
594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595 {
596         struct obd_device  *obd;
597         const char       *type;
598         int              i, rc = 0, rc2;
599
600         LASSERT(namelen > 0);
601
602         read_lock(&obd_dev_lock);
603         for (i = 0; i < class_devno_max(); i++) {
604                 obd = class_num2obd(i);
605
606                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607                         continue;
608
609                 /* only notify mdc, osc, mdt, ost */
610                 type = obd->obd_type->typ_name;
611                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614                     strcmp(type, LUSTRE_OST_NAME) != 0)
615                         continue;
616
617                 if (strncmp(obd->obd_name, fsname, namelen))
618                         continue;
619
620                 class_incref(obd, __func__, obd);
621                 read_unlock(&obd_dev_lock);
622                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623                                          sizeof(KEY_SPTLRPC_CONF),
624                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
625                 rc = rc ? rc : rc2;
626                 class_decref(obd, __func__, obd);
627                 read_lock(&obd_dev_lock);
628         }
629         read_unlock(&obd_dev_lock);
630         return rc;
631 }
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633
634 void obd_cleanup_caches(void)
635 {
636         if (obd_device_cachep) {
637                 kmem_cache_destroy(obd_device_cachep);
638                 obd_device_cachep = NULL;
639         }
640         if (obdo_cachep) {
641                 kmem_cache_destroy(obdo_cachep);
642                 obdo_cachep = NULL;
643         }
644         if (import_cachep) {
645                 kmem_cache_destroy(import_cachep);
646                 import_cachep = NULL;
647         }
648         if (capa_cachep) {
649                 kmem_cache_destroy(capa_cachep);
650                 capa_cachep = NULL;
651         }
652 }
653
654 int obd_init_caches(void)
655 {
656         LASSERT(obd_device_cachep == NULL);
657         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658                                                  sizeof(struct obd_device),
659                                                  0, 0, NULL);
660         if (!obd_device_cachep)
661                 goto out;
662
663         LASSERT(obdo_cachep == NULL);
664         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665                                            0, 0, NULL);
666         if (!obdo_cachep)
667                 goto out;
668
669         LASSERT(import_cachep == NULL);
670         import_cachep = kmem_cache_create("ll_import_cache",
671                                              sizeof(struct obd_import),
672                                              0, 0, NULL);
673         if (!import_cachep)
674                 goto out;
675
676         LASSERT(capa_cachep == NULL);
677         capa_cachep = kmem_cache_create("capa_cache",
678                                            sizeof(struct obd_capa), 0, 0, NULL);
679         if (!capa_cachep)
680                 goto out;
681
682         return 0;
683  out:
684         obd_cleanup_caches();
685         return -ENOMEM;
686
687 }
688
689 /* map connection to client */
690 struct obd_export *class_conn2export(struct lustre_handle *conn)
691 {
692         struct obd_export *export;
693
694         if (!conn) {
695                 CDEBUG(D_CACHE, "looking for null handle\n");
696                 return NULL;
697         }
698
699         if (conn->cookie == -1) {  /* this means assign a new connection */
700                 CDEBUG(D_CACHE, "want a new connection\n");
701                 return NULL;
702         }
703
704         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705         export = class_handle2object(conn->cookie);
706         return export;
707 }
708 EXPORT_SYMBOL(class_conn2export);
709
710 struct obd_device *class_exp2obd(struct obd_export *exp)
711 {
712         if (exp)
713                 return exp->exp_obd;
714         return NULL;
715 }
716 EXPORT_SYMBOL(class_exp2obd);
717
718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
719 {
720         struct obd_export *export;
721         export = class_conn2export(conn);
722         if (export) {
723                 struct obd_device *obd = export->exp_obd;
724                 class_export_put(export);
725                 return obd;
726         }
727         return NULL;
728 }
729 EXPORT_SYMBOL(class_conn2obd);
730
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 {
733         struct obd_device *obd = exp->exp_obd;
734         if (obd == NULL)
735                 return NULL;
736         return obd->u.cli.cl_import;
737 }
738 EXPORT_SYMBOL(class_exp2cliimp);
739
740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741 {
742         struct obd_device *obd = class_conn2obd(conn);
743         if (obd == NULL)
744                 return NULL;
745         return obd->u.cli.cl_import;
746 }
747 EXPORT_SYMBOL(class_conn2cliimp);
748
749 /* Export management functions */
750 static void class_export_destroy(struct obd_export *exp)
751 {
752         struct obd_device *obd = exp->exp_obd;
753
754         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755         LASSERT(obd != NULL);
756
757         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758                exp->exp_client_uuid.uuid, obd->obd_name);
759
760         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761         if (exp->exp_connection)
762                 ptlrpc_put_connection_superhack(exp->exp_connection);
763
764         LASSERT(list_empty(&exp->exp_outstanding_replies));
765         LASSERT(list_empty(&exp->exp_uncommitted_replies));
766         LASSERT(list_empty(&exp->exp_req_replay_queue));
767         LASSERT(list_empty(&exp->exp_hp_rpcs));
768         obd_destroy_export(exp);
769         class_decref(obd, "export", exp);
770
771         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772 }
773
774 static void export_handle_addref(void *export)
775 {
776         class_export_get(export);
777 }
778
779 static struct portals_handle_ops export_handle_ops = {
780         .hop_addref = export_handle_addref,
781         .hop_free   = NULL,
782 };
783
784 struct obd_export *class_export_get(struct obd_export *exp)
785 {
786         atomic_inc(&exp->exp_refcount);
787         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788                atomic_read(&exp->exp_refcount));
789         return exp;
790 }
791 EXPORT_SYMBOL(class_export_get);
792
793 void class_export_put(struct obd_export *exp)
794 {
795         LASSERT(exp != NULL);
796         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798                atomic_read(&exp->exp_refcount) - 1);
799
800         if (atomic_dec_and_test(&exp->exp_refcount)) {
801                 LASSERT(!list_empty(&exp->exp_obd_chain));
802                 CDEBUG(D_IOCTL, "final put %p/%s\n",
803                        exp, exp->exp_client_uuid.uuid);
804
805                 /* release nid stat refererence */
806                 lprocfs_exp_cleanup(exp);
807
808                 obd_zombie_export_add(exp);
809         }
810 }
811 EXPORT_SYMBOL(class_export_put);
812
813 /* Creates a new export, adds it to the hash table, and returns a
814  * pointer to it. The refcount is 2: one for the hash reference, and
815  * one for the pointer returned by this function. */
816 struct obd_export *class_new_export(struct obd_device *obd,
817                                     struct obd_uuid *cluuid)
818 {
819         struct obd_export *export;
820         struct cfs_hash *hash = NULL;
821         int rc = 0;
822
823         OBD_ALLOC_PTR(export);
824         if (!export)
825                 return ERR_PTR(-ENOMEM);
826
827         export->exp_conn_cnt = 0;
828         export->exp_lock_hash = NULL;
829         export->exp_flock_hash = NULL;
830         atomic_set(&export->exp_refcount, 2);
831         atomic_set(&export->exp_rpc_count, 0);
832         atomic_set(&export->exp_cb_count, 0);
833         atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835         INIT_LIST_HEAD(&export->exp_locks_list);
836         spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838         atomic_set(&export->exp_replay_count, 0);
839         export->exp_obd = obd;
840         INIT_LIST_HEAD(&export->exp_outstanding_replies);
841         spin_lock_init(&export->exp_uncommitted_replies_lock);
842         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843         INIT_LIST_HEAD(&export->exp_req_replay_queue);
844         INIT_LIST_HEAD(&export->exp_handle.h_link);
845         INIT_LIST_HEAD(&export->exp_hp_rpcs);
846         class_handle_hash(&export->exp_handle, &export_handle_ops);
847         export->exp_last_request_time = get_seconds();
848         spin_lock_init(&export->exp_lock);
849         spin_lock_init(&export->exp_rpc_lock);
850         INIT_HLIST_NODE(&export->exp_uuid_hash);
851         INIT_HLIST_NODE(&export->exp_nid_hash);
852         spin_lock_init(&export->exp_bl_list_lock);
853         INIT_LIST_HEAD(&export->exp_bl_list);
854
855         export->exp_sp_peer = LUSTRE_SP_ANY;
856         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857         export->exp_client_uuid = *cluuid;
858         obd_init_export(export);
859
860         spin_lock(&obd->obd_dev_lock);
861         /* shouldn't happen, but might race */
862         if (obd->obd_stopping) {
863                 rc = -ENODEV;
864                 goto exit_unlock;
865         }
866
867         hash = cfs_hash_getref(obd->obd_uuid_hash);
868         if (hash == NULL) {
869                 rc = -ENODEV;
870                 goto exit_unlock;
871         }
872         spin_unlock(&obd->obd_dev_lock);
873
874         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876                 if (rc != 0) {
877                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878                                       obd->obd_name, cluuid->uuid, rc);
879                         rc = -EALREADY;
880                         goto exit_err;
881                 }
882         }
883
884         spin_lock(&obd->obd_dev_lock);
885         if (obd->obd_stopping) {
886                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
887                 rc = -ENODEV;
888                 goto exit_unlock;
889         }
890
891         class_incref(obd, "export", export);
892         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893         list_add_tail(&export->exp_obd_chain_timed,
894                           &export->exp_obd->obd_exports_timed);
895         export->exp_obd->obd_num_exports++;
896         spin_unlock(&obd->obd_dev_lock);
897         cfs_hash_putref(hash);
898         return export;
899
900 exit_unlock:
901         spin_unlock(&obd->obd_dev_lock);
902 exit_err:
903         if (hash)
904                 cfs_hash_putref(hash);
905         class_handle_unhash(&export->exp_handle);
906         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907         obd_destroy_export(export);
908         OBD_FREE_PTR(export);
909         return ERR_PTR(rc);
910 }
911 EXPORT_SYMBOL(class_new_export);
912
913 void class_unlink_export(struct obd_export *exp)
914 {
915         class_handle_unhash(&exp->exp_handle);
916
917         spin_lock(&exp->exp_obd->obd_dev_lock);
918         /* delete an uuid-export hashitem from hashtables */
919         if (!hlist_unhashed(&exp->exp_uuid_hash))
920                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921                              &exp->exp_client_uuid,
922                              &exp->exp_uuid_hash);
923
924         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925         list_del_init(&exp->exp_obd_chain_timed);
926         exp->exp_obd->obd_num_exports--;
927         spin_unlock(&exp->exp_obd->obd_dev_lock);
928         class_export_put(exp);
929 }
930 EXPORT_SYMBOL(class_unlink_export);
931
932 /* Import management functions */
933 void class_import_destroy(struct obd_import *imp)
934 {
935         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936                 imp->imp_obd->obd_name);
937
938         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939
940         ptlrpc_put_connection_superhack(imp->imp_connection);
941
942         while (!list_empty(&imp->imp_conn_list)) {
943                 struct obd_import_conn *imp_conn;
944
945                 imp_conn = list_entry(imp->imp_conn_list.next,
946                                           struct obd_import_conn, oic_item);
947                 list_del_init(&imp_conn->oic_item);
948                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949                 OBD_FREE(imp_conn, sizeof(*imp_conn));
950         }
951
952         LASSERT(imp->imp_sec == NULL);
953         class_decref(imp->imp_obd, "import", imp);
954         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
955 }
956
957 static void import_handle_addref(void *import)
958 {
959         class_import_get(import);
960 }
961
962 static struct portals_handle_ops import_handle_ops = {
963         .hop_addref = import_handle_addref,
964         .hop_free   = NULL,
965 };
966
967 struct obd_import *class_import_get(struct obd_import *import)
968 {
969         atomic_inc(&import->imp_refcount);
970         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971                atomic_read(&import->imp_refcount),
972                import->imp_obd->obd_name);
973         return import;
974 }
975 EXPORT_SYMBOL(class_import_get);
976
977 void class_import_put(struct obd_import *imp)
978 {
979         LASSERT(list_empty(&imp->imp_zombie_chain));
980         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981
982         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983                atomic_read(&imp->imp_refcount) - 1,
984                imp->imp_obd->obd_name);
985
986         if (atomic_dec_and_test(&imp->imp_refcount)) {
987                 CDEBUG(D_INFO, "final put import %p\n", imp);
988                 obd_zombie_import_add(imp);
989         }
990
991         /* catch possible import put race */
992         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993 }
994 EXPORT_SYMBOL(class_import_put);
995
996 static void init_imp_at(struct imp_at *at) {
997         int i;
998         at_init(&at->iat_net_latency, 0, 0);
999         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000                 /* max service estimates are tracked on the server side, so
1001                    don't use the AT history here, just use the last reported
1002                    val. (But keep hist for proc histogram, worst_ever) */
1003                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004                         AT_FLG_NOHIST);
1005         }
1006 }
1007
1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010         struct obd_import *imp;
1011
1012         OBD_ALLOC(imp, sizeof(*imp));
1013         if (imp == NULL)
1014                 return NULL;
1015
1016         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018         INIT_LIST_HEAD(&imp->imp_replay_list);
1019         INIT_LIST_HEAD(&imp->imp_sending_list);
1020         INIT_LIST_HEAD(&imp->imp_delayed_list);
1021         INIT_LIST_HEAD(&imp->imp_committed_list);
1022         imp->imp_replay_cursor = &imp->imp_committed_list;
1023         spin_lock_init(&imp->imp_lock);
1024         imp->imp_last_success_conn = 0;
1025         imp->imp_state = LUSTRE_IMP_NEW;
1026         imp->imp_obd = class_incref(obd, "import", imp);
1027         mutex_init(&imp->imp_sec_mutex);
1028         init_waitqueue_head(&imp->imp_recovery_waitq);
1029
1030         atomic_set(&imp->imp_refcount, 2);
1031         atomic_set(&imp->imp_unregistering, 0);
1032         atomic_set(&imp->imp_inflight, 0);
1033         atomic_set(&imp->imp_replay_inflight, 0);
1034         atomic_set(&imp->imp_inval_count, 0);
1035         INIT_LIST_HEAD(&imp->imp_conn_list);
1036         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038         init_imp_at(&imp->imp_at);
1039
1040         /* the default magic is V2, will be used in connect RPC, and
1041          * then adjusted according to the flags in request/reply. */
1042         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043
1044         return imp;
1045 }
1046 EXPORT_SYMBOL(class_new_import);
1047
1048 void class_destroy_import(struct obd_import *import)
1049 {
1050         LASSERT(import != NULL);
1051         LASSERT(import != LP_POISON);
1052
1053         class_handle_unhash(&import->imp_handle);
1054
1055         spin_lock(&import->imp_lock);
1056         import->imp_generation++;
1057         spin_unlock(&import->imp_lock);
1058         class_import_put(import);
1059 }
1060 EXPORT_SYMBOL(class_destroy_import);
1061
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1063
1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065 {
1066         spin_lock(&exp->exp_locks_list_guard);
1067
1068         LASSERT(lock->l_exp_refs_nr >= 0);
1069
1070         if (lock->l_exp_refs_target != NULL &&
1071             lock->l_exp_refs_target != exp) {
1072                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073                               exp, lock, lock->l_exp_refs_target);
1074         }
1075         if ((lock->l_exp_refs_nr ++) == 0) {
1076                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077                 lock->l_exp_refs_target = exp;
1078         }
1079         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080                lock, exp, lock->l_exp_refs_nr);
1081         spin_unlock(&exp->exp_locks_list_guard);
1082 }
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1084
1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087         spin_lock(&exp->exp_locks_list_guard);
1088         LASSERT(lock->l_exp_refs_nr > 0);
1089         if (lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1091                               lock, lock->l_exp_refs_target, exp);
1092         }
1093         if (-- lock->l_exp_refs_nr == 0) {
1094                 list_del_init(&lock->l_exp_refs_link);
1095                 lock->l_exp_refs_target = NULL;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_del_lock_ref);
1102 #endif
1103
1104 /* A connection defines an export context in which preallocation can
1105    be managed. This releases the export pointer reference, and returns
1106    the export handle, so the export refcount is 1 when this function
1107    returns. */
1108 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1109                   struct obd_uuid *cluuid)
1110 {
1111         struct obd_export *export;
1112         LASSERT(conn != NULL);
1113         LASSERT(obd != NULL);
1114         LASSERT(cluuid != NULL);
1115
1116         export = class_new_export(obd, cluuid);
1117         if (IS_ERR(export))
1118                 return PTR_ERR(export);
1119
1120         conn->cookie = export->exp_handle.h_cookie;
1121         class_export_put(export);
1122
1123         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1124                cluuid->uuid, conn->cookie);
1125         return 0;
1126 }
1127 EXPORT_SYMBOL(class_connect);
1128
1129 /* if export is involved in recovery then clean up related things */
1130 void class_export_recovery_cleanup(struct obd_export *exp)
1131 {
1132         struct obd_device *obd = exp->exp_obd;
1133
1134         spin_lock(&obd->obd_recovery_task_lock);
1135         if (exp->exp_delayed)
1136                 obd->obd_delayed_clients--;
1137         if (obd->obd_recovering) {
1138                 if (exp->exp_in_recovery) {
1139                         spin_lock(&exp->exp_lock);
1140                         exp->exp_in_recovery = 0;
1141                         spin_unlock(&exp->exp_lock);
1142                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1143                         atomic_dec(&obd->obd_connected_clients);
1144                 }
1145
1146                 /* if called during recovery then should update
1147                  * obd_stale_clients counter,
1148                  * lightweight exports are not counted */
1149                 if (exp->exp_failed &&
1150                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1151                         exp->exp_obd->obd_stale_clients++;
1152         }
1153         spin_unlock(&obd->obd_recovery_task_lock);
1154
1155         spin_lock(&exp->exp_lock);
1156         /** Cleanup req replay fields */
1157         if (exp->exp_req_replay_needed) {
1158                 exp->exp_req_replay_needed = 0;
1159
1160                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1161                 atomic_dec(&obd->obd_req_replay_clients);
1162         }
1163
1164         /** Cleanup lock replay data */
1165         if (exp->exp_lock_replay_needed) {
1166                 exp->exp_lock_replay_needed = 0;
1167
1168                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1169                 atomic_dec(&obd->obd_lock_replay_clients);
1170         }
1171         spin_unlock(&exp->exp_lock);
1172 }
1173
1174 /* This function removes 1-3 references from the export:
1175  * 1 - for export pointer passed
1176  * and if disconnect really need
1177  * 2 - removing from hash
1178  * 3 - in client_unlink_export
1179  * The export pointer passed to this function can destroyed */
1180 int class_disconnect(struct obd_export *export)
1181 {
1182         int already_disconnected;
1183
1184         if (export == NULL) {
1185                 CWARN("attempting to free NULL export %p\n", export);
1186                 return -EINVAL;
1187         }
1188
1189         spin_lock(&export->exp_lock);
1190         already_disconnected = export->exp_disconnected;
1191         export->exp_disconnected = 1;
1192         spin_unlock(&export->exp_lock);
1193
1194         /* class_cleanup(), abort_recovery(), and class_fail_export()
1195          * all end up in here, and if any of them race we shouldn't
1196          * call extra class_export_puts(). */
1197         if (already_disconnected) {
1198                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1199                 goto no_disconn;
1200         }
1201
1202         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1203                export->exp_handle.h_cookie);
1204
1205         if (!hlist_unhashed(&export->exp_nid_hash))
1206                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1207                              &export->exp_connection->c_peer.nid,
1208                              &export->exp_nid_hash);
1209
1210         class_export_recovery_cleanup(export);
1211         class_unlink_export(export);
1212 no_disconn:
1213         class_export_put(export);
1214         return 0;
1215 }
1216 EXPORT_SYMBOL(class_disconnect);
1217
1218 /* Return non-zero for a fully connected export */
1219 int class_connected_export(struct obd_export *exp)
1220 {
1221         if (exp) {
1222                 int connected;
1223                 spin_lock(&exp->exp_lock);
1224                 connected = (exp->exp_conn_cnt > 0);
1225                 spin_unlock(&exp->exp_lock);
1226                 return connected;
1227         }
1228         return 0;
1229 }
1230 EXPORT_SYMBOL(class_connected_export);
1231
1232 static void class_disconnect_export_list(struct list_head *list,
1233                                          enum obd_option flags)
1234 {
1235         int rc;
1236         struct obd_export *exp;
1237
1238         /* It's possible that an export may disconnect itself, but
1239          * nothing else will be added to this list. */
1240         while (!list_empty(list)) {
1241                 exp = list_entry(list->next, struct obd_export,
1242                                      exp_obd_chain);
1243                 /* need for safe call CDEBUG after obd_disconnect */
1244                 class_export_get(exp);
1245
1246                 spin_lock(&exp->exp_lock);
1247                 exp->exp_flags = flags;
1248                 spin_unlock(&exp->exp_lock);
1249
1250                 if (obd_uuid_equals(&exp->exp_client_uuid,
1251                                     &exp->exp_obd->obd_uuid)) {
1252                         CDEBUG(D_HA,
1253                                "exp %p export uuid == obd uuid, don't discon\n",
1254                                exp);
1255                         /* Need to delete this now so we don't end up pointing
1256                          * to work_list later when this export is cleaned up. */
1257                         list_del_init(&exp->exp_obd_chain);
1258                         class_export_put(exp);
1259                         continue;
1260                 }
1261
1262                 class_export_get(exp);
1263                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1264                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265                        exp, exp->exp_last_request_time);
1266                 /* release one export reference anyway */
1267                 rc = obd_disconnect(exp);
1268
1269                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270                        obd_export_nid2str(exp), exp, rc);
1271                 class_export_put(exp);
1272         }
1273 }
1274
1275 void class_disconnect_exports(struct obd_device *obd)
1276 {
1277         struct list_head work_list;
1278
1279         /* Move all of the exports from obd_exports to a work list, en masse. */
1280         INIT_LIST_HEAD(&work_list);
1281         spin_lock(&obd->obd_dev_lock);
1282         list_splice_init(&obd->obd_exports, &work_list);
1283         list_splice_init(&obd->obd_delayed_exports, &work_list);
1284         spin_unlock(&obd->obd_dev_lock);
1285
1286         if (!list_empty(&work_list)) {
1287                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1288                        obd->obd_minor, obd);
1289                 class_disconnect_export_list(&work_list,
1290                                              exp_flags_from_obd(obd));
1291         } else
1292                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1293                        obd->obd_minor, obd);
1294 }
1295 EXPORT_SYMBOL(class_disconnect_exports);
1296
1297 /* Remove exports that have not completed recovery.
1298  */
1299 void class_disconnect_stale_exports(struct obd_device *obd,
1300                                     int (*test_export)(struct obd_export *))
1301 {
1302         struct list_head work_list;
1303         struct obd_export *exp, *n;
1304         int evicted = 0;
1305
1306         INIT_LIST_HEAD(&work_list);
1307         spin_lock(&obd->obd_dev_lock);
1308         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1309                                      exp_obd_chain) {
1310                 /* don't count self-export as client */
1311                 if (obd_uuid_equals(&exp->exp_client_uuid,
1312                                     &exp->exp_obd->obd_uuid))
1313                         continue;
1314
1315                 /* don't evict clients which have no slot in last_rcvd
1316                  * (e.g. lightweight connection) */
1317                 if (exp->exp_target_data.ted_lr_idx == -1)
1318                         continue;
1319
1320                 spin_lock(&exp->exp_lock);
1321                 if (exp->exp_failed || test_export(exp)) {
1322                         spin_unlock(&exp->exp_lock);
1323                         continue;
1324                 }
1325                 exp->exp_failed = 1;
1326                 spin_unlock(&exp->exp_lock);
1327
1328                 list_move(&exp->exp_obd_chain, &work_list);
1329                 evicted++;
1330                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1331                        obd->obd_name, exp->exp_client_uuid.uuid,
1332                        exp->exp_connection == NULL ? "<unknown>" :
1333                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1334                 print_export_data(exp, "EVICTING", 0);
1335         }
1336         spin_unlock(&obd->obd_dev_lock);
1337
1338         if (evicted)
1339                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1340                               obd->obd_name, evicted);
1341
1342         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1343                                                  OBD_OPT_ABORT_RECOV);
1344 }
1345 EXPORT_SYMBOL(class_disconnect_stale_exports);
1346
1347 void class_fail_export(struct obd_export *exp)
1348 {
1349         int rc, already_failed;
1350
1351         spin_lock(&exp->exp_lock);
1352         already_failed = exp->exp_failed;
1353         exp->exp_failed = 1;
1354         spin_unlock(&exp->exp_lock);
1355
1356         if (already_failed) {
1357                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1358                        exp, exp->exp_client_uuid.uuid);
1359                 return;
1360         }
1361
1362         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1363                exp, exp->exp_client_uuid.uuid);
1364
1365         if (obd_dump_on_timeout)
1366                 libcfs_debug_dumplog();
1367
1368         /* need for safe call CDEBUG after obd_disconnect */
1369         class_export_get(exp);
1370
1371         /* Most callers into obd_disconnect are removing their own reference
1372          * (request, for example) in addition to the one from the hash table.
1373          * We don't have such a reference here, so make one. */
1374         class_export_get(exp);
1375         rc = obd_disconnect(exp);
1376         if (rc)
1377                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1378         else
1379                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1380                        exp, exp->exp_client_uuid.uuid);
1381         class_export_put(exp);
1382 }
1383 EXPORT_SYMBOL(class_fail_export);
1384
1385 char *obd_export_nid2str(struct obd_export *exp)
1386 {
1387         if (exp->exp_connection != NULL)
1388                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1389
1390         return "(no nid)";
1391 }
1392 EXPORT_SYMBOL(obd_export_nid2str);
1393
1394 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1395 {
1396         struct cfs_hash *nid_hash;
1397         struct obd_export *doomed_exp = NULL;
1398         int exports_evicted = 0;
1399
1400         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1401
1402         spin_lock(&obd->obd_dev_lock);
1403         /* umount has run already, so evict thread should leave
1404          * its task to umount thread now */
1405         if (obd->obd_stopping) {
1406                 spin_unlock(&obd->obd_dev_lock);
1407                 return exports_evicted;
1408         }
1409         nid_hash = obd->obd_nid_hash;
1410         cfs_hash_getref(nid_hash);
1411         spin_unlock(&obd->obd_dev_lock);
1412
1413         do {
1414                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1415                 if (doomed_exp == NULL)
1416                         break;
1417
1418                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1419                          "nid %s found, wanted nid %s, requested nid %s\n",
1420                          obd_export_nid2str(doomed_exp),
1421                          libcfs_nid2str(nid_key), nid);
1422                 LASSERTF(doomed_exp != obd->obd_self_export,
1423                          "self-export is hashed by NID?\n");
1424                 exports_evicted++;
1425                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1426                               obd->obd_name,
1427                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1428                               obd_export_nid2str(doomed_exp));
1429                 class_fail_export(doomed_exp);
1430                 class_export_put(doomed_exp);
1431         } while (1);
1432
1433         cfs_hash_putref(nid_hash);
1434
1435         if (!exports_evicted)
1436                 CDEBUG(D_HA,
1437                        "%s: can't disconnect NID '%s': no exports found\n",
1438                        obd->obd_name, nid);
1439         return exports_evicted;
1440 }
1441 EXPORT_SYMBOL(obd_export_evict_by_nid);
1442
1443 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1444 {
1445         struct cfs_hash *uuid_hash;
1446         struct obd_export *doomed_exp = NULL;
1447         struct obd_uuid doomed_uuid;
1448         int exports_evicted = 0;
1449
1450         spin_lock(&obd->obd_dev_lock);
1451         if (obd->obd_stopping) {
1452                 spin_unlock(&obd->obd_dev_lock);
1453                 return exports_evicted;
1454         }
1455         uuid_hash = obd->obd_uuid_hash;
1456         cfs_hash_getref(uuid_hash);
1457         spin_unlock(&obd->obd_dev_lock);
1458
1459         obd_str2uuid(&doomed_uuid, uuid);
1460         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1461                 CERROR("%s: can't evict myself\n", obd->obd_name);
1462                 cfs_hash_putref(uuid_hash);
1463                 return exports_evicted;
1464         }
1465
1466         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1467
1468         if (doomed_exp == NULL) {
1469                 CERROR("%s: can't disconnect %s: no exports found\n",
1470                        obd->obd_name, uuid);
1471         } else {
1472                 CWARN("%s: evicting %s at administrative request\n",
1473                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1474                 class_fail_export(doomed_exp);
1475                 class_export_put(doomed_exp);
1476                 exports_evicted++;
1477         }
1478         cfs_hash_putref(uuid_hash);
1479
1480         return exports_evicted;
1481 }
1482 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1483
1484 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1485 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1486 EXPORT_SYMBOL(class_export_dump_hook);
1487 #endif
1488
1489 static void print_export_data(struct obd_export *exp, const char *status,
1490                               int locks)
1491 {
1492         struct ptlrpc_reply_state *rs;
1493         struct ptlrpc_reply_state *first_reply = NULL;
1494         int nreplies = 0;
1495
1496         spin_lock(&exp->exp_lock);
1497         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1498                                 rs_exp_list) {
1499                 if (nreplies == 0)
1500                         first_reply = rs;
1501                 nreplies++;
1502         }
1503         spin_unlock(&exp->exp_lock);
1504
1505         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1506                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1507                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1508                atomic_read(&exp->exp_rpc_count),
1509                atomic_read(&exp->exp_cb_count),
1510                atomic_read(&exp->exp_locks_count),
1511                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1512                nreplies, first_reply, nreplies > 3 ? "..." : "",
1513                exp->exp_last_committed);
1514 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1515         if (locks && class_export_dump_hook != NULL)
1516                 class_export_dump_hook(exp);
1517 #endif
1518 }
1519
1520 void dump_exports(struct obd_device *obd, int locks)
1521 {
1522         struct obd_export *exp;
1523
1524         spin_lock(&obd->obd_dev_lock);
1525         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1526                 print_export_data(exp, "ACTIVE", locks);
1527         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1528                 print_export_data(exp, "UNLINKED", locks);
1529         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1530                 print_export_data(exp, "DELAYED", locks);
1531         spin_unlock(&obd->obd_dev_lock);
1532         spin_lock(&obd_zombie_impexp_lock);
1533         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1534                 print_export_data(exp, "ZOMBIE", locks);
1535         spin_unlock(&obd_zombie_impexp_lock);
1536 }
1537 EXPORT_SYMBOL(dump_exports);
1538
1539 void obd_exports_barrier(struct obd_device *obd)
1540 {
1541         int waited = 2;
1542         LASSERT(list_empty(&obd->obd_exports));
1543         spin_lock(&obd->obd_dev_lock);
1544         while (!list_empty(&obd->obd_unlinked_exports)) {
1545                 spin_unlock(&obd->obd_dev_lock);
1546                 set_current_state(TASK_UNINTERRUPTIBLE);
1547                 schedule_timeout(cfs_time_seconds(waited));
1548                 if (waited > 5 && IS_PO2(waited)) {
1549                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1550                                       obd->obd_name, waited,
1551                                       atomic_read(&obd->obd_refcount));
1552                         dump_exports(obd, 1);
1553                 }
1554                 waited *= 2;
1555                 spin_lock(&obd->obd_dev_lock);
1556         }
1557         spin_unlock(&obd->obd_dev_lock);
1558 }
1559 EXPORT_SYMBOL(obd_exports_barrier);
1560
1561 /* Total amount of zombies to be destroyed */
1562 static int zombies_count = 0;
1563
1564 /**
1565  * kill zombie imports and exports
1566  */
1567 void obd_zombie_impexp_cull(void)
1568 {
1569         struct obd_import *import;
1570         struct obd_export *export;
1571
1572         do {
1573                 spin_lock(&obd_zombie_impexp_lock);
1574
1575                 import = NULL;
1576                 if (!list_empty(&obd_zombie_imports)) {
1577                         import = list_entry(obd_zombie_imports.next,
1578                                                 struct obd_import,
1579                                                 imp_zombie_chain);
1580                         list_del_init(&import->imp_zombie_chain);
1581                 }
1582
1583                 export = NULL;
1584                 if (!list_empty(&obd_zombie_exports)) {
1585                         export = list_entry(obd_zombie_exports.next,
1586                                                 struct obd_export,
1587                                                 exp_obd_chain);
1588                         list_del_init(&export->exp_obd_chain);
1589                 }
1590
1591                 spin_unlock(&obd_zombie_impexp_lock);
1592
1593                 if (import != NULL) {
1594                         class_import_destroy(import);
1595                         spin_lock(&obd_zombie_impexp_lock);
1596                         zombies_count--;
1597                         spin_unlock(&obd_zombie_impexp_lock);
1598                 }
1599
1600                 if (export != NULL) {
1601                         class_export_destroy(export);
1602                         spin_lock(&obd_zombie_impexp_lock);
1603                         zombies_count--;
1604                         spin_unlock(&obd_zombie_impexp_lock);
1605                 }
1606
1607                 cond_resched();
1608         } while (import != NULL || export != NULL);
1609 }
1610
1611 static struct completion        obd_zombie_start;
1612 static struct completion        obd_zombie_stop;
1613 static unsigned long            obd_zombie_flags;
1614 static wait_queue_head_t                obd_zombie_waitq;
1615 static pid_t                    obd_zombie_pid;
1616
1617 enum {
1618         OBD_ZOMBIE_STOP         = 0x0001,
1619 };
1620
1621 /**
1622  * check for work for kill zombie import/export thread.
1623  */
1624 static int obd_zombie_impexp_check(void *arg)
1625 {
1626         int rc;
1627
1628         spin_lock(&obd_zombie_impexp_lock);
1629         rc = (zombies_count == 0) &&
1630              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1631         spin_unlock(&obd_zombie_impexp_lock);
1632
1633         return rc;
1634 }
1635
1636 /**
1637  * Add export to the obd_zombie thread and notify it.
1638  */
1639 static void obd_zombie_export_add(struct obd_export *exp) {
1640         spin_lock(&exp->exp_obd->obd_dev_lock);
1641         LASSERT(!list_empty(&exp->exp_obd_chain));
1642         list_del_init(&exp->exp_obd_chain);
1643         spin_unlock(&exp->exp_obd->obd_dev_lock);
1644         spin_lock(&obd_zombie_impexp_lock);
1645         zombies_count++;
1646         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1647         spin_unlock(&obd_zombie_impexp_lock);
1648
1649         obd_zombie_impexp_notify();
1650 }
1651
1652 /**
1653  * Add import to the obd_zombie thread and notify it.
1654  */
1655 static void obd_zombie_import_add(struct obd_import *imp) {
1656         LASSERT(imp->imp_sec == NULL);
1657         LASSERT(imp->imp_rq_pool == NULL);
1658         spin_lock(&obd_zombie_impexp_lock);
1659         LASSERT(list_empty(&imp->imp_zombie_chain));
1660         zombies_count++;
1661         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1662         spin_unlock(&obd_zombie_impexp_lock);
1663
1664         obd_zombie_impexp_notify();
1665 }
1666
1667 /**
1668  * notify import/export destroy thread about new zombie.
1669  */
1670 static void obd_zombie_impexp_notify(void)
1671 {
1672         /*
1673          * Make sure obd_zombie_impexp_thread get this notification.
1674          * It is possible this signal only get by obd_zombie_barrier, and
1675          * barrier gulps this notification and sleeps away and hangs ensues
1676          */
1677         wake_up_all(&obd_zombie_waitq);
1678 }
1679
1680 /**
1681  * check whether obd_zombie is idle
1682  */
1683 static int obd_zombie_is_idle(void)
1684 {
1685         int rc;
1686
1687         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1688         spin_lock(&obd_zombie_impexp_lock);
1689         rc = (zombies_count == 0);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691         return rc;
1692 }
1693
1694 /**
1695  * wait when obd_zombie import/export queues become empty
1696  */
1697 void obd_zombie_barrier(void)
1698 {
1699         struct l_wait_info lwi = { 0 };
1700
1701         if (obd_zombie_pid == current_pid())
1702                 /* don't wait for myself */
1703                 return;
1704         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1705 }
1706 EXPORT_SYMBOL(obd_zombie_barrier);
1707
1708
1709 /**
1710  * destroy zombie export/import thread.
1711  */
1712 static int obd_zombie_impexp_thread(void *unused)
1713 {
1714         unshare_fs_struct();
1715         complete(&obd_zombie_start);
1716
1717         obd_zombie_pid = current_pid();
1718
1719         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1720                 struct l_wait_info lwi = { 0 };
1721
1722                 l_wait_event(obd_zombie_waitq,
1723                              !obd_zombie_impexp_check(NULL), &lwi);
1724                 obd_zombie_impexp_cull();
1725
1726                 /*
1727                  * Notify obd_zombie_barrier callers that queues
1728                  * may be empty.
1729                  */
1730                 wake_up(&obd_zombie_waitq);
1731         }
1732
1733         complete(&obd_zombie_stop);
1734
1735         return 0;
1736 }
1737
1738
1739 /**
1740  * start destroy zombie import/export thread
1741  */
1742 int obd_zombie_impexp_init(void)
1743 {
1744         struct task_struct *task;
1745
1746         INIT_LIST_HEAD(&obd_zombie_imports);
1747         INIT_LIST_HEAD(&obd_zombie_exports);
1748         spin_lock_init(&obd_zombie_impexp_lock);
1749         init_completion(&obd_zombie_start);
1750         init_completion(&obd_zombie_stop);
1751         init_waitqueue_head(&obd_zombie_waitq);
1752         obd_zombie_pid = 0;
1753
1754         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1755         if (IS_ERR(task))
1756                 return PTR_ERR(task);
1757
1758         wait_for_completion(&obd_zombie_start);
1759         return 0;
1760 }
1761 /**
1762  * stop destroy zombie import/export thread
1763  */
1764 void obd_zombie_impexp_stop(void)
1765 {
1766         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1767         obd_zombie_impexp_notify();
1768         wait_for_completion(&obd_zombie_stop);
1769 }
1770
1771 /***** Kernel-userspace comm helpers *******/
1772
1773 /* Get length of entire message, including header */
1774 int kuc_len(int payload_len)
1775 {
1776         return sizeof(struct kuc_hdr) + payload_len;
1777 }
1778 EXPORT_SYMBOL(kuc_len);
1779
1780 /* Get a pointer to kuc header, given a ptr to the payload
1781  * @param p Pointer to payload area
1782  * @returns Pointer to kuc header
1783  */
1784 struct kuc_hdr *kuc_ptr(void *p)
1785 {
1786         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1787         LASSERT(lh->kuc_magic == KUC_MAGIC);
1788         return lh;
1789 }
1790 EXPORT_SYMBOL(kuc_ptr);
1791
1792 /* Test if payload is part of kuc message
1793  * @param p Pointer to payload area
1794  * @returns boolean
1795  */
1796 int kuc_ispayload(void *p)
1797 {
1798         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1799
1800         if (kh->kuc_magic == KUC_MAGIC)
1801                 return 1;
1802         else
1803                 return 0;
1804 }
1805 EXPORT_SYMBOL(kuc_ispayload);
1806
1807 /* Alloc space for a message, and fill in header
1808  * @return Pointer to payload area
1809  */
1810 void *kuc_alloc(int payload_len, int transport, int type)
1811 {
1812         struct kuc_hdr *lh;
1813         int len = kuc_len(payload_len);
1814
1815         OBD_ALLOC(lh, len);
1816         if (lh == NULL)
1817                 return ERR_PTR(-ENOMEM);
1818
1819         lh->kuc_magic = KUC_MAGIC;
1820         lh->kuc_transport = transport;
1821         lh->kuc_msgtype = type;
1822         lh->kuc_msglen = len;
1823
1824         return (void *)(lh + 1);
1825 }
1826 EXPORT_SYMBOL(kuc_alloc);
1827
1828 /* Takes pointer to payload area */
1829 inline void kuc_free(void *p, int payload_len)
1830 {
1831         struct kuc_hdr *lh = kuc_ptr(p);
1832         OBD_FREE(lh, kuc_len(payload_len));
1833 }
1834 EXPORT_SYMBOL(kuc_free);