drivers: staging: lustre: Fix space required after that ',' errors
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_ost.h"
44 #include "../include/obd_class.h"
45 #include "../include/lprocfs_status.h"
46
47 extern struct list_head obd_types;
48 spinlock_t obd_types_lock;
49
50 struct kmem_cache *obd_device_cachep;
51 struct kmem_cache *obdo_cachep;
52 EXPORT_SYMBOL(obdo_cachep);
53 struct kmem_cache *import_cachep;
54
55 struct list_head      obd_zombie_imports;
56 struct list_head      obd_zombie_exports;
57 spinlock_t  obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks);
63
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
65 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66
67 /*
68  * support functions: we could use inter-module communication, but this
69  * is more portable to other OS's
70  */
71 static struct obd_device *obd_device_alloc(void)
72 {
73         struct obd_device *obd;
74
75         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76         if (obd != NULL) {
77                 obd->obd_magic = OBD_DEVICE_MAGIC;
78         }
79         return obd;
80 }
81
82 static void obd_device_free(struct obd_device *obd)
83 {
84         LASSERT(obd != NULL);
85         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87         if (obd->obd_namespace != NULL) {
88                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89                        obd, obd->obd_namespace, obd->obd_force);
90                 LBUG();
91         }
92         lu_ref_fini(&obd->obd_reference);
93         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 }
95
96 struct obd_type *class_search_type(const char *name)
97 {
98         struct list_head *tmp;
99         struct obd_type *type;
100
101         spin_lock(&obd_types_lock);
102         list_for_each(tmp, &obd_types) {
103                 type = list_entry(tmp, struct obd_type, typ_chain);
104                 if (strcmp(type->typ_name, name) == 0) {
105                         spin_unlock(&obd_types_lock);
106                         return type;
107                 }
108         }
109         spin_unlock(&obd_types_lock);
110         return NULL;
111 }
112 EXPORT_SYMBOL(class_search_type);
113
114 struct obd_type *class_get_type(const char *name)
115 {
116         struct obd_type *type = class_search_type(name);
117
118         if (!type) {
119                 const char *modname = name;
120
121                 if (strcmp(modname, "obdfilter") == 0)
122                         modname = "ofd";
123
124                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125                         modname = LUSTRE_OSP_NAME;
126
127                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128                         modname = LUSTRE_MDT_NAME;
129
130                 if (!request_module("%s", modname)) {
131                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132                         type = class_search_type(name);
133                 } else {
134                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135                                            modname);
136                 }
137         }
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143         }
144         return type;
145 }
146 EXPORT_SYMBOL(class_get_type);
147
148 void class_put_type(struct obd_type *type)
149 {
150         LASSERT(type);
151         spin_lock(&type->obd_type_lock);
152         type->typ_refcnt--;
153         module_put(type->typ_dt_ops->o_owner);
154         spin_unlock(&type->obd_type_lock);
155 }
156 EXPORT_SYMBOL(class_put_type);
157
158 #define CLASS_MAX_NAME 1024
159
160 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
161                         struct lprocfs_vars *vars, const char *name,
162                         struct lu_device_type *ldt)
163 {
164         struct obd_type *type;
165         int rc = 0;
166
167         /* sanity check */
168         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
169
170         if (class_search_type(name)) {
171                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
172                 return -EEXIST;
173         }
174
175         rc = -ENOMEM;
176         OBD_ALLOC(type, sizeof(*type));
177         if (type == NULL)
178                 return rc;
179
180         OBD_ALLOC_PTR(type->typ_dt_ops);
181         OBD_ALLOC_PTR(type->typ_md_ops);
182         OBD_ALLOC(type->typ_name, strlen(name) + 1);
183
184         if (type->typ_dt_ops == NULL ||
185             type->typ_md_ops == NULL ||
186             type->typ_name == NULL)
187                 GOTO (failed, rc);
188
189         *(type->typ_dt_ops) = *dt_ops;
190         /* md_ops is optional */
191         if (md_ops)
192                 *(type->typ_md_ops) = *md_ops;
193         strcpy(type->typ_name, name);
194         spin_lock_init(&type->obd_type_lock);
195
196         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
197                                               vars, type);
198         if (IS_ERR(type->typ_procroot)) {
199                 rc = PTR_ERR(type->typ_procroot);
200                 type->typ_procroot = NULL;
201                 GOTO (failed, rc);
202         }
203
204         if (ldt != NULL) {
205                 type->typ_lu = ldt;
206                 rc = lu_device_type_init(ldt);
207                 if (rc != 0)
208                         GOTO (failed, rc);
209         }
210
211         spin_lock(&obd_types_lock);
212         list_add(&type->typ_chain, &obd_types);
213         spin_unlock(&obd_types_lock);
214
215         return 0;
216
217  failed:
218         if (type->typ_name != NULL)
219                 OBD_FREE(type->typ_name, strlen(name) + 1);
220         if (type->typ_md_ops != NULL)
221                 OBD_FREE_PTR(type->typ_md_ops);
222         if (type->typ_dt_ops != NULL)
223                 OBD_FREE_PTR(type->typ_dt_ops);
224         OBD_FREE(type, sizeof(*type));
225         return rc;
226 }
227 EXPORT_SYMBOL(class_register_type);
228
229 int class_unregister_type(const char *name)
230 {
231         struct obd_type *type = class_search_type(name);
232
233         if (!type) {
234                 CERROR("unknown obd type\n");
235                 return -EINVAL;
236         }
237
238         if (type->typ_refcnt) {
239                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240                 /* This is a bad situation, let's make the best of it */
241                 /* Remove ops, but leave the name for debugging */
242                 OBD_FREE_PTR(type->typ_dt_ops);
243                 OBD_FREE_PTR(type->typ_md_ops);
244                 return -EBUSY;
245         }
246
247         if (type->typ_procroot) {
248                 lprocfs_remove(&type->typ_procroot);
249         }
250
251         if (type->typ_lu)
252                 lu_device_type_fini(type->typ_lu);
253
254         spin_lock(&obd_types_lock);
255         list_del(&type->typ_chain);
256         spin_unlock(&obd_types_lock);
257         OBD_FREE(type->typ_name, strlen(name) + 1);
258         if (type->typ_dt_ops != NULL)
259                 OBD_FREE_PTR(type->typ_dt_ops);
260         if (type->typ_md_ops != NULL)
261                 OBD_FREE_PTR(type->typ_md_ops);
262         OBD_FREE(type, sizeof(*type));
263         return 0;
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266
267 /**
268  * Create a new obd device.
269  *
270  * Find an empty slot in ::obd_devs[], create a new obd device in it.
271  *
272  * \param[in] type_name obd device type string.
273  * \param[in] name      obd device name.
274  *
275  * \retval NULL if create fails, otherwise return the obd device
276  *       pointer created.
277  */
278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280         struct obd_device *result = NULL;
281         struct obd_device *newdev;
282         struct obd_type *type = NULL;
283         int i;
284         int new_obd_minor = 0;
285
286         if (strlen(name) >= MAX_OBD_NAME) {
287                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288                 return ERR_PTR(-EINVAL);
289         }
290
291         type = class_get_type(type_name);
292         if (type == NULL){
293                 CERROR("OBD: unknown type: %s\n", type_name);
294                 return ERR_PTR(-ENODEV);
295         }
296
297         newdev = obd_device_alloc();
298         if (newdev == NULL)
299                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
300
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0]='\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
340         }
341
342         if (IS_ERR(result))
343                 GOTO(out, result);
344
345         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346                result->obd_name, result);
347
348         return result;
349 out:
350         obd_device_free(newdev);
351 out_type:
352         class_put_type(type);
353         return result;
354 }
355
356 void class_release_dev(struct obd_device *obd)
357 {
358         struct obd_type *obd_type = obd->obd_type;
359
360         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
361                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
362         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
363                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
364         LASSERT(obd_type != NULL);
365
366         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
367                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
368
369         write_lock(&obd_dev_lock);
370         obd_devs[obd->obd_minor] = NULL;
371         write_unlock(&obd_dev_lock);
372         obd_device_free(obd);
373
374         class_put_type(obd_type);
375 }
376
377 int class_name2dev(const char *name)
378 {
379         int i;
380
381         if (!name)
382                 return -1;
383
384         read_lock(&obd_dev_lock);
385         for (i = 0; i < class_devno_max(); i++) {
386                 struct obd_device *obd = class_num2obd(i);
387
388                 if (obd && strcmp(name, obd->obd_name) == 0) {
389                         /* Make sure we finished attaching before we give
390                            out any references */
391                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
392                         if (obd->obd_attached) {
393                                 read_unlock(&obd_dev_lock);
394                                 return i;
395                         }
396                         break;
397                 }
398         }
399         read_unlock(&obd_dev_lock);
400
401         return -1;
402 }
403 EXPORT_SYMBOL(class_name2dev);
404
405 struct obd_device *class_name2obd(const char *name)
406 {
407         int dev = class_name2dev(name);
408
409         if (dev < 0 || dev > class_devno_max())
410                 return NULL;
411         return class_num2obd(dev);
412 }
413 EXPORT_SYMBOL(class_name2obd);
414
415 int class_uuid2dev(struct obd_uuid *uuid)
416 {
417         int i;
418
419         read_lock(&obd_dev_lock);
420         for (i = 0; i < class_devno_max(); i++) {
421                 struct obd_device *obd = class_num2obd(i);
422
423                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
424                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
425                         read_unlock(&obd_dev_lock);
426                         return i;
427                 }
428         }
429         read_unlock(&obd_dev_lock);
430
431         return -1;
432 }
433 EXPORT_SYMBOL(class_uuid2dev);
434
435 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
436 {
437         int dev = class_uuid2dev(uuid);
438         if (dev < 0)
439                 return NULL;
440         return class_num2obd(dev);
441 }
442 EXPORT_SYMBOL(class_uuid2obd);
443
444 /**
445  * Get obd device from ::obd_devs[]
446  *
447  * \param num [in] array index
448  *
449  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
450  *       otherwise return the obd device there.
451  */
452 struct obd_device *class_num2obd(int num)
453 {
454         struct obd_device *obd = NULL;
455
456         if (num < class_devno_max()) {
457                 obd = obd_devs[num];
458                 if (obd == NULL)
459                         return NULL;
460
461                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
462                          "%p obd_magic %08x != %08x\n",
463                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
464                 LASSERTF(obd->obd_minor == num,
465                          "%p obd_minor %0d != %0d\n",
466                          obd, obd->obd_minor, num);
467         }
468
469         return obd;
470 }
471 EXPORT_SYMBOL(class_num2obd);
472
473 /**
474  * Get obd devices count. Device in any
475  *    state are counted
476  * \retval obd device count
477  */
478 int get_devices_count(void)
479 {
480         int index, max_index = class_devno_max(), dev_count = 0;
481
482         read_lock(&obd_dev_lock);
483         for (index = 0; index <= max_index; index++) {
484                 struct obd_device *obd = class_num2obd(index);
485                 if (obd != NULL)
486                         dev_count++;
487         }
488         read_unlock(&obd_dev_lock);
489
490         return dev_count;
491 }
492 EXPORT_SYMBOL(get_devices_count);
493
494 void class_obd_list(void)
495 {
496         char *status;
497         int i;
498
499         read_lock(&obd_dev_lock);
500         for (i = 0; i < class_devno_max(); i++) {
501                 struct obd_device *obd = class_num2obd(i);
502
503                 if (obd == NULL)
504                         continue;
505                 if (obd->obd_stopping)
506                         status = "ST";
507                 else if (obd->obd_set_up)
508                         status = "UP";
509                 else if (obd->obd_attached)
510                         status = "AT";
511                 else
512                         status = "--";
513                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
514                          i, status, obd->obd_type->typ_name,
515                          obd->obd_name, obd->obd_uuid.uuid,
516                          atomic_read(&obd->obd_refcount));
517         }
518         read_unlock(&obd_dev_lock);
519         return;
520 }
521
522 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
523    specified, then only the client with that uuid is returned,
524    otherwise any client connected to the tgt is returned. */
525 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
526                                           const char * typ_name,
527                                           struct obd_uuid *grp_uuid)
528 {
529         int i;
530
531         read_lock(&obd_dev_lock);
532         for (i = 0; i < class_devno_max(); i++) {
533                 struct obd_device *obd = class_num2obd(i);
534
535                 if (obd == NULL)
536                         continue;
537                 if ((strncmp(obd->obd_type->typ_name, typ_name,
538                              strlen(typ_name)) == 0)) {
539                         if (obd_uuid_equals(tgt_uuid,
540                                             &obd->u.cli.cl_target_uuid) &&
541                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
542                                                          &obd->obd_uuid) : 1)) {
543                                 read_unlock(&obd_dev_lock);
544                                 return obd;
545                         }
546                 }
547         }
548         read_unlock(&obd_dev_lock);
549
550         return NULL;
551 }
552 EXPORT_SYMBOL(class_find_client_obd);
553
554 /* Iterate the obd_device list looking devices have grp_uuid. Start
555    searching at *next, and if a device is found, the next index to look
556    at is saved in *next. If next is NULL, then the first matching device
557    will always be returned. */
558 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
559 {
560         int i;
561
562         if (next == NULL)
563                 i = 0;
564         else if (*next >= 0 && *next < class_devno_max())
565                 i = *next;
566         else
567                 return NULL;
568
569         read_lock(&obd_dev_lock);
570         for (; i < class_devno_max(); i++) {
571                 struct obd_device *obd = class_num2obd(i);
572
573                 if (obd == NULL)
574                         continue;
575                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
576                         if (next != NULL)
577                                 *next = i+1;
578                         read_unlock(&obd_dev_lock);
579                         return obd;
580                 }
581         }
582         read_unlock(&obd_dev_lock);
583
584         return NULL;
585 }
586 EXPORT_SYMBOL(class_devices_in_group);
587
588 /**
589  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
590  * adjust sptlrpc settings accordingly.
591  */
592 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
593 {
594         struct obd_device  *obd;
595         const char       *type;
596         int              i, rc = 0, rc2;
597
598         LASSERT(namelen > 0);
599
600         read_lock(&obd_dev_lock);
601         for (i = 0; i < class_devno_max(); i++) {
602                 obd = class_num2obd(i);
603
604                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
605                         continue;
606
607                 /* only notify mdc, osc, mdt, ost */
608                 type = obd->obd_type->typ_name;
609                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
610                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
611                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
612                     strcmp(type, LUSTRE_OST_NAME) != 0)
613                         continue;
614
615                 if (strncmp(obd->obd_name, fsname, namelen))
616                         continue;
617
618                 class_incref(obd, __func__, obd);
619                 read_unlock(&obd_dev_lock);
620                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
621                                          sizeof(KEY_SPTLRPC_CONF),
622                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
623                 rc = rc ? rc : rc2;
624                 class_decref(obd, __func__, obd);
625                 read_lock(&obd_dev_lock);
626         }
627         read_unlock(&obd_dev_lock);
628         return rc;
629 }
630 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
631
632 void obd_cleanup_caches(void)
633 {
634         if (obd_device_cachep) {
635                 kmem_cache_destroy(obd_device_cachep);
636                 obd_device_cachep = NULL;
637         }
638         if (obdo_cachep) {
639                 kmem_cache_destroy(obdo_cachep);
640                 obdo_cachep = NULL;
641         }
642         if (import_cachep) {
643                 kmem_cache_destroy(import_cachep);
644                 import_cachep = NULL;
645         }
646         if (capa_cachep) {
647                 kmem_cache_destroy(capa_cachep);
648                 capa_cachep = NULL;
649         }
650 }
651
652 int obd_init_caches(void)
653 {
654         LASSERT(obd_device_cachep == NULL);
655         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
656                                                  sizeof(struct obd_device),
657                                                  0, 0, NULL);
658         if (!obd_device_cachep)
659                 GOTO(out, -ENOMEM);
660
661         LASSERT(obdo_cachep == NULL);
662         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
663                                            0, 0, NULL);
664         if (!obdo_cachep)
665                 GOTO(out, -ENOMEM);
666
667         LASSERT(import_cachep == NULL);
668         import_cachep = kmem_cache_create("ll_import_cache",
669                                              sizeof(struct obd_import),
670                                              0, 0, NULL);
671         if (!import_cachep)
672                 GOTO(out, -ENOMEM);
673
674         LASSERT(capa_cachep == NULL);
675         capa_cachep = kmem_cache_create("capa_cache",
676                                            sizeof(struct obd_capa), 0, 0, NULL);
677         if (!capa_cachep)
678                 GOTO(out, -ENOMEM);
679
680         return 0;
681  out:
682         obd_cleanup_caches();
683         return -ENOMEM;
684
685 }
686
687 /* map connection to client */
688 struct obd_export *class_conn2export(struct lustre_handle *conn)
689 {
690         struct obd_export *export;
691
692         if (!conn) {
693                 CDEBUG(D_CACHE, "looking for null handle\n");
694                 return NULL;
695         }
696
697         if (conn->cookie == -1) {  /* this means assign a new connection */
698                 CDEBUG(D_CACHE, "want a new connection\n");
699                 return NULL;
700         }
701
702         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
703         export = class_handle2object(conn->cookie);
704         return export;
705 }
706 EXPORT_SYMBOL(class_conn2export);
707
708 struct obd_device *class_exp2obd(struct obd_export *exp)
709 {
710         if (exp)
711                 return exp->exp_obd;
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_exp2obd);
715
716 struct obd_device *class_conn2obd(struct lustre_handle *conn)
717 {
718         struct obd_export *export;
719         export = class_conn2export(conn);
720         if (export) {
721                 struct obd_device *obd = export->exp_obd;
722                 class_export_put(export);
723                 return obd;
724         }
725         return NULL;
726 }
727 EXPORT_SYMBOL(class_conn2obd);
728
729 struct obd_import *class_exp2cliimp(struct obd_export *exp)
730 {
731         struct obd_device *obd = exp->exp_obd;
732         if (obd == NULL)
733                 return NULL;
734         return obd->u.cli.cl_import;
735 }
736 EXPORT_SYMBOL(class_exp2cliimp);
737
738 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
739 {
740         struct obd_device *obd = class_conn2obd(conn);
741         if (obd == NULL)
742                 return NULL;
743         return obd->u.cli.cl_import;
744 }
745 EXPORT_SYMBOL(class_conn2cliimp);
746
747 /* Export management functions */
748 static void class_export_destroy(struct obd_export *exp)
749 {
750         struct obd_device *obd = exp->exp_obd;
751
752         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753         LASSERT(obd != NULL);
754
755         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756                exp->exp_client_uuid.uuid, obd->obd_name);
757
758         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759         if (exp->exp_connection)
760                 ptlrpc_put_connection_superhack(exp->exp_connection);
761
762         LASSERT(list_empty(&exp->exp_outstanding_replies));
763         LASSERT(list_empty(&exp->exp_uncommitted_replies));
764         LASSERT(list_empty(&exp->exp_req_replay_queue));
765         LASSERT(list_empty(&exp->exp_hp_rpcs));
766         obd_destroy_export(exp);
767         class_decref(obd, "export", exp);
768
769         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770 }
771
772 static void export_handle_addref(void *export)
773 {
774         class_export_get(export);
775 }
776
777 static struct portals_handle_ops export_handle_ops = {
778         .hop_addref = export_handle_addref,
779         .hop_free   = NULL,
780 };
781
782 struct obd_export *class_export_get(struct obd_export *exp)
783 {
784         atomic_inc(&exp->exp_refcount);
785         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
786                atomic_read(&exp->exp_refcount));
787         return exp;
788 }
789 EXPORT_SYMBOL(class_export_get);
790
791 void class_export_put(struct obd_export *exp)
792 {
793         LASSERT(exp != NULL);
794         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
795         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
796                atomic_read(&exp->exp_refcount) - 1);
797
798         if (atomic_dec_and_test(&exp->exp_refcount)) {
799                 LASSERT(!list_empty(&exp->exp_obd_chain));
800                 CDEBUG(D_IOCTL, "final put %p/%s\n",
801                        exp, exp->exp_client_uuid.uuid);
802
803                 /* release nid stat refererence */
804                 lprocfs_exp_cleanup(exp);
805
806                 obd_zombie_export_add(exp);
807         }
808 }
809 EXPORT_SYMBOL(class_export_put);
810
811 /* Creates a new export, adds it to the hash table, and returns a
812  * pointer to it. The refcount is 2: one for the hash reference, and
813  * one for the pointer returned by this function. */
814 struct obd_export *class_new_export(struct obd_device *obd,
815                                     struct obd_uuid *cluuid)
816 {
817         struct obd_export *export;
818         struct cfs_hash *hash = NULL;
819         int rc = 0;
820
821         OBD_ALLOC_PTR(export);
822         if (!export)
823                 return ERR_PTR(-ENOMEM);
824
825         export->exp_conn_cnt = 0;
826         export->exp_lock_hash = NULL;
827         export->exp_flock_hash = NULL;
828         atomic_set(&export->exp_refcount, 2);
829         atomic_set(&export->exp_rpc_count, 0);
830         atomic_set(&export->exp_cb_count, 0);
831         atomic_set(&export->exp_locks_count, 0);
832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
833         INIT_LIST_HEAD(&export->exp_locks_list);
834         spin_lock_init(&export->exp_locks_list_guard);
835 #endif
836         atomic_set(&export->exp_replay_count, 0);
837         export->exp_obd = obd;
838         INIT_LIST_HEAD(&export->exp_outstanding_replies);
839         spin_lock_init(&export->exp_uncommitted_replies_lock);
840         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
841         INIT_LIST_HEAD(&export->exp_req_replay_queue);
842         INIT_LIST_HEAD(&export->exp_handle.h_link);
843         INIT_LIST_HEAD(&export->exp_hp_rpcs);
844         class_handle_hash(&export->exp_handle, &export_handle_ops);
845         export->exp_last_request_time = get_seconds();
846         spin_lock_init(&export->exp_lock);
847         spin_lock_init(&export->exp_rpc_lock);
848         INIT_HLIST_NODE(&export->exp_uuid_hash);
849         INIT_HLIST_NODE(&export->exp_nid_hash);
850         spin_lock_init(&export->exp_bl_list_lock);
851         INIT_LIST_HEAD(&export->exp_bl_list);
852
853         export->exp_sp_peer = LUSTRE_SP_ANY;
854         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
855         export->exp_client_uuid = *cluuid;
856         obd_init_export(export);
857
858         spin_lock(&obd->obd_dev_lock);
859         /* shouldn't happen, but might race */
860         if (obd->obd_stopping)
861                 GOTO(exit_unlock, rc = -ENODEV);
862
863         hash = cfs_hash_getref(obd->obd_uuid_hash);
864         if (hash == NULL)
865                 GOTO(exit_unlock, rc = -ENODEV);
866         spin_unlock(&obd->obd_dev_lock);
867
868         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
869                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
870                 if (rc != 0) {
871                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
872                                       obd->obd_name, cluuid->uuid, rc);
873                         GOTO(exit_err, rc = -EALREADY);
874                 }
875         }
876
877         spin_lock(&obd->obd_dev_lock);
878         if (obd->obd_stopping) {
879                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
880                 GOTO(exit_unlock, rc = -ENODEV);
881         }
882
883         class_incref(obd, "export", export);
884         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
885         list_add_tail(&export->exp_obd_chain_timed,
886                           &export->exp_obd->obd_exports_timed);
887         export->exp_obd->obd_num_exports++;
888         spin_unlock(&obd->obd_dev_lock);
889         cfs_hash_putref(hash);
890         return export;
891
892 exit_unlock:
893         spin_unlock(&obd->obd_dev_lock);
894 exit_err:
895         if (hash)
896                 cfs_hash_putref(hash);
897         class_handle_unhash(&export->exp_handle);
898         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
899         obd_destroy_export(export);
900         OBD_FREE_PTR(export);
901         return ERR_PTR(rc);
902 }
903 EXPORT_SYMBOL(class_new_export);
904
905 void class_unlink_export(struct obd_export *exp)
906 {
907         class_handle_unhash(&exp->exp_handle);
908
909         spin_lock(&exp->exp_obd->obd_dev_lock);
910         /* delete an uuid-export hashitem from hashtables */
911         if (!hlist_unhashed(&exp->exp_uuid_hash))
912                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
913                              &exp->exp_client_uuid,
914                              &exp->exp_uuid_hash);
915
916         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
917         list_del_init(&exp->exp_obd_chain_timed);
918         exp->exp_obd->obd_num_exports--;
919         spin_unlock(&exp->exp_obd->obd_dev_lock);
920         class_export_put(exp);
921 }
922 EXPORT_SYMBOL(class_unlink_export);
923
924 /* Import management functions */
925 void class_import_destroy(struct obd_import *imp)
926 {
927         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
928                 imp->imp_obd->obd_name);
929
930         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
931
932         ptlrpc_put_connection_superhack(imp->imp_connection);
933
934         while (!list_empty(&imp->imp_conn_list)) {
935                 struct obd_import_conn *imp_conn;
936
937                 imp_conn = list_entry(imp->imp_conn_list.next,
938                                           struct obd_import_conn, oic_item);
939                 list_del_init(&imp_conn->oic_item);
940                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
941                 OBD_FREE(imp_conn, sizeof(*imp_conn));
942         }
943
944         LASSERT(imp->imp_sec == NULL);
945         class_decref(imp->imp_obd, "import", imp);
946         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
947 }
948
949 static void import_handle_addref(void *import)
950 {
951         class_import_get(import);
952 }
953
954 static struct portals_handle_ops import_handle_ops = {
955         .hop_addref = import_handle_addref,
956         .hop_free   = NULL,
957 };
958
959 struct obd_import *class_import_get(struct obd_import *import)
960 {
961         atomic_inc(&import->imp_refcount);
962         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
963                atomic_read(&import->imp_refcount),
964                import->imp_obd->obd_name);
965         return import;
966 }
967 EXPORT_SYMBOL(class_import_get);
968
969 void class_import_put(struct obd_import *imp)
970 {
971         LASSERT(list_empty(&imp->imp_zombie_chain));
972         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
973
974         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
975                atomic_read(&imp->imp_refcount) - 1,
976                imp->imp_obd->obd_name);
977
978         if (atomic_dec_and_test(&imp->imp_refcount)) {
979                 CDEBUG(D_INFO, "final put import %p\n", imp);
980                 obd_zombie_import_add(imp);
981         }
982
983         /* catch possible import put race */
984         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
985 }
986 EXPORT_SYMBOL(class_import_put);
987
988 static void init_imp_at(struct imp_at *at) {
989         int i;
990         at_init(&at->iat_net_latency, 0, 0);
991         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
992                 /* max service estimates are tracked on the server side, so
993                    don't use the AT history here, just use the last reported
994                    val. (But keep hist for proc histogram, worst_ever) */
995                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
996                         AT_FLG_NOHIST);
997         }
998 }
999
1000 struct obd_import *class_new_import(struct obd_device *obd)
1001 {
1002         struct obd_import *imp;
1003
1004         OBD_ALLOC(imp, sizeof(*imp));
1005         if (imp == NULL)
1006                 return NULL;
1007
1008         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1009         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1010         INIT_LIST_HEAD(&imp->imp_replay_list);
1011         INIT_LIST_HEAD(&imp->imp_sending_list);
1012         INIT_LIST_HEAD(&imp->imp_delayed_list);
1013         INIT_LIST_HEAD(&imp->imp_committed_list);
1014         imp->imp_replay_cursor = &imp->imp_committed_list;
1015         spin_lock_init(&imp->imp_lock);
1016         imp->imp_last_success_conn = 0;
1017         imp->imp_state = LUSTRE_IMP_NEW;
1018         imp->imp_obd = class_incref(obd, "import", imp);
1019         mutex_init(&imp->imp_sec_mutex);
1020         init_waitqueue_head(&imp->imp_recovery_waitq);
1021
1022         atomic_set(&imp->imp_refcount, 2);
1023         atomic_set(&imp->imp_unregistering, 0);
1024         atomic_set(&imp->imp_inflight, 0);
1025         atomic_set(&imp->imp_replay_inflight, 0);
1026         atomic_set(&imp->imp_inval_count, 0);
1027         INIT_LIST_HEAD(&imp->imp_conn_list);
1028         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1029         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1030         init_imp_at(&imp->imp_at);
1031
1032         /* the default magic is V2, will be used in connect RPC, and
1033          * then adjusted according to the flags in request/reply. */
1034         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1035
1036         return imp;
1037 }
1038 EXPORT_SYMBOL(class_new_import);
1039
1040 void class_destroy_import(struct obd_import *import)
1041 {
1042         LASSERT(import != NULL);
1043         LASSERT(import != LP_POISON);
1044
1045         class_handle_unhash(&import->imp_handle);
1046
1047         spin_lock(&import->imp_lock);
1048         import->imp_generation++;
1049         spin_unlock(&import->imp_lock);
1050         class_import_put(import);
1051 }
1052 EXPORT_SYMBOL(class_destroy_import);
1053
1054 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1055
1056 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1057 {
1058         spin_lock(&exp->exp_locks_list_guard);
1059
1060         LASSERT(lock->l_exp_refs_nr >= 0);
1061
1062         if (lock->l_exp_refs_target != NULL &&
1063             lock->l_exp_refs_target != exp) {
1064                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1065                               exp, lock, lock->l_exp_refs_target);
1066         }
1067         if ((lock->l_exp_refs_nr ++) == 0) {
1068                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1069                 lock->l_exp_refs_target = exp;
1070         }
1071         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1072                lock, exp, lock->l_exp_refs_nr);
1073         spin_unlock(&exp->exp_locks_list_guard);
1074 }
1075 EXPORT_SYMBOL(__class_export_add_lock_ref);
1076
1077 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1078 {
1079         spin_lock(&exp->exp_locks_list_guard);
1080         LASSERT(lock->l_exp_refs_nr > 0);
1081         if (lock->l_exp_refs_target != exp) {
1082                 LCONSOLE_WARN("lock %p, "
1083                               "mismatching export pointers: %p, %p\n",
1084                               lock, lock->l_exp_refs_target, exp);
1085         }
1086         if (-- lock->l_exp_refs_nr == 0) {
1087                 list_del_init(&lock->l_exp_refs_link);
1088                 lock->l_exp_refs_target = NULL;
1089         }
1090         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1091                lock, exp, lock->l_exp_refs_nr);
1092         spin_unlock(&exp->exp_locks_list_guard);
1093 }
1094 EXPORT_SYMBOL(__class_export_del_lock_ref);
1095 #endif
1096
1097 /* A connection defines an export context in which preallocation can
1098    be managed. This releases the export pointer reference, and returns
1099    the export handle, so the export refcount is 1 when this function
1100    returns. */
1101 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1102                   struct obd_uuid *cluuid)
1103 {
1104         struct obd_export *export;
1105         LASSERT(conn != NULL);
1106         LASSERT(obd != NULL);
1107         LASSERT(cluuid != NULL);
1108
1109         export = class_new_export(obd, cluuid);
1110         if (IS_ERR(export))
1111                 return PTR_ERR(export);
1112
1113         conn->cookie = export->exp_handle.h_cookie;
1114         class_export_put(export);
1115
1116         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1117                cluuid->uuid, conn->cookie);
1118         return 0;
1119 }
1120 EXPORT_SYMBOL(class_connect);
1121
1122 /* if export is involved in recovery then clean up related things */
1123 void class_export_recovery_cleanup(struct obd_export *exp)
1124 {
1125         struct obd_device *obd = exp->exp_obd;
1126
1127         spin_lock(&obd->obd_recovery_task_lock);
1128         if (exp->exp_delayed)
1129                 obd->obd_delayed_clients--;
1130         if (obd->obd_recovering) {
1131                 if (exp->exp_in_recovery) {
1132                         spin_lock(&exp->exp_lock);
1133                         exp->exp_in_recovery = 0;
1134                         spin_unlock(&exp->exp_lock);
1135                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1136                         atomic_dec(&obd->obd_connected_clients);
1137                 }
1138
1139                 /* if called during recovery then should update
1140                  * obd_stale_clients counter,
1141                  * lightweight exports are not counted */
1142                 if (exp->exp_failed &&
1143                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1144                         exp->exp_obd->obd_stale_clients++;
1145         }
1146         spin_unlock(&obd->obd_recovery_task_lock);
1147         /** Cleanup req replay fields */
1148         if (exp->exp_req_replay_needed) {
1149                 spin_lock(&exp->exp_lock);
1150                 exp->exp_req_replay_needed = 0;
1151                 spin_unlock(&exp->exp_lock);
1152                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1153                 atomic_dec(&obd->obd_req_replay_clients);
1154         }
1155         /** Cleanup lock replay data */
1156         if (exp->exp_lock_replay_needed) {
1157                 spin_lock(&exp->exp_lock);
1158                 exp->exp_lock_replay_needed = 0;
1159                 spin_unlock(&exp->exp_lock);
1160                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1161                 atomic_dec(&obd->obd_lock_replay_clients);
1162         }
1163 }
1164
1165 /* This function removes 1-3 references from the export:
1166  * 1 - for export pointer passed
1167  * and if disconnect really need
1168  * 2 - removing from hash
1169  * 3 - in client_unlink_export
1170  * The export pointer passed to this function can destroyed */
1171 int class_disconnect(struct obd_export *export)
1172 {
1173         int already_disconnected;
1174
1175         if (export == NULL) {
1176                 CWARN("attempting to free NULL export %p\n", export);
1177                 return -EINVAL;
1178         }
1179
1180         spin_lock(&export->exp_lock);
1181         already_disconnected = export->exp_disconnected;
1182         export->exp_disconnected = 1;
1183         spin_unlock(&export->exp_lock);
1184
1185         /* class_cleanup(), abort_recovery(), and class_fail_export()
1186          * all end up in here, and if any of them race we shouldn't
1187          * call extra class_export_puts(). */
1188         if (already_disconnected) {
1189                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1190                 GOTO(no_disconn, already_disconnected);
1191         }
1192
1193         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1194                export->exp_handle.h_cookie);
1195
1196         if (!hlist_unhashed(&export->exp_nid_hash))
1197                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1198                              &export->exp_connection->c_peer.nid,
1199                              &export->exp_nid_hash);
1200
1201         class_export_recovery_cleanup(export);
1202         class_unlink_export(export);
1203 no_disconn:
1204         class_export_put(export);
1205         return 0;
1206 }
1207 EXPORT_SYMBOL(class_disconnect);
1208
1209 /* Return non-zero for a fully connected export */
1210 int class_connected_export(struct obd_export *exp)
1211 {
1212         if (exp) {
1213                 int connected;
1214                 spin_lock(&exp->exp_lock);
1215                 connected = (exp->exp_conn_cnt > 0);
1216                 spin_unlock(&exp->exp_lock);
1217                 return connected;
1218         }
1219         return 0;
1220 }
1221 EXPORT_SYMBOL(class_connected_export);
1222
1223 static void class_disconnect_export_list(struct list_head *list,
1224                                          enum obd_option flags)
1225 {
1226         int rc;
1227         struct obd_export *exp;
1228
1229         /* It's possible that an export may disconnect itself, but
1230          * nothing else will be added to this list. */
1231         while (!list_empty(list)) {
1232                 exp = list_entry(list->next, struct obd_export,
1233                                      exp_obd_chain);
1234                 /* need for safe call CDEBUG after obd_disconnect */
1235                 class_export_get(exp);
1236
1237                 spin_lock(&exp->exp_lock);
1238                 exp->exp_flags = flags;
1239                 spin_unlock(&exp->exp_lock);
1240
1241                 if (obd_uuid_equals(&exp->exp_client_uuid,
1242                                     &exp->exp_obd->obd_uuid)) {
1243                         CDEBUG(D_HA,
1244                                "exp %p export uuid == obd uuid, don't discon\n",
1245                                exp);
1246                         /* Need to delete this now so we don't end up pointing
1247                          * to work_list later when this export is cleaned up. */
1248                         list_del_init(&exp->exp_obd_chain);
1249                         class_export_put(exp);
1250                         continue;
1251                 }
1252
1253                 class_export_get(exp);
1254                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1255                        "last request at "CFS_TIME_T"\n",
1256                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1257                        exp, exp->exp_last_request_time);
1258                 /* release one export reference anyway */
1259                 rc = obd_disconnect(exp);
1260
1261                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1262                        obd_export_nid2str(exp), exp, rc);
1263                 class_export_put(exp);
1264         }
1265 }
1266
1267 void class_disconnect_exports(struct obd_device *obd)
1268 {
1269         struct list_head work_list;
1270
1271         /* Move all of the exports from obd_exports to a work list, en masse. */
1272         INIT_LIST_HEAD(&work_list);
1273         spin_lock(&obd->obd_dev_lock);
1274         list_splice_init(&obd->obd_exports, &work_list);
1275         list_splice_init(&obd->obd_delayed_exports, &work_list);
1276         spin_unlock(&obd->obd_dev_lock);
1277
1278         if (!list_empty(&work_list)) {
1279                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1280                        "disconnecting them\n", obd->obd_minor, obd);
1281                 class_disconnect_export_list(&work_list,
1282                                              exp_flags_from_obd(obd));
1283         } else
1284                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1285                        obd->obd_minor, obd);
1286 }
1287 EXPORT_SYMBOL(class_disconnect_exports);
1288
1289 /* Remove exports that have not completed recovery.
1290  */
1291 void class_disconnect_stale_exports(struct obd_device *obd,
1292                                     int (*test_export)(struct obd_export *))
1293 {
1294         struct list_head work_list;
1295         struct obd_export *exp, *n;
1296         int evicted = 0;
1297
1298         INIT_LIST_HEAD(&work_list);
1299         spin_lock(&obd->obd_dev_lock);
1300         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1301                                      exp_obd_chain) {
1302                 /* don't count self-export as client */
1303                 if (obd_uuid_equals(&exp->exp_client_uuid,
1304                                     &exp->exp_obd->obd_uuid))
1305                         continue;
1306
1307                 /* don't evict clients which have no slot in last_rcvd
1308                  * (e.g. lightweight connection) */
1309                 if (exp->exp_target_data.ted_lr_idx == -1)
1310                         continue;
1311
1312                 spin_lock(&exp->exp_lock);
1313                 if (exp->exp_failed || test_export(exp)) {
1314                         spin_unlock(&exp->exp_lock);
1315                         continue;
1316                 }
1317                 exp->exp_failed = 1;
1318                 spin_unlock(&exp->exp_lock);
1319
1320                 list_move(&exp->exp_obd_chain, &work_list);
1321                 evicted++;
1322                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1323                        obd->obd_name, exp->exp_client_uuid.uuid,
1324                        exp->exp_connection == NULL ? "<unknown>" :
1325                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1326                 print_export_data(exp, "EVICTING", 0);
1327         }
1328         spin_unlock(&obd->obd_dev_lock);
1329
1330         if (evicted)
1331                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332                               obd->obd_name, evicted);
1333
1334         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1335                                                  OBD_OPT_ABORT_RECOV);
1336 }
1337 EXPORT_SYMBOL(class_disconnect_stale_exports);
1338
1339 void class_fail_export(struct obd_export *exp)
1340 {
1341         int rc, already_failed;
1342
1343         spin_lock(&exp->exp_lock);
1344         already_failed = exp->exp_failed;
1345         exp->exp_failed = 1;
1346         spin_unlock(&exp->exp_lock);
1347
1348         if (already_failed) {
1349                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1350                        exp, exp->exp_client_uuid.uuid);
1351                 return;
1352         }
1353
1354         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1355                exp, exp->exp_client_uuid.uuid);
1356
1357         if (obd_dump_on_timeout)
1358                 libcfs_debug_dumplog();
1359
1360         /* need for safe call CDEBUG after obd_disconnect */
1361         class_export_get(exp);
1362
1363         /* Most callers into obd_disconnect are removing their own reference
1364          * (request, for example) in addition to the one from the hash table.
1365          * We don't have such a reference here, so make one. */
1366         class_export_get(exp);
1367         rc = obd_disconnect(exp);
1368         if (rc)
1369                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1370         else
1371                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1372                        exp, exp->exp_client_uuid.uuid);
1373         class_export_put(exp);
1374 }
1375 EXPORT_SYMBOL(class_fail_export);
1376
1377 char *obd_export_nid2str(struct obd_export *exp)
1378 {
1379         if (exp->exp_connection != NULL)
1380                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1381
1382         return "(no nid)";
1383 }
1384 EXPORT_SYMBOL(obd_export_nid2str);
1385
1386 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1387 {
1388         struct cfs_hash *nid_hash;
1389         struct obd_export *doomed_exp = NULL;
1390         int exports_evicted = 0;
1391
1392         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1393
1394         spin_lock(&obd->obd_dev_lock);
1395         /* umount has run already, so evict thread should leave
1396          * its task to umount thread now */
1397         if (obd->obd_stopping) {
1398                 spin_unlock(&obd->obd_dev_lock);
1399                 return exports_evicted;
1400         }
1401         nid_hash = obd->obd_nid_hash;
1402         cfs_hash_getref(nid_hash);
1403         spin_unlock(&obd->obd_dev_lock);
1404
1405         do {
1406                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1407                 if (doomed_exp == NULL)
1408                         break;
1409
1410                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1411                          "nid %s found, wanted nid %s, requested nid %s\n",
1412                          obd_export_nid2str(doomed_exp),
1413                          libcfs_nid2str(nid_key), nid);
1414                 LASSERTF(doomed_exp != obd->obd_self_export,
1415                          "self-export is hashed by NID?\n");
1416                 exports_evicted++;
1417                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1418                               "request\n", obd->obd_name,
1419                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1420                               obd_export_nid2str(doomed_exp));
1421                 class_fail_export(doomed_exp);
1422                 class_export_put(doomed_exp);
1423         } while (1);
1424
1425         cfs_hash_putref(nid_hash);
1426
1427         if (!exports_evicted)
1428                 CDEBUG(D_HA,
1429                        "%s: can't disconnect NID '%s': no exports found\n",
1430                        obd->obd_name, nid);
1431         return exports_evicted;
1432 }
1433 EXPORT_SYMBOL(obd_export_evict_by_nid);
1434
1435 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1436 {
1437         struct cfs_hash *uuid_hash;
1438         struct obd_export *doomed_exp = NULL;
1439         struct obd_uuid doomed_uuid;
1440         int exports_evicted = 0;
1441
1442         spin_lock(&obd->obd_dev_lock);
1443         if (obd->obd_stopping) {
1444                 spin_unlock(&obd->obd_dev_lock);
1445                 return exports_evicted;
1446         }
1447         uuid_hash = obd->obd_uuid_hash;
1448         cfs_hash_getref(uuid_hash);
1449         spin_unlock(&obd->obd_dev_lock);
1450
1451         obd_str2uuid(&doomed_uuid, uuid);
1452         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1453                 CERROR("%s: can't evict myself\n", obd->obd_name);
1454                 cfs_hash_putref(uuid_hash);
1455                 return exports_evicted;
1456         }
1457
1458         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1459
1460         if (doomed_exp == NULL) {
1461                 CERROR("%s: can't disconnect %s: no exports found\n",
1462                        obd->obd_name, uuid);
1463         } else {
1464                 CWARN("%s: evicting %s at administrative request\n",
1465                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1466                 class_fail_export(doomed_exp);
1467                 class_export_put(doomed_exp);
1468                 exports_evicted++;
1469         }
1470         cfs_hash_putref(uuid_hash);
1471
1472         return exports_evicted;
1473 }
1474 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1475
1476 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1477 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1478 EXPORT_SYMBOL(class_export_dump_hook);
1479 #endif
1480
1481 static void print_export_data(struct obd_export *exp, const char *status,
1482                               int locks)
1483 {
1484         struct ptlrpc_reply_state *rs;
1485         struct ptlrpc_reply_state *first_reply = NULL;
1486         int nreplies = 0;
1487
1488         spin_lock(&exp->exp_lock);
1489         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1490                                 rs_exp_list) {
1491                 if (nreplies == 0)
1492                         first_reply = rs;
1493                 nreplies++;
1494         }
1495         spin_unlock(&exp->exp_lock);
1496
1497         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1498                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1499                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1500                atomic_read(&exp->exp_rpc_count),
1501                atomic_read(&exp->exp_cb_count),
1502                atomic_read(&exp->exp_locks_count),
1503                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1504                nreplies, first_reply, nreplies > 3 ? "..." : "",
1505                exp->exp_last_committed);
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507         if (locks && class_export_dump_hook != NULL)
1508                 class_export_dump_hook(exp);
1509 #endif
1510 }
1511
1512 void dump_exports(struct obd_device *obd, int locks)
1513 {
1514         struct obd_export *exp;
1515
1516         spin_lock(&obd->obd_dev_lock);
1517         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1518                 print_export_data(exp, "ACTIVE", locks);
1519         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1520                 print_export_data(exp, "UNLINKED", locks);
1521         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1522                 print_export_data(exp, "DELAYED", locks);
1523         spin_unlock(&obd->obd_dev_lock);
1524         spin_lock(&obd_zombie_impexp_lock);
1525         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1526                 print_export_data(exp, "ZOMBIE", locks);
1527         spin_unlock(&obd_zombie_impexp_lock);
1528 }
1529 EXPORT_SYMBOL(dump_exports);
1530
1531 void obd_exports_barrier(struct obd_device *obd)
1532 {
1533         int waited = 2;
1534         LASSERT(list_empty(&obd->obd_exports));
1535         spin_lock(&obd->obd_dev_lock);
1536         while (!list_empty(&obd->obd_unlinked_exports)) {
1537                 spin_unlock(&obd->obd_dev_lock);
1538                 set_current_state(TASK_UNINTERRUPTIBLE);
1539                 schedule_timeout(cfs_time_seconds(waited));
1540                 if (waited > 5 && IS_PO2(waited)) {
1541                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1542                                       "more than %d seconds. "
1543                                       "The obd refcount = %d. Is it stuck?\n",
1544                                       obd->obd_name, waited,
1545                                       atomic_read(&obd->obd_refcount));
1546                         dump_exports(obd, 1);
1547                 }
1548                 waited *= 2;
1549                 spin_lock(&obd->obd_dev_lock);
1550         }
1551         spin_unlock(&obd->obd_dev_lock);
1552 }
1553 EXPORT_SYMBOL(obd_exports_barrier);
1554
1555 /* Total amount of zombies to be destroyed */
1556 static int zombies_count = 0;
1557
1558 /**
1559  * kill zombie imports and exports
1560  */
1561 void obd_zombie_impexp_cull(void)
1562 {
1563         struct obd_import *import;
1564         struct obd_export *export;
1565
1566         do {
1567                 spin_lock(&obd_zombie_impexp_lock);
1568
1569                 import = NULL;
1570                 if (!list_empty(&obd_zombie_imports)) {
1571                         import = list_entry(obd_zombie_imports.next,
1572                                                 struct obd_import,
1573                                                 imp_zombie_chain);
1574                         list_del_init(&import->imp_zombie_chain);
1575                 }
1576
1577                 export = NULL;
1578                 if (!list_empty(&obd_zombie_exports)) {
1579                         export = list_entry(obd_zombie_exports.next,
1580                                                 struct obd_export,
1581                                                 exp_obd_chain);
1582                         list_del_init(&export->exp_obd_chain);
1583                 }
1584
1585                 spin_unlock(&obd_zombie_impexp_lock);
1586
1587                 if (import != NULL) {
1588                         class_import_destroy(import);
1589                         spin_lock(&obd_zombie_impexp_lock);
1590                         zombies_count--;
1591                         spin_unlock(&obd_zombie_impexp_lock);
1592                 }
1593
1594                 if (export != NULL) {
1595                         class_export_destroy(export);
1596                         spin_lock(&obd_zombie_impexp_lock);
1597                         zombies_count--;
1598                         spin_unlock(&obd_zombie_impexp_lock);
1599                 }
1600
1601                 cond_resched();
1602         } while (import != NULL || export != NULL);
1603 }
1604
1605 static struct completion        obd_zombie_start;
1606 static struct completion        obd_zombie_stop;
1607 static unsigned long            obd_zombie_flags;
1608 static wait_queue_head_t                obd_zombie_waitq;
1609 static pid_t                    obd_zombie_pid;
1610
1611 enum {
1612         OBD_ZOMBIE_STOP         = 0x0001,
1613 };
1614
1615 /**
1616  * check for work for kill zombie import/export thread.
1617  */
1618 static int obd_zombie_impexp_check(void *arg)
1619 {
1620         int rc;
1621
1622         spin_lock(&obd_zombie_impexp_lock);
1623         rc = (zombies_count == 0) &&
1624              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1625         spin_unlock(&obd_zombie_impexp_lock);
1626
1627         return rc;
1628 }
1629
1630 /**
1631  * Add export to the obd_zombie thread and notify it.
1632  */
1633 static void obd_zombie_export_add(struct obd_export *exp) {
1634         spin_lock(&exp->exp_obd->obd_dev_lock);
1635         LASSERT(!list_empty(&exp->exp_obd_chain));
1636         list_del_init(&exp->exp_obd_chain);
1637         spin_unlock(&exp->exp_obd->obd_dev_lock);
1638         spin_lock(&obd_zombie_impexp_lock);
1639         zombies_count++;
1640         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1641         spin_unlock(&obd_zombie_impexp_lock);
1642
1643         obd_zombie_impexp_notify();
1644 }
1645
1646 /**
1647  * Add import to the obd_zombie thread and notify it.
1648  */
1649 static void obd_zombie_import_add(struct obd_import *imp) {
1650         LASSERT(imp->imp_sec == NULL);
1651         LASSERT(imp->imp_rq_pool == NULL);
1652         spin_lock(&obd_zombie_impexp_lock);
1653         LASSERT(list_empty(&imp->imp_zombie_chain));
1654         zombies_count++;
1655         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1656         spin_unlock(&obd_zombie_impexp_lock);
1657
1658         obd_zombie_impexp_notify();
1659 }
1660
1661 /**
1662  * notify import/export destroy thread about new zombie.
1663  */
1664 static void obd_zombie_impexp_notify(void)
1665 {
1666         /*
1667          * Make sure obd_zombie_impexp_thread get this notification.
1668          * It is possible this signal only get by obd_zombie_barrier, and
1669          * barrier gulps this notification and sleeps away and hangs ensues
1670          */
1671         wake_up_all(&obd_zombie_waitq);
1672 }
1673
1674 /**
1675  * check whether obd_zombie is idle
1676  */
1677 static int obd_zombie_is_idle(void)
1678 {
1679         int rc;
1680
1681         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1682         spin_lock(&obd_zombie_impexp_lock);
1683         rc = (zombies_count == 0);
1684         spin_unlock(&obd_zombie_impexp_lock);
1685         return rc;
1686 }
1687
1688 /**
1689  * wait when obd_zombie import/export queues become empty
1690  */
1691 void obd_zombie_barrier(void)
1692 {
1693         struct l_wait_info lwi = { 0 };
1694
1695         if (obd_zombie_pid == current_pid())
1696                 /* don't wait for myself */
1697                 return;
1698         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1699 }
1700 EXPORT_SYMBOL(obd_zombie_barrier);
1701
1702
1703 /**
1704  * destroy zombie export/import thread.
1705  */
1706 static int obd_zombie_impexp_thread(void *unused)
1707 {
1708         unshare_fs_struct();
1709         complete(&obd_zombie_start);
1710
1711         obd_zombie_pid = current_pid();
1712
1713         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1714                 struct l_wait_info lwi = { 0 };
1715
1716                 l_wait_event(obd_zombie_waitq,
1717                              !obd_zombie_impexp_check(NULL), &lwi);
1718                 obd_zombie_impexp_cull();
1719
1720                 /*
1721                  * Notify obd_zombie_barrier callers that queues
1722                  * may be empty.
1723                  */
1724                 wake_up(&obd_zombie_waitq);
1725         }
1726
1727         complete(&obd_zombie_stop);
1728
1729         return 0;
1730 }
1731
1732
1733 /**
1734  * start destroy zombie import/export thread
1735  */
1736 int obd_zombie_impexp_init(void)
1737 {
1738         struct task_struct *task;
1739
1740         INIT_LIST_HEAD(&obd_zombie_imports);
1741         INIT_LIST_HEAD(&obd_zombie_exports);
1742         spin_lock_init(&obd_zombie_impexp_lock);
1743         init_completion(&obd_zombie_start);
1744         init_completion(&obd_zombie_stop);
1745         init_waitqueue_head(&obd_zombie_waitq);
1746         obd_zombie_pid = 0;
1747
1748         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1749         if (IS_ERR(task))
1750                 return PTR_ERR(task);
1751
1752         wait_for_completion(&obd_zombie_start);
1753         return 0;
1754 }
1755 /**
1756  * stop destroy zombie import/export thread
1757  */
1758 void obd_zombie_impexp_stop(void)
1759 {
1760         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1761         obd_zombie_impexp_notify();
1762         wait_for_completion(&obd_zombie_stop);
1763 }
1764
1765 /***** Kernel-userspace comm helpers *******/
1766
1767 /* Get length of entire message, including header */
1768 int kuc_len(int payload_len)
1769 {
1770         return sizeof(struct kuc_hdr) + payload_len;
1771 }
1772 EXPORT_SYMBOL(kuc_len);
1773
1774 /* Get a pointer to kuc header, given a ptr to the payload
1775  * @param p Pointer to payload area
1776  * @returns Pointer to kuc header
1777  */
1778 struct kuc_hdr * kuc_ptr(void *p)
1779 {
1780         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1781         LASSERT(lh->kuc_magic == KUC_MAGIC);
1782         return lh;
1783 }
1784 EXPORT_SYMBOL(kuc_ptr);
1785
1786 /* Test if payload is part of kuc message
1787  * @param p Pointer to payload area
1788  * @returns boolean
1789  */
1790 int kuc_ispayload(void *p)
1791 {
1792         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1793
1794         if (kh->kuc_magic == KUC_MAGIC)
1795                 return 1;
1796         else
1797                 return 0;
1798 }
1799 EXPORT_SYMBOL(kuc_ispayload);
1800
1801 /* Alloc space for a message, and fill in header
1802  * @return Pointer to payload area
1803  */
1804 void *kuc_alloc(int payload_len, int transport, int type)
1805 {
1806         struct kuc_hdr *lh;
1807         int len = kuc_len(payload_len);
1808
1809         OBD_ALLOC(lh, len);
1810         if (lh == NULL)
1811                 return ERR_PTR(-ENOMEM);
1812
1813         lh->kuc_magic = KUC_MAGIC;
1814         lh->kuc_transport = transport;
1815         lh->kuc_msgtype = type;
1816         lh->kuc_msglen = len;
1817
1818         return (void *)(lh + 1);
1819 }
1820 EXPORT_SYMBOL(kuc_alloc);
1821
1822 /* Takes pointer to payload area */
1823 inline void kuc_free(void *p, int payload_len)
1824 {
1825         struct kuc_hdr *lh = kuc_ptr(p);
1826         OBD_FREE(lh, kuc_len(payload_len));
1827 }
1828 EXPORT_SYMBOL(kuc_free);