Merge branch 'sfi' of git://git.kernel.org/pub/scm/linux/kernel/git/lenb/linux into...
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 extern struct list_head obd_types;
47 spinlock_t obd_types_lock;
48
49 struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 struct kmem_cache *import_cachep;
53
54 struct list_head      obd_zombie_imports;
55 struct list_head      obd_zombie_exports;
56 spinlock_t  obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks);
62
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66 /*
67  * support functions: we could use inter-module communication, but this
68  * is more portable to other OS's
69  */
70 static struct obd_device *obd_device_alloc(void)
71 {
72         struct obd_device *obd;
73
74         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75         if (obd != NULL) {
76                 obd->obd_magic = OBD_DEVICE_MAGIC;
77         }
78         return obd;
79 }
80
81 static void obd_device_free(struct obd_device *obd)
82 {
83         LASSERT(obd != NULL);
84         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86         if (obd->obd_namespace != NULL) {
87                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88                        obd, obd->obd_namespace, obd->obd_force);
89                 LBUG();
90         }
91         lu_ref_fini(&obd->obd_reference);
92         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 }
94
95 struct obd_type *class_search_type(const char *name)
96 {
97         struct list_head *tmp;
98         struct obd_type *type;
99
100         spin_lock(&obd_types_lock);
101         list_for_each(tmp, &obd_types) {
102                 type = list_entry(tmp, struct obd_type, typ_chain);
103                 if (strcmp(type->typ_name, name) == 0) {
104                         spin_unlock(&obd_types_lock);
105                         return type;
106                 }
107         }
108         spin_unlock(&obd_types_lock);
109         return NULL;
110 }
111 EXPORT_SYMBOL(class_search_type);
112
113 struct obd_type *class_get_type(const char *name)
114 {
115         struct obd_type *type = class_search_type(name);
116
117         if (!type) {
118                 const char *modname = name;
119
120                 if (strcmp(modname, "obdfilter") == 0)
121                         modname = "ofd";
122
123                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124                         modname = LUSTRE_OSP_NAME;
125
126                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127                         modname = LUSTRE_MDT_NAME;
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137         if (type) {
138                 spin_lock(&type->obd_type_lock);
139                 type->typ_refcnt++;
140                 try_module_get(type->typ_dt_ops->o_owner);
141                 spin_unlock(&type->obd_type_lock);
142         }
143         return type;
144 }
145 EXPORT_SYMBOL(class_get_type);
146
147 void class_put_type(struct obd_type *type)
148 {
149         LASSERT(type);
150         spin_lock(&type->obd_type_lock);
151         type->typ_refcnt--;
152         module_put(type->typ_dt_ops->o_owner);
153         spin_unlock(&type->obd_type_lock);
154 }
155 EXPORT_SYMBOL(class_put_type);
156
157 #define CLASS_MAX_NAME 1024
158
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160                         struct lprocfs_vars *vars, const char *name,
161                         struct lu_device_type *ldt)
162 {
163         struct obd_type *type;
164         int rc = 0;
165
166         /* sanity check */
167         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168
169         if (class_search_type(name)) {
170                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171                 return -EEXIST;
172         }
173
174         rc = -ENOMEM;
175         OBD_ALLOC(type, sizeof(*type));
176         if (type == NULL)
177                 return rc;
178
179         OBD_ALLOC_PTR(type->typ_dt_ops);
180         OBD_ALLOC_PTR(type->typ_md_ops);
181         OBD_ALLOC(type->typ_name, strlen(name) + 1);
182
183         if (type->typ_dt_ops == NULL ||
184             type->typ_md_ops == NULL ||
185             type->typ_name == NULL)
186                 goto failed;
187
188         *(type->typ_dt_ops) = *dt_ops;
189         /* md_ops is optional */
190         if (md_ops)
191                 *(type->typ_md_ops) = *md_ops;
192         strcpy(type->typ_name, name);
193         spin_lock_init(&type->obd_type_lock);
194
195         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196                                               vars, type);
197         if (IS_ERR(type->typ_procroot)) {
198                 rc = PTR_ERR(type->typ_procroot);
199                 type->typ_procroot = NULL;
200                 goto failed;
201         }
202
203         if (ldt != NULL) {
204                 type->typ_lu = ldt;
205                 rc = lu_device_type_init(ldt);
206                 if (rc != 0)
207                         goto failed;
208         }
209
210         spin_lock(&obd_types_lock);
211         list_add(&type->typ_chain, &obd_types);
212         spin_unlock(&obd_types_lock);
213
214         return 0;
215
216  failed:
217         if (type->typ_name != NULL)
218                 OBD_FREE(type->typ_name, strlen(name) + 1);
219         if (type->typ_md_ops != NULL)
220                 OBD_FREE_PTR(type->typ_md_ops);
221         if (type->typ_dt_ops != NULL)
222                 OBD_FREE_PTR(type->typ_dt_ops);
223         OBD_FREE(type, sizeof(*type));
224         return rc;
225 }
226 EXPORT_SYMBOL(class_register_type);
227
228 int class_unregister_type(const char *name)
229 {
230         struct obd_type *type = class_search_type(name);
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 return -EINVAL;
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 return -EBUSY;
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         spin_lock(&obd_types_lock);
254         list_del(&type->typ_chain);
255         spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         return 0;
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *       pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 return ERR_PTR(-EINVAL);
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 return ERR_PTR(-ENODEV);
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 result = ERR_PTR(-ENOMEM);
299                 goto out_type;
300         }
301
302         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304         write_lock(&obd_dev_lock);
305         for (i = 0; i < class_devno_max(); i++) {
306                 struct obd_device *obd = class_num2obd(i);
307
308                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0] = '\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 result = ERR_PTR(-EOVERFLOW);
341                 goto out;
342         }
343
344         if (IS_ERR(result))
345                 goto out;
346
347         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                result->obd_name, result);
349
350         return result;
351 out:
352         obd_device_free(newdev);
353 out_type:
354         class_put_type(type);
355         return result;
356 }
357
358 void class_release_dev(struct obd_device *obd)
359 {
360         struct obd_type *obd_type = obd->obd_type;
361
362         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366         LASSERT(obd_type != NULL);
367
368         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370
371         write_lock(&obd_dev_lock);
372         obd_devs[obd->obd_minor] = NULL;
373         write_unlock(&obd_dev_lock);
374         obd_device_free(obd);
375
376         class_put_type(obd_type);
377 }
378
379 int class_name2dev(const char *name)
380 {
381         int i;
382
383         if (!name)
384                 return -1;
385
386         read_lock(&obd_dev_lock);
387         for (i = 0; i < class_devno_max(); i++) {
388                 struct obd_device *obd = class_num2obd(i);
389
390                 if (obd && strcmp(name, obd->obd_name) == 0) {
391                         /* Make sure we finished attaching before we give
392                            out any references */
393                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394                         if (obd->obd_attached) {
395                                 read_unlock(&obd_dev_lock);
396                                 return i;
397                         }
398                         break;
399                 }
400         }
401         read_unlock(&obd_dev_lock);
402
403         return -1;
404 }
405 EXPORT_SYMBOL(class_name2dev);
406
407 struct obd_device *class_name2obd(const char *name)
408 {
409         int dev = class_name2dev(name);
410
411         if (dev < 0 || dev > class_devno_max())
412                 return NULL;
413         return class_num2obd(dev);
414 }
415 EXPORT_SYMBOL(class_name2obd);
416
417 int class_uuid2dev(struct obd_uuid *uuid)
418 {
419         int i;
420
421         read_lock(&obd_dev_lock);
422         for (i = 0; i < class_devno_max(); i++) {
423                 struct obd_device *obd = class_num2obd(i);
424
425                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427                         read_unlock(&obd_dev_lock);
428                         return i;
429                 }
430         }
431         read_unlock(&obd_dev_lock);
432
433         return -1;
434 }
435 EXPORT_SYMBOL(class_uuid2dev);
436
437 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438 {
439         int dev = class_uuid2dev(uuid);
440         if (dev < 0)
441                 return NULL;
442         return class_num2obd(dev);
443 }
444 EXPORT_SYMBOL(class_uuid2obd);
445
446 /**
447  * Get obd device from ::obd_devs[]
448  *
449  * \param num [in] array index
450  *
451  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452  *       otherwise return the obd device there.
453  */
454 struct obd_device *class_num2obd(int num)
455 {
456         struct obd_device *obd = NULL;
457
458         if (num < class_devno_max()) {
459                 obd = obd_devs[num];
460                 if (obd == NULL)
461                         return NULL;
462
463                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464                          "%p obd_magic %08x != %08x\n",
465                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466                 LASSERTF(obd->obd_minor == num,
467                          "%p obd_minor %0d != %0d\n",
468                          obd, obd->obd_minor, num);
469         }
470
471         return obd;
472 }
473 EXPORT_SYMBOL(class_num2obd);
474
475 /**
476  * Get obd devices count. Device in any
477  *    state are counted
478  * \retval obd device count
479  */
480 int get_devices_count(void)
481 {
482         int index, max_index = class_devno_max(), dev_count = 0;
483
484         read_lock(&obd_dev_lock);
485         for (index = 0; index <= max_index; index++) {
486                 struct obd_device *obd = class_num2obd(index);
487                 if (obd != NULL)
488                         dev_count++;
489         }
490         read_unlock(&obd_dev_lock);
491
492         return dev_count;
493 }
494 EXPORT_SYMBOL(get_devices_count);
495
496 void class_obd_list(void)
497 {
498         char *status;
499         int i;
500
501         read_lock(&obd_dev_lock);
502         for (i = 0; i < class_devno_max(); i++) {
503                 struct obd_device *obd = class_num2obd(i);
504
505                 if (obd == NULL)
506                         continue;
507                 if (obd->obd_stopping)
508                         status = "ST";
509                 else if (obd->obd_set_up)
510                         status = "UP";
511                 else if (obd->obd_attached)
512                         status = "AT";
513                 else
514                         status = "--";
515                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516                          i, status, obd->obd_type->typ_name,
517                          obd->obd_name, obd->obd_uuid.uuid,
518                          atomic_read(&obd->obd_refcount));
519         }
520         read_unlock(&obd_dev_lock);
521         return;
522 }
523
524 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
525    specified, then only the client with that uuid is returned,
526    otherwise any client connected to the tgt is returned. */
527 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
528                                           const char *typ_name,
529                                           struct obd_uuid *grp_uuid)
530 {
531         int i;
532
533         read_lock(&obd_dev_lock);
534         for (i = 0; i < class_devno_max(); i++) {
535                 struct obd_device *obd = class_num2obd(i);
536
537                 if (obd == NULL)
538                         continue;
539                 if ((strncmp(obd->obd_type->typ_name, typ_name,
540                              strlen(typ_name)) == 0)) {
541                         if (obd_uuid_equals(tgt_uuid,
542                                             &obd->u.cli.cl_target_uuid) &&
543                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
544                                                          &obd->obd_uuid) : 1)) {
545                                 read_unlock(&obd_dev_lock);
546                                 return obd;
547                         }
548                 }
549         }
550         read_unlock(&obd_dev_lock);
551
552         return NULL;
553 }
554 EXPORT_SYMBOL(class_find_client_obd);
555
556 /* Iterate the obd_device list looking devices have grp_uuid. Start
557    searching at *next, and if a device is found, the next index to look
558    at is saved in *next. If next is NULL, then the first matching device
559    will always be returned. */
560 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
561 {
562         int i;
563
564         if (next == NULL)
565                 i = 0;
566         else if (*next >= 0 && *next < class_devno_max())
567                 i = *next;
568         else
569                 return NULL;
570
571         read_lock(&obd_dev_lock);
572         for (; i < class_devno_max(); i++) {
573                 struct obd_device *obd = class_num2obd(i);
574
575                 if (obd == NULL)
576                         continue;
577                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578                         if (next != NULL)
579                                 *next = i+1;
580                         read_unlock(&obd_dev_lock);
581                         return obd;
582                 }
583         }
584         read_unlock(&obd_dev_lock);
585
586         return NULL;
587 }
588 EXPORT_SYMBOL(class_devices_in_group);
589
590 /**
591  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592  * adjust sptlrpc settings accordingly.
593  */
594 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595 {
596         struct obd_device  *obd;
597         const char       *type;
598         int              i, rc = 0, rc2;
599
600         LASSERT(namelen > 0);
601
602         read_lock(&obd_dev_lock);
603         for (i = 0; i < class_devno_max(); i++) {
604                 obd = class_num2obd(i);
605
606                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607                         continue;
608
609                 /* only notify mdc, osc, mdt, ost */
610                 type = obd->obd_type->typ_name;
611                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614                     strcmp(type, LUSTRE_OST_NAME) != 0)
615                         continue;
616
617                 if (strncmp(obd->obd_name, fsname, namelen))
618                         continue;
619
620                 class_incref(obd, __func__, obd);
621                 read_unlock(&obd_dev_lock);
622                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623                                          sizeof(KEY_SPTLRPC_CONF),
624                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
625                 rc = rc ? rc : rc2;
626                 class_decref(obd, __func__, obd);
627                 read_lock(&obd_dev_lock);
628         }
629         read_unlock(&obd_dev_lock);
630         return rc;
631 }
632 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633
634 void obd_cleanup_caches(void)
635 {
636         if (obd_device_cachep) {
637                 kmem_cache_destroy(obd_device_cachep);
638                 obd_device_cachep = NULL;
639         }
640         if (obdo_cachep) {
641                 kmem_cache_destroy(obdo_cachep);
642                 obdo_cachep = NULL;
643         }
644         if (import_cachep) {
645                 kmem_cache_destroy(import_cachep);
646                 import_cachep = NULL;
647         }
648         if (capa_cachep) {
649                 kmem_cache_destroy(capa_cachep);
650                 capa_cachep = NULL;
651         }
652 }
653
654 int obd_init_caches(void)
655 {
656         LASSERT(obd_device_cachep == NULL);
657         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658                                                  sizeof(struct obd_device),
659                                                  0, 0, NULL);
660         if (!obd_device_cachep)
661                 goto out;
662
663         LASSERT(obdo_cachep == NULL);
664         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665                                            0, 0, NULL);
666         if (!obdo_cachep)
667                 goto out;
668
669         LASSERT(import_cachep == NULL);
670         import_cachep = kmem_cache_create("ll_import_cache",
671                                              sizeof(struct obd_import),
672                                              0, 0, NULL);
673         if (!import_cachep)
674                 goto out;
675
676         LASSERT(capa_cachep == NULL);
677         capa_cachep = kmem_cache_create("capa_cache",
678                                            sizeof(struct obd_capa), 0, 0, NULL);
679         if (!capa_cachep)
680                 goto out;
681
682         return 0;
683  out:
684         obd_cleanup_caches();
685         return -ENOMEM;
686
687 }
688
689 /* map connection to client */
690 struct obd_export *class_conn2export(struct lustre_handle *conn)
691 {
692         struct obd_export *export;
693
694         if (!conn) {
695                 CDEBUG(D_CACHE, "looking for null handle\n");
696                 return NULL;
697         }
698
699         if (conn->cookie == -1) {  /* this means assign a new connection */
700                 CDEBUG(D_CACHE, "want a new connection\n");
701                 return NULL;
702         }
703
704         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705         export = class_handle2object(conn->cookie);
706         return export;
707 }
708 EXPORT_SYMBOL(class_conn2export);
709
710 struct obd_device *class_exp2obd(struct obd_export *exp)
711 {
712         if (exp)
713                 return exp->exp_obd;
714         return NULL;
715 }
716 EXPORT_SYMBOL(class_exp2obd);
717
718 struct obd_device *class_conn2obd(struct lustre_handle *conn)
719 {
720         struct obd_export *export;
721         export = class_conn2export(conn);
722         if (export) {
723                 struct obd_device *obd = export->exp_obd;
724                 class_export_put(export);
725                 return obd;
726         }
727         return NULL;
728 }
729 EXPORT_SYMBOL(class_conn2obd);
730
731 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 {
733         struct obd_device *obd = exp->exp_obd;
734         if (obd == NULL)
735                 return NULL;
736         return obd->u.cli.cl_import;
737 }
738 EXPORT_SYMBOL(class_exp2cliimp);
739
740 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741 {
742         struct obd_device *obd = class_conn2obd(conn);
743         if (obd == NULL)
744                 return NULL;
745         return obd->u.cli.cl_import;
746 }
747 EXPORT_SYMBOL(class_conn2cliimp);
748
749 /* Export management functions */
750 static void class_export_destroy(struct obd_export *exp)
751 {
752         struct obd_device *obd = exp->exp_obd;
753
754         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755         LASSERT(obd != NULL);
756
757         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758                exp->exp_client_uuid.uuid, obd->obd_name);
759
760         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761         if (exp->exp_connection)
762                 ptlrpc_put_connection_superhack(exp->exp_connection);
763
764         LASSERT(list_empty(&exp->exp_outstanding_replies));
765         LASSERT(list_empty(&exp->exp_uncommitted_replies));
766         LASSERT(list_empty(&exp->exp_req_replay_queue));
767         LASSERT(list_empty(&exp->exp_hp_rpcs));
768         obd_destroy_export(exp);
769         class_decref(obd, "export", exp);
770
771         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772 }
773
774 static void export_handle_addref(void *export)
775 {
776         class_export_get(export);
777 }
778
779 static struct portals_handle_ops export_handle_ops = {
780         .hop_addref = export_handle_addref,
781         .hop_free   = NULL,
782 };
783
784 struct obd_export *class_export_get(struct obd_export *exp)
785 {
786         atomic_inc(&exp->exp_refcount);
787         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788                atomic_read(&exp->exp_refcount));
789         return exp;
790 }
791 EXPORT_SYMBOL(class_export_get);
792
793 void class_export_put(struct obd_export *exp)
794 {
795         LASSERT(exp != NULL);
796         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798                atomic_read(&exp->exp_refcount) - 1);
799
800         if (atomic_dec_and_test(&exp->exp_refcount)) {
801                 LASSERT(!list_empty(&exp->exp_obd_chain));
802                 CDEBUG(D_IOCTL, "final put %p/%s\n",
803                        exp, exp->exp_client_uuid.uuid);
804
805                 /* release nid stat refererence */
806                 lprocfs_exp_cleanup(exp);
807
808                 obd_zombie_export_add(exp);
809         }
810 }
811 EXPORT_SYMBOL(class_export_put);
812
813 /* Creates a new export, adds it to the hash table, and returns a
814  * pointer to it. The refcount is 2: one for the hash reference, and
815  * one for the pointer returned by this function. */
816 struct obd_export *class_new_export(struct obd_device *obd,
817                                     struct obd_uuid *cluuid)
818 {
819         struct obd_export *export;
820         struct cfs_hash *hash = NULL;
821         int rc = 0;
822
823         OBD_ALLOC_PTR(export);
824         if (!export)
825                 return ERR_PTR(-ENOMEM);
826
827         export->exp_conn_cnt = 0;
828         export->exp_lock_hash = NULL;
829         export->exp_flock_hash = NULL;
830         atomic_set(&export->exp_refcount, 2);
831         atomic_set(&export->exp_rpc_count, 0);
832         atomic_set(&export->exp_cb_count, 0);
833         atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835         INIT_LIST_HEAD(&export->exp_locks_list);
836         spin_lock_init(&export->exp_locks_list_guard);
837 #endif
838         atomic_set(&export->exp_replay_count, 0);
839         export->exp_obd = obd;
840         INIT_LIST_HEAD(&export->exp_outstanding_replies);
841         spin_lock_init(&export->exp_uncommitted_replies_lock);
842         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843         INIT_LIST_HEAD(&export->exp_req_replay_queue);
844         INIT_LIST_HEAD(&export->exp_handle.h_link);
845         INIT_LIST_HEAD(&export->exp_hp_rpcs);
846         class_handle_hash(&export->exp_handle, &export_handle_ops);
847         export->exp_last_request_time = get_seconds();
848         spin_lock_init(&export->exp_lock);
849         spin_lock_init(&export->exp_rpc_lock);
850         INIT_HLIST_NODE(&export->exp_uuid_hash);
851         INIT_HLIST_NODE(&export->exp_nid_hash);
852         spin_lock_init(&export->exp_bl_list_lock);
853         INIT_LIST_HEAD(&export->exp_bl_list);
854
855         export->exp_sp_peer = LUSTRE_SP_ANY;
856         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857         export->exp_client_uuid = *cluuid;
858         obd_init_export(export);
859
860         spin_lock(&obd->obd_dev_lock);
861         /* shouldn't happen, but might race */
862         if (obd->obd_stopping) {
863                 rc = -ENODEV;
864                 goto exit_unlock;
865         }
866
867         hash = cfs_hash_getref(obd->obd_uuid_hash);
868         if (hash == NULL) {
869                 rc = -ENODEV;
870                 goto exit_unlock;
871         }
872         spin_unlock(&obd->obd_dev_lock);
873
874         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876                 if (rc != 0) {
877                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878                                       obd->obd_name, cluuid->uuid, rc);
879                         rc = -EALREADY;
880                         goto exit_err;
881                 }
882         }
883
884         spin_lock(&obd->obd_dev_lock);
885         if (obd->obd_stopping) {
886                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
887                 rc = -ENODEV;
888                 goto exit_unlock;
889         }
890
891         class_incref(obd, "export", export);
892         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893         list_add_tail(&export->exp_obd_chain_timed,
894                           &export->exp_obd->obd_exports_timed);
895         export->exp_obd->obd_num_exports++;
896         spin_unlock(&obd->obd_dev_lock);
897         cfs_hash_putref(hash);
898         return export;
899
900 exit_unlock:
901         spin_unlock(&obd->obd_dev_lock);
902 exit_err:
903         if (hash)
904                 cfs_hash_putref(hash);
905         class_handle_unhash(&export->exp_handle);
906         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907         obd_destroy_export(export);
908         OBD_FREE_PTR(export);
909         return ERR_PTR(rc);
910 }
911 EXPORT_SYMBOL(class_new_export);
912
913 void class_unlink_export(struct obd_export *exp)
914 {
915         class_handle_unhash(&exp->exp_handle);
916
917         spin_lock(&exp->exp_obd->obd_dev_lock);
918         /* delete an uuid-export hashitem from hashtables */
919         if (!hlist_unhashed(&exp->exp_uuid_hash))
920                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921                              &exp->exp_client_uuid,
922                              &exp->exp_uuid_hash);
923
924         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925         list_del_init(&exp->exp_obd_chain_timed);
926         exp->exp_obd->obd_num_exports--;
927         spin_unlock(&exp->exp_obd->obd_dev_lock);
928         class_export_put(exp);
929 }
930 EXPORT_SYMBOL(class_unlink_export);
931
932 /* Import management functions */
933 void class_import_destroy(struct obd_import *imp)
934 {
935         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936                 imp->imp_obd->obd_name);
937
938         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939
940         ptlrpc_put_connection_superhack(imp->imp_connection);
941
942         while (!list_empty(&imp->imp_conn_list)) {
943                 struct obd_import_conn *imp_conn;
944
945                 imp_conn = list_entry(imp->imp_conn_list.next,
946                                           struct obd_import_conn, oic_item);
947                 list_del_init(&imp_conn->oic_item);
948                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949                 OBD_FREE(imp_conn, sizeof(*imp_conn));
950         }
951
952         LASSERT(imp->imp_sec == NULL);
953         class_decref(imp->imp_obd, "import", imp);
954         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
955 }
956
957 static void import_handle_addref(void *import)
958 {
959         class_import_get(import);
960 }
961
962 static struct portals_handle_ops import_handle_ops = {
963         .hop_addref = import_handle_addref,
964         .hop_free   = NULL,
965 };
966
967 struct obd_import *class_import_get(struct obd_import *import)
968 {
969         atomic_inc(&import->imp_refcount);
970         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971                atomic_read(&import->imp_refcount),
972                import->imp_obd->obd_name);
973         return import;
974 }
975 EXPORT_SYMBOL(class_import_get);
976
977 void class_import_put(struct obd_import *imp)
978 {
979         LASSERT(list_empty(&imp->imp_zombie_chain));
980         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981
982         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983                atomic_read(&imp->imp_refcount) - 1,
984                imp->imp_obd->obd_name);
985
986         if (atomic_dec_and_test(&imp->imp_refcount)) {
987                 CDEBUG(D_INFO, "final put import %p\n", imp);
988                 obd_zombie_import_add(imp);
989         }
990
991         /* catch possible import put race */
992         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993 }
994 EXPORT_SYMBOL(class_import_put);
995
996 static void init_imp_at(struct imp_at *at) {
997         int i;
998         at_init(&at->iat_net_latency, 0, 0);
999         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000                 /* max service estimates are tracked on the server side, so
1001                    don't use the AT history here, just use the last reported
1002                    val. (But keep hist for proc histogram, worst_ever) */
1003                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004                         AT_FLG_NOHIST);
1005         }
1006 }
1007
1008 struct obd_import *class_new_import(struct obd_device *obd)
1009 {
1010         struct obd_import *imp;
1011
1012         OBD_ALLOC(imp, sizeof(*imp));
1013         if (imp == NULL)
1014                 return NULL;
1015
1016         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018         INIT_LIST_HEAD(&imp->imp_replay_list);
1019         INIT_LIST_HEAD(&imp->imp_sending_list);
1020         INIT_LIST_HEAD(&imp->imp_delayed_list);
1021         INIT_LIST_HEAD(&imp->imp_committed_list);
1022         imp->imp_replay_cursor = &imp->imp_committed_list;
1023         spin_lock_init(&imp->imp_lock);
1024         imp->imp_last_success_conn = 0;
1025         imp->imp_state = LUSTRE_IMP_NEW;
1026         imp->imp_obd = class_incref(obd, "import", imp);
1027         mutex_init(&imp->imp_sec_mutex);
1028         init_waitqueue_head(&imp->imp_recovery_waitq);
1029
1030         atomic_set(&imp->imp_refcount, 2);
1031         atomic_set(&imp->imp_unregistering, 0);
1032         atomic_set(&imp->imp_inflight, 0);
1033         atomic_set(&imp->imp_replay_inflight, 0);
1034         atomic_set(&imp->imp_inval_count, 0);
1035         INIT_LIST_HEAD(&imp->imp_conn_list);
1036         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038         init_imp_at(&imp->imp_at);
1039
1040         /* the default magic is V2, will be used in connect RPC, and
1041          * then adjusted according to the flags in request/reply. */
1042         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043
1044         return imp;
1045 }
1046 EXPORT_SYMBOL(class_new_import);
1047
1048 void class_destroy_import(struct obd_import *import)
1049 {
1050         LASSERT(import != NULL);
1051         LASSERT(import != LP_POISON);
1052
1053         class_handle_unhash(&import->imp_handle);
1054
1055         spin_lock(&import->imp_lock);
1056         import->imp_generation++;
1057         spin_unlock(&import->imp_lock);
1058         class_import_put(import);
1059 }
1060 EXPORT_SYMBOL(class_destroy_import);
1061
1062 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1063
1064 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065 {
1066         spin_lock(&exp->exp_locks_list_guard);
1067
1068         LASSERT(lock->l_exp_refs_nr >= 0);
1069
1070         if (lock->l_exp_refs_target != NULL &&
1071             lock->l_exp_refs_target != exp) {
1072                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073                               exp, lock, lock->l_exp_refs_target);
1074         }
1075         if ((lock->l_exp_refs_nr ++) == 0) {
1076                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077                 lock->l_exp_refs_target = exp;
1078         }
1079         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080                lock, exp, lock->l_exp_refs_nr);
1081         spin_unlock(&exp->exp_locks_list_guard);
1082 }
1083 EXPORT_SYMBOL(__class_export_add_lock_ref);
1084
1085 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 {
1087         spin_lock(&exp->exp_locks_list_guard);
1088         LASSERT(lock->l_exp_refs_nr > 0);
1089         if (lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1091                               lock, lock->l_exp_refs_target, exp);
1092         }
1093         if (-- lock->l_exp_refs_nr == 0) {
1094                 list_del_init(&lock->l_exp_refs_link);
1095                 lock->l_exp_refs_target = NULL;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_del_lock_ref);
1102 #endif
1103
1104 /* A connection defines an export context in which preallocation can
1105    be managed. This releases the export pointer reference, and returns
1106    the export handle, so the export refcount is 1 when this function
1107    returns. */
1108 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1109                   struct obd_uuid *cluuid)
1110 {
1111         struct obd_export *export;
1112         LASSERT(conn != NULL);
1113         LASSERT(obd != NULL);
1114         LASSERT(cluuid != NULL);
1115
1116         export = class_new_export(obd, cluuid);
1117         if (IS_ERR(export))
1118                 return PTR_ERR(export);
1119
1120         conn->cookie = export->exp_handle.h_cookie;
1121         class_export_put(export);
1122
1123         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1124                cluuid->uuid, conn->cookie);
1125         return 0;
1126 }
1127 EXPORT_SYMBOL(class_connect);
1128
1129 /* if export is involved in recovery then clean up related things */
1130 void class_export_recovery_cleanup(struct obd_export *exp)
1131 {
1132         struct obd_device *obd = exp->exp_obd;
1133
1134         spin_lock(&obd->obd_recovery_task_lock);
1135         if (exp->exp_delayed)
1136                 obd->obd_delayed_clients--;
1137         if (obd->obd_recovering) {
1138                 if (exp->exp_in_recovery) {
1139                         spin_lock(&exp->exp_lock);
1140                         exp->exp_in_recovery = 0;
1141                         spin_unlock(&exp->exp_lock);
1142                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1143                         atomic_dec(&obd->obd_connected_clients);
1144                 }
1145
1146                 /* if called during recovery then should update
1147                  * obd_stale_clients counter,
1148                  * lightweight exports are not counted */
1149                 if (exp->exp_failed &&
1150                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1151                         exp->exp_obd->obd_stale_clients++;
1152         }
1153         spin_unlock(&obd->obd_recovery_task_lock);
1154         /** Cleanup req replay fields */
1155         if (exp->exp_req_replay_needed) {
1156                 spin_lock(&exp->exp_lock);
1157                 exp->exp_req_replay_needed = 0;
1158                 spin_unlock(&exp->exp_lock);
1159                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160                 atomic_dec(&obd->obd_req_replay_clients);
1161         }
1162         /** Cleanup lock replay data */
1163         if (exp->exp_lock_replay_needed) {
1164                 spin_lock(&exp->exp_lock);
1165                 exp->exp_lock_replay_needed = 0;
1166                 spin_unlock(&exp->exp_lock);
1167                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168                 atomic_dec(&obd->obd_lock_replay_clients);
1169         }
1170 }
1171
1172 /* This function removes 1-3 references from the export:
1173  * 1 - for export pointer passed
1174  * and if disconnect really need
1175  * 2 - removing from hash
1176  * 3 - in client_unlink_export
1177  * The export pointer passed to this function can destroyed */
1178 int class_disconnect(struct obd_export *export)
1179 {
1180         int already_disconnected;
1181
1182         if (export == NULL) {
1183                 CWARN("attempting to free NULL export %p\n", export);
1184                 return -EINVAL;
1185         }
1186
1187         spin_lock(&export->exp_lock);
1188         already_disconnected = export->exp_disconnected;
1189         export->exp_disconnected = 1;
1190         spin_unlock(&export->exp_lock);
1191
1192         /* class_cleanup(), abort_recovery(), and class_fail_export()
1193          * all end up in here, and if any of them race we shouldn't
1194          * call extra class_export_puts(). */
1195         if (already_disconnected) {
1196                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1197                 goto no_disconn;
1198         }
1199
1200         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1201                export->exp_handle.h_cookie);
1202
1203         if (!hlist_unhashed(&export->exp_nid_hash))
1204                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1205                              &export->exp_connection->c_peer.nid,
1206                              &export->exp_nid_hash);
1207
1208         class_export_recovery_cleanup(export);
1209         class_unlink_export(export);
1210 no_disconn:
1211         class_export_put(export);
1212         return 0;
1213 }
1214 EXPORT_SYMBOL(class_disconnect);
1215
1216 /* Return non-zero for a fully connected export */
1217 int class_connected_export(struct obd_export *exp)
1218 {
1219         if (exp) {
1220                 int connected;
1221                 spin_lock(&exp->exp_lock);
1222                 connected = (exp->exp_conn_cnt > 0);
1223                 spin_unlock(&exp->exp_lock);
1224                 return connected;
1225         }
1226         return 0;
1227 }
1228 EXPORT_SYMBOL(class_connected_export);
1229
1230 static void class_disconnect_export_list(struct list_head *list,
1231                                          enum obd_option flags)
1232 {
1233         int rc;
1234         struct obd_export *exp;
1235
1236         /* It's possible that an export may disconnect itself, but
1237          * nothing else will be added to this list. */
1238         while (!list_empty(list)) {
1239                 exp = list_entry(list->next, struct obd_export,
1240                                      exp_obd_chain);
1241                 /* need for safe call CDEBUG after obd_disconnect */
1242                 class_export_get(exp);
1243
1244                 spin_lock(&exp->exp_lock);
1245                 exp->exp_flags = flags;
1246                 spin_unlock(&exp->exp_lock);
1247
1248                 if (obd_uuid_equals(&exp->exp_client_uuid,
1249                                     &exp->exp_obd->obd_uuid)) {
1250                         CDEBUG(D_HA,
1251                                "exp %p export uuid == obd uuid, don't discon\n",
1252                                exp);
1253                         /* Need to delete this now so we don't end up pointing
1254                          * to work_list later when this export is cleaned up. */
1255                         list_del_init(&exp->exp_obd_chain);
1256                         class_export_put(exp);
1257                         continue;
1258                 }
1259
1260                 class_export_get(exp);
1261                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1262                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1263                        exp, exp->exp_last_request_time);
1264                 /* release one export reference anyway */
1265                 rc = obd_disconnect(exp);
1266
1267                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1268                        obd_export_nid2str(exp), exp, rc);
1269                 class_export_put(exp);
1270         }
1271 }
1272
1273 void class_disconnect_exports(struct obd_device *obd)
1274 {
1275         struct list_head work_list;
1276
1277         /* Move all of the exports from obd_exports to a work list, en masse. */
1278         INIT_LIST_HEAD(&work_list);
1279         spin_lock(&obd->obd_dev_lock);
1280         list_splice_init(&obd->obd_exports, &work_list);
1281         list_splice_init(&obd->obd_delayed_exports, &work_list);
1282         spin_unlock(&obd->obd_dev_lock);
1283
1284         if (!list_empty(&work_list)) {
1285                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1286                        obd->obd_minor, obd);
1287                 class_disconnect_export_list(&work_list,
1288                                              exp_flags_from_obd(obd));
1289         } else
1290                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1291                        obd->obd_minor, obd);
1292 }
1293 EXPORT_SYMBOL(class_disconnect_exports);
1294
1295 /* Remove exports that have not completed recovery.
1296  */
1297 void class_disconnect_stale_exports(struct obd_device *obd,
1298                                     int (*test_export)(struct obd_export *))
1299 {
1300         struct list_head work_list;
1301         struct obd_export *exp, *n;
1302         int evicted = 0;
1303
1304         INIT_LIST_HEAD(&work_list);
1305         spin_lock(&obd->obd_dev_lock);
1306         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1307                                      exp_obd_chain) {
1308                 /* don't count self-export as client */
1309                 if (obd_uuid_equals(&exp->exp_client_uuid,
1310                                     &exp->exp_obd->obd_uuid))
1311                         continue;
1312
1313                 /* don't evict clients which have no slot in last_rcvd
1314                  * (e.g. lightweight connection) */
1315                 if (exp->exp_target_data.ted_lr_idx == -1)
1316                         continue;
1317
1318                 spin_lock(&exp->exp_lock);
1319                 if (exp->exp_failed || test_export(exp)) {
1320                         spin_unlock(&exp->exp_lock);
1321                         continue;
1322                 }
1323                 exp->exp_failed = 1;
1324                 spin_unlock(&exp->exp_lock);
1325
1326                 list_move(&exp->exp_obd_chain, &work_list);
1327                 evicted++;
1328                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1329                        obd->obd_name, exp->exp_client_uuid.uuid,
1330                        exp->exp_connection == NULL ? "<unknown>" :
1331                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1332                 print_export_data(exp, "EVICTING", 0);
1333         }
1334         spin_unlock(&obd->obd_dev_lock);
1335
1336         if (evicted)
1337                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1338                               obd->obd_name, evicted);
1339
1340         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1341                                                  OBD_OPT_ABORT_RECOV);
1342 }
1343 EXPORT_SYMBOL(class_disconnect_stale_exports);
1344
1345 void class_fail_export(struct obd_export *exp)
1346 {
1347         int rc, already_failed;
1348
1349         spin_lock(&exp->exp_lock);
1350         already_failed = exp->exp_failed;
1351         exp->exp_failed = 1;
1352         spin_unlock(&exp->exp_lock);
1353
1354         if (already_failed) {
1355                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1356                        exp, exp->exp_client_uuid.uuid);
1357                 return;
1358         }
1359
1360         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1361                exp, exp->exp_client_uuid.uuid);
1362
1363         if (obd_dump_on_timeout)
1364                 libcfs_debug_dumplog();
1365
1366         /* need for safe call CDEBUG after obd_disconnect */
1367         class_export_get(exp);
1368
1369         /* Most callers into obd_disconnect are removing their own reference
1370          * (request, for example) in addition to the one from the hash table.
1371          * We don't have such a reference here, so make one. */
1372         class_export_get(exp);
1373         rc = obd_disconnect(exp);
1374         if (rc)
1375                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1376         else
1377                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1378                        exp, exp->exp_client_uuid.uuid);
1379         class_export_put(exp);
1380 }
1381 EXPORT_SYMBOL(class_fail_export);
1382
1383 char *obd_export_nid2str(struct obd_export *exp)
1384 {
1385         if (exp->exp_connection != NULL)
1386                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1387
1388         return "(no nid)";
1389 }
1390 EXPORT_SYMBOL(obd_export_nid2str);
1391
1392 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1393 {
1394         struct cfs_hash *nid_hash;
1395         struct obd_export *doomed_exp = NULL;
1396         int exports_evicted = 0;
1397
1398         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1399
1400         spin_lock(&obd->obd_dev_lock);
1401         /* umount has run already, so evict thread should leave
1402          * its task to umount thread now */
1403         if (obd->obd_stopping) {
1404                 spin_unlock(&obd->obd_dev_lock);
1405                 return exports_evicted;
1406         }
1407         nid_hash = obd->obd_nid_hash;
1408         cfs_hash_getref(nid_hash);
1409         spin_unlock(&obd->obd_dev_lock);
1410
1411         do {
1412                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1413                 if (doomed_exp == NULL)
1414                         break;
1415
1416                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1417                          "nid %s found, wanted nid %s, requested nid %s\n",
1418                          obd_export_nid2str(doomed_exp),
1419                          libcfs_nid2str(nid_key), nid);
1420                 LASSERTF(doomed_exp != obd->obd_self_export,
1421                          "self-export is hashed by NID?\n");
1422                 exports_evicted++;
1423                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1424                               obd->obd_name,
1425                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1426                               obd_export_nid2str(doomed_exp));
1427                 class_fail_export(doomed_exp);
1428                 class_export_put(doomed_exp);
1429         } while (1);
1430
1431         cfs_hash_putref(nid_hash);
1432
1433         if (!exports_evicted)
1434                 CDEBUG(D_HA,
1435                        "%s: can't disconnect NID '%s': no exports found\n",
1436                        obd->obd_name, nid);
1437         return exports_evicted;
1438 }
1439 EXPORT_SYMBOL(obd_export_evict_by_nid);
1440
1441 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1442 {
1443         struct cfs_hash *uuid_hash;
1444         struct obd_export *doomed_exp = NULL;
1445         struct obd_uuid doomed_uuid;
1446         int exports_evicted = 0;
1447
1448         spin_lock(&obd->obd_dev_lock);
1449         if (obd->obd_stopping) {
1450                 spin_unlock(&obd->obd_dev_lock);
1451                 return exports_evicted;
1452         }
1453         uuid_hash = obd->obd_uuid_hash;
1454         cfs_hash_getref(uuid_hash);
1455         spin_unlock(&obd->obd_dev_lock);
1456
1457         obd_str2uuid(&doomed_uuid, uuid);
1458         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1459                 CERROR("%s: can't evict myself\n", obd->obd_name);
1460                 cfs_hash_putref(uuid_hash);
1461                 return exports_evicted;
1462         }
1463
1464         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1465
1466         if (doomed_exp == NULL) {
1467                 CERROR("%s: can't disconnect %s: no exports found\n",
1468                        obd->obd_name, uuid);
1469         } else {
1470                 CWARN("%s: evicting %s at administrative request\n",
1471                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1472                 class_fail_export(doomed_exp);
1473                 class_export_put(doomed_exp);
1474                 exports_evicted++;
1475         }
1476         cfs_hash_putref(uuid_hash);
1477
1478         return exports_evicted;
1479 }
1480 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1481
1482 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1483 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1484 EXPORT_SYMBOL(class_export_dump_hook);
1485 #endif
1486
1487 static void print_export_data(struct obd_export *exp, const char *status,
1488                               int locks)
1489 {
1490         struct ptlrpc_reply_state *rs;
1491         struct ptlrpc_reply_state *first_reply = NULL;
1492         int nreplies = 0;
1493
1494         spin_lock(&exp->exp_lock);
1495         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1496                                 rs_exp_list) {
1497                 if (nreplies == 0)
1498                         first_reply = rs;
1499                 nreplies++;
1500         }
1501         spin_unlock(&exp->exp_lock);
1502
1503         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1504                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1505                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1506                atomic_read(&exp->exp_rpc_count),
1507                atomic_read(&exp->exp_cb_count),
1508                atomic_read(&exp->exp_locks_count),
1509                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1510                nreplies, first_reply, nreplies > 3 ? "..." : "",
1511                exp->exp_last_committed);
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513         if (locks && class_export_dump_hook != NULL)
1514                 class_export_dump_hook(exp);
1515 #endif
1516 }
1517
1518 void dump_exports(struct obd_device *obd, int locks)
1519 {
1520         struct obd_export *exp;
1521
1522         spin_lock(&obd->obd_dev_lock);
1523         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1524                 print_export_data(exp, "ACTIVE", locks);
1525         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1526                 print_export_data(exp, "UNLINKED", locks);
1527         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1528                 print_export_data(exp, "DELAYED", locks);
1529         spin_unlock(&obd->obd_dev_lock);
1530         spin_lock(&obd_zombie_impexp_lock);
1531         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1532                 print_export_data(exp, "ZOMBIE", locks);
1533         spin_unlock(&obd_zombie_impexp_lock);
1534 }
1535 EXPORT_SYMBOL(dump_exports);
1536
1537 void obd_exports_barrier(struct obd_device *obd)
1538 {
1539         int waited = 2;
1540         LASSERT(list_empty(&obd->obd_exports));
1541         spin_lock(&obd->obd_dev_lock);
1542         while (!list_empty(&obd->obd_unlinked_exports)) {
1543                 spin_unlock(&obd->obd_dev_lock);
1544                 set_current_state(TASK_UNINTERRUPTIBLE);
1545                 schedule_timeout(cfs_time_seconds(waited));
1546                 if (waited > 5 && IS_PO2(waited)) {
1547                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1548                                       obd->obd_name, waited,
1549                                       atomic_read(&obd->obd_refcount));
1550                         dump_exports(obd, 1);
1551                 }
1552                 waited *= 2;
1553                 spin_lock(&obd->obd_dev_lock);
1554         }
1555         spin_unlock(&obd->obd_dev_lock);
1556 }
1557 EXPORT_SYMBOL(obd_exports_barrier);
1558
1559 /* Total amount of zombies to be destroyed */
1560 static int zombies_count = 0;
1561
1562 /**
1563  * kill zombie imports and exports
1564  */
1565 void obd_zombie_impexp_cull(void)
1566 {
1567         struct obd_import *import;
1568         struct obd_export *export;
1569
1570         do {
1571                 spin_lock(&obd_zombie_impexp_lock);
1572
1573                 import = NULL;
1574                 if (!list_empty(&obd_zombie_imports)) {
1575                         import = list_entry(obd_zombie_imports.next,
1576                                                 struct obd_import,
1577                                                 imp_zombie_chain);
1578                         list_del_init(&import->imp_zombie_chain);
1579                 }
1580
1581                 export = NULL;
1582                 if (!list_empty(&obd_zombie_exports)) {
1583                         export = list_entry(obd_zombie_exports.next,
1584                                                 struct obd_export,
1585                                                 exp_obd_chain);
1586                         list_del_init(&export->exp_obd_chain);
1587                 }
1588
1589                 spin_unlock(&obd_zombie_impexp_lock);
1590
1591                 if (import != NULL) {
1592                         class_import_destroy(import);
1593                         spin_lock(&obd_zombie_impexp_lock);
1594                         zombies_count--;
1595                         spin_unlock(&obd_zombie_impexp_lock);
1596                 }
1597
1598                 if (export != NULL) {
1599                         class_export_destroy(export);
1600                         spin_lock(&obd_zombie_impexp_lock);
1601                         zombies_count--;
1602                         spin_unlock(&obd_zombie_impexp_lock);
1603                 }
1604
1605                 cond_resched();
1606         } while (import != NULL || export != NULL);
1607 }
1608
1609 static struct completion        obd_zombie_start;
1610 static struct completion        obd_zombie_stop;
1611 static unsigned long            obd_zombie_flags;
1612 static wait_queue_head_t                obd_zombie_waitq;
1613 static pid_t                    obd_zombie_pid;
1614
1615 enum {
1616         OBD_ZOMBIE_STOP         = 0x0001,
1617 };
1618
1619 /**
1620  * check for work for kill zombie import/export thread.
1621  */
1622 static int obd_zombie_impexp_check(void *arg)
1623 {
1624         int rc;
1625
1626         spin_lock(&obd_zombie_impexp_lock);
1627         rc = (zombies_count == 0) &&
1628              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1629         spin_unlock(&obd_zombie_impexp_lock);
1630
1631         return rc;
1632 }
1633
1634 /**
1635  * Add export to the obd_zombie thread and notify it.
1636  */
1637 static void obd_zombie_export_add(struct obd_export *exp) {
1638         spin_lock(&exp->exp_obd->obd_dev_lock);
1639         LASSERT(!list_empty(&exp->exp_obd_chain));
1640         list_del_init(&exp->exp_obd_chain);
1641         spin_unlock(&exp->exp_obd->obd_dev_lock);
1642         spin_lock(&obd_zombie_impexp_lock);
1643         zombies_count++;
1644         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1645         spin_unlock(&obd_zombie_impexp_lock);
1646
1647         obd_zombie_impexp_notify();
1648 }
1649
1650 /**
1651  * Add import to the obd_zombie thread and notify it.
1652  */
1653 static void obd_zombie_import_add(struct obd_import *imp) {
1654         LASSERT(imp->imp_sec == NULL);
1655         LASSERT(imp->imp_rq_pool == NULL);
1656         spin_lock(&obd_zombie_impexp_lock);
1657         LASSERT(list_empty(&imp->imp_zombie_chain));
1658         zombies_count++;
1659         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1660         spin_unlock(&obd_zombie_impexp_lock);
1661
1662         obd_zombie_impexp_notify();
1663 }
1664
1665 /**
1666  * notify import/export destroy thread about new zombie.
1667  */
1668 static void obd_zombie_impexp_notify(void)
1669 {
1670         /*
1671          * Make sure obd_zombie_impexp_thread get this notification.
1672          * It is possible this signal only get by obd_zombie_barrier, and
1673          * barrier gulps this notification and sleeps away and hangs ensues
1674          */
1675         wake_up_all(&obd_zombie_waitq);
1676 }
1677
1678 /**
1679  * check whether obd_zombie is idle
1680  */
1681 static int obd_zombie_is_idle(void)
1682 {
1683         int rc;
1684
1685         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1686         spin_lock(&obd_zombie_impexp_lock);
1687         rc = (zombies_count == 0);
1688         spin_unlock(&obd_zombie_impexp_lock);
1689         return rc;
1690 }
1691
1692 /**
1693  * wait when obd_zombie import/export queues become empty
1694  */
1695 void obd_zombie_barrier(void)
1696 {
1697         struct l_wait_info lwi = { 0 };
1698
1699         if (obd_zombie_pid == current_pid())
1700                 /* don't wait for myself */
1701                 return;
1702         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1703 }
1704 EXPORT_SYMBOL(obd_zombie_barrier);
1705
1706
1707 /**
1708  * destroy zombie export/import thread.
1709  */
1710 static int obd_zombie_impexp_thread(void *unused)
1711 {
1712         unshare_fs_struct();
1713         complete(&obd_zombie_start);
1714
1715         obd_zombie_pid = current_pid();
1716
1717         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1718                 struct l_wait_info lwi = { 0 };
1719
1720                 l_wait_event(obd_zombie_waitq,
1721                              !obd_zombie_impexp_check(NULL), &lwi);
1722                 obd_zombie_impexp_cull();
1723
1724                 /*
1725                  * Notify obd_zombie_barrier callers that queues
1726                  * may be empty.
1727                  */
1728                 wake_up(&obd_zombie_waitq);
1729         }
1730
1731         complete(&obd_zombie_stop);
1732
1733         return 0;
1734 }
1735
1736
1737 /**
1738  * start destroy zombie import/export thread
1739  */
1740 int obd_zombie_impexp_init(void)
1741 {
1742         struct task_struct *task;
1743
1744         INIT_LIST_HEAD(&obd_zombie_imports);
1745         INIT_LIST_HEAD(&obd_zombie_exports);
1746         spin_lock_init(&obd_zombie_impexp_lock);
1747         init_completion(&obd_zombie_start);
1748         init_completion(&obd_zombie_stop);
1749         init_waitqueue_head(&obd_zombie_waitq);
1750         obd_zombie_pid = 0;
1751
1752         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1753         if (IS_ERR(task))
1754                 return PTR_ERR(task);
1755
1756         wait_for_completion(&obd_zombie_start);
1757         return 0;
1758 }
1759 /**
1760  * stop destroy zombie import/export thread
1761  */
1762 void obd_zombie_impexp_stop(void)
1763 {
1764         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1765         obd_zombie_impexp_notify();
1766         wait_for_completion(&obd_zombie_stop);
1767 }
1768
1769 /***** Kernel-userspace comm helpers *******/
1770
1771 /* Get length of entire message, including header */
1772 int kuc_len(int payload_len)
1773 {
1774         return sizeof(struct kuc_hdr) + payload_len;
1775 }
1776 EXPORT_SYMBOL(kuc_len);
1777
1778 /* Get a pointer to kuc header, given a ptr to the payload
1779  * @param p Pointer to payload area
1780  * @returns Pointer to kuc header
1781  */
1782 struct kuc_hdr *kuc_ptr(void *p)
1783 {
1784         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1785         LASSERT(lh->kuc_magic == KUC_MAGIC);
1786         return lh;
1787 }
1788 EXPORT_SYMBOL(kuc_ptr);
1789
1790 /* Test if payload is part of kuc message
1791  * @param p Pointer to payload area
1792  * @returns boolean
1793  */
1794 int kuc_ispayload(void *p)
1795 {
1796         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1797
1798         if (kh->kuc_magic == KUC_MAGIC)
1799                 return 1;
1800         else
1801                 return 0;
1802 }
1803 EXPORT_SYMBOL(kuc_ispayload);
1804
1805 /* Alloc space for a message, and fill in header
1806  * @return Pointer to payload area
1807  */
1808 void *kuc_alloc(int payload_len, int transport, int type)
1809 {
1810         struct kuc_hdr *lh;
1811         int len = kuc_len(payload_len);
1812
1813         OBD_ALLOC(lh, len);
1814         if (lh == NULL)
1815                 return ERR_PTR(-ENOMEM);
1816
1817         lh->kuc_magic = KUC_MAGIC;
1818         lh->kuc_transport = transport;
1819         lh->kuc_msgtype = type;
1820         lh->kuc_msglen = len;
1821
1822         return (void *)(lh + 1);
1823 }
1824 EXPORT_SYMBOL(kuc_alloc);
1825
1826 /* Takes pointer to payload area */
1827 inline void kuc_free(void *p, int payload_len)
1828 {
1829         struct kuc_hdr *lh = kuc_ptr(p);
1830         OBD_FREE(lh, kuc_len(payload_len));
1831 }
1832 EXPORT_SYMBOL(kuc_free);