drivers: staging: lustre: Fix space required after that ',' errors
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/pagemap.h>
42 #include <linux/mm.h>
43 #include <asm/div64.h>
44 #include <linux/seq_file.h>
45 #include <linux/namei.h>
46 #include <asm/uaccess.h>
47
48 #include "../include/lustre/lustre_idl.h"
49 #include "../include/obd_support.h"
50 #include "../include/lustre_lib.h"
51 #include "../include/lustre_net.h"
52 #include "../include/obd_class.h"
53 #include "../include/lprocfs_status.h"
54 #include "../include/lustre_lite.h"
55 #include "../include/lustre_fid.h"
56 #include "lmv_internal.h"
57
58 static void lmv_activate_target(struct lmv_obd *lmv,
59                                 struct lmv_tgt_desc *tgt,
60                                 int activate)
61 {
62         if (tgt->ltd_active == activate)
63                 return;
64
65         tgt->ltd_active = activate;
66         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc    *uninitialized_var(tgt);
80         struct obd_device      *obd;
81         int                  i;
82         int                  rc = 0;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89                 tgt = lmv->tgts[i];
90                 if (tgt == NULL || tgt->ltd_exp == NULL)
91                         continue;
92
93                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
94                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95
96                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97                         break;
98         }
99
100         if (i == lmv->desc.ld_tgt_count)
101                 GOTO(out_lmv_lock, rc = -EINVAL);
102
103         obd = class_exp2obd(tgt->ltd_exp);
104         if (obd == NULL)
105                 GOTO(out_lmv_lock, rc = -ENOTCONN);
106
107         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
108                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
109                obd->obd_type->typ_name, i);
110         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
111
112         if (tgt->ltd_active == activate) {
113                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
114                        activate ? "" : "in");
115                 GOTO(out_lmv_lock, rc);
116         }
117
118         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
119                activate ? "" : "in");
120         lmv_activate_target(lmv, tgt, activate);
121
122  out_lmv_lock:
123         spin_unlock(&lmv->lmv_lock);
124         return rc;
125 }
126
127 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
128 {
129         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
130
131         return obd_get_uuid(lmv->tgts[0]->ltd_exp);
132 }
133
134 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
135                       enum obd_notify_event ev, void *data)
136 {
137         struct obd_connect_data *conn_data;
138         struct lmv_obd    *lmv = &obd->u.lmv;
139         struct obd_uuid  *uuid;
140         int                   rc = 0;
141
142         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
143                 CERROR("unexpected notification of %s %s!\n",
144                        watched->obd_type->typ_name,
145                        watched->obd_name);
146                 return -EINVAL;
147         }
148
149         uuid = &watched->u.cli.cl_target_uuid;
150         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
151                 /*
152                  * Set MDC as active before notifying the observer, so the
153                  * observer can use the MDC normally.
154                  */
155                 rc = lmv_set_mdc_active(lmv, uuid,
156                                         ev == OBD_NOTIFY_ACTIVE);
157                 if (rc) {
158                         CERROR("%sactivation of %s failed: %d\n",
159                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
160                                uuid->uuid, rc);
161                         return rc;
162                 }
163         } else if (ev == OBD_NOTIFY_OCD) {
164                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
165                 /*
166                  * XXX: Make sure that ocd_connect_flags from all targets are
167                  * the same. Otherwise one of MDTs runs wrong version or
168                  * something like this.  --umka
169                  */
170                 obd->obd_self_export->exp_connect_data = *conn_data;
171         }
172 #if 0
173         else if (ev == OBD_NOTIFY_DISCON) {
174                 /*
175                  * For disconnect event, flush fld cache for failout MDS case.
176                  */
177                 fld_client_flush(&lmv->lmv_fld);
178         }
179 #endif
180         /*
181          * Pass the notification up the chain.
182          */
183         if (obd->obd_observer)
184                 rc = obd_notify(obd->obd_observer, watched, ev, data);
185
186         return rc;
187 }
188
189 /**
190  * This is fake connect function. Its purpose is to initialize lmv and say
191  * caller that everything is okay. Real connection will be performed later.
192  */
193 static int lmv_connect(const struct lu_env *env,
194                        struct obd_export **exp, struct obd_device *obd,
195                        struct obd_uuid *cluuid, struct obd_connect_data *data,
196                        void *localdata)
197 {
198         struct proc_dir_entry *lmv_proc_dir;
199         struct lmv_obd  *lmv = &obd->u.lmv;
200         struct lustre_handle  conn = { 0 };
201         int                 rc = 0;
202
203         /*
204          * We don't want to actually do the underlying connections more than
205          * once, so keep track.
206          */
207         lmv->refcount++;
208         if (lmv->refcount > 1) {
209                 *exp = NULL;
210                 return 0;
211         }
212
213         rc = class_connect(&conn, obd, cluuid);
214         if (rc) {
215                 CERROR("class_connection() returned %d\n", rc);
216                 return rc;
217         }
218
219         *exp = class_conn2export(&conn);
220         class_export_get(*exp);
221
222         lmv->exp = *exp;
223         lmv->connected = 0;
224         lmv->cluuid = *cluuid;
225
226         if (data)
227                 lmv->conn_data = *data;
228
229         if (obd->obd_proc_private != NULL) {
230                 lmv_proc_dir = obd->obd_proc_private;
231         } else {
232                 lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
233                                                 NULL, NULL);
234                 if (IS_ERR(lmv_proc_dir)) {
235                         CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
236                                obd->obd_type->typ_name, obd->obd_name);
237                         lmv_proc_dir = NULL;
238                 }
239                 obd->obd_proc_private = lmv_proc_dir;
240         }
241
242         /*
243          * All real clients should perform actual connection right away, because
244          * it is possible, that LMV will not have opportunity to connect targets
245          * and MDC stuff will be called directly, for instance while reading
246          * ../mdc/../kbytesfree procfs file, etc.
247          */
248         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
249                 rc = lmv_check_connect(obd);
250
251         if (rc && lmv_proc_dir) {
252                 lprocfs_remove(&lmv_proc_dir);
253                 obd->obd_proc_private = NULL;
254         }
255
256         return rc;
257 }
258
259 static void lmv_set_timeouts(struct obd_device *obd)
260 {
261         struct lmv_tgt_desc   *tgt;
262         struct lmv_obd  *lmv;
263         int                 i;
264
265         lmv = &obd->u.lmv;
266         if (lmv->server_timeout == 0)
267                 return;
268
269         if (lmv->connected == 0)
270                 return;
271
272         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
273                 tgt = lmv->tgts[i];
274                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
275                         continue;
276
277                 obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
278                                    KEY_INTERMDS, 0, NULL, NULL);
279         }
280 }
281
282 static int lmv_init_ea_size(struct obd_export *exp, int easize,
283                             int def_easize, int cookiesize, int def_cookiesize)
284 {
285         struct obd_device   *obd = exp->exp_obd;
286         struct lmv_obd      *lmv = &obd->u.lmv;
287         int               i;
288         int               rc = 0;
289         int               change = 0;
290
291         if (lmv->max_easize < easize) {
292                 lmv->max_easize = easize;
293                 change = 1;
294         }
295         if (lmv->max_def_easize < def_easize) {
296                 lmv->max_def_easize = def_easize;
297                 change = 1;
298         }
299         if (lmv->max_cookiesize < cookiesize) {
300                 lmv->max_cookiesize = cookiesize;
301                 change = 1;
302         }
303         if (lmv->max_def_cookiesize < def_cookiesize) {
304                 lmv->max_def_cookiesize = def_cookiesize;
305                 change = 1;
306         }
307         if (change == 0)
308                 return 0;
309
310         if (lmv->connected == 0)
311                 return 0;
312
313         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
314                 if (lmv->tgts[i] == NULL ||
315                     lmv->tgts[i]->ltd_exp == NULL ||
316                     lmv->tgts[i]->ltd_active == 0) {
317                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
318                         continue;
319                 }
320
321                 rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
322                                      cookiesize, def_cookiesize);
323                 if (rc) {
324                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
325                                " rc = %d.\n", obd->obd_name, i, rc);
326                         break;
327                 }
328         }
329         return rc;
330 }
331
332 #define MAX_STRING_SIZE 128
333
334 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
335 {
336         struct proc_dir_entry   *lmv_proc_dir;
337         struct lmv_obd    *lmv = &obd->u.lmv;
338         struct obd_uuid  *cluuid = &lmv->cluuid;
339         struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
340         struct obd_device       *mdc_obd;
341         struct obd_export       *mdc_exp;
342         struct lu_fld_target     target;
343         int                   rc;
344
345         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
346                                         &obd->obd_uuid);
347         if (!mdc_obd) {
348                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
349                 return -EINVAL;
350         }
351
352         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
353                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
354                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
355                 cluuid->uuid);
356
357         if (!mdc_obd->obd_set_up) {
358                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
359                 return -EINVAL;
360         }
361
362         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
363                          &lmv->conn_data, NULL);
364         if (rc) {
365                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
366                 return rc;
367         }
368
369         /*
370          * Init fid sequence client for this mdc and add new fld target.
371          */
372         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
373         if (rc)
374                 return rc;
375
376         target.ft_srv = NULL;
377         target.ft_exp = mdc_exp;
378         target.ft_idx = tgt->ltd_idx;
379
380         fld_client_add_target(&lmv->lmv_fld, &target);
381
382         rc = obd_register_observer(mdc_obd, obd);
383         if (rc) {
384                 obd_disconnect(mdc_exp);
385                 CERROR("target %s register_observer error %d\n",
386                        tgt->ltd_uuid.uuid, rc);
387                 return rc;
388         }
389
390         if (obd->obd_observer) {
391                 /*
392                  * Tell the observer about the new target.
393                  */
394                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
395                                 OBD_NOTIFY_ACTIVE,
396                                 (void *)(tgt - lmv->tgts[0]));
397                 if (rc) {
398                         obd_disconnect(mdc_exp);
399                         return rc;
400                 }
401         }
402
403         tgt->ltd_active = 1;
404         tgt->ltd_exp = mdc_exp;
405         lmv->desc.ld_active_tgt_count++;
406
407         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize,
408                         lmv->max_cookiesize, lmv->max_def_cookiesize);
409
410         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
411                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
412                 atomic_read(&obd->obd_refcount));
413
414         lmv_proc_dir = obd->obd_proc_private;
415         if (lmv_proc_dir) {
416                 struct proc_dir_entry *mdc_symlink;
417
418                 LASSERT(mdc_obd->obd_type != NULL);
419                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
420                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
421                                                   lmv_proc_dir,
422                                                   "../../../%s/%s",
423                                                   mdc_obd->obd_type->typ_name,
424                                                   mdc_obd->obd_name);
425                 if (mdc_symlink == NULL) {
426                         CERROR("Could not register LMV target "
427                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
428                                obd->obd_type->typ_name, obd->obd_name,
429                                mdc_obd->obd_name);
430                         lprocfs_remove(&lmv_proc_dir);
431                         obd->obd_proc_private = NULL;
432                 }
433         }
434         return 0;
435 }
436
437 static void lmv_del_target(struct lmv_obd *lmv, int index)
438 {
439         if (lmv->tgts[index] == NULL)
440                 return;
441
442         OBD_FREE_PTR(lmv->tgts[index]);
443         lmv->tgts[index] = NULL;
444         return;
445 }
446
447 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
448                            __u32 index, int gen)
449 {
450         struct lmv_obd      *lmv = &obd->u.lmv;
451         struct lmv_tgt_desc *tgt;
452         int               rc = 0;
453
454         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
455
456         lmv_init_lock(lmv);
457
458         if (lmv->desc.ld_tgt_count == 0) {
459                 struct obd_device *mdc_obd;
460
461                 mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
462                                                 &obd->obd_uuid);
463                 if (!mdc_obd) {
464                         lmv_init_unlock(lmv);
465                         CERROR("%s: Target %s not attached: rc = %d\n",
466                                obd->obd_name, uuidp->uuid, -EINVAL);
467                         return -EINVAL;
468                 }
469         }
470
471         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
472                 tgt = lmv->tgts[index];
473                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
474                        " rc = %d\n", obd->obd_name,
475                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
476                 lmv_init_unlock(lmv);
477                 return -EEXIST;
478         }
479
480         if (index >= lmv->tgts_size) {
481                 /* We need to reallocate the lmv target array. */
482                 struct lmv_tgt_desc **newtgts, **old = NULL;
483                 __u32 newsize = 1;
484                 __u32 oldsize = 0;
485
486                 while (newsize < index + 1)
487                         newsize = newsize << 1;
488                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
489                 if (newtgts == NULL) {
490                         lmv_init_unlock(lmv);
491                         return -ENOMEM;
492                 }
493
494                 if (lmv->tgts_size) {
495                         memcpy(newtgts, lmv->tgts,
496                                sizeof(*newtgts) * lmv->tgts_size);
497                         old = lmv->tgts;
498                         oldsize = lmv->tgts_size;
499                 }
500
501                 lmv->tgts = newtgts;
502                 lmv->tgts_size = newsize;
503                 smp_rmb();
504                 if (old)
505                         OBD_FREE(old, sizeof(*old) * oldsize);
506
507                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
508                        lmv->tgts_size);
509         }
510
511         OBD_ALLOC_PTR(tgt);
512         if (!tgt) {
513                 lmv_init_unlock(lmv);
514                 return -ENOMEM;
515         }
516
517         mutex_init(&tgt->ltd_fid_mutex);
518         tgt->ltd_idx = index;
519         tgt->ltd_uuid = *uuidp;
520         tgt->ltd_active = 0;
521         lmv->tgts[index] = tgt;
522         if (index >= lmv->desc.ld_tgt_count)
523                 lmv->desc.ld_tgt_count = index + 1;
524
525         if (lmv->connected) {
526                 rc = lmv_connect_mdc(obd, tgt);
527                 if (rc) {
528                         spin_lock(&lmv->lmv_lock);
529                         lmv->desc.ld_tgt_count--;
530                         memset(tgt, 0, sizeof(*tgt));
531                         spin_unlock(&lmv->lmv_lock);
532                 } else {
533                         int easize = sizeof(struct lmv_stripe_md) +
534                                 lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
535                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
536                 }
537         }
538
539         lmv_init_unlock(lmv);
540         return rc;
541 }
542
543 int lmv_check_connect(struct obd_device *obd)
544 {
545         struct lmv_obd       *lmv = &obd->u.lmv;
546         struct lmv_tgt_desc  *tgt;
547         int                i;
548         int                rc;
549         int                easize;
550
551         if (lmv->connected)
552                 return 0;
553
554         lmv_init_lock(lmv);
555         if (lmv->connected) {
556                 lmv_init_unlock(lmv);
557                 return 0;
558         }
559
560         if (lmv->desc.ld_tgt_count == 0) {
561                 lmv_init_unlock(lmv);
562                 CERROR("%s: no targets configured.\n", obd->obd_name);
563                 return -EINVAL;
564         }
565
566         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
567                lmv->cluuid.uuid, obd->obd_name);
568
569         LASSERT(lmv->tgts != NULL);
570
571         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
572                 tgt = lmv->tgts[i];
573                 if (tgt == NULL)
574                         continue;
575                 rc = lmv_connect_mdc(obd, tgt);
576                 if (rc)
577                         GOTO(out_disc, rc);
578         }
579
580         lmv_set_timeouts(obd);
581         class_export_put(lmv->exp);
582         lmv->connected = 1;
583         easize = lmv_get_easize(lmv);
584         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0, 0);
585         lmv_init_unlock(lmv);
586         return 0;
587
588  out_disc:
589         while (i-- > 0) {
590                 int rc2;
591                 tgt = lmv->tgts[i];
592                 if (tgt == NULL)
593                         continue;
594                 tgt->ltd_active = 0;
595                 if (tgt->ltd_exp) {
596                         --lmv->desc.ld_active_tgt_count;
597                         rc2 = obd_disconnect(tgt->ltd_exp);
598                         if (rc2) {
599                                 CERROR("LMV target %s disconnect on "
600                                        "MDC idx %d: error %d\n",
601                                        tgt->ltd_uuid.uuid, i, rc2);
602                         }
603                 }
604         }
605         class_disconnect(lmv->exp);
606         lmv_init_unlock(lmv);
607         return rc;
608 }
609
610 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
611 {
612         struct proc_dir_entry  *lmv_proc_dir;
613         struct lmv_obd   *lmv = &obd->u.lmv;
614         struct obd_device      *mdc_obd;
615         int                  rc;
616
617         LASSERT(tgt != NULL);
618         LASSERT(obd != NULL);
619
620         mdc_obd = class_exp2obd(tgt->ltd_exp);
621
622         if (mdc_obd) {
623                 mdc_obd->obd_force = obd->obd_force;
624                 mdc_obd->obd_fail = obd->obd_fail;
625                 mdc_obd->obd_no_recov = obd->obd_no_recov;
626         }
627
628         lmv_proc_dir = obd->obd_proc_private;
629         if (lmv_proc_dir)
630                 lprocfs_remove_proc_entry(mdc_obd->obd_name, lmv_proc_dir);
631
632         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
633         if (rc)
634                 CERROR("Can't finalize fids factory\n");
635
636         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
637                tgt->ltd_exp->exp_obd->obd_name,
638                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
639
640         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
641         rc = obd_disconnect(tgt->ltd_exp);
642         if (rc) {
643                 if (tgt->ltd_active) {
644                         CERROR("Target %s disconnect error %d\n",
645                                tgt->ltd_uuid.uuid, rc);
646                 }
647         }
648
649         lmv_activate_target(lmv, tgt, 0);
650         tgt->ltd_exp = NULL;
651         return 0;
652 }
653
654 static int lmv_disconnect(struct obd_export *exp)
655 {
656         struct obd_device     *obd = class_exp2obd(exp);
657         struct lmv_obd  *lmv = &obd->u.lmv;
658         int                 rc;
659         int                 i;
660
661         if (!lmv->tgts)
662                 goto out_local;
663
664         /*
665          * Only disconnect the underlying layers on the final disconnect.
666          */
667         lmv->refcount--;
668         if (lmv->refcount != 0)
669                 goto out_local;
670
671         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
672                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
673                         continue;
674
675                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
676         }
677
678         if (obd->obd_proc_private)
679                 lprocfs_remove((struct proc_dir_entry **)&obd->obd_proc_private);
680         else
681                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
682                        obd->obd_type->typ_name, obd->obd_name);
683
684 out_local:
685         /*
686          * This is the case when no real connection is established by
687          * lmv_check_connect().
688          */
689         if (!lmv->connected)
690                 class_export_put(exp);
691         rc = class_disconnect(exp);
692         if (lmv->refcount == 0)
693                 lmv->connected = 0;
694         return rc;
695 }
696
697 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
698 {
699         struct obd_device       *obddev = class_exp2obd(exp);
700         struct lmv_obd          *lmv = &obddev->u.lmv;
701         struct getinfo_fid2path *gf;
702         struct lmv_tgt_desc     *tgt;
703         struct getinfo_fid2path *remote_gf = NULL;
704         int                     remote_gf_size = 0;
705         int                     rc;
706
707         gf = (struct getinfo_fid2path *)karg;
708         tgt = lmv_find_target(lmv, &gf->gf_fid);
709         if (IS_ERR(tgt))
710                 return PTR_ERR(tgt);
711
712 repeat_fid2path:
713         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
714         if (rc != 0 && rc != -EREMOTE)
715                 GOTO(out_fid2path, rc);
716
717         /* If remote_gf != NULL, it means just building the
718          * path on the remote MDT, copy this path segment to gf */
719         if (remote_gf != NULL) {
720                 struct getinfo_fid2path *ori_gf;
721                 char *ptr;
722
723                 ori_gf = (struct getinfo_fid2path *)karg;
724                 if (strlen(ori_gf->gf_path) +
725                     strlen(gf->gf_path) > ori_gf->gf_pathlen)
726                         GOTO(out_fid2path, rc = -EOVERFLOW);
727
728                 ptr = ori_gf->gf_path;
729
730                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
731                         strlen(ori_gf->gf_path));
732
733                 strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
734                 ptr += strlen(gf->gf_path);
735                 *ptr = '/';
736         }
737
738         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
739                tgt->ltd_exp->exp_obd->obd_name,
740                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
741                gf->gf_linkno);
742
743         if (rc == 0)
744                 GOTO(out_fid2path, rc);
745
746         /* sigh, has to go to another MDT to do path building further */
747         if (remote_gf == NULL) {
748                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
749                 OBD_ALLOC(remote_gf, remote_gf_size);
750                 if (remote_gf == NULL)
751                         GOTO(out_fid2path, rc = -ENOMEM);
752                 remote_gf->gf_pathlen = PATH_MAX;
753         }
754
755         if (!fid_is_sane(&gf->gf_fid)) {
756                 CERROR("%s: invalid FID "DFID": rc = %d\n",
757                        tgt->ltd_exp->exp_obd->obd_name,
758                        PFID(&gf->gf_fid), -EINVAL);
759                 GOTO(out_fid2path, rc = -EINVAL);
760         }
761
762         tgt = lmv_find_target(lmv, &gf->gf_fid);
763         if (IS_ERR(tgt))
764                 GOTO(out_fid2path, rc = -EINVAL);
765
766         remote_gf->gf_fid = gf->gf_fid;
767         remote_gf->gf_recno = -1;
768         remote_gf->gf_linkno = -1;
769         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
770         gf = remote_gf;
771         goto repeat_fid2path;
772
773 out_fid2path:
774         if (remote_gf != NULL)
775                 OBD_FREE(remote_gf, remote_gf_size);
776         return rc;
777 }
778
779 static int lmv_hsm_req_count(struct lmv_obd *lmv,
780                              const struct hsm_user_request *hur,
781                              const struct lmv_tgt_desc *tgt_mds)
782 {
783         int                     i, nr = 0;
784         struct lmv_tgt_desc    *curr_tgt;
785
786         /* count how many requests must be sent to the given target */
787         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
788                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
789                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
790                         nr++;
791         }
792         return nr;
793 }
794
795 static void lmv_hsm_req_build(struct lmv_obd *lmv,
796                               struct hsm_user_request *hur_in,
797                               const struct lmv_tgt_desc *tgt_mds,
798                               struct hsm_user_request *hur_out)
799 {
800         int                     i, nr_out;
801         struct lmv_tgt_desc    *curr_tgt;
802
803         /* build the hsm_user_request for the given target */
804         hur_out->hur_request = hur_in->hur_request;
805         nr_out = 0;
806         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
807                 curr_tgt = lmv_find_target(lmv,
808                                         &hur_in->hur_user_item[i].hui_fid);
809                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
810                         hur_out->hur_user_item[nr_out] =
811                                 hur_in->hur_user_item[i];
812                         nr_out++;
813                 }
814         }
815         hur_out->hur_request.hr_itemcount = nr_out;
816         memcpy(hur_data(hur_out), hur_data(hur_in),
817                hur_in->hur_request.hr_data_len);
818 }
819
820 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
821                                  struct lustre_kernelcomm *lk, void *uarg)
822 {
823         int     i, rc = 0;
824
825         /* unregister request (call from llapi_hsm_copytool_fini) */
826         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
827                 /* best effort: try to clean as much as possible
828                  * (continue on error) */
829                 obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
830         }
831
832         /* Whatever the result, remove copytool from kuc groups.
833          * Unreached coordinators will get EPIPE on next requests
834          * and will unregister automatically.
835          */
836         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
837         return rc;
838 }
839
840 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
841                                struct lustre_kernelcomm *lk, void *uarg)
842 {
843         struct file     *filp;
844         int              i, j, err;
845         int              rc = 0;
846         bool             any_set = false;
847
848         /* All or nothing: try to register to all MDS.
849          * In case of failure, unregister from previous MDS,
850          * except if it because of inactive target. */
851         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
852                 err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
853                                    len, lk, uarg);
854                 if (err) {
855                         if (lmv->tgts[i]->ltd_active) {
856                                 /* permanent error */
857                                 CERROR("error: iocontrol MDC %s on MDT"
858                                        "idx %d cmd %x: err = %d\n",
859                                         lmv->tgts[i]->ltd_uuid.uuid,
860                                         i, cmd, err);
861                                 rc = err;
862                                 lk->lk_flags |= LK_FLG_STOP;
863                                 /* unregister from previous MDS */
864                                 for (j = 0; j < i; j++)
865                                         obd_iocontrol(cmd,
866                                                   lmv->tgts[j]->ltd_exp,
867                                                   len, lk, uarg);
868                                 return rc;
869                         }
870                         /* else: transient error.
871                          * kuc will register to the missing MDT
872                          * when it is back */
873                 } else {
874                         any_set = true;
875                 }
876         }
877
878         if (!any_set)
879                 /* no registration done: return error */
880                 return -ENOTCONN;
881
882         /* at least one registration done, with no failure */
883         filp = fget(lk->lk_wfd);
884         if (filp == NULL) {
885                 return -EBADF;
886         }
887         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data);
888         if (rc != 0 && filp != NULL)
889                 fput(filp);
890         return rc;
891 }
892
893
894
895
896 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
897                          int len, void *karg, void *uarg)
898 {
899         struct obd_device    *obddev = class_exp2obd(exp);
900         struct lmv_obd       *lmv = &obddev->u.lmv;
901         int                i = 0;
902         int                rc = 0;
903         int                set = 0;
904         int                count = lmv->desc.ld_tgt_count;
905
906         if (count == 0)
907                 return -ENOTTY;
908
909         switch (cmd) {
910         case IOC_OBD_STATFS: {
911                 struct obd_ioctl_data *data = karg;
912                 struct obd_device *mdc_obd;
913                 struct obd_statfs stat_buf = {0};
914                 __u32 index;
915
916                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
917                 if ((index >= count))
918                         return -ENODEV;
919
920                 if (lmv->tgts[index] == NULL ||
921                     lmv->tgts[index]->ltd_active == 0)
922                         return -ENODATA;
923
924                 mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
925                 if (!mdc_obd)
926                         return -EINVAL;
927
928                 /* copy UUID */
929                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
930                                      min((int) data->ioc_plen2,
931                                          (int) sizeof(struct obd_uuid))))
932                         return -EFAULT;
933
934                 rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
935                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
936                                 0);
937                 if (rc)
938                         return rc;
939                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
940                                      min((int) data->ioc_plen1,
941                                          (int) sizeof(stat_buf))))
942                         return -EFAULT;
943                 break;
944         }
945         case OBD_IOC_QUOTACTL: {
946                 struct if_quotactl *qctl = karg;
947                 struct lmv_tgt_desc *tgt = NULL;
948                 struct obd_quotactl *oqctl;
949
950                 if (qctl->qc_valid == QC_MDTIDX) {
951                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
952                                 return -EINVAL;
953
954                         tgt = lmv->tgts[qctl->qc_idx];
955                         if (tgt == NULL || tgt->ltd_exp == NULL)
956                                 return -EINVAL;
957                 } else if (qctl->qc_valid == QC_UUID) {
958                         for (i = 0; i < count; i++) {
959                                 tgt = lmv->tgts[i];
960                                 if (tgt == NULL)
961                                         continue;
962                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
963                                                      &qctl->obd_uuid))
964                                         continue;
965
966                                 if (tgt->ltd_exp == NULL)
967                                         return -EINVAL;
968
969                                 break;
970                         }
971                 } else {
972                         return -EINVAL;
973                 }
974
975                 if (i >= count)
976                         return -EAGAIN;
977
978                 LASSERT(tgt && tgt->ltd_exp);
979                 OBD_ALLOC_PTR(oqctl);
980                 if (!oqctl)
981                         return -ENOMEM;
982
983                 QCTL_COPY(oqctl, qctl);
984                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
985                 if (rc == 0) {
986                         QCTL_COPY(qctl, oqctl);
987                         qctl->qc_valid = QC_MDTIDX;
988                         qctl->obd_uuid = tgt->ltd_uuid;
989                 }
990                 OBD_FREE_PTR(oqctl);
991                 break;
992         }
993         case OBD_IOC_CHANGELOG_SEND:
994         case OBD_IOC_CHANGELOG_CLEAR: {
995                 struct ioc_changelog *icc = karg;
996
997                 if (icc->icc_mdtindex >= count)
998                         return -ENODEV;
999
1000                 if (lmv->tgts[icc->icc_mdtindex] == NULL ||
1001                     lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
1002                     lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
1003                         return -ENODEV;
1004                 rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
1005                                    sizeof(*icc), icc, NULL);
1006                 break;
1007         }
1008         case LL_IOC_GET_CONNECT_FLAGS: {
1009                 if (lmv->tgts[0] == NULL)
1010                         return -ENODATA;
1011                 rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
1012                 break;
1013         }
1014         case OBD_IOC_FID2PATH: {
1015                 rc = lmv_fid2path(exp, len, karg, uarg);
1016                 break;
1017         }
1018         case LL_IOC_HSM_STATE_GET:
1019         case LL_IOC_HSM_STATE_SET:
1020         case LL_IOC_HSM_ACTION: {
1021                 struct md_op_data       *op_data = karg;
1022                 struct lmv_tgt_desc     *tgt;
1023
1024                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1025                 if (IS_ERR(tgt))
1026                                 return PTR_ERR(tgt);
1027
1028                 if (tgt->ltd_exp == NULL)
1029                                 return -EINVAL;
1030
1031                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1032                 break;
1033         }
1034         case LL_IOC_HSM_PROGRESS: {
1035                 const struct hsm_progress_kernel *hpk = karg;
1036                 struct lmv_tgt_desc     *tgt;
1037
1038                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1039                 if (IS_ERR(tgt))
1040                         return PTR_ERR(tgt);
1041                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1042                 break;
1043         }
1044         case LL_IOC_HSM_REQUEST: {
1045                 struct hsm_user_request *hur = karg;
1046                 struct lmv_tgt_desc     *tgt;
1047                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1048
1049                 if (reqcount == 0)
1050                         return 0;
1051
1052                 /* if the request is about a single fid
1053                  * or if there is a single MDS, no need to split
1054                  * the request. */
1055                 if (reqcount == 1 || count == 1) {
1056                         tgt = lmv_find_target(lmv,
1057                                               &hur->hur_user_item[0].hui_fid);
1058                         if (IS_ERR(tgt))
1059                                 return PTR_ERR(tgt);
1060                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1061                 } else {
1062                         /* split fid list to their respective MDS */
1063                         for (i = 0; i < count; i++) {
1064                                 unsigned int            nr, reqlen;
1065                                 int                     rc1;
1066                                 struct hsm_user_request *req;
1067
1068                                 nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1069                                 if (nr == 0) /* nothing for this MDS */
1070                                         continue;
1071
1072                                 /* build a request with fids for this MDS */
1073                                 reqlen = offsetof(typeof(*hur),
1074                                                   hur_user_item[nr])
1075                                          + hur->hur_request.hr_data_len;
1076                                 OBD_ALLOC_LARGE(req, reqlen);
1077                                 if (req == NULL)
1078                                         return -ENOMEM;
1079
1080                                 lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1081
1082                                 rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1083                                                     reqlen, req, uarg);
1084                                 if (rc1 != 0 && rc == 0)
1085                                         rc = rc1;
1086                                 OBD_FREE_LARGE(req, reqlen);
1087                         }
1088                 }
1089                 break;
1090         }
1091         case LL_IOC_LOV_SWAP_LAYOUTS: {
1092                 struct md_op_data       *op_data = karg;
1093                 struct lmv_tgt_desc     *tgt1, *tgt2;
1094
1095                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1096                 if (IS_ERR(tgt1))
1097                         return PTR_ERR(tgt1);
1098
1099                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1100                 if (IS_ERR(tgt2))
1101                         return PTR_ERR(tgt2);
1102
1103                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1104                         return -EINVAL;
1105
1106                 /* only files on same MDT can have their layouts swapped */
1107                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1108                         return -EPERM;
1109
1110                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1111                 break;
1112         }
1113         case LL_IOC_HSM_CT_START: {
1114                 struct lustre_kernelcomm *lk = karg;
1115                 if (lk->lk_flags & LK_FLG_STOP)
1116                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1117                 else
1118                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1119                 break;
1120         }
1121         default:
1122                 for (i = 0; i < count; i++) {
1123                         struct obd_device *mdc_obd;
1124                         int err;
1125
1126                         if (lmv->tgts[i] == NULL ||
1127                             lmv->tgts[i]->ltd_exp == NULL)
1128                                 continue;
1129                         /* ll_umount_begin() sets force flag but for lmv, not
1130                          * mdc. Let's pass it through */
1131                         mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1132                         mdc_obd->obd_force = obddev->obd_force;
1133                         err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1134                                             karg, uarg);
1135                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1136                                 return err;
1137                         } else if (err) {
1138                                 if (lmv->tgts[i]->ltd_active) {
1139                                         CERROR("error: iocontrol MDC %s on MDT"
1140                                                "idx %d cmd %x: err = %d\n",
1141                                                 lmv->tgts[i]->ltd_uuid.uuid,
1142                                                 i, cmd, err);
1143                                         if (!rc)
1144                                                 rc = err;
1145                                 }
1146                         } else
1147                                 set = 1;
1148                 }
1149                 if (!set && !rc)
1150                         rc = -EIO;
1151         }
1152         return rc;
1153 }
1154
1155 #if 0
1156 static int lmv_all_chars_policy(int count, const char *name,
1157                                 int len)
1158 {
1159         unsigned int c = 0;
1160
1161         while (len > 0)
1162                 c += name[--len];
1163         c = c % count;
1164         return c;
1165 }
1166
1167 static int lmv_nid_policy(struct lmv_obd *lmv)
1168 {
1169         struct obd_import *imp;
1170         __u32         id;
1171
1172         /*
1173          * XXX: To get nid we assume that underlying obd device is mdc.
1174          */
1175         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1176         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1177         return id % lmv->desc.ld_tgt_count;
1178 }
1179
1180 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1181                           placement_policy_t placement)
1182 {
1183         switch (placement) {
1184         case PLACEMENT_CHAR_POLICY:
1185                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1186                                             op_data->op_name,
1187                                             op_data->op_namelen);
1188         case PLACEMENT_NID_POLICY:
1189                 return lmv_nid_policy(lmv);
1190
1191         default:
1192                 break;
1193         }
1194
1195         CERROR("Unsupported placement policy %x\n", placement);
1196         return -EINVAL;
1197 }
1198 #endif
1199
1200 /**
1201  * This is _inode_ placement policy function (not name).
1202  */
1203 static int lmv_placement_policy(struct obd_device *obd,
1204                                 struct md_op_data *op_data, u32 *mds)
1205 {
1206         struct lmv_obd    *lmv = &obd->u.lmv;
1207
1208         LASSERT(mds != NULL);
1209
1210         if (lmv->desc.ld_tgt_count == 1) {
1211                 *mds = 0;
1212                 return 0;
1213         }
1214
1215         /**
1216          * If stripe_offset is provided during setdirstripe
1217          * (setdirstripe -i xx), xx MDS will be chosen.
1218          */
1219         if (op_data->op_cli_flags & CLI_SET_MEA) {
1220                 struct lmv_user_md *lum;
1221
1222                 lum = (struct lmv_user_md *)op_data->op_data;
1223                 if (lum->lum_type == LMV_STRIPE_TYPE &&
1224                     lum->lum_stripe_offset != -1) {
1225                         if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1226                                 CERROR("%s: Stripe_offset %d > MDT count %d:"
1227                                        " rc = %d\n", obd->obd_name,
1228                                        lum->lum_stripe_offset,
1229                                        lmv->desc.ld_tgt_count, -ERANGE);
1230                                 return -ERANGE;
1231                         }
1232                         *mds = lum->lum_stripe_offset;
1233                         return 0;
1234                 }
1235         }
1236
1237         /* Allocate new fid on target according to operation type and parent
1238          * home mds. */
1239         *mds = op_data->op_mds;
1240         return 0;
1241 }
1242
1243 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1244 {
1245         struct lmv_tgt_desc     *tgt;
1246         int                      rc;
1247
1248         tgt = lmv_get_target(lmv, mds);
1249         if (IS_ERR(tgt))
1250                 return PTR_ERR(tgt);
1251
1252         /*
1253          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1254          * on server that seq in new allocated fid is not yet known.
1255          */
1256         mutex_lock(&tgt->ltd_fid_mutex);
1257
1258         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1259                 GOTO(out, rc = -ENODEV);
1260
1261         /*
1262          * Asking underlaying tgt layer to allocate new fid.
1263          */
1264         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1265         if (rc > 0) {
1266                 LASSERT(fid_is_sane(fid));
1267                 rc = 0;
1268         }
1269
1270 out:
1271         mutex_unlock(&tgt->ltd_fid_mutex);
1272         return rc;
1273 }
1274
1275 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1276                   struct md_op_data *op_data)
1277 {
1278         struct obd_device     *obd = class_exp2obd(exp);
1279         struct lmv_obd  *lmv = &obd->u.lmv;
1280         u32                    mds = 0;
1281         int                 rc;
1282
1283         LASSERT(op_data != NULL);
1284         LASSERT(fid != NULL);
1285
1286         rc = lmv_placement_policy(obd, op_data, &mds);
1287         if (rc) {
1288                 CERROR("Can't get target for allocating fid, "
1289                        "rc %d\n", rc);
1290                 return rc;
1291         }
1292
1293         rc = __lmv_fid_alloc(lmv, fid, mds);
1294         if (rc) {
1295                 CERROR("Can't alloc new fid, rc %d\n", rc);
1296                 return rc;
1297         }
1298
1299         return rc;
1300 }
1301
1302 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1303 {
1304         struct lmv_obd       *lmv = &obd->u.lmv;
1305         struct lprocfs_static_vars  lvars;
1306         struct lmv_desc     *desc;
1307         int                      rc;
1308
1309         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1310                 CERROR("LMV setup requires a descriptor\n");
1311                 return -EINVAL;
1312         }
1313
1314         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1315         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1316                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1317                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1318                 return -EINVAL;
1319         }
1320
1321         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1322         if (lmv->tgts == NULL)
1323                 return -ENOMEM;
1324         lmv->tgts_size = 32;
1325
1326         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1327         lmv->desc.ld_tgt_count = 0;
1328         lmv->desc.ld_active_tgt_count = 0;
1329         lmv->max_cookiesize = 0;
1330         lmv->max_def_easize = 0;
1331         lmv->max_easize = 0;
1332         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1333
1334         spin_lock_init(&lmv->lmv_lock);
1335         mutex_init(&lmv->init_mutex);
1336
1337         lprocfs_lmv_init_vars(&lvars);
1338
1339         lprocfs_obd_setup(obd, lvars.obd_vars);
1340 #if defined (CONFIG_PROC_FS)
1341         {
1342                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1343                                         0444, &lmv_proc_target_fops, obd);
1344                 if (rc)
1345                         CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1346                                obd->obd_name, rc);
1347        }
1348 #endif
1349         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1350                              LUSTRE_CLI_FLD_HASH_DHT);
1351         if (rc) {
1352                 CERROR("Can't init FLD, err %d\n", rc);
1353                 GOTO(out, rc);
1354         }
1355
1356         return 0;
1357
1358 out:
1359         return rc;
1360 }
1361
1362 static int lmv_cleanup(struct obd_device *obd)
1363 {
1364         struct lmv_obd   *lmv = &obd->u.lmv;
1365
1366         fld_client_fini(&lmv->lmv_fld);
1367         if (lmv->tgts != NULL) {
1368                 int i;
1369                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1370                         if (lmv->tgts[i] == NULL)
1371                                 continue;
1372                         lmv_del_target(lmv, i);
1373                 }
1374                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1375                 lmv->tgts_size = 0;
1376         }
1377         return 0;
1378 }
1379
1380 static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1381 {
1382         struct lustre_cfg       *lcfg = buf;
1383         struct obd_uuid         obd_uuid;
1384         int                     gen;
1385         __u32                   index;
1386         int                     rc;
1387
1388         switch (lcfg->lcfg_command) {
1389         case LCFG_ADD_MDC:
1390                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1391                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1392                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1393                         GOTO(out, rc = -EINVAL);
1394
1395                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1396
1397                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1398                         GOTO(out, rc = -EINVAL);
1399                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1400                         GOTO(out, rc = -EINVAL);
1401                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1402                 GOTO(out, rc);
1403         default:
1404                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1405                 GOTO(out, rc = -EINVAL);
1406         }
1407 out:
1408         return rc;
1409 }
1410
1411 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1412                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1413 {
1414         struct obd_device     *obd = class_exp2obd(exp);
1415         struct lmv_obd  *lmv = &obd->u.lmv;
1416         struct obd_statfs     *temp;
1417         int                 rc = 0;
1418         int                 i;
1419
1420         rc = lmv_check_connect(obd);
1421         if (rc)
1422                 return rc;
1423
1424         OBD_ALLOC(temp, sizeof(*temp));
1425         if (temp == NULL)
1426                 return -ENOMEM;
1427
1428         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1429                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1430                         continue;
1431
1432                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1433                                 max_age, flags);
1434                 if (rc) {
1435                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1436                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1437                                rc);
1438                         GOTO(out_free_temp, rc);
1439                 }
1440
1441                 if (i == 0) {
1442                         *osfs = *temp;
1443                         /* If the statfs is from mount, it will needs
1444                          * retrieve necessary information from MDT0.
1445                          * i.e. mount does not need the merged osfs
1446                          * from all of MDT.
1447                          * And also clients can be mounted as long as
1448                          * MDT0 is in service*/
1449                         if (flags & OBD_STATFS_FOR_MDT0)
1450                                 GOTO(out_free_temp, rc);
1451                 } else {
1452                         osfs->os_bavail += temp->os_bavail;
1453                         osfs->os_blocks += temp->os_blocks;
1454                         osfs->os_ffree += temp->os_ffree;
1455                         osfs->os_files += temp->os_files;
1456                 }
1457         }
1458
1459 out_free_temp:
1460         OBD_FREE(temp, sizeof(*temp));
1461         return rc;
1462 }
1463
1464 static int lmv_getstatus(struct obd_export *exp,
1465                          struct lu_fid *fid,
1466                          struct obd_capa **pc)
1467 {
1468         struct obd_device    *obd = exp->exp_obd;
1469         struct lmv_obd       *lmv = &obd->u.lmv;
1470         int                rc;
1471
1472         rc = lmv_check_connect(obd);
1473         if (rc)
1474                 return rc;
1475
1476         rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1477         return rc;
1478 }
1479
1480 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1481                         struct obd_capa *oc, u64 valid, const char *name,
1482                         const char *input, int input_size, int output_size,
1483                         int flags, struct ptlrpc_request **request)
1484 {
1485         struct obd_device      *obd = exp->exp_obd;
1486         struct lmv_obd   *lmv = &obd->u.lmv;
1487         struct lmv_tgt_desc    *tgt;
1488         int                  rc;
1489
1490         rc = lmv_check_connect(obd);
1491         if (rc)
1492                 return rc;
1493
1494         tgt = lmv_find_target(lmv, fid);
1495         if (IS_ERR(tgt))
1496                 return PTR_ERR(tgt);
1497
1498         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1499                          input_size, output_size, flags, request);
1500
1501         return rc;
1502 }
1503
1504 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1505                         struct obd_capa *oc, u64 valid, const char *name,
1506                         const char *input, int input_size, int output_size,
1507                         int flags, __u32 suppgid,
1508                         struct ptlrpc_request **request)
1509 {
1510         struct obd_device      *obd = exp->exp_obd;
1511         struct lmv_obd   *lmv = &obd->u.lmv;
1512         struct lmv_tgt_desc    *tgt;
1513         int                  rc;
1514
1515         rc = lmv_check_connect(obd);
1516         if (rc)
1517                 return rc;
1518
1519         tgt = lmv_find_target(lmv, fid);
1520         if (IS_ERR(tgt))
1521                 return PTR_ERR(tgt);
1522
1523         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1524                          input_size, output_size, flags, suppgid,
1525                          request);
1526
1527         return rc;
1528 }
1529
1530 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1531                        struct ptlrpc_request **request)
1532 {
1533         struct obd_device       *obd = exp->exp_obd;
1534         struct lmv_obd    *lmv = &obd->u.lmv;
1535         struct lmv_tgt_desc     *tgt;
1536         int                   rc;
1537
1538         rc = lmv_check_connect(obd);
1539         if (rc)
1540                 return rc;
1541
1542         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1543         if (IS_ERR(tgt))
1544                 return PTR_ERR(tgt);
1545
1546         if (op_data->op_flags & MF_GET_MDT_IDX) {
1547                 op_data->op_mds = tgt->ltd_idx;
1548                 return 0;
1549         }
1550
1551         rc = md_getattr(tgt->ltd_exp, op_data, request);
1552
1553         return rc;
1554 }
1555
1556 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1557 {
1558         struct obd_device   *obd = exp->exp_obd;
1559         struct lmv_obd      *lmv = &obd->u.lmv;
1560         int               i;
1561         int               rc;
1562
1563         rc = lmv_check_connect(obd);
1564         if (rc)
1565                 return rc;
1566
1567         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1568
1569         /*
1570          * With DNE every object can have two locks in different namespaces:
1571          * lookup lock in space of MDT storing direntry and update/open lock in
1572          * space of MDT storing inode.
1573          */
1574         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1575                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1576                         continue;
1577                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1578         }
1579
1580         return 0;
1581 }
1582
1583 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1584                            ldlm_iterator_t it, void *data)
1585 {
1586         struct obd_device   *obd = exp->exp_obd;
1587         struct lmv_obd      *lmv = &obd->u.lmv;
1588         int               i;
1589         int               rc;
1590
1591         rc = lmv_check_connect(obd);
1592         if (rc)
1593                 return rc;
1594
1595         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1596
1597         /*
1598          * With DNE every object can have two locks in different namespaces:
1599          * lookup lock in space of MDT storing direntry and update/open lock in
1600          * space of MDT storing inode.
1601          */
1602         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1603                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1604                         continue;
1605                 rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1606                 if (rc)
1607                         return rc;
1608         }
1609
1610         return rc;
1611 }
1612
1613
1614 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1615                      struct md_open_data *mod, struct ptlrpc_request **request)
1616 {
1617         struct obd_device     *obd = exp->exp_obd;
1618         struct lmv_obd  *lmv = &obd->u.lmv;
1619         struct lmv_tgt_desc   *tgt;
1620         int                 rc;
1621
1622         rc = lmv_check_connect(obd);
1623         if (rc)
1624                 return rc;
1625
1626         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1627         if (IS_ERR(tgt))
1628                 return PTR_ERR(tgt);
1629
1630         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1631         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1632         return rc;
1633 }
1634
1635 struct lmv_tgt_desc
1636 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1637                 struct lu_fid *fid)
1638 {
1639         struct lmv_tgt_desc *tgt;
1640
1641         tgt = lmv_find_target(lmv, fid);
1642         if (IS_ERR(tgt))
1643                 return tgt;
1644
1645         op_data->op_mds = tgt->ltd_idx;
1646
1647         return tgt;
1648 }
1649
1650 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1651                const void *data, int datalen, int mode, __u32 uid,
1652                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1653                struct ptlrpc_request **request)
1654 {
1655         struct obd_device       *obd = exp->exp_obd;
1656         struct lmv_obd    *lmv = &obd->u.lmv;
1657         struct lmv_tgt_desc     *tgt;
1658         int                   rc;
1659
1660         rc = lmv_check_connect(obd);
1661         if (rc)
1662                 return rc;
1663
1664         if (!lmv->desc.ld_active_tgt_count)
1665                 return -EIO;
1666
1667         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1668         if (IS_ERR(tgt))
1669                 return PTR_ERR(tgt);
1670
1671         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1672         if (rc)
1673                 return rc;
1674
1675         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1676                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1677                op_data->op_mds);
1678
1679         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1680         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1681                        cap_effective, rdev, request);
1682
1683         if (rc == 0) {
1684                 if (*request == NULL)
1685                         return rc;
1686                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1687         }
1688         return rc;
1689 }
1690
1691 static int lmv_done_writing(struct obd_export *exp,
1692                             struct md_op_data *op_data,
1693                             struct md_open_data *mod)
1694 {
1695         struct obd_device     *obd = exp->exp_obd;
1696         struct lmv_obd  *lmv = &obd->u.lmv;
1697         struct lmv_tgt_desc   *tgt;
1698         int                 rc;
1699
1700         rc = lmv_check_connect(obd);
1701         if (rc)
1702                 return rc;
1703
1704         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1705         if (IS_ERR(tgt))
1706                 return PTR_ERR(tgt);
1707
1708         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1709         return rc;
1710 }
1711
1712 static int
1713 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1714                    struct lookup_intent *it, struct md_op_data *op_data,
1715                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1716                    __u64 extra_lock_flags)
1717 {
1718         struct ptlrpc_request      *req = it->d.lustre.it_data;
1719         struct obd_device         *obd = exp->exp_obd;
1720         struct lmv_obd       *lmv = &obd->u.lmv;
1721         struct lustre_handle    plock;
1722         struct lmv_tgt_desc     *tgt;
1723         struct md_op_data         *rdata;
1724         struct lu_fid          fid1;
1725         struct mdt_body     *body;
1726         int                      rc = 0;
1727         int                      pmode;
1728
1729         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1730         LASSERT(body != NULL);
1731
1732         if (!(body->valid & OBD_MD_MDS))
1733                 return 0;
1734
1735         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1736                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1737
1738         /*
1739          * We got LOOKUP lock, but we really need attrs.
1740          */
1741         pmode = it->d.lustre.it_lock_mode;
1742         LASSERT(pmode != 0);
1743         memcpy(&plock, lockh, sizeof(plock));
1744         it->d.lustre.it_lock_mode = 0;
1745         it->d.lustre.it_data = NULL;
1746         fid1 = body->fid1;
1747
1748         ptlrpc_req_finished(req);
1749
1750         tgt = lmv_find_target(lmv, &fid1);
1751         if (IS_ERR(tgt))
1752                 GOTO(out, rc = PTR_ERR(tgt));
1753
1754         OBD_ALLOC_PTR(rdata);
1755         if (rdata == NULL)
1756                 GOTO(out, rc = -ENOMEM);
1757
1758         rdata->op_fid1 = fid1;
1759         rdata->op_bias = MDS_CROSS_REF;
1760
1761         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1762                         lmm, lmmsize, NULL, extra_lock_flags);
1763         OBD_FREE_PTR(rdata);
1764 out:
1765         ldlm_lock_decref(&plock, pmode);
1766         return rc;
1767 }
1768
1769 static int
1770 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1771             struct lookup_intent *it, struct md_op_data *op_data,
1772             struct lustre_handle *lockh, void *lmm, int lmmsize,
1773             struct ptlrpc_request **req, __u64 extra_lock_flags)
1774 {
1775         struct obd_device       *obd = exp->exp_obd;
1776         struct lmv_obd     *lmv = &obd->u.lmv;
1777         struct lmv_tgt_desc      *tgt;
1778         int                    rc;
1779
1780         rc = lmv_check_connect(obd);
1781         if (rc)
1782                 return rc;
1783
1784         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1785                LL_IT2STR(it), PFID(&op_data->op_fid1));
1786
1787         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1788         if (IS_ERR(tgt))
1789                 return PTR_ERR(tgt);
1790
1791         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1792                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1793
1794         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1795                         lmm, lmmsize, req, extra_lock_flags);
1796
1797         if (rc == 0 && it && it->it_op == IT_OPEN) {
1798                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1799                                         lmm, lmmsize, extra_lock_flags);
1800         }
1801         return rc;
1802 }
1803
1804 static int
1805 lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1806                  struct ptlrpc_request **request)
1807 {
1808         struct ptlrpc_request   *req = NULL;
1809         struct obd_device       *obd = exp->exp_obd;
1810         struct lmv_obd    *lmv = &obd->u.lmv;
1811         struct lmv_tgt_desc     *tgt;
1812         struct mdt_body  *body;
1813         int                   rc;
1814
1815         rc = lmv_check_connect(obd);
1816         if (rc)
1817                 return rc;
1818
1819         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1820         if (IS_ERR(tgt))
1821                 return PTR_ERR(tgt);
1822
1823         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1824                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1825                tgt->ltd_idx);
1826
1827         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1828         if (rc != 0)
1829                 return rc;
1830
1831         body = req_capsule_server_get(&(*request)->rq_pill,
1832                                       &RMF_MDT_BODY);
1833         LASSERT(body != NULL);
1834
1835         if (body->valid & OBD_MD_MDS) {
1836                 struct lu_fid rid = body->fid1;
1837                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1838                        PFID(&rid));
1839
1840                 tgt = lmv_find_target(lmv, &rid);
1841                 if (IS_ERR(tgt)) {
1842                         ptlrpc_req_finished(*request);
1843                         return PTR_ERR(tgt);
1844                 }
1845
1846                 op_data->op_fid1 = rid;
1847                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1848                 op_data->op_namelen = 0;
1849                 op_data->op_name = NULL;
1850                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1851                 ptlrpc_req_finished(*request);
1852                 *request = req;
1853         }
1854
1855         return rc;
1856 }
1857
1858 #define md_op_data_fid(op_data, fl)                  \
1859         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1860          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1861          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1862          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1863          NULL)
1864
1865 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1866                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1867 {
1868         struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1869         struct obd_device      *obd = exp->exp_obd;
1870         struct lmv_obd   *lmv = &obd->u.lmv;
1871         struct lmv_tgt_desc    *tgt;
1872         ldlm_policy_data_t      policy = {{0}};
1873         int                  rc = 0;
1874
1875         if (!fid_is_sane(fid))
1876                 return 0;
1877
1878         tgt = lmv_find_target(lmv, fid);
1879         if (IS_ERR(tgt))
1880                 return PTR_ERR(tgt);
1881
1882         if (tgt->ltd_idx != op_tgt) {
1883                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1884                 policy.l_inodebits.bits = bits;
1885                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1886                                       mode, LCF_ASYNC, NULL);
1887         } else {
1888                 CDEBUG(D_INODE,
1889                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1890                        op_tgt, PFID(fid));
1891                 op_data->op_flags |= flag;
1892                 rc = 0;
1893         }
1894
1895         return rc;
1896 }
1897
1898 /*
1899  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1900  * op_data->op_fid2
1901  */
1902 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1903                     struct ptlrpc_request **request)
1904 {
1905         struct obd_device       *obd = exp->exp_obd;
1906         struct lmv_obd    *lmv = &obd->u.lmv;
1907         struct lmv_tgt_desc     *tgt;
1908         int                   rc;
1909
1910         rc = lmv_check_connect(obd);
1911         if (rc)
1912                 return rc;
1913
1914         LASSERT(op_data->op_namelen != 0);
1915
1916         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1917                PFID(&op_data->op_fid2), op_data->op_namelen,
1918                op_data->op_name, PFID(&op_data->op_fid1));
1919
1920         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1921         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1922         op_data->op_cap = cfs_curproc_cap_pack();
1923         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1924         if (IS_ERR(tgt))
1925                 return PTR_ERR(tgt);
1926
1927         /*
1928          * Cancel UPDATE lock on child (fid1).
1929          */
1930         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1931         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1932                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1933         if (rc != 0)
1934                 return rc;
1935
1936         rc = md_link(tgt->ltd_exp, op_data, request);
1937
1938         return rc;
1939 }
1940
1941 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1942                       const char *old, int oldlen, const char *new, int newlen,
1943                       struct ptlrpc_request **request)
1944 {
1945         struct obd_device       *obd = exp->exp_obd;
1946         struct lmv_obd    *lmv = &obd->u.lmv;
1947         struct lmv_tgt_desc     *src_tgt;
1948         struct lmv_tgt_desc     *tgt_tgt;
1949         int                     rc;
1950
1951         LASSERT(oldlen != 0);
1952
1953         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1954                oldlen, old, PFID(&op_data->op_fid1),
1955                newlen, new, PFID(&op_data->op_fid2));
1956
1957         rc = lmv_check_connect(obd);
1958         if (rc)
1959                 return rc;
1960
1961         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1962         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1963         op_data->op_cap = cfs_curproc_cap_pack();
1964         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1965         if (IS_ERR(src_tgt))
1966                 return PTR_ERR(src_tgt);
1967
1968         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1969         if (IS_ERR(tgt_tgt))
1970                 return PTR_ERR(tgt_tgt);
1971         /*
1972          * LOOKUP lock on src child (fid3) should also be cancelled for
1973          * src_tgt in mdc_rename.
1974          */
1975         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1976
1977         /*
1978          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1979          * own target.
1980          */
1981         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1982                               LCK_EX, MDS_INODELOCK_UPDATE,
1983                               MF_MDC_CANCEL_FID2);
1984
1985         /*
1986          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1987          */
1988         if (rc == 0) {
1989                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1990                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1991                                       MF_MDC_CANCEL_FID4);
1992         }
1993
1994         /*
1995          * Cancel all the locks on tgt child (fid4).
1996          */
1997         if (rc == 0)
1998                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1999                                       LCK_EX, MDS_INODELOCK_FULL,
2000                                       MF_MDC_CANCEL_FID4);
2001
2002         if (rc == 0)
2003                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
2004                                new, newlen, request);
2005         return rc;
2006 }
2007
2008 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2009                        void *ea, int ealen, void *ea2, int ea2len,
2010                        struct ptlrpc_request **request,
2011                        struct md_open_data **mod)
2012 {
2013         struct obd_device       *obd = exp->exp_obd;
2014         struct lmv_obd    *lmv = &obd->u.lmv;
2015         struct lmv_tgt_desc     *tgt;
2016         int                   rc = 0;
2017
2018         rc = lmv_check_connect(obd);
2019         if (rc)
2020                 return rc;
2021
2022         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2023                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2024
2025         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2026         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2027         if (IS_ERR(tgt))
2028                 return PTR_ERR(tgt);
2029
2030         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2031                         ea2len, request, mod);
2032
2033         return rc;
2034 }
2035
2036 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2037                     struct obd_capa *oc, struct ptlrpc_request **request)
2038 {
2039         struct obd_device        *obd = exp->exp_obd;
2040         struct lmv_obd      *lmv = &obd->u.lmv;
2041         struct lmv_tgt_desc       *tgt;
2042         int                     rc;
2043
2044         rc = lmv_check_connect(obd);
2045         if (rc)
2046                 return rc;
2047
2048         tgt = lmv_find_target(lmv, fid);
2049         if (IS_ERR(tgt))
2050                 return PTR_ERR(tgt);
2051
2052         rc = md_sync(tgt->ltd_exp, fid, oc, request);
2053         return rc;
2054 }
2055
2056 /*
2057  * Adjust a set of pages, each page containing an array of lu_dirpages,
2058  * so that each page can be used as a single logical lu_dirpage.
2059  *
2060  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2061  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2062  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2063  * value is used as a cookie to request the next lu_dirpage in a
2064  * directory listing that spans multiple pages (two in this example):
2065  *   ________
2066  *  |   |
2067  * .|--------v-------   -----.
2068  * |s|e|f|p|ent|ent| ... |ent|
2069  * '--|--------------   -----'   Each CFS_PAGE contains a single
2070  *    '------.             lu_dirpage.
2071  * .---------v-------   -----.
2072  * |s|e|f|p|ent| 0 | ... | 0 |
2073  * '-----------------   -----'
2074  *
2075  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2076  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2077  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2078  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2079  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2080  * in general e0==s1, e1==s2, etc.):
2081  *
2082  * .--------------------   -----.
2083  * |s0|e0|f0|p|ent|ent| ... |ent|
2084  * |---v----------------   -----|
2085  * |s1|e1|f1|p|ent|ent| ... |ent|
2086  * |---v----------------   -----|  Here, each CFS_PAGE contains
2087  *           ...                 multiple lu_dirpages.
2088  * |---v----------------   -----|
2089  * |s'|e'|f'|p|ent|ent| ... |ent|
2090  * '---|----------------   -----'
2091  *     v
2092  * .----------------------------.
2093  * |    next CFS_PAGE       |
2094  *
2095  * This structure is transformed into a single logical lu_dirpage as follows:
2096  *
2097  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2098  *   labeled 'next CFS_PAGE'.
2099  *
2100  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2101  *   a hash collision with the next page exists.
2102  *
2103  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2104  *   to the first entry of the next lu_dirpage.
2105  */
2106 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
2107 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2108 {
2109         int i;
2110
2111         for (i = 0; i < ncfspgs; i++) {
2112                 struct lu_dirpage       *dp = kmap(pages[i]);
2113                 struct lu_dirpage       *first = dp;
2114                 struct lu_dirent        *end_dirent = NULL;
2115                 struct lu_dirent        *ent;
2116                 __u64                   hash_end = dp->ldp_hash_end;
2117                 __u32                   flags = dp->ldp_flags;
2118
2119                 while (--nlupgs > 0) {
2120                         ent = lu_dirent_start(dp);
2121                         for (end_dirent = ent; ent != NULL;
2122                              end_dirent = ent, ent = lu_dirent_next(ent));
2123
2124                         /* Advance dp to next lu_dirpage. */
2125                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2126
2127                         /* Check if we've reached the end of the CFS_PAGE. */
2128                         if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2129                                 break;
2130
2131                         /* Save the hash and flags of this lu_dirpage. */
2132                         hash_end = dp->ldp_hash_end;
2133                         flags = dp->ldp_flags;
2134
2135                         /* Check if lu_dirpage contains no entries. */
2136                         if (!end_dirent)
2137                                 break;
2138
2139                         /* Enlarge the end entry lde_reclen from 0 to
2140                          * first entry of next lu_dirpage. */
2141                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2142                         end_dirent->lde_reclen =
2143                                 cpu_to_le16((char *)(dp->ldp_entries) -
2144                                             (char *)end_dirent);
2145                 }
2146
2147                 first->ldp_hash_end = hash_end;
2148                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2149                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2150
2151                 kunmap(pages[i]);
2152         }
2153         LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2154 }
2155 #else
2156 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2157 #endif  /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2158
2159 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2160                         struct page **pages, struct ptlrpc_request **request)
2161 {
2162         struct obd_device       *obd = exp->exp_obd;
2163         struct lmv_obd          *lmv = &obd->u.lmv;
2164         __u64                   offset = op_data->op_offset;
2165         int                     rc;
2166         int                     ncfspgs; /* pages read in PAGE_CACHE_SIZE */
2167         int                     nlupgs; /* pages read in LU_PAGE_SIZE */
2168         struct lmv_tgt_desc     *tgt;
2169
2170         rc = lmv_check_connect(obd);
2171         if (rc)
2172                 return rc;
2173
2174         CDEBUG(D_INODE, "READPAGE at %#llx from "DFID"\n",
2175                offset, PFID(&op_data->op_fid1));
2176
2177         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2178         if (IS_ERR(tgt))
2179                 return PTR_ERR(tgt);
2180
2181         rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2182         if (rc != 0)
2183                 return rc;
2184
2185         ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_CACHE_SIZE - 1)
2186                  >> PAGE_CACHE_SHIFT;
2187         nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2188         LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2189         LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2190
2191         CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2192                op_data->op_npages);
2193
2194         lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2195
2196         return rc;
2197 }
2198
2199 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2200                       struct ptlrpc_request **request)
2201 {
2202         struct obd_device       *obd = exp->exp_obd;
2203         struct lmv_obd    *lmv = &obd->u.lmv;
2204         struct lmv_tgt_desc     *tgt = NULL;
2205         struct mdt_body         *body;
2206         int                  rc;
2207
2208         rc = lmv_check_connect(obd);
2209         if (rc)
2210                 return rc;
2211 retry:
2212         /* Send unlink requests to the MDT where the child is located */
2213         if (likely(!fid_is_zero(&op_data->op_fid2)))
2214                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2215         else
2216                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2217         if (IS_ERR(tgt))
2218                 return PTR_ERR(tgt);
2219
2220         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2221         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2222         op_data->op_cap = cfs_curproc_cap_pack();
2223
2224         /*
2225          * If child's fid is given, cancel unused locks for it if it is from
2226          * another export than parent.
2227          *
2228          * LOOKUP lock for child (fid3) should also be cancelled on parent
2229          * tgt_tgt in mdc_unlink().
2230          */
2231         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2232
2233         /*
2234          * Cancel FULL locks on child (fid3).
2235          */
2236         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2237                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2238
2239         if (rc != 0)
2240                 return rc;
2241
2242         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2243                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2244
2245         rc = md_unlink(tgt->ltd_exp, op_data, request);
2246         if (rc != 0 && rc != -EREMOTE)
2247                 return rc;
2248
2249         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2250         if (body == NULL)
2251                 return -EPROTO;
2252
2253         /* Not cross-ref case, just get out of here. */
2254         if (likely(!(body->valid & OBD_MD_MDS)))
2255                 return 0;
2256
2257         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2258                exp->exp_obd->obd_name, PFID(&body->fid1));
2259
2260         /* This is a remote object, try remote MDT, Note: it may
2261          * try more than 1 time here, Considering following case
2262          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2263          * 1. Initially A does not know where remote1 is, it send
2264          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2265          *    resend unlink RPC to MDT1 (retry 1st time).
2266          *
2267          * 2. During the unlink RPC in flight,
2268          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2269          *    and create new remote1, but on MDT0
2270          *
2271          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2272          *    /mnt/lustre, then lookup get fid of remote1, and find
2273          *    it is remote dir again, and replay -EREMOTE again.
2274          *
2275          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2276          *
2277          * In theory, it might try unlimited time here, but it should
2278          * be very rare case.  */
2279         op_data->op_fid2 = body->fid1;
2280         ptlrpc_req_finished(*request);
2281         *request = NULL;
2282
2283         goto retry;
2284 }
2285
2286 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2287 {
2288         struct lmv_obd *lmv = &obd->u.lmv;
2289         int rc = 0;
2290
2291         switch (stage) {
2292         case OBD_CLEANUP_EARLY:
2293                 /* XXX: here should be calling obd_precleanup() down to
2294                  * stack. */
2295                 break;
2296         case OBD_CLEANUP_EXPORTS:
2297                 fld_client_proc_fini(&lmv->lmv_fld);
2298                 lprocfs_obd_cleanup(obd);
2299                 break;
2300         default:
2301                 break;
2302         }
2303         return rc;
2304 }
2305
2306 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2307                         __u32 keylen, void *key, __u32 *vallen, void *val,
2308                         struct lov_stripe_md *lsm)
2309 {
2310         struct obd_device       *obd;
2311         struct lmv_obd    *lmv;
2312         int                   rc = 0;
2313
2314         obd = class_exp2obd(exp);
2315         if (obd == NULL) {
2316                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2317                        exp->exp_handle.h_cookie);
2318                 return -EINVAL;
2319         }
2320
2321         lmv = &obd->u.lmv;
2322         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2323                 struct lmv_tgt_desc *tgt;
2324                 int i;
2325
2326                 rc = lmv_check_connect(obd);
2327                 if (rc)
2328                         return rc;
2329
2330                 LASSERT(*vallen == sizeof(__u32));
2331                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2332                         tgt = lmv->tgts[i];
2333                         /*
2334                          * All tgts should be connected when this gets called.
2335                          */
2336                         if (tgt == NULL || tgt->ltd_exp == NULL)
2337                                 continue;
2338
2339                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2340                                           vallen, val, NULL))
2341                                 return 0;
2342                 }
2343                 return -EINVAL;
2344         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2345                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2346                    KEY_IS(KEY_MAX_COOKIESIZE) ||
2347                    KEY_IS(KEY_DEFAULT_COOKIESIZE) ||
2348                    KEY_IS(KEY_CONN_DATA)) {
2349                 rc = lmv_check_connect(obd);
2350                 if (rc)
2351                         return rc;
2352
2353                 /*
2354                  * Forwarding this request to first MDS, it should know LOV
2355                  * desc.
2356                  */
2357                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2358                                   vallen, val, NULL);
2359                 if (!rc && KEY_IS(KEY_CONN_DATA))
2360                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2361                 return rc;
2362         } else if (KEY_IS(KEY_TGT_COUNT)) {
2363                 *((int *)val) = lmv->desc.ld_tgt_count;
2364                 return 0;
2365         }
2366
2367         CDEBUG(D_IOCTL, "Invalid key\n");
2368         return -EINVAL;
2369 }
2370
2371 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2372                        u32 keylen, void *key, u32 vallen,
2373                        void *val, struct ptlrpc_request_set *set)
2374 {
2375         struct lmv_tgt_desc    *tgt;
2376         struct obd_device      *obd;
2377         struct lmv_obd   *lmv;
2378         int rc = 0;
2379
2380         obd = class_exp2obd(exp);
2381         if (obd == NULL) {
2382                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2383                        exp->exp_handle.h_cookie);
2384                 return -EINVAL;
2385         }
2386         lmv = &obd->u.lmv;
2387
2388         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2389                 int i, err = 0;
2390
2391                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2392                         tgt = lmv->tgts[i];
2393
2394                         if (tgt == NULL || tgt->ltd_exp == NULL)
2395                                 continue;
2396
2397                         err = obd_set_info_async(env, tgt->ltd_exp,
2398                                                  keylen, key, vallen, val, set);
2399                         if (err && rc == 0)
2400                                 rc = err;
2401                 }
2402
2403                 return rc;
2404         }
2405
2406         return -EINVAL;
2407 }
2408
2409 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2410                struct lov_stripe_md *lsm)
2411 {
2412         struct obd_device        *obd = class_exp2obd(exp);
2413         struct lmv_obd      *lmv = &obd->u.lmv;
2414         struct lmv_stripe_md      *meap;
2415         struct lmv_stripe_md      *lsmp;
2416         int                     mea_size;
2417         int                     i;
2418
2419         mea_size = lmv_get_easize(lmv);
2420         if (!lmmp)
2421                 return mea_size;
2422
2423         if (*lmmp && !lsm) {
2424                 OBD_FREE_LARGE(*lmmp, mea_size);
2425                 *lmmp = NULL;
2426                 return 0;
2427         }
2428
2429         if (*lmmp == NULL) {
2430                 OBD_ALLOC_LARGE(*lmmp, mea_size);
2431                 if (*lmmp == NULL)
2432                         return -ENOMEM;
2433         }
2434
2435         if (!lsm)
2436                 return mea_size;
2437
2438         lsmp = (struct lmv_stripe_md *)lsm;
2439         meap = (struct lmv_stripe_md *)*lmmp;
2440
2441         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2442             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2443                 return -EINVAL;
2444
2445         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2446         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2447         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2448
2449         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2450                 meap->mea_ids[i] = lsmp->mea_ids[i];
2451                 fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2452         }
2453
2454         return mea_size;
2455 }
2456
2457 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2458                  struct lov_mds_md *lmm, int lmm_size)
2459 {
2460         struct obd_device         *obd = class_exp2obd(exp);
2461         struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2462         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2463         struct lmv_obd       *lmv = &obd->u.lmv;
2464         int                      mea_size;
2465         int                      i;
2466         __u32                  magic;
2467
2468         mea_size = lmv_get_easize(lmv);
2469         if (lsmp == NULL)
2470                 return mea_size;
2471
2472         if (*lsmp != NULL && lmm == NULL) {
2473                 OBD_FREE_LARGE(*tmea, mea_size);
2474                 *lsmp = NULL;
2475                 return 0;
2476         }
2477
2478         LASSERT(mea_size == lmm_size);
2479
2480         OBD_ALLOC_LARGE(*tmea, mea_size);
2481         if (*tmea == NULL)
2482                 return -ENOMEM;
2483
2484         if (!lmm)
2485                 return mea_size;
2486
2487         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2488             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2489             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT) {
2490                 magic = le32_to_cpu(mea->mea_magic);
2491         } else {
2492                 /*
2493                  * Old mea is not handled here.
2494                  */
2495                 CERROR("Old not supportable EA is found\n");
2496                 LBUG();
2497         }
2498
2499         (*tmea)->mea_magic = magic;
2500         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2501         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2502
2503         for (i = 0; i < (*tmea)->mea_count; i++) {
2504                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2505                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2506         }
2507         return mea_size;
2508 }
2509
2510 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2511                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2512                              ldlm_cancel_flags_t flags, void *opaque)
2513 {
2514         struct obd_device       *obd = exp->exp_obd;
2515         struct lmv_obd    *lmv = &obd->u.lmv;
2516         int                   rc = 0;
2517         int                   err;
2518         int                   i;
2519
2520         LASSERT(fid != NULL);
2521
2522         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2523                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
2524                     lmv->tgts[i]->ltd_active == 0)
2525                         continue;
2526
2527                 err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2528                                        policy, mode, flags, opaque);
2529                 if (!rc)
2530                         rc = err;
2531         }
2532         return rc;
2533 }
2534
2535 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2536                       __u64 *bits)
2537 {
2538         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2539         int                   rc;
2540
2541         rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2542         return rc;
2543 }
2544
2545 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2546                            const struct lu_fid *fid, ldlm_type_t type,
2547                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2548                            struct lustre_handle *lockh)
2549 {
2550         struct obd_device       *obd = exp->exp_obd;
2551         struct lmv_obd    *lmv = &obd->u.lmv;
2552         ldlm_mode_t           rc;
2553         int                   i;
2554
2555         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2556
2557         /*
2558          * With CMD every object can have two locks in different namespaces:
2559          * lookup lock in space of mds storing direntry and update/open lock in
2560          * space of mds storing inode. Thus we check all targets, not only that
2561          * one fid was created in.
2562          */
2563         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2564                 if (lmv->tgts[i] == NULL ||
2565                     lmv->tgts[i]->ltd_exp == NULL ||
2566                     lmv->tgts[i]->ltd_active == 0)
2567                         continue;
2568
2569                 rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2570                                    type, policy, mode, lockh);
2571                 if (rc)
2572                         return rc;
2573         }
2574
2575         return 0;
2576 }
2577
2578 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2579                       struct obd_export *dt_exp, struct obd_export *md_exp,
2580                       struct lustre_md *md)
2581 {
2582         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2583
2584         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2585 }
2586
2587 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2588 {
2589         struct obd_device       *obd = exp->exp_obd;
2590         struct lmv_obd    *lmv = &obd->u.lmv;
2591
2592         if (md->mea)
2593                 obd_free_memmd(exp, (void *)&md->mea);
2594         return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2595 }
2596
2597 int lmv_set_open_replay_data(struct obd_export *exp,
2598                              struct obd_client_handle *och,
2599                              struct lookup_intent *it)
2600 {
2601         struct obd_device       *obd = exp->exp_obd;
2602         struct lmv_obd    *lmv = &obd->u.lmv;
2603         struct lmv_tgt_desc     *tgt;
2604
2605         tgt = lmv_find_target(lmv, &och->och_fid);
2606         if (IS_ERR(tgt))
2607                 return PTR_ERR(tgt);
2608
2609         return md_set_open_replay_data(tgt->ltd_exp, och, it);
2610 }
2611
2612 int lmv_clear_open_replay_data(struct obd_export *exp,
2613                                struct obd_client_handle *och)
2614 {
2615         struct obd_device       *obd = exp->exp_obd;
2616         struct lmv_obd    *lmv = &obd->u.lmv;
2617         struct lmv_tgt_desc     *tgt;
2618
2619         tgt = lmv_find_target(lmv, &och->och_fid);
2620         if (IS_ERR(tgt))
2621                 return PTR_ERR(tgt);
2622
2623         return md_clear_open_replay_data(tgt->ltd_exp, och);
2624 }
2625
2626 static int lmv_get_remote_perm(struct obd_export *exp,
2627                                const struct lu_fid *fid,
2628                                struct obd_capa *oc, __u32 suppgid,
2629                                struct ptlrpc_request **request)
2630 {
2631         struct obd_device       *obd = exp->exp_obd;
2632         struct lmv_obd    *lmv = &obd->u.lmv;
2633         struct lmv_tgt_desc     *tgt;
2634         int                   rc;
2635
2636         rc = lmv_check_connect(obd);
2637         if (rc)
2638                 return rc;
2639
2640         tgt = lmv_find_target(lmv, fid);
2641         if (IS_ERR(tgt))
2642                 return PTR_ERR(tgt);
2643
2644         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2645         return rc;
2646 }
2647
2648 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2649                           renew_capa_cb_t cb)
2650 {
2651         struct obd_device       *obd = exp->exp_obd;
2652         struct lmv_obd    *lmv = &obd->u.lmv;
2653         struct lmv_tgt_desc     *tgt;
2654         int                   rc;
2655
2656         rc = lmv_check_connect(obd);
2657         if (rc)
2658                 return rc;
2659
2660         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2661         if (IS_ERR(tgt))
2662                 return PTR_ERR(tgt);
2663
2664         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2665         return rc;
2666 }
2667
2668 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2669                     const struct req_msg_field *field, struct obd_capa **oc)
2670 {
2671         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2672
2673         return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
2674 }
2675
2676 int lmv_intent_getattr_async(struct obd_export *exp,
2677                              struct md_enqueue_info *minfo,
2678                              struct ldlm_enqueue_info *einfo)
2679 {
2680         struct md_op_data       *op_data = &minfo->mi_data;
2681         struct obd_device       *obd = exp->exp_obd;
2682         struct lmv_obd    *lmv = &obd->u.lmv;
2683         struct lmv_tgt_desc     *tgt = NULL;
2684         int                   rc;
2685
2686         rc = lmv_check_connect(obd);
2687         if (rc)
2688                 return rc;
2689
2690         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2691         if (IS_ERR(tgt))
2692                 return PTR_ERR(tgt);
2693
2694         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2695         return rc;
2696 }
2697
2698 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2699                         struct lu_fid *fid, __u64 *bits)
2700 {
2701         struct obd_device       *obd = exp->exp_obd;
2702         struct lmv_obd    *lmv = &obd->u.lmv;
2703         struct lmv_tgt_desc     *tgt;
2704         int                   rc;
2705
2706         rc = lmv_check_connect(obd);
2707         if (rc)
2708                 return rc;
2709
2710         tgt = lmv_find_target(lmv, fid);
2711         if (IS_ERR(tgt))
2712                 return PTR_ERR(tgt);
2713
2714         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2715         return rc;
2716 }
2717
2718 /**
2719  * For lmv, only need to send request to master MDT, and the master MDT will
2720  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2721  * we directly fetch data from the slave MDTs.
2722  */
2723 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2724                  struct obd_quotactl *oqctl)
2725 {
2726         struct obd_device   *obd = class_exp2obd(exp);
2727         struct lmv_obd      *lmv = &obd->u.lmv;
2728         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2729         int               rc = 0, i;
2730         __u64           curspace, curinodes;
2731
2732         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2733                 CERROR("master lmv inactive\n");
2734                 return -EIO;
2735         }
2736
2737         if (oqctl->qc_cmd != Q_GETOQUOTA) {
2738                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2739                 return rc;
2740         }
2741
2742         curspace = curinodes = 0;
2743         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2744                 int err;
2745                 tgt = lmv->tgts[i];
2746
2747                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
2748                         continue;
2749                 if (!tgt->ltd_active) {
2750                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2751                         continue;
2752                 }
2753
2754                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2755                 if (err) {
2756                         CERROR("getquota on mdt %d failed. %d\n", i, err);
2757                         if (!rc)
2758                                 rc = err;
2759                 } else {
2760                         curspace += oqctl->qc_dqblk.dqb_curspace;
2761                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
2762                 }
2763         }
2764         oqctl->qc_dqblk.dqb_curspace = curspace;
2765         oqctl->qc_dqblk.dqb_curinodes = curinodes;
2766
2767         return rc;
2768 }
2769
2770 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2771                    struct obd_quotactl *oqctl)
2772 {
2773         struct obd_device   *obd = class_exp2obd(exp);
2774         struct lmv_obd      *lmv = &obd->u.lmv;
2775         struct lmv_tgt_desc *tgt;
2776         int               i, rc = 0;
2777
2778         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2779                 int err;
2780                 tgt = lmv->tgts[i];
2781                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
2782                         CERROR("lmv idx %d inactive\n", i);
2783                         return -EIO;
2784                 }
2785
2786                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
2787                 if (err && !rc)
2788                         rc = err;
2789         }
2790
2791         return rc;
2792 }
2793
2794 struct obd_ops lmv_obd_ops = {
2795         .o_owner                = THIS_MODULE,
2796         .o_setup                = lmv_setup,
2797         .o_cleanup            = lmv_cleanup,
2798         .o_precleanup      = lmv_precleanup,
2799         .o_process_config       = lmv_process_config,
2800         .o_connect            = lmv_connect,
2801         .o_disconnect      = lmv_disconnect,
2802         .o_statfs              = lmv_statfs,
2803         .o_get_info          = lmv_get_info,
2804         .o_set_info_async       = lmv_set_info_async,
2805         .o_packmd              = lmv_packmd,
2806         .o_unpackmd          = lmv_unpackmd,
2807         .o_notify              = lmv_notify,
2808         .o_get_uuid          = lmv_get_uuid,
2809         .o_iocontrol        = lmv_iocontrol,
2810         .o_quotacheck      = lmv_quotacheck,
2811         .o_quotactl          = lmv_quotactl
2812 };
2813
2814 struct md_ops lmv_md_ops = {
2815         .m_getstatus        = lmv_getstatus,
2816         .m_null_inode           = lmv_null_inode,
2817         .m_find_cbdata    = lmv_find_cbdata,
2818         .m_close                = lmv_close,
2819         .m_create              = lmv_create,
2820         .m_done_writing  = lmv_done_writing,
2821         .m_enqueue            = lmv_enqueue,
2822         .m_getattr            = lmv_getattr,
2823         .m_getxattr          = lmv_getxattr,
2824         .m_getattr_name  = lmv_getattr_name,
2825         .m_intent_lock    = lmv_intent_lock,
2826         .m_link          = lmv_link,
2827         .m_rename              = lmv_rename,
2828         .m_setattr            = lmv_setattr,
2829         .m_setxattr          = lmv_setxattr,
2830         .m_sync          = lmv_sync,
2831         .m_readpage          = lmv_readpage,
2832         .m_unlink              = lmv_unlink,
2833         .m_init_ea_size  = lmv_init_ea_size,
2834         .m_cancel_unused        = lmv_cancel_unused,
2835         .m_set_lock_data        = lmv_set_lock_data,
2836         .m_lock_match      = lmv_lock_match,
2837         .m_get_lustre_md        = lmv_get_lustre_md,
2838         .m_free_lustre_md       = lmv_free_lustre_md,
2839         .m_set_open_replay_data = lmv_set_open_replay_data,
2840         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2841         .m_renew_capa      = lmv_renew_capa,
2842         .m_unpack_capa    = lmv_unpack_capa,
2843         .m_get_remote_perm      = lmv_get_remote_perm,
2844         .m_intent_getattr_async = lmv_intent_getattr_async,
2845         .m_revalidate_lock      = lmv_revalidate_lock
2846 };
2847
2848 int __init lmv_init(void)
2849 {
2850         struct lprocfs_static_vars lvars;
2851         int                     rc;
2852
2853         lprocfs_lmv_init_vars(&lvars);
2854
2855         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2856                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2857         return rc;
2858 }
2859
2860 static void lmv_exit(void)
2861 {
2862         class_unregister_type(LUSTRE_LMV_NAME);
2863 }
2864
2865 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2866 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2867 MODULE_LICENSE("GPL");
2868
2869 module_init(lmv_init);
2870 module_exit(lmv_exit);