lustre/osc/osc_dev.c add a blank line after declarations
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <linux/libcfs/libcfs.h>
40
41
42 #include <lustre_dlm.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47
48 #include <lustre_ha.h>
49 #include <lprocfs_status.h>
50 #include <lustre_log.h>
51 #include <lustre_debug.h>
52 #include <lustre_param.h>
53 #include <lustre_fid.h>
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
58 static int brw_interpret(const struct lu_env *env,
59                          struct ptlrpc_request *req, void *data, int rc);
60 int osc_cleanup(struct obd_device *obd);
61
62 /* Pack OSC object metadata for disk storage (LE byte order). */
63 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
64                       struct lov_stripe_md *lsm)
65 {
66         int lmm_size;
67
68         lmm_size = sizeof(**lmmp);
69         if (lmmp == NULL)
70                 return lmm_size;
71
72         if (*lmmp != NULL && lsm == NULL) {
73                 OBD_FREE(*lmmp, lmm_size);
74                 *lmmp = NULL;
75                 return 0;
76         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
77                 return -EBADF;
78         }
79
80         if (*lmmp == NULL) {
81                 OBD_ALLOC(*lmmp, lmm_size);
82                 if (*lmmp == NULL)
83                         return -ENOMEM;
84         }
85
86         if (lsm)
87                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
88
89         return lmm_size;
90 }
91
92 /* Unpack OSC object metadata from disk storage (LE byte order). */
93 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
94                         struct lov_mds_md *lmm, int lmm_bytes)
95 {
96         int lsm_size;
97         struct obd_import *imp = class_exp2cliimp(exp);
98
99         if (lmm != NULL) {
100                 if (lmm_bytes < sizeof(*lmm)) {
101                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
102                                exp->exp_obd->obd_name, lmm_bytes,
103                                (int)sizeof(*lmm));
104                         return -EINVAL;
105                 }
106                 /* XXX LOV_MAGIC etc check? */
107
108                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
109                         CERROR("%s: zero lmm_object_id: rc = %d\n",
110                                exp->exp_obd->obd_name, -EINVAL);
111                         return -EINVAL;
112                 }
113         }
114
115         lsm_size = lov_stripe_md_size(1);
116         if (lsmp == NULL)
117                 return lsm_size;
118
119         if (*lsmp != NULL && lmm == NULL) {
120                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
121                 OBD_FREE(*lsmp, lsm_size);
122                 *lsmp = NULL;
123                 return 0;
124         }
125
126         if (*lsmp == NULL) {
127                 OBD_ALLOC(*lsmp, lsm_size);
128                 if (unlikely(*lsmp == NULL))
129                         return -ENOMEM;
130                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
132                         OBD_FREE(*lsmp, lsm_size);
133                         return -ENOMEM;
134                 }
135                 loi_init((*lsmp)->lsm_oinfo[0]);
136         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
137                 return -EBADF;
138         }
139
140         if (lmm != NULL)
141                 /* XXX zero *lsmp? */
142                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
143
144         if (imp != NULL &&
145             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
146                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
147         else
148                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
149
150         return lsm_size;
151 }
152
153 static inline void osc_pack_capa(struct ptlrpc_request *req,
154                                  struct ost_body *body, void *capa)
155 {
156         struct obd_capa *oc = (struct obd_capa *)capa;
157         struct lustre_capa *c;
158
159         if (!capa)
160                 return;
161
162         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
163         LASSERT(c);
164         capa_cpy(c, oc);
165         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
166         DEBUG_CAPA(D_SEC, c, "pack");
167 }
168
169 static inline void osc_pack_req_body(struct ptlrpc_request *req,
170                                      struct obd_info *oinfo)
171 {
172         struct ost_body *body;
173
174         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
175         LASSERT(body);
176
177         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
178                              oinfo->oi_oa);
179         osc_pack_capa(req, body, oinfo->oi_capa);
180 }
181
182 static inline void osc_set_capa_size(struct ptlrpc_request *req,
183                                      const struct req_msg_field *field,
184                                      struct obd_capa *oc)
185 {
186         if (oc == NULL)
187                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
188         else
189                 /* it is already calculated as sizeof struct obd_capa */
190                 ;
191 }
192
193 static int osc_getattr_interpret(const struct lu_env *env,
194                                  struct ptlrpc_request *req,
195                                  struct osc_async_args *aa, int rc)
196 {
197         struct ost_body *body;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body) {
204                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
205                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
206                                      aa->aa_oi->oi_oa, &body->oa);
207
208                 /* This should really be sent by the OST */
209                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
210                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
211         } else {
212                 CDEBUG(D_INFO, "can't unpack ost_body\n");
213                 rc = -EPROTO;
214                 aa->aa_oi->oi_oa->o_valid = 0;
215         }
216 out:
217         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
218         return rc;
219 }
220
221 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
222                              struct ptlrpc_request_set *set)
223 {
224         struct ptlrpc_request *req;
225         struct osc_async_args *aa;
226         int                 rc;
227
228         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
229         if (req == NULL)
230                 return -ENOMEM;
231
232         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
233         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
234         if (rc) {
235                 ptlrpc_request_free(req);
236                 return rc;
237         }
238
239         osc_pack_req_body(req, oinfo);
240
241         ptlrpc_request_set_replen(req);
242         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
243
244         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
245         aa = ptlrpc_req_async_args(req);
246         aa->aa_oi = oinfo;
247
248         ptlrpc_set_add_req(set, req);
249         return 0;
250 }
251
252 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
253                        struct obd_info *oinfo)
254 {
255         struct ptlrpc_request *req;
256         struct ost_body       *body;
257         int                 rc;
258
259         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
260         if (req == NULL)
261                 return -ENOMEM;
262
263         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
264         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
265         if (rc) {
266                 ptlrpc_request_free(req);
267                 return rc;
268         }
269
270         osc_pack_req_body(req, oinfo);
271
272         ptlrpc_request_set_replen(req);
273
274         rc = ptlrpc_queue_wait(req);
275         if (rc)
276                 GOTO(out, rc);
277
278         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
279         if (body == NULL)
280                 GOTO(out, rc = -EPROTO);
281
282         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
283         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
284                              &body->oa);
285
286         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
287         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
288
289  out:
290         ptlrpc_req_finished(req);
291         return rc;
292 }
293
294 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
295                        struct obd_info *oinfo, struct obd_trans_info *oti)
296 {
297         struct ptlrpc_request *req;
298         struct ost_body       *body;
299         int                 rc;
300
301         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
302
303         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
304         if (req == NULL)
305                 return -ENOMEM;
306
307         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
308         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
309         if (rc) {
310                 ptlrpc_request_free(req);
311                 return rc;
312         }
313
314         osc_pack_req_body(req, oinfo);
315
316         ptlrpc_request_set_replen(req);
317
318         rc = ptlrpc_queue_wait(req);
319         if (rc)
320                 GOTO(out, rc);
321
322         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
323         if (body == NULL)
324                 GOTO(out, rc = -EPROTO);
325
326         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327                              &body->oa);
328
329 out:
330         ptlrpc_req_finished(req);
331         return rc;
332 }
333
334 static int osc_setattr_interpret(const struct lu_env *env,
335                                  struct ptlrpc_request *req,
336                                  struct osc_setattr_args *sa, int rc)
337 {
338         struct ost_body *body;
339
340         if (rc != 0)
341                 GOTO(out, rc);
342
343         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344         if (body == NULL)
345                 GOTO(out, rc = -EPROTO);
346
347         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
348                              &body->oa);
349 out:
350         rc = sa->sa_upcall(sa->sa_cookie, rc);
351         return rc;
352 }
353
354 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
355                            struct obd_trans_info *oti,
356                            obd_enqueue_update_f upcall, void *cookie,
357                            struct ptlrpc_request_set *rqset)
358 {
359         struct ptlrpc_request   *req;
360         struct osc_setattr_args *sa;
361         int                   rc;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
364         if (req == NULL)
365                 return -ENOMEM;
366
367         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
368         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
369         if (rc) {
370                 ptlrpc_request_free(req);
371                 return rc;
372         }
373
374         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
375                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
376
377         osc_pack_req_body(req, oinfo);
378
379         ptlrpc_request_set_replen(req);
380
381         /* do mds to ost setattr asynchronously */
382         if (!rqset) {
383                 /* Do not wait for response. */
384                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
385         } else {
386                 req->rq_interpret_reply =
387                         (ptlrpc_interpterer_t)osc_setattr_interpret;
388
389                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
390                 sa = ptlrpc_req_async_args(req);
391                 sa->sa_oa = oinfo->oi_oa;
392                 sa->sa_upcall = upcall;
393                 sa->sa_cookie = cookie;
394
395                 if (rqset == PTLRPCD_SET)
396                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
397                 else
398                         ptlrpc_set_add_req(rqset, req);
399         }
400
401         return 0;
402 }
403
404 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
405                              struct obd_trans_info *oti,
406                              struct ptlrpc_request_set *rqset)
407 {
408         return osc_setattr_async_base(exp, oinfo, oti,
409                                       oinfo->oi_cb_up, oinfo, rqset);
410 }
411
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct lov_stripe_md  *lsm;
418         int                 rc;
419
420         LASSERT(oa);
421         LASSERT(ea);
422
423         lsm = *ea;
424         if (!lsm) {
425                 rc = obd_alloc_memmd(exp, &lsm);
426                 if (rc < 0)
427                         return rc;
428         }
429
430         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
431         if (req == NULL)
432                 GOTO(out, rc = -ENOMEM);
433
434         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
435         if (rc) {
436                 ptlrpc_request_free(req);
437                 GOTO(out, rc);
438         }
439
440         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441         LASSERT(body);
442
443         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
444
445         ptlrpc_request_set_replen(req);
446
447         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448             oa->o_flags == OBD_FL_DELORPHAN) {
449                 DEBUG_REQ(D_HA, req,
450                           "delorphan from OST integration");
451                 /* Don't resend the delorphan req */
452                 req->rq_no_resend = req->rq_no_delay = 1;
453         }
454
455         rc = ptlrpc_queue_wait(req);
456         if (rc)
457                 GOTO(out_req, rc);
458
459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460         if (body == NULL)
461                 GOTO(out_req, rc = -EPROTO);
462
463         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
464         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
465
466         oa->o_blksize = cli_brw_size(exp->exp_obd);
467         oa->o_valid |= OBD_MD_FLBLKSZ;
468
469         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470          * have valid lsm_oinfo data structs, so don't go touching that.
471          * This needs to be fixed in a big way.
472          */
473         lsm->lsm_oi = oa->o_oi;
474         *ea = lsm;
475
476         if (oti != NULL) {
477                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478
479                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
480                         if (!oti->oti_logcookies)
481                                 oti_alloc_cookies(oti, 1);
482                         *oti->oti_logcookies = oa->o_lcookie;
483                 }
484         }
485
486         CDEBUG(D_HA, "transno: "LPD64"\n",
487                lustre_msg_get_transno(req->rq_repmsg));
488 out_req:
489         ptlrpc_req_finished(req);
490 out:
491         if (rc && !*ea)
492                 obd_free_memmd(exp, &lsm);
493         return rc;
494 }
495
496 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
497                    obd_enqueue_update_f upcall, void *cookie,
498                    struct ptlrpc_request_set *rqset)
499 {
500         struct ptlrpc_request   *req;
501         struct osc_setattr_args *sa;
502         struct ost_body  *body;
503         int                   rc;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
506         if (req == NULL)
507                 return -ENOMEM;
508
509         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
510         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
511         if (rc) {
512                 ptlrpc_request_free(req);
513                 return rc;
514         }
515         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
516         ptlrpc_at_set_req_timeout(req);
517
518         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
519         LASSERT(body);
520         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
521                              oinfo->oi_oa);
522         osc_pack_capa(req, body, oinfo->oi_capa);
523
524         ptlrpc_request_set_replen(req);
525
526         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
527         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
528         sa = ptlrpc_req_async_args(req);
529         sa->sa_oa     = oinfo->oi_oa;
530         sa->sa_upcall = upcall;
531         sa->sa_cookie = cookie;
532         if (rqset == PTLRPCD_SET)
533                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
534         else
535                 ptlrpc_set_add_req(rqset, req);
536
537         return 0;
538 }
539
540 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
541                      struct obd_info *oinfo, struct obd_trans_info *oti,
542                      struct ptlrpc_request_set *rqset)
543 {
544         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
545         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
546         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
547         return osc_punch_base(exp, oinfo,
548                               oinfo->oi_cb_up, oinfo, rqset);
549 }
550
551 static int osc_sync_interpret(const struct lu_env *env,
552                               struct ptlrpc_request *req,
553                               void *arg, int rc)
554 {
555         struct osc_fsync_args *fa = arg;
556         struct ost_body *body;
557
558         if (rc)
559                 GOTO(out, rc);
560
561         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO(out, rc = -EPROTO);
565         }
566
567         *fa->fa_oi->oi_oa = body->oa;
568 out:
569         rc = fa->fa_upcall(fa->fa_cookie, rc);
570         return rc;
571 }
572
573 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
574                   obd_enqueue_update_f upcall, void *cookie,
575                   struct ptlrpc_request_set *rqset)
576 {
577         struct ptlrpc_request *req;
578         struct ost_body       *body;
579         struct osc_fsync_args *fa;
580         int                 rc;
581
582         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
583         if (req == NULL)
584                 return -ENOMEM;
585
586         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
587         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588         if (rc) {
589                 ptlrpc_request_free(req);
590                 return rc;
591         }
592
593         /* overload the size and blocks fields in the oa with start/end */
594         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595         LASSERT(body);
596         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
597                              oinfo->oi_oa);
598         osc_pack_capa(req, body, oinfo->oi_capa);
599
600         ptlrpc_request_set_replen(req);
601         req->rq_interpret_reply = osc_sync_interpret;
602
603         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
604         fa = ptlrpc_req_async_args(req);
605         fa->fa_oi = oinfo;
606         fa->fa_upcall = upcall;
607         fa->fa_cookie = cookie;
608
609         if (rqset == PTLRPCD_SET)
610                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
611         else
612                 ptlrpc_set_add_req(rqset, req);
613
614         return 0;
615 }
616
617 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
618                     struct obd_info *oinfo, obd_size start, obd_size end,
619                     struct ptlrpc_request_set *set)
620 {
621         if (!oinfo->oi_oa) {
622                 CDEBUG(D_INFO, "oa NULL\n");
623                 return -EINVAL;
624         }
625
626         oinfo->oi_oa->o_size = start;
627         oinfo->oi_oa->o_blocks = end;
628         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
629
630         return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
631 }
632
633 /* Find and cancel locally locks matched by @mode in the resource found by
634  * @objid. Found locks are added into @cancel list. Returns the amount of
635  * locks added to @cancels list. */
636 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
637                                    struct list_head *cancels,
638                                    ldlm_mode_t mode, int lock_flags)
639 {
640         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
641         struct ldlm_res_id res_id;
642         struct ldlm_resource *res;
643         int count;
644
645         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
646          * export) but disabled through procfs (flag in NS).
647          *
648          * This distinguishes from a case when ELC is not supported originally,
649          * when we still want to cancel locks in advance and just cancel them
650          * locally, without sending any RPC. */
651         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
652                 return 0;
653
654         ostid_build_res_name(&oa->o_oi, &res_id);
655         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656         if (res == NULL)
657                 return 0;
658
659         LDLM_RESOURCE_ADDREF(res);
660         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661                                            lock_flags, 0, NULL);
662         LDLM_RESOURCE_DELREF(res);
663         ldlm_resource_putref(res);
664         return count;
665 }
666
667 static int osc_destroy_interpret(const struct lu_env *env,
668                                  struct ptlrpc_request *req, void *data,
669                                  int rc)
670 {
671         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672
673         atomic_dec(&cli->cl_destroy_in_flight);
674         wake_up(&cli->cl_destroy_waitq);
675         return 0;
676 }
677
678 static int osc_can_send_destroy(struct client_obd *cli)
679 {
680         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
681             cli->cl_max_rpcs_in_flight) {
682                 /* The destroy request can be sent */
683                 return 1;
684         }
685         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
686             cli->cl_max_rpcs_in_flight) {
687                 /*
688                  * The counter has been modified between the two atomic
689                  * operations.
690                  */
691                 wake_up(&cli->cl_destroy_waitq);
692         }
693         return 0;
694 }
695
696 int osc_create(const struct lu_env *env, struct obd_export *exp,
697                struct obdo *oa, struct lov_stripe_md **ea,
698                struct obd_trans_info *oti)
699 {
700         int rc = 0;
701
702         LASSERT(oa);
703         LASSERT(ea);
704         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
705
706         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707             oa->o_flags == OBD_FL_RECREATE_OBJS) {
708                 return osc_real_create(exp, oa, ea, oti);
709         }
710
711         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712                 return osc_real_create(exp, oa, ea, oti);
713
714         /* we should not get here anymore */
715         LBUG();
716
717         return rc;
718 }
719
720 /* Destroy requests can be async always on the client, and we don't even really
721  * care about the return code since the client cannot do anything at all about
722  * a destroy failure.
723  * When the MDS is unlinking a filename, it saves the file objects into a
724  * recovery llog, and these object records are cancelled when the OST reports
725  * they were destroyed and sync'd to disk (i.e. transaction committed).
726  * If the client dies, or the OST is down when the object should be destroyed,
727  * the records are not cancelled, and when the OST reconnects to the MDS next,
728  * it will retrieve the llog unlink logs and then sends the log cancellation
729  * cookies to the MDS after committing destroy transactions. */
730 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731                        struct obdo *oa, struct lov_stripe_md *ea,
732                        struct obd_trans_info *oti, struct obd_export *md_export,
733                        void *capa)
734 {
735         struct client_obd     *cli = &exp->exp_obd->u.cli;
736         struct ptlrpc_request *req;
737         struct ost_body       *body;
738         LIST_HEAD(cancels);
739         int rc, count;
740
741         if (!oa) {
742                 CDEBUG(D_INFO, "oa NULL\n");
743                 return -EINVAL;
744         }
745
746         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
747                                         LDLM_FL_DISCARD_DATA);
748
749         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
750         if (req == NULL) {
751                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
752                 return -ENOMEM;
753         }
754
755         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
756         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
757                                0, &cancels, count);
758         if (rc) {
759                 ptlrpc_request_free(req);
760                 return rc;
761         }
762
763         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
764         ptlrpc_at_set_req_timeout(req);
765
766         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
767                 oa->o_lcookie = *oti->oti_logcookies;
768         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
769         LASSERT(body);
770         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
771
772         osc_pack_capa(req, body, (struct obd_capa *)capa);
773         ptlrpc_request_set_replen(req);
774
775         /* If osc_destroy is for destroying the unlink orphan,
776          * sent from MDT to OST, which should not be blocked here,
777          * because the process might be triggered by ptlrpcd, and
778          * it is not good to block ptlrpcd thread (b=16006)*/
779         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
780                 req->rq_interpret_reply = osc_destroy_interpret;
781                 if (!osc_can_send_destroy(cli)) {
782                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
783                                                           NULL);
784
785                         /*
786                          * Wait until the number of on-going destroy RPCs drops
787                          * under max_rpc_in_flight
788                          */
789                         l_wait_event_exclusive(cli->cl_destroy_waitq,
790                                                osc_can_send_destroy(cli), &lwi);
791                 }
792         }
793
794         /* Do not wait for response */
795         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
796         return 0;
797 }
798
799 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
800                                 long writing_bytes)
801 {
802         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
803
804         LASSERT(!(oa->o_valid & bits));
805
806         oa->o_valid |= bits;
807         client_obd_list_lock(&cli->cl_loi_list_lock);
808         oa->o_dirty = cli->cl_dirty;
809         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
810                      cli->cl_dirty_max)) {
811                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
812                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
813                 oa->o_undirty = 0;
814         } else if (unlikely(atomic_read(&obd_dirty_pages) -
815                             atomic_read(&obd_dirty_transit_pages) >
816                             (long)(obd_max_dirty_pages + 1))) {
817                 /* The atomic_read() allowing the atomic_inc() are
818                  * not covered by a lock thus they may safely race and trip
819                  * this CERROR() unless we add in a small fudge factor (+1). */
820                 CERROR("dirty %d - %d > system dirty_max %d\n",
821                        atomic_read(&obd_dirty_pages),
822                        atomic_read(&obd_dirty_transit_pages),
823                        obd_max_dirty_pages);
824                 oa->o_undirty = 0;
825         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
826                 CERROR("dirty %lu - dirty_max %lu too big???\n",
827                        cli->cl_dirty, cli->cl_dirty_max);
828                 oa->o_undirty = 0;
829         } else {
830                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
831                                       PAGE_CACHE_SHIFT)*
832                                      (cli->cl_max_rpcs_in_flight + 1);
833                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
834         }
835         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
836         oa->o_dropped = cli->cl_lost_grant;
837         cli->cl_lost_grant = 0;
838         client_obd_list_unlock(&cli->cl_loi_list_lock);
839         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
840                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
841
842 }
843
844 void osc_update_next_shrink(struct client_obd *cli)
845 {
846         cli->cl_next_shrink_grant =
847                 cfs_time_shift(cli->cl_grant_shrink_interval);
848         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
849                cli->cl_next_shrink_grant);
850 }
851
852 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
853 {
854         client_obd_list_lock(&cli->cl_loi_list_lock);
855         cli->cl_avail_grant += grant;
856         client_obd_list_unlock(&cli->cl_loi_list_lock);
857 }
858
859 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
860 {
861         if (body->oa.o_valid & OBD_MD_FLGRANT) {
862                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
863                 __osc_update_grant(cli, body->oa.o_grant);
864         }
865 }
866
867 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
868                               obd_count keylen, void *key, obd_count vallen,
869                               void *val, struct ptlrpc_request_set *set);
870
871 static int osc_shrink_grant_interpret(const struct lu_env *env,
872                                       struct ptlrpc_request *req,
873                                       void *aa, int rc)
874 {
875         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
876         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
877         struct ost_body *body;
878
879         if (rc != 0) {
880                 __osc_update_grant(cli, oa->o_grant);
881                 GOTO(out, rc);
882         }
883
884         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
885         LASSERT(body);
886         osc_update_grant(cli, body);
887 out:
888         OBDO_FREE(oa);
889         return rc;
890 }
891
892 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
893 {
894         client_obd_list_lock(&cli->cl_loi_list_lock);
895         oa->o_grant = cli->cl_avail_grant / 4;
896         cli->cl_avail_grant -= oa->o_grant;
897         client_obd_list_unlock(&cli->cl_loi_list_lock);
898         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
899                 oa->o_valid |= OBD_MD_FLFLAGS;
900                 oa->o_flags = 0;
901         }
902         oa->o_flags |= OBD_FL_SHRINK_GRANT;
903         osc_update_next_shrink(cli);
904 }
905
906 /* Shrink the current grant, either from some large amount to enough for a
907  * full set of in-flight RPCs, or if we have already shrunk to that limit
908  * then to enough for a single RPC.  This avoids keeping more grant than
909  * needed, and avoids shrinking the grant piecemeal. */
910 static int osc_shrink_grant(struct client_obd *cli)
911 {
912         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
913                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
914
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         if (cli->cl_avail_grant <= target_bytes)
917                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
918         client_obd_list_unlock(&cli->cl_loi_list_lock);
919
920         return osc_shrink_grant_to_target(cli, target_bytes);
921 }
922
923 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
924 {
925         int                     rc = 0;
926         struct ost_body *body;
927
928         client_obd_list_lock(&cli->cl_loi_list_lock);
929         /* Don't shrink if we are already above or below the desired limit
930          * We don't want to shrink below a single RPC, as that will negatively
931          * impact block allocation and long-term performance. */
932         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
933                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
934
935         if (target_bytes >= cli->cl_avail_grant) {
936                 client_obd_list_unlock(&cli->cl_loi_list_lock);
937                 return 0;
938         }
939         client_obd_list_unlock(&cli->cl_loi_list_lock);
940
941         OBD_ALLOC_PTR(body);
942         if (!body)
943                 return -ENOMEM;
944
945         osc_announce_cached(cli, &body->oa, 0);
946
947         client_obd_list_lock(&cli->cl_loi_list_lock);
948         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
949         cli->cl_avail_grant = target_bytes;
950         client_obd_list_unlock(&cli->cl_loi_list_lock);
951         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
952                 body->oa.o_valid |= OBD_MD_FLFLAGS;
953                 body->oa.o_flags = 0;
954         }
955         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
956         osc_update_next_shrink(cli);
957
958         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
959                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
960                                 sizeof(*body), body, NULL);
961         if (rc != 0)
962                 __osc_update_grant(cli, body->oa.o_grant);
963         OBD_FREE_PTR(body);
964         return rc;
965 }
966
967 static int osc_should_shrink_grant(struct client_obd *client)
968 {
969         cfs_time_t time = cfs_time_current();
970         cfs_time_t next_shrink = client->cl_next_shrink_grant;
971
972         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973              OBD_CONNECT_GRANT_SHRINK) == 0)
974                 return 0;
975
976         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977                 /* Get the current RPC size directly, instead of going via:
978                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
979                  * Keep comment here so that it can be found by searching. */
980                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
981
982                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
983                     client->cl_avail_grant > brw_size)
984                         return 1;
985                 else
986                         osc_update_next_shrink(client);
987         }
988         return 0;
989 }
990
991 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
992 {
993         struct client_obd *client;
994
995         list_for_each_entry(client, &item->ti_obd_list,
996                                 cl_grant_shrink_list) {
997                 if (osc_should_shrink_grant(client))
998                         osc_shrink_grant(client);
999         }
1000         return 0;
1001 }
1002
1003 static int osc_add_shrink_grant(struct client_obd *client)
1004 {
1005         int rc;
1006
1007         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1008                                        TIMEOUT_GRANT,
1009                                        osc_grant_shrink_grant_cb, NULL,
1010                                        &client->cl_grant_shrink_list);
1011         if (rc) {
1012                 CERROR("add grant client %s error %d\n",
1013                         client->cl_import->imp_obd->obd_name, rc);
1014                 return rc;
1015         }
1016         CDEBUG(D_CACHE, "add grant client %s \n",
1017                client->cl_import->imp_obd->obd_name);
1018         osc_update_next_shrink(client);
1019         return 0;
1020 }
1021
1022 static int osc_del_shrink_grant(struct client_obd *client)
1023 {
1024         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1025                                          TIMEOUT_GRANT);
1026 }
1027
1028 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029 {
1030         /*
1031          * ocd_grant is the total grant amount we're expect to hold: if we've
1032          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1034          *
1035          * race is tolerable here: if we're evicted, but imp_state already
1036          * left EVICTED state, then cl_dirty must be 0 already.
1037          */
1038         client_obd_list_lock(&cli->cl_loi_list_lock);
1039         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1040                 cli->cl_avail_grant = ocd->ocd_grant;
1041         else
1042                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1043
1044         if (cli->cl_avail_grant < 0) {
1045                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047                       ocd->ocd_grant, cli->cl_dirty);
1048                 /* workaround for servers which do not have the patch from
1049                  * LU-2679 */
1050                 cli->cl_avail_grant = ocd->ocd_grant;
1051         }
1052
1053         /* determine the appropriate chunk size used by osc_extent. */
1054         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055         client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062             list_empty(&cli->cl_grant_shrink_list))
1063                 osc_add_shrink_grant(cli);
1064 }
1065
1066 /* We assume that the reason this OSC got a short read is because it read
1067  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069  * this stripe never got written at or beyond this stripe offset yet. */
1070 static void handle_short_read(int nob_read, obd_count page_count,
1071                               struct brw_page **pga)
1072 {
1073         char *ptr;
1074         int i = 0;
1075
1076         /* skip bytes read OK */
1077         while (nob_read > 0) {
1078                 LASSERT (page_count > 0);
1079
1080                 if (pga[i]->count > nob_read) {
1081                         /* EOF inside this page */
1082                         ptr = kmap(pga[i]->pg) +
1083                                 (pga[i]->off & ~CFS_PAGE_MASK);
1084                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085                         kunmap(pga[i]->pg);
1086                         page_count--;
1087                         i++;
1088                         break;
1089                 }
1090
1091                 nob_read -= pga[i]->count;
1092                 page_count--;
1093                 i++;
1094         }
1095
1096         /* zero remaining pages */
1097         while (page_count-- > 0) {
1098                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099                 memset(ptr, 0, pga[i]->count);
1100                 kunmap(pga[i]->pg);
1101                 i++;
1102         }
1103 }
1104
1105 static int check_write_rcs(struct ptlrpc_request *req,
1106                            int requested_nob, int niocount,
1107                            obd_count page_count, struct brw_page **pga)
1108 {
1109         int     i;
1110         __u32   *remote_rcs;
1111
1112         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113                                                   sizeof(*remote_rcs) *
1114                                                   niocount);
1115         if (remote_rcs == NULL) {
1116                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117                 return(-EPROTO);
1118         }
1119
1120         /* return error if any niobuf was in error */
1121         for (i = 0; i < niocount; i++) {
1122                 if ((int)remote_rcs[i] < 0)
1123                         return(remote_rcs[i]);
1124
1125                 if (remote_rcs[i] != 0) {
1126                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127                                 i, remote_rcs[i], req);
1128                         return(-EPROTO);
1129                 }
1130         }
1131
1132         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134                        req->rq_bulk->bd_nob_transferred, requested_nob);
1135                 return(-EPROTO);
1136         }
1137
1138         return (0);
1139 }
1140
1141 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142 {
1143         if (p1->flag != p2->flag) {
1144                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1145                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1146
1147                 /* warn if we try to combine flags that we don't know to be
1148                  * safe to combine */
1149                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1150                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1151                               "report this at http://bugs.whamcloud.com/\n",
1152                               p1->flag, p2->flag);
1153                 }
1154                 return 0;
1155         }
1156
1157         return (p1->off + p1->count == p2->off);
1158 }
1159
1160 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1161                                    struct brw_page **pga, int opc,
1162                                    cksum_type_t cksum_type)
1163 {
1164         __u32                           cksum;
1165         int                             i = 0;
1166         struct cfs_crypto_hash_desc     *hdesc;
1167         unsigned int                    bufsize;
1168         int                             err;
1169         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1170
1171         LASSERT(pg_count > 0);
1172
1173         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1174         if (IS_ERR(hdesc)) {
1175                 CERROR("Unable to initialize checksum hash %s\n",
1176                        cfs_crypto_hash_name(cfs_alg));
1177                 return PTR_ERR(hdesc);
1178         }
1179
1180         while (nob > 0 && pg_count > 0) {
1181                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1182
1183                 /* corrupt the data before we compute the checksum, to
1184                  * simulate an OST->client data error */
1185                 if (i == 0 && opc == OST_READ &&
1186                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1187                         unsigned char *ptr = kmap(pga[i]->pg);
1188                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1189                         memcpy(ptr + off, "bad1", min(4, nob));
1190                         kunmap(pga[i]->pg);
1191                 }
1192                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1193                                   pga[i]->off & ~CFS_PAGE_MASK,
1194                                   count);
1195                 CDEBUG(D_PAGE,
1196                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1197                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1198                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1199                        page_private(pga[i]->pg),
1200                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1201
1202                 nob -= pga[i]->count;
1203                 pg_count--;
1204                 i++;
1205         }
1206
1207         bufsize = 4;
1208         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1209
1210         if (err)
1211                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1212
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve,
1226                                 int resend)
1227 {
1228         struct ptlrpc_request   *req;
1229         struct ptlrpc_bulk_desc *desc;
1230         struct ost_body  *body;
1231         struct obd_ioobj        *ioobj;
1232         struct niobuf_remote    *niobuf;
1233         int niocount, i, requested_nob, opc, rc;
1234         struct osc_brw_async_args *aa;
1235         struct req_capsule      *pill;
1236         struct brw_page *pg_prev;
1237
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 return -ENOMEM; /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 return -EINVAL; /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW_WRITE);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251         }
1252         if (req == NULL)
1253                 return -ENOMEM;
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262                              sizeof(*ioobj));
1263         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264                              niocount * sizeof(*niobuf));
1265         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268         if (rc) {
1269                 ptlrpc_request_free(req);
1270                 return rc;
1271         }
1272         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1273         ptlrpc_at_set_req_timeout(req);
1274         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1275          * retry logic */
1276         req->rq_no_retry_einprogress = 1;
1277
1278         desc = ptlrpc_prep_bulk_imp(req, page_count,
1279                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281                 OST_BULK_PORTAL);
1282
1283         if (desc == NULL)
1284                 GOTO(out, rc = -ENOMEM);
1285         /* NB request now owns desc and will free it when it gets freed */
1286
1287         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294         obdo_to_ioobj(oa, ioobj);
1295         ioobj->ioo_bufcnt = niocount;
1296         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1297          * that might be send for this request.  The actual number is decided
1298          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1299          * "max - 1" for old client compatibility sending "0", and also so the
1300          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1301         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302         osc_pack_capa(req, body, ocapa);
1303         LASSERT(page_count > 0);
1304         pg_prev = pga[0];
1305         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306                 struct brw_page *pg = pga[i];
1307                 int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309                 LASSERT(pg->count > 0);
1310                 /* make sure there is no gap in the middle of page array */
1311                 LASSERTF(page_count == 1 ||
1312                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313                           ergo(i > 0 && i < page_count - 1,
1314                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1315                           ergo(i == page_count - 1, poff == 0)),
1316                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1317                          i, page_count, pg, pg->off, pg->count);
1318                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1320                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1321                          i, page_count,
1322                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323                          pg_prev->pg, page_private(pg_prev->pg),
1324                          pg_prev->pg->index, pg_prev->off);
1325                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326                         (pg->flag & OBD_BRW_SRVLOCK));
1327
1328                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329                 requested_nob += pg->count;
1330
1331                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332                         niobuf--;
1333                         niobuf->len += pg->count;
1334                 } else {
1335                         niobuf->offset = pg->off;
1336                         niobuf->len    = pg->count;
1337                         niobuf->flags  = pg->flag;
1338                 }
1339                 pg_prev = pg;
1340         }
1341
1342         LASSERTF((void *)(niobuf - niocount) ==
1343                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348         if (resend) {
1349                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1351                         body->oa.o_flags = 0;
1352                 }
1353                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354         }
1355
1356         if (osc_should_shrink_grant(cli))
1357                 osc_shrink_grant_local(cli, &body->oa);
1358
1359         /* size[REQ_REC_OFF] still sizeof (*body) */
1360         if (opc == OST_WRITE) {
1361                 if (cli->cl_checksum &&
1362                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363                         /* store cl_cksum_type in a local variable since
1364                          * it can be changed via lprocfs */
1365                         cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1369                                 body->oa.o_flags = 0;
1370                         }
1371                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1372                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374                                                              page_count, pga,
1375                                                              OST_WRITE,
1376                                                              cksum_type);
1377                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378                                body->oa.o_cksum);
1379                         /* save this in 'oa', too, for later checking */
1380                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381                         oa->o_flags |= cksum_type_pack(cksum_type);
1382                 } else {
1383                         /* clear out the checksum flag, in case this is a
1384                          * resend but cl_checksum is no longer set. b=11238 */
1385                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1386                 }
1387                 oa->o_cksum = body->oa.o_cksum;
1388                 /* 1 RC per niobuf */
1389                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390                                      sizeof(__u32) * niocount);
1391         } else {
1392                 if (cli->cl_checksum &&
1393                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395                                 body->oa.o_flags = 0;
1396                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                 }
1399         }
1400         ptlrpc_request_set_replen(req);
1401
1402         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403         aa = ptlrpc_req_async_args(req);
1404         aa->aa_oa = oa;
1405         aa->aa_requested_nob = requested_nob;
1406         aa->aa_nio_count = niocount;
1407         aa->aa_page_count = page_count;
1408         aa->aa_resends = 0;
1409         aa->aa_ppga = pga;
1410         aa->aa_cli = cli;
1411         INIT_LIST_HEAD(&aa->aa_oaps);
1412         if (ocapa && reserve)
1413                 aa->aa_ocapa = capa_get(ocapa);
1414
1415         *reqp = req;
1416         return 0;
1417
1418  out:
1419         ptlrpc_req_finished(req);
1420         return rc;
1421 }
1422
1423 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424                                 __u32 client_cksum, __u32 server_cksum, int nob,
1425                                 obd_count page_count, struct brw_page **pga,
1426                                 cksum_type_t client_cksum_type)
1427 {
1428         __u32 new_cksum;
1429         char *msg;
1430         cksum_type_t cksum_type;
1431
1432         if (server_cksum == client_cksum) {
1433                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434                 return 0;
1435         }
1436
1437         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438                                        oa->o_flags : 0);
1439         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440                                       cksum_type);
1441
1442         if (cksum_type != client_cksum_type)
1443                 msg = "the server did not use the checksum type specified in "
1444                       "the original request - likely a protocol problem";
1445         else if (new_cksum == server_cksum)
1446                 msg = "changed on the client after we checksummed it - "
1447                       "likely false positive due to mmap IO (bug 11742)";
1448         else if (new_cksum == client_cksum)
1449                 msg = "changed in transit before arrival at OST";
1450         else
1451                 msg = "changed in transit AND doesn't match the original - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453
1454         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1456                            msg, libcfs_nid2str(peer->nid),
1457                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460                            POSTID(&oa->o_oi), pga[0]->off,
1461                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463                "client csum now %x\n", client_cksum, client_cksum_type,
1464                server_cksum, cksum_type, new_cksum);
1465         return 1;
1466 }
1467
1468 /* Note rc enters this function as number of bytes transferred */
1469 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470 {
1471         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472         const lnet_process_id_t *peer =
1473                         &req->rq_import->imp_connection->c_peer;
1474         struct client_obd *cli = aa->aa_cli;
1475         struct ost_body *body;
1476         __u32 client_cksum = 0;
1477
1478         if (rc < 0 && rc != -EDQUOT) {
1479                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1480                 return rc;
1481         }
1482
1483         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1485         if (body == NULL) {
1486                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1487                 return -EPROTO;
1488         }
1489
1490         /* set/clear over quota flag for a uid/gid */
1491         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1496                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497                        body->oa.o_flags);
1498                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1499         }
1500
1501         osc_update_grant(cli, body);
1502
1503         if (rc < 0)
1504                 return rc;
1505
1506         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         return -EPROTO;
1513                 }
1514                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         return -EAGAIN;
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa->aa_requested_nob,
1522                                          aa->aa_page_count, aa->aa_ppga,
1523                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1524                         return -EAGAIN;
1525
1526                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527                                      aa->aa_page_count, aa->aa_ppga);
1528                 GOTO(out, rc);
1529         }
1530
1531         /* The rest of this function executes only for OST_READs */
1532
1533         /* if unwrap_bulk failed, return -EAGAIN to retry */
1534         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535         if (rc < 0)
1536                 GOTO(out, rc = -EAGAIN);
1537
1538         if (rc > aa->aa_requested_nob) {
1539                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1540                        aa->aa_requested_nob);
1541                 return -EPROTO;
1542         }
1543
1544         if (rc != req->rq_bulk->bd_nob_transferred) {
1545                 CERROR ("Unexpected rc %d (%d transferred)\n",
1546                         rc, req->rq_bulk->bd_nob_transferred);
1547                 return (-EPROTO);
1548         }
1549
1550         if (rc < aa->aa_requested_nob)
1551                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554                 static int cksum_counter;
1555                 __u32      server_cksum = body->oa.o_cksum;
1556                 char      *via;
1557                 char      *router;
1558                 cksum_type_t cksum_type;
1559
1560                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1561                                                body->oa.o_flags : 0);
1562                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563                                                  aa->aa_ppga, OST_READ,
1564                                                  cksum_type);
1565
1566                 if (peer->nid == req->rq_bulk->bd_sender) {
1567                         via = router = "";
1568                 } else {
1569                         via = " via ";
1570                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1571                 }
1572
1573                 if (server_cksum == ~0 && rc > 0) {
1574                         CERROR("Protocol error: server %s set the 'checksum' "
1575                                "bit, but didn't send a checksum.  Not fatal, "
1576                                "but please notify on http://bugs.whamcloud.com/\n",
1577                                libcfs_nid2str(peer->nid));
1578                 } else if (server_cksum != client_cksum) {
1579                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1580                                            "%s%s%s inode "DFID" object "DOSTID
1581                                            " extent ["LPU64"-"LPU64"]\n",
1582                                            req->rq_import->imp_obd->obd_name,
1583                                            libcfs_nid2str(peer->nid),
1584                                            via, router,
1585                                            body->oa.o_valid & OBD_MD_FLFID ?
1586                                                 body->oa.o_parent_seq : (__u64)0,
1587                                            body->oa.o_valid & OBD_MD_FLFID ?
1588                                                 body->oa.o_parent_oid : 0,
1589                                            body->oa.o_valid & OBD_MD_FLFID ?
1590                                                 body->oa.o_parent_ver : 0,
1591                                            POSTID(&body->oa.o_oi),
1592                                            aa->aa_ppga[0]->off,
1593                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1594                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1595                                                                         1);
1596                         CERROR("client %x, server %x, cksum_type %x\n",
1597                                client_cksum, server_cksum, cksum_type);
1598                         cksum_counter = 0;
1599                         aa->aa_oa->o_cksum = client_cksum;
1600                         rc = -EAGAIN;
1601                 } else {
1602                         cksum_counter++;
1603                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1604                         rc = 0;
1605                 }
1606         } else if (unlikely(client_cksum)) {
1607                 static int cksum_missed;
1608
1609                 cksum_missed++;
1610                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1611                         CERROR("Checksum %u requested from %s but not sent\n",
1612                                cksum_missed, libcfs_nid2str(peer->nid));
1613         } else {
1614                 rc = 0;
1615         }
1616 out:
1617         if (rc >= 0)
1618                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1619                                      aa->aa_oa, &body->oa);
1620
1621         return rc;
1622 }
1623
1624 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1625                             struct lov_stripe_md *lsm,
1626                             obd_count page_count, struct brw_page **pga,
1627                             struct obd_capa *ocapa)
1628 {
1629         struct ptlrpc_request *req;
1630         int                 rc;
1631         wait_queue_head_t           waitq;
1632         int                 generation, resends = 0;
1633         struct l_wait_info     lwi;
1634
1635         init_waitqueue_head(&waitq);
1636         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1637
1638 restart_bulk:
1639         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1640                                   page_count, pga, &req, ocapa, 0, resends);
1641         if (rc != 0)
1642                 return (rc);
1643
1644         if (resends) {
1645                 req->rq_generation_set = 1;
1646                 req->rq_import_generation = generation;
1647                 req->rq_sent = cfs_time_current_sec() + resends;
1648         }
1649
1650         rc = ptlrpc_queue_wait(req);
1651
1652         if (rc == -ETIMEDOUT && req->rq_resend) {
1653                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1654                 ptlrpc_req_finished(req);
1655                 goto restart_bulk;
1656         }
1657
1658         rc = osc_brw_fini_request(req, rc);
1659
1660         ptlrpc_req_finished(req);
1661         /* When server return -EINPROGRESS, client should always retry
1662          * regardless of the number of times the bulk was resent already.*/
1663         if (osc_recoverable_error(rc)) {
1664                 resends++;
1665                 if (rc != -EINPROGRESS &&
1666                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1667                         CERROR("%s: too many resend retries for object: "
1668                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1669                                POSTID(&oa->o_oi), rc);
1670                         goto out;
1671                 }
1672                 if (generation !=
1673                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1674                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1675                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1676                                POSTID(&oa->o_oi), rc);
1677                         goto out;
1678                 }
1679
1680                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1681                                        NULL);
1682                 l_wait_event(waitq, 0, &lwi);
1683
1684                 goto restart_bulk;
1685         }
1686 out:
1687         if (rc == -EAGAIN || rc == -EINPROGRESS)
1688                 rc = -EIO;
1689         return rc;
1690 }
1691
1692 static int osc_brw_redo_request(struct ptlrpc_request *request,
1693                                 struct osc_brw_async_args *aa, int rc)
1694 {
1695         struct ptlrpc_request *new_req;
1696         struct osc_brw_async_args *new_aa;
1697         struct osc_async_page *oap;
1698
1699         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1700                   "redo for recoverable error %d", rc);
1701
1702         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1703                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1704                                   aa->aa_cli, aa->aa_oa,
1705                                   NULL /* lsm unused by osc currently */,
1706                                   aa->aa_page_count, aa->aa_ppga,
1707                                   &new_req, aa->aa_ocapa, 0, 1);
1708         if (rc)
1709                 return rc;
1710
1711         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1712                 if (oap->oap_request != NULL) {
1713                         LASSERTF(request == oap->oap_request,
1714                                  "request %p != oap_request %p\n",
1715                                  request, oap->oap_request);
1716                         if (oap->oap_interrupted) {
1717                                 ptlrpc_req_finished(new_req);
1718                                 return -EINTR;
1719                         }
1720                 }
1721         }
1722         /* New request takes over pga and oaps from old request.
1723          * Note that copying a list_head doesn't work, need to move it... */
1724         aa->aa_resends++;
1725         new_req->rq_interpret_reply = request->rq_interpret_reply;
1726         new_req->rq_async_args = request->rq_async_args;
1727         /* cap resend delay to the current request timeout, this is similar to
1728          * what ptlrpc does (see after_reply()) */
1729         if (aa->aa_resends > new_req->rq_timeout)
1730                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1731         else
1732                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1733         new_req->rq_generation_set = 1;
1734         new_req->rq_import_generation = request->rq_import_generation;
1735
1736         new_aa = ptlrpc_req_async_args(new_req);
1737
1738         INIT_LIST_HEAD(&new_aa->aa_oaps);
1739         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1740         INIT_LIST_HEAD(&new_aa->aa_exts);
1741         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1742         new_aa->aa_resends = aa->aa_resends;
1743
1744         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1745                 if (oap->oap_request) {
1746                         ptlrpc_req_finished(oap->oap_request);
1747                         oap->oap_request = ptlrpc_request_addref(new_req);
1748                 }
1749         }
1750
1751         new_aa->aa_ocapa = aa->aa_ocapa;
1752         aa->aa_ocapa = NULL;
1753
1754         /* XXX: This code will run into problem if we're going to support
1755          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1756          * and wait for all of them to be finished. We should inherit request
1757          * set from old request. */
1758         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1759
1760         DEBUG_REQ(D_INFO, new_req, "new request");
1761         return 0;
1762 }
1763
1764 /*
1765  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1766  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1767  * fine for our small page arrays and doesn't require allocation.  its an
1768  * insertion sort that swaps elements that are strides apart, shrinking the
1769  * stride down until its '1' and the array is sorted.
1770  */
1771 static void sort_brw_pages(struct brw_page **array, int num)
1772 {
1773         int stride, i, j;
1774         struct brw_page *tmp;
1775
1776         if (num == 1)
1777                 return;
1778         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1779                 ;
1780
1781         do {
1782                 stride /= 3;
1783                 for (i = stride ; i < num ; i++) {
1784                         tmp = array[i];
1785                         j = i;
1786                         while (j >= stride && array[j - stride]->off > tmp->off) {
1787                                 array[j] = array[j - stride];
1788                                 j -= stride;
1789                         }
1790                         array[j] = tmp;
1791                 }
1792         } while (stride > 1);
1793 }
1794
1795 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1796 {
1797         int count = 1;
1798         int offset;
1799         int i = 0;
1800
1801         LASSERT (pages > 0);
1802         offset = pg[i]->off & ~CFS_PAGE_MASK;
1803
1804         for (;;) {
1805                 pages--;
1806                 if (pages == 0)  /* that's all */
1807                         return count;
1808
1809                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1810                         return count;   /* doesn't end on page boundary */
1811
1812                 i++;
1813                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1814                 if (offset != 0)        /* doesn't start on page boundary */
1815                         return count;
1816
1817                 count++;
1818         }
1819 }
1820
1821 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1822 {
1823         struct brw_page **ppga;
1824         int i;
1825
1826         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1827         if (ppga == NULL)
1828                 return NULL;
1829
1830         for (i = 0; i < count; i++)
1831                 ppga[i] = pga + i;
1832         return ppga;
1833 }
1834
1835 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1836 {
1837         LASSERT(ppga != NULL);
1838         OBD_FREE(ppga, sizeof(*ppga) * count);
1839 }
1840
1841 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1842                    obd_count page_count, struct brw_page *pga,
1843                    struct obd_trans_info *oti)
1844 {
1845         struct obdo *saved_oa = NULL;
1846         struct brw_page **ppga, **orig;
1847         struct obd_import *imp = class_exp2cliimp(exp);
1848         struct client_obd *cli;
1849         int rc, page_count_orig;
1850
1851         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1852         cli = &imp->imp_obd->u.cli;
1853
1854         if (cmd & OBD_BRW_CHECK) {
1855                 /* The caller just wants to know if there's a chance that this
1856                  * I/O can succeed */
1857
1858                 if (imp->imp_invalid)
1859                         return -EIO;
1860                 return 0;
1861         }
1862
1863         /* test_brw with a failed create can trip this, maybe others. */
1864         LASSERT(cli->cl_max_pages_per_rpc);
1865
1866         rc = 0;
1867
1868         orig = ppga = osc_build_ppga(pga, page_count);
1869         if (ppga == NULL)
1870                 return -ENOMEM;
1871         page_count_orig = page_count;
1872
1873         sort_brw_pages(ppga, page_count);
1874         while (page_count) {
1875                 obd_count pages_per_brw;
1876
1877                 if (page_count > cli->cl_max_pages_per_rpc)
1878                         pages_per_brw = cli->cl_max_pages_per_rpc;
1879                 else
1880                         pages_per_brw = page_count;
1881
1882                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1883
1884                 if (saved_oa != NULL) {
1885                         /* restore previously saved oa */
1886                         *oinfo->oi_oa = *saved_oa;
1887                 } else if (page_count > pages_per_brw) {
1888                         /* save a copy of oa (brw will clobber it) */
1889                         OBDO_ALLOC(saved_oa);
1890                         if (saved_oa == NULL)
1891                                 GOTO(out, rc = -ENOMEM);
1892                         *saved_oa = *oinfo->oi_oa;
1893                 }
1894
1895                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1896                                       pages_per_brw, ppga, oinfo->oi_capa);
1897
1898                 if (rc != 0)
1899                         break;
1900
1901                 page_count -= pages_per_brw;
1902                 ppga += pages_per_brw;
1903         }
1904
1905 out:
1906         osc_release_ppga(orig, page_count_orig);
1907
1908         if (saved_oa != NULL)
1909                 OBDO_FREE(saved_oa);
1910
1911         return rc;
1912 }
1913
1914 static int brw_interpret(const struct lu_env *env,
1915                          struct ptlrpc_request *req, void *data, int rc)
1916 {
1917         struct osc_brw_async_args *aa = data;
1918         struct osc_extent *ext;
1919         struct osc_extent *tmp;
1920         struct cl_object  *obj = NULL;
1921         struct client_obd *cli = aa->aa_cli;
1922
1923         rc = osc_brw_fini_request(req, rc);
1924         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1925         /* When server return -EINPROGRESS, client should always retry
1926          * regardless of the number of times the bulk was resent already. */
1927         if (osc_recoverable_error(rc)) {
1928                 if (req->rq_import_generation !=
1929                     req->rq_import->imp_generation) {
1930                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1931                                ""DOSTID", rc = %d.\n",
1932                                req->rq_import->imp_obd->obd_name,
1933                                POSTID(&aa->aa_oa->o_oi), rc);
1934                 } else if (rc == -EINPROGRESS ||
1935                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1936                         rc = osc_brw_redo_request(req, aa, rc);
1937                 } else {
1938                         CERROR("%s: too many resent retries for object: "
1939                                ""LPU64":"LPU64", rc = %d.\n",
1940                                req->rq_import->imp_obd->obd_name,
1941                                POSTID(&aa->aa_oa->o_oi), rc);
1942                 }
1943
1944                 if (rc == 0)
1945                         return 0;
1946                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1947                         rc = -EIO;
1948         }
1949
1950         if (aa->aa_ocapa) {
1951                 capa_put(aa->aa_ocapa);
1952                 aa->aa_ocapa = NULL;
1953         }
1954
1955         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1956                 if (obj == NULL && rc == 0) {
1957                         obj = osc2cl(ext->oe_obj);
1958                         cl_object_get(obj);
1959                 }
1960
1961                 list_del_init(&ext->oe_link);
1962                 osc_extent_finish(env, ext, 1, rc);
1963         }
1964         LASSERT(list_empty(&aa->aa_exts));
1965         LASSERT(list_empty(&aa->aa_oaps));
1966
1967         if (obj != NULL) {
1968                 struct obdo *oa = aa->aa_oa;
1969                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1970                 unsigned long valid = 0;
1971
1972                 LASSERT(rc == 0);
1973                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1974                         attr->cat_blocks = oa->o_blocks;
1975                         valid |= CAT_BLOCKS;
1976                 }
1977                 if (oa->o_valid & OBD_MD_FLMTIME) {
1978                         attr->cat_mtime = oa->o_mtime;
1979                         valid |= CAT_MTIME;
1980                 }
1981                 if (oa->o_valid & OBD_MD_FLATIME) {
1982                         attr->cat_atime = oa->o_atime;
1983                         valid |= CAT_ATIME;
1984                 }
1985                 if (oa->o_valid & OBD_MD_FLCTIME) {
1986                         attr->cat_ctime = oa->o_ctime;
1987                         valid |= CAT_CTIME;
1988                 }
1989                 if (valid != 0) {
1990                         cl_object_attr_lock(obj);
1991                         cl_object_attr_set(env, obj, attr, valid);
1992                         cl_object_attr_unlock(obj);
1993                 }
1994                 cl_object_put(env, obj);
1995         }
1996         OBDO_FREE(aa->aa_oa);
1997
1998         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1999                           req->rq_bulk->bd_nob_transferred);
2000         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2001         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2002
2003         client_obd_list_lock(&cli->cl_loi_list_lock);
2004         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2005          * is called so we know whether to go to sync BRWs or wait for more
2006          * RPCs to complete */
2007         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2008                 cli->cl_w_in_flight--;
2009         else
2010                 cli->cl_r_in_flight--;
2011         osc_wake_cache_waiters(cli);
2012         client_obd_list_unlock(&cli->cl_loi_list_lock);
2013
2014         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2015         return rc;
2016 }
2017
2018 /**
2019  * Build an RPC by the list of extent @ext_list. The caller must ensure
2020  * that the total pages in this list are NOT over max pages per RPC.
2021  * Extents in the list must be in OES_RPC state.
2022  */
2023 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2024                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2025 {
2026         struct ptlrpc_request           *req = NULL;
2027         struct osc_extent               *ext;
2028         struct brw_page                 **pga = NULL;
2029         struct osc_brw_async_args       *aa = NULL;
2030         struct obdo                     *oa = NULL;
2031         struct osc_async_page           *oap;
2032         struct osc_async_page           *tmp;
2033         struct cl_req                   *clerq = NULL;
2034         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2035                                                                       CRT_READ;
2036         struct ldlm_lock                *lock = NULL;
2037         struct cl_req_attr              *crattr = NULL;
2038         obd_off                         starting_offset = OBD_OBJECT_EOF;
2039         obd_off                         ending_offset = 0;
2040         int                             mpflag = 0;
2041         int                             mem_tight = 0;
2042         int                             page_count = 0;
2043         int                             i;
2044         int                             rc;
2045         LIST_HEAD(rpc_list);
2046
2047         LASSERT(!list_empty(ext_list));
2048
2049         /* add pages into rpc_list to build BRW rpc */
2050         list_for_each_entry(ext, ext_list, oe_link) {
2051                 LASSERT(ext->oe_state == OES_RPC);
2052                 mem_tight |= ext->oe_memalloc;
2053                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2054                         ++page_count;
2055                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2056                         if (starting_offset > oap->oap_obj_off)
2057                                 starting_offset = oap->oap_obj_off;
2058                         else
2059                                 LASSERT(oap->oap_page_off == 0);
2060                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2061                                 ending_offset = oap->oap_obj_off +
2062                                                 oap->oap_count;
2063                         else
2064                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2065                                         PAGE_CACHE_SIZE);
2066                 }
2067         }
2068
2069         if (mem_tight)
2070                 mpflag = cfs_memory_pressure_get_and_set();
2071
2072         OBD_ALLOC(crattr, sizeof(*crattr));
2073         if (crattr == NULL)
2074                 GOTO(out, rc = -ENOMEM);
2075
2076         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2077         if (pga == NULL)
2078                 GOTO(out, rc = -ENOMEM);
2079
2080         OBDO_ALLOC(oa);
2081         if (oa == NULL)
2082                 GOTO(out, rc = -ENOMEM);
2083
2084         i = 0;
2085         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2086                 struct cl_page *page = oap2cl_page(oap);
2087                 if (clerq == NULL) {
2088                         clerq = cl_req_alloc(env, page, crt,
2089                                              1 /* only 1-object rpcs for now */);
2090                         if (IS_ERR(clerq))
2091                                 GOTO(out, rc = PTR_ERR(clerq));
2092                         lock = oap->oap_ldlm_lock;
2093                 }
2094                 if (mem_tight)
2095                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2096                 pga[i] = &oap->oap_brw_page;
2097                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2098                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2099                        pga[i]->pg, page_index(oap->oap_page), oap,
2100                        pga[i]->flag);
2101                 i++;
2102                 cl_req_page_add(env, clerq, page);
2103         }
2104
2105         /* always get the data for the obdo for the rpc */
2106         LASSERT(clerq != NULL);
2107         crattr->cra_oa = oa;
2108         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2109         if (lock) {
2110                 oa->o_handle = lock->l_remote_handle;
2111                 oa->o_valid |= OBD_MD_FLHANDLE;
2112         }
2113
2114         rc = cl_req_prep(env, clerq);
2115         if (rc != 0) {
2116                 CERROR("cl_req_prep failed: %d\n", rc);
2117                 GOTO(out, rc);
2118         }
2119
2120         sort_brw_pages(pga, page_count);
2121         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2122                         pga, &req, crattr->cra_capa, 1, 0);
2123         if (rc != 0) {
2124                 CERROR("prep_req failed: %d\n", rc);
2125                 GOTO(out, rc);
2126         }
2127
2128         req->rq_interpret_reply = brw_interpret;
2129
2130         if (mem_tight != 0)
2131                 req->rq_memalloc = 1;
2132
2133         /* Need to update the timestamps after the request is built in case
2134          * we race with setattr (locally or in queue at OST).  If OST gets
2135          * later setattr before earlier BRW (as determined by the request xid),
2136          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2137          * way to do this in a single call.  bug 10150 */
2138         cl_req_attr_set(env, clerq, crattr,
2139                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2140
2141         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2142
2143         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2144         aa = ptlrpc_req_async_args(req);
2145         INIT_LIST_HEAD(&aa->aa_oaps);
2146         list_splice_init(&rpc_list, &aa->aa_oaps);
2147         INIT_LIST_HEAD(&aa->aa_exts);
2148         list_splice_init(ext_list, &aa->aa_exts);
2149         aa->aa_clerq = clerq;
2150
2151         /* queued sync pages can be torn down while the pages
2152          * were between the pending list and the rpc */
2153         tmp = NULL;
2154         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2155                 /* only one oap gets a request reference */
2156                 if (tmp == NULL)
2157                         tmp = oap;
2158                 if (oap->oap_interrupted && !req->rq_intr) {
2159                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2160                                         oap, req);
2161                         ptlrpc_mark_interrupted(req);
2162                 }
2163         }
2164         if (tmp != NULL)
2165                 tmp->oap_request = ptlrpc_request_addref(req);
2166
2167         client_obd_list_lock(&cli->cl_loi_list_lock);
2168         starting_offset >>= PAGE_CACHE_SHIFT;
2169         if (cmd == OBD_BRW_READ) {
2170                 cli->cl_r_in_flight++;
2171                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2172                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2173                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2174                                       starting_offset + 1);
2175         } else {
2176                 cli->cl_w_in_flight++;
2177                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2178                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2179                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2180                                       starting_offset + 1);
2181         }
2182         client_obd_list_unlock(&cli->cl_loi_list_lock);
2183
2184         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2185                   page_count, aa, cli->cl_r_in_flight,
2186                   cli->cl_w_in_flight);
2187
2188         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2189          * see which CPU/NUMA node the majority of pages were allocated
2190          * on, and try to assign the async RPC to the CPU core
2191          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2192          *
2193          * But on the other hand, we expect that multiple ptlrpcd
2194          * threads and the initial write sponsor can run in parallel,
2195          * especially when data checksum is enabled, which is CPU-bound
2196          * operation and single ptlrpcd thread cannot process in time.
2197          * So more ptlrpcd threads sharing BRW load
2198          * (with PDL_POLICY_ROUND) seems better.
2199          */
2200         ptlrpcd_add_req(req, pol, -1);
2201         rc = 0;
2202
2203 out:
2204         if (mem_tight != 0)
2205                 cfs_memory_pressure_restore(mpflag);
2206
2207         if (crattr != NULL) {
2208                 capa_put(crattr->cra_capa);
2209                 OBD_FREE(crattr, sizeof(*crattr));
2210         }
2211
2212         if (rc != 0) {
2213                 LASSERT(req == NULL);
2214
2215                 if (oa)
2216                         OBDO_FREE(oa);
2217                 if (pga)
2218                         OBD_FREE(pga, sizeof(*pga) * page_count);
2219                 /* this should happen rarely and is pretty bad, it makes the
2220                  * pending list not follow the dirty order */
2221                 while (!list_empty(ext_list)) {
2222                         ext = list_entry(ext_list->next, struct osc_extent,
2223                                              oe_link);
2224                         list_del_init(&ext->oe_link);
2225                         osc_extent_finish(env, ext, 0, rc);
2226                 }
2227                 if (clerq && !IS_ERR(clerq))
2228                         cl_req_completion(env, clerq, rc);
2229         }
2230         return rc;
2231 }
2232
2233 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2234                                         struct ldlm_enqueue_info *einfo)
2235 {
2236         void *data = einfo->ei_cbdata;
2237         int set = 0;
2238
2239         LASSERT(lock != NULL);
2240         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2241         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2242         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2243         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2244
2245         lock_res_and_lock(lock);
2246         spin_lock(&osc_ast_guard);
2247
2248         if (lock->l_ast_data == NULL)
2249                 lock->l_ast_data = data;
2250         if (lock->l_ast_data == data)
2251                 set = 1;
2252
2253         spin_unlock(&osc_ast_guard);
2254         unlock_res_and_lock(lock);
2255
2256         return set;
2257 }
2258
2259 static int osc_set_data_with_check(struct lustre_handle *lockh,
2260                                    struct ldlm_enqueue_info *einfo)
2261 {
2262         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2263         int set = 0;
2264
2265         if (lock != NULL) {
2266                 set = osc_set_lock_data_with_check(lock, einfo);
2267                 LDLM_LOCK_PUT(lock);
2268         } else
2269                 CERROR("lockh %p, data %p - client evicted?\n",
2270                        lockh, einfo->ei_cbdata);
2271         return set;
2272 }
2273
2274 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2275                              ldlm_iterator_t replace, void *data)
2276 {
2277         struct ldlm_res_id res_id;
2278         struct obd_device *obd = class_exp2obd(exp);
2279
2280         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2281         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2282         return 0;
2283 }
2284
2285 /* find any ldlm lock of the inode in osc
2286  * return 0    not find
2287  *      1    find one
2288  *      < 0    error */
2289 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2290                            ldlm_iterator_t replace, void *data)
2291 {
2292         struct ldlm_res_id res_id;
2293         struct obd_device *obd = class_exp2obd(exp);
2294         int rc = 0;
2295
2296         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2297         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2298         if (rc == LDLM_ITER_STOP)
2299                 return(1);
2300         if (rc == LDLM_ITER_CONTINUE)
2301                 return(0);
2302         return(rc);
2303 }
2304
2305 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2306                             obd_enqueue_update_f upcall, void *cookie,
2307                             __u64 *flags, int agl, int rc)
2308 {
2309         int intent = *flags & LDLM_FL_HAS_INTENT;
2310
2311         if (intent) {
2312                 /* The request was created before ldlm_cli_enqueue call. */
2313                 if (rc == ELDLM_LOCK_ABORTED) {
2314                         struct ldlm_reply *rep;
2315                         rep = req_capsule_server_get(&req->rq_pill,
2316                                                      &RMF_DLM_REP);
2317
2318                         LASSERT(rep != NULL);
2319                         rep->lock_policy_res1 =
2320                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2321                         if (rep->lock_policy_res1)
2322                                 rc = rep->lock_policy_res1;
2323                 }
2324         }
2325
2326         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2327             (rc == 0)) {
2328                 *flags |= LDLM_FL_LVB_READY;
2329                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2330                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2331         }
2332
2333         /* Call the update callback. */
2334         rc = (*upcall)(cookie, rc);
2335         return rc;
2336 }
2337
2338 static int osc_enqueue_interpret(const struct lu_env *env,
2339                                  struct ptlrpc_request *req,
2340                                  struct osc_enqueue_args *aa, int rc)
2341 {
2342         struct ldlm_lock *lock;
2343         struct lustre_handle handle;
2344         __u32 mode;
2345         struct ost_lvb *lvb;
2346         __u32 lvb_len;
2347         __u64 *flags = aa->oa_flags;
2348
2349         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2350          * might be freed anytime after lock upcall has been called. */
2351         lustre_handle_copy(&handle, aa->oa_lockh);
2352         mode = aa->oa_ei->ei_mode;
2353
2354         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2355          * be valid. */
2356         lock = ldlm_handle2lock(&handle);
2357
2358         /* Take an additional reference so that a blocking AST that
2359          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2360          * to arrive after an upcall has been executed by
2361          * osc_enqueue_fini(). */
2362         ldlm_lock_addref(&handle, mode);
2363
2364         /* Let CP AST to grant the lock first. */
2365         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2366
2367         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2368                 lvb = NULL;
2369                 lvb_len = 0;
2370         } else {
2371                 lvb = aa->oa_lvb;
2372                 lvb_len = sizeof(*aa->oa_lvb);
2373         }
2374
2375         /* Complete obtaining the lock procedure. */
2376         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2377                                    mode, flags, lvb, lvb_len, &handle, rc);
2378         /* Complete osc stuff. */
2379         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2380                               flags, aa->oa_agl, rc);
2381
2382         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2383
2384         /* Release the lock for async request. */
2385         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2386                 /*
2387                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2388                  * not already released by
2389                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2390                  */
2391                 ldlm_lock_decref(&handle, mode);
2392
2393         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2394                  aa->oa_lockh, req, aa);
2395         ldlm_lock_decref(&handle, mode);
2396         LDLM_LOCK_PUT(lock);
2397         return rc;
2398 }
2399
2400 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2401                         struct lov_oinfo *loi, int flags,
2402                         struct ost_lvb *lvb, __u32 mode, int rc)
2403 {
2404         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2405
2406         if (rc == ELDLM_OK) {
2407                 __u64 tmp;
2408
2409                 LASSERT(lock != NULL);
2410                 loi->loi_lvb = *lvb;
2411                 tmp = loi->loi_lvb.lvb_size;
2412                 /* Extend KMS up to the end of this lock and no further
2413                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2414                 if (tmp > lock->l_policy_data.l_extent.end)
2415                         tmp = lock->l_policy_data.l_extent.end + 1;
2416                 if (tmp >= loi->loi_kms) {
2417                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2418                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2419                         loi_kms_set(loi, tmp);
2420                 } else {
2421                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2422                                    LPU64"; leaving kms="LPU64", end="LPU64,
2423                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2424                                    lock->l_policy_data.l_extent.end);
2425                 }
2426                 ldlm_lock_allow_match(lock);
2427         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2428                 LASSERT(lock != NULL);
2429                 loi->loi_lvb = *lvb;
2430                 ldlm_lock_allow_match(lock);
2431                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2432                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2433                 rc = ELDLM_OK;
2434         }
2435
2436         if (lock != NULL) {
2437                 if (rc != ELDLM_OK)
2438                         ldlm_lock_fail_match(lock);
2439
2440                 LDLM_LOCK_PUT(lock);
2441         }
2442 }
2443 EXPORT_SYMBOL(osc_update_enqueue);
2444
2445 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2446
2447 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2448  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2449  * other synchronous requests, however keeping some locks and trying to obtain
2450  * others may take a considerable amount of time in a case of ost failure; and
2451  * when other sync requests do not get released lock from a client, the client
2452  * is excluded from the cluster -- such scenarious make the life difficult, so
2453  * release locks just after they are obtained. */
2454 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2455                      __u64 *flags, ldlm_policy_data_t *policy,
2456                      struct ost_lvb *lvb, int kms_valid,
2457                      obd_enqueue_update_f upcall, void *cookie,
2458                      struct ldlm_enqueue_info *einfo,
2459                      struct lustre_handle *lockh,
2460                      struct ptlrpc_request_set *rqset, int async, int agl)
2461 {
2462         struct obd_device *obd = exp->exp_obd;
2463         struct ptlrpc_request *req = NULL;
2464         int intent = *flags & LDLM_FL_HAS_INTENT;
2465         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2466         ldlm_mode_t mode;
2467         int rc;
2468
2469         /* Filesystem lock extents are extended to page boundaries so that
2470          * dealing with the page cache is a little smoother.  */
2471         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2472         policy->l_extent.end |= ~CFS_PAGE_MASK;
2473
2474         /*
2475          * kms is not valid when either object is completely fresh (so that no
2476          * locks are cached), or object was evicted. In the latter case cached
2477          * lock cannot be used, because it would prime inode state with
2478          * potentially stale LVB.
2479          */
2480         if (!kms_valid)
2481                 goto no_match;
2482
2483         /* Next, search for already existing extent locks that will cover us */
2484         /* If we're trying to read, we also search for an existing PW lock.  The
2485          * VFS and page cache already protect us locally, so lots of readers/
2486          * writers can share a single PW lock.
2487          *
2488          * There are problems with conversion deadlocks, so instead of
2489          * converting a read lock to a write lock, we'll just enqueue a new
2490          * one.
2491          *
2492          * At some point we should cancel the read lock instead of making them
2493          * send us a blocking callback, but there are problems with canceling
2494          * locks out from other users right now, too. */
2495         mode = einfo->ei_mode;
2496         if (einfo->ei_mode == LCK_PR)
2497                 mode |= LCK_PW;
2498         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2499                                einfo->ei_type, policy, mode, lockh, 0);
2500         if (mode) {
2501                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2502
2503                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2504                         /* For AGL, if enqueue RPC is sent but the lock is not
2505                          * granted, then skip to process this strpe.
2506                          * Return -ECANCELED to tell the caller. */
2507                         ldlm_lock_decref(lockh, mode);
2508                         LDLM_LOCK_PUT(matched);
2509                         return -ECANCELED;
2510                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2511                         *flags |= LDLM_FL_LVB_READY;
2512                         /* addref the lock only if not async requests and PW
2513                          * lock is matched whereas we asked for PR. */
2514                         if (!rqset && einfo->ei_mode != mode)
2515                                 ldlm_lock_addref(lockh, LCK_PR);
2516                         if (intent) {
2517                                 /* I would like to be able to ASSERT here that
2518                                  * rss <= kms, but I can't, for reasons which
2519                                  * are explained in lov_enqueue() */
2520                         }
2521
2522                         /* We already have a lock, and it's referenced.
2523                          *
2524                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2525                          * AGL upcall may change it to CLS_HELD directly. */
2526                         (*upcall)(cookie, ELDLM_OK);
2527
2528                         if (einfo->ei_mode != mode)
2529                                 ldlm_lock_decref(lockh, LCK_PW);
2530                         else if (rqset)
2531                                 /* For async requests, decref the lock. */
2532                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2533                         LDLM_LOCK_PUT(matched);
2534                         return ELDLM_OK;
2535                 } else {
2536                         ldlm_lock_decref(lockh, mode);
2537                         LDLM_LOCK_PUT(matched);
2538                 }
2539         }
2540
2541  no_match:
2542         if (intent) {
2543                 LIST_HEAD(cancels);
2544                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2545                                            &RQF_LDLM_ENQUEUE_LVB);
2546                 if (req == NULL)
2547                         return -ENOMEM;
2548
2549                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2550                 if (rc) {
2551                         ptlrpc_request_free(req);
2552                         return rc;
2553                 }
2554
2555                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2556                                      sizeof(*lvb));
2557                 ptlrpc_request_set_replen(req);
2558         }
2559
2560         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2561         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2562
2563         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2564                               sizeof(*lvb), LVB_T_OST, lockh, async);
2565         if (rqset) {
2566                 if (!rc) {
2567                         struct osc_enqueue_args *aa;
2568                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2569                         aa = ptlrpc_req_async_args(req);
2570                         aa->oa_ei = einfo;
2571                         aa->oa_exp = exp;
2572                         aa->oa_flags  = flags;
2573                         aa->oa_upcall = upcall;
2574                         aa->oa_cookie = cookie;
2575                         aa->oa_lvb    = lvb;
2576                         aa->oa_lockh  = lockh;
2577                         aa->oa_agl    = !!agl;
2578
2579                         req->rq_interpret_reply =
2580                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2581                         if (rqset == PTLRPCD_SET)
2582                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2583                         else
2584                                 ptlrpc_set_add_req(rqset, req);
2585                 } else if (intent) {
2586                         ptlrpc_req_finished(req);
2587                 }
2588                 return rc;
2589         }
2590
2591         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2592         if (intent)
2593                 ptlrpc_req_finished(req);
2594
2595         return rc;
2596 }
2597
2598 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2599                        struct ldlm_enqueue_info *einfo,
2600                        struct ptlrpc_request_set *rqset)
2601 {
2602         struct ldlm_res_id res_id;
2603         int rc;
2604
2605         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2606         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2607                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2608                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2609                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2610                               rqset, rqset != NULL, 0);
2611         return rc;
2612 }
2613
2614 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2615                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2616                    int *flags, void *data, struct lustre_handle *lockh,
2617                    int unref)
2618 {
2619         struct obd_device *obd = exp->exp_obd;
2620         int lflags = *flags;
2621         ldlm_mode_t rc;
2622
2623         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2624                 return -EIO;
2625
2626         /* Filesystem lock extents are extended to page boundaries so that
2627          * dealing with the page cache is a little smoother */
2628         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2629         policy->l_extent.end |= ~CFS_PAGE_MASK;
2630
2631         /* Next, search for already existing extent locks that will cover us */
2632         /* If we're trying to read, we also search for an existing PW lock.  The
2633          * VFS and page cache already protect us locally, so lots of readers/
2634          * writers can share a single PW lock. */
2635         rc = mode;
2636         if (mode == LCK_PR)
2637                 rc |= LCK_PW;
2638         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2639                              res_id, type, policy, rc, lockh, unref);
2640         if (rc) {
2641                 if (data != NULL) {
2642                         if (!osc_set_data_with_check(lockh, data)) {
2643                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2644                                         ldlm_lock_decref(lockh, rc);
2645                                 return 0;
2646                         }
2647                 }
2648                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2649                         ldlm_lock_addref(lockh, LCK_PR);
2650                         ldlm_lock_decref(lockh, LCK_PW);
2651                 }
2652                 return rc;
2653         }
2654         return rc;
2655 }
2656
2657 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2658 {
2659         if (unlikely(mode == LCK_GROUP))
2660                 ldlm_lock_decref_and_cancel(lockh, mode);
2661         else
2662                 ldlm_lock_decref(lockh, mode);
2663
2664         return 0;
2665 }
2666
2667 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2668                       __u32 mode, struct lustre_handle *lockh)
2669 {
2670         return osc_cancel_base(lockh, mode);
2671 }
2672
2673 static int osc_cancel_unused(struct obd_export *exp,
2674                              struct lov_stripe_md *lsm,
2675                              ldlm_cancel_flags_t flags,
2676                              void *opaque)
2677 {
2678         struct obd_device *obd = class_exp2obd(exp);
2679         struct ldlm_res_id res_id, *resp = NULL;
2680
2681         if (lsm != NULL) {
2682                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2683                 resp = &res_id;
2684         }
2685
2686         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2687 }
2688
2689 static int osc_statfs_interpret(const struct lu_env *env,
2690                                 struct ptlrpc_request *req,
2691                                 struct osc_async_args *aa, int rc)
2692 {
2693         struct obd_statfs *msfs;
2694
2695         if (rc == -EBADR)
2696                 /* The request has in fact never been sent
2697                  * due to issues at a higher level (LOV).
2698                  * Exit immediately since the caller is
2699                  * aware of the problem and takes care
2700                  * of the clean up */
2701                  return rc;
2702
2703         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2704             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2705                 GOTO(out, rc = 0);
2706
2707         if (rc != 0)
2708                 GOTO(out, rc);
2709
2710         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2711         if (msfs == NULL) {
2712                 GOTO(out, rc = -EPROTO);
2713         }
2714
2715         *aa->aa_oi->oi_osfs = *msfs;
2716 out:
2717         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2718         return rc;
2719 }
2720
2721 static int osc_statfs_async(struct obd_export *exp,
2722                             struct obd_info *oinfo, __u64 max_age,
2723                             struct ptlrpc_request_set *rqset)
2724 {
2725         struct obd_device     *obd = class_exp2obd(exp);
2726         struct ptlrpc_request *req;
2727         struct osc_async_args *aa;
2728         int                 rc;
2729
2730         /* We could possibly pass max_age in the request (as an absolute
2731          * timestamp or a "seconds.usec ago") so the target can avoid doing
2732          * extra calls into the filesystem if that isn't necessary (e.g.
2733          * during mount that would help a bit).  Having relative timestamps
2734          * is not so great if request processing is slow, while absolute
2735          * timestamps are not ideal because they need time synchronization. */
2736         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2737         if (req == NULL)
2738                 return -ENOMEM;
2739
2740         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2741         if (rc) {
2742                 ptlrpc_request_free(req);
2743                 return rc;
2744         }
2745         ptlrpc_request_set_replen(req);
2746         req->rq_request_portal = OST_CREATE_PORTAL;
2747         ptlrpc_at_set_req_timeout(req);
2748
2749         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2750                 /* procfs requests not want stat in wait for avoid deadlock */
2751                 req->rq_no_resend = 1;
2752                 req->rq_no_delay = 1;
2753         }
2754
2755         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2756         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2757         aa = ptlrpc_req_async_args(req);
2758         aa->aa_oi = oinfo;
2759
2760         ptlrpc_set_add_req(rqset, req);
2761         return 0;
2762 }
2763
2764 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2765                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2766 {
2767         struct obd_device     *obd = class_exp2obd(exp);
2768         struct obd_statfs     *msfs;
2769         struct ptlrpc_request *req;
2770         struct obd_import     *imp = NULL;
2771         int rc;
2772
2773         /*Since the request might also come from lprocfs, so we need
2774          *sync this with client_disconnect_export Bug15684*/
2775         down_read(&obd->u.cli.cl_sem);
2776         if (obd->u.cli.cl_import)
2777                 imp = class_import_get(obd->u.cli.cl_import);
2778         up_read(&obd->u.cli.cl_sem);
2779         if (!imp)
2780                 return -ENODEV;
2781
2782         /* We could possibly pass max_age in the request (as an absolute
2783          * timestamp or a "seconds.usec ago") so the target can avoid doing
2784          * extra calls into the filesystem if that isn't necessary (e.g.
2785          * during mount that would help a bit).  Having relative timestamps
2786          * is not so great if request processing is slow, while absolute
2787          * timestamps are not ideal because they need time synchronization. */
2788         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2789
2790         class_import_put(imp);
2791
2792         if (req == NULL)
2793                 return -ENOMEM;
2794
2795         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2796         if (rc) {
2797                 ptlrpc_request_free(req);
2798                 return rc;
2799         }
2800         ptlrpc_request_set_replen(req);
2801         req->rq_request_portal = OST_CREATE_PORTAL;
2802         ptlrpc_at_set_req_timeout(req);
2803
2804         if (flags & OBD_STATFS_NODELAY) {
2805                 /* procfs requests not want stat in wait for avoid deadlock */
2806                 req->rq_no_resend = 1;
2807                 req->rq_no_delay = 1;
2808         }
2809
2810         rc = ptlrpc_queue_wait(req);
2811         if (rc)
2812                 GOTO(out, rc);
2813
2814         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2815         if (msfs == NULL) {
2816                 GOTO(out, rc = -EPROTO);
2817         }
2818
2819         *osfs = *msfs;
2820
2821  out:
2822         ptlrpc_req_finished(req);
2823         return rc;
2824 }
2825
2826 /* Retrieve object striping information.
2827  *
2828  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2829  * the maximum number of OST indices which will fit in the user buffer.
2830  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2831  */
2832 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2833 {
2834         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2835         struct lov_user_md_v3 lum, *lumk;
2836         struct lov_user_ost_data_v1 *lmm_objects;
2837         int rc = 0, lum_size;
2838
2839         if (!lsm)
2840                 return -ENODATA;
2841
2842         /* we only need the header part from user space to get lmm_magic and
2843          * lmm_stripe_count, (the header part is common to v1 and v3) */
2844         lum_size = sizeof(struct lov_user_md_v1);
2845         if (copy_from_user(&lum, lump, lum_size))
2846                 return -EFAULT;
2847
2848         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2849             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2850                 return -EINVAL;
2851
2852         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2853         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2854         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2855         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2856
2857         /* we can use lov_mds_md_size() to compute lum_size
2858          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2859         if (lum.lmm_stripe_count > 0) {
2860                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2861                 OBD_ALLOC(lumk, lum_size);
2862                 if (!lumk)
2863                         return -ENOMEM;
2864
2865                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2866                         lmm_objects =
2867                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2868                 else
2869                         lmm_objects = &(lumk->lmm_objects[0]);
2870                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2871         } else {
2872                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2873                 lumk = &lum;
2874         }
2875
2876         lumk->lmm_oi = lsm->lsm_oi;
2877         lumk->lmm_stripe_count = 1;
2878
2879         if (copy_to_user(lump, lumk, lum_size))
2880                 rc = -EFAULT;
2881
2882         if (lumk != &lum)
2883                 OBD_FREE(lumk, lum_size);
2884
2885         return rc;
2886 }
2887
2888
2889 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2890                          void *karg, void *uarg)
2891 {
2892         struct obd_device *obd = exp->exp_obd;
2893         struct obd_ioctl_data *data = karg;
2894         int err = 0;
2895
2896         if (!try_module_get(THIS_MODULE)) {
2897                 CERROR("Can't get module. Is it alive?");
2898                 return -EINVAL;
2899         }
2900         switch (cmd) {
2901         case OBD_IOC_LOV_GET_CONFIG: {
2902                 char *buf;
2903                 struct lov_desc *desc;
2904                 struct obd_uuid uuid;
2905
2906                 buf = NULL;
2907                 len = 0;
2908                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2909                         GOTO(out, err = -EINVAL);
2910
2911                 data = (struct obd_ioctl_data *)buf;
2912
2913                 if (sizeof(*desc) > data->ioc_inllen1) {
2914                         obd_ioctl_freedata(buf, len);
2915                         GOTO(out, err = -EINVAL);
2916                 }
2917
2918                 if (data->ioc_inllen2 < sizeof(uuid)) {
2919                         obd_ioctl_freedata(buf, len);
2920                         GOTO(out, err = -EINVAL);
2921                 }
2922
2923                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2924                 desc->ld_tgt_count = 1;
2925                 desc->ld_active_tgt_count = 1;
2926                 desc->ld_default_stripe_count = 1;
2927                 desc->ld_default_stripe_size = 0;
2928                 desc->ld_default_stripe_offset = 0;
2929                 desc->ld_pattern = 0;
2930                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2931
2932                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2933
2934                 err = copy_to_user((void *)uarg, buf, len);
2935                 if (err)
2936                         err = -EFAULT;
2937                 obd_ioctl_freedata(buf, len);
2938                 GOTO(out, err);
2939         }
2940         case LL_IOC_LOV_SETSTRIPE:
2941                 err = obd_alloc_memmd(exp, karg);
2942                 if (err > 0)
2943                         err = 0;
2944                 GOTO(out, err);
2945         case LL_IOC_LOV_GETSTRIPE:
2946                 err = osc_getstripe(karg, uarg);
2947                 GOTO(out, err);
2948         case OBD_IOC_CLIENT_RECOVER:
2949                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2950                                             data->ioc_inlbuf1, 0);
2951                 if (err > 0)
2952                         err = 0;
2953                 GOTO(out, err);
2954         case IOC_OSC_SET_ACTIVE:
2955                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2956                                                data->ioc_offset);
2957                 GOTO(out, err);
2958         case OBD_IOC_POLL_QUOTACHECK:
2959                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2960                 GOTO(out, err);
2961         case OBD_IOC_PING_TARGET:
2962                 err = ptlrpc_obd_ping(obd);
2963                 GOTO(out, err);
2964         default:
2965                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2966                        cmd, current_comm());
2967                 GOTO(out, err = -ENOTTY);
2968         }
2969 out:
2970         module_put(THIS_MODULE);
2971         return err;
2972 }
2973
2974 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2975                         obd_count keylen, void *key, __u32 *vallen, void *val,
2976                         struct lov_stripe_md *lsm)
2977 {
2978         if (!vallen || !val)
2979                 return -EFAULT;
2980
2981         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2982                 __u32 *stripe = val;
2983                 *vallen = sizeof(*stripe);
2984                 *stripe = 0;
2985                 return 0;
2986         } else if (KEY_IS(KEY_LAST_ID)) {
2987                 struct ptlrpc_request *req;
2988                 obd_id          *reply;
2989                 char              *tmp;
2990                 int                 rc;
2991
2992                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2993                                            &RQF_OST_GET_INFO_LAST_ID);
2994                 if (req == NULL)
2995                         return -ENOMEM;
2996
2997                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2998                                      RCL_CLIENT, keylen);
2999                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3000                 if (rc) {
3001                         ptlrpc_request_free(req);
3002                         return rc;
3003                 }
3004
3005                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3006                 memcpy(tmp, key, keylen);
3007
3008                 req->rq_no_delay = req->rq_no_resend = 1;
3009                 ptlrpc_request_set_replen(req);
3010                 rc = ptlrpc_queue_wait(req);
3011                 if (rc)
3012                         GOTO(out, rc);
3013
3014                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3015                 if (reply == NULL)
3016                         GOTO(out, rc = -EPROTO);
3017
3018                 *((obd_id *)val) = *reply;
3019         out:
3020                 ptlrpc_req_finished(req);
3021                 return rc;
3022         } else if (KEY_IS(KEY_FIEMAP)) {
3023                 struct ll_fiemap_info_key *fm_key =
3024                                 (struct ll_fiemap_info_key *)key;
3025                 struct ldlm_res_id       res_id;
3026                 ldlm_policy_data_t       policy;
3027                 struct lustre_handle     lockh;
3028                 ldlm_mode_t              mode = 0;
3029                 struct ptlrpc_request   *req;
3030                 struct ll_user_fiemap   *reply;
3031                 char                    *tmp;
3032                 int                      rc;
3033
3034                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3035                         goto skip_locking;
3036
3037                 policy.l_extent.start = fm_key->fiemap.fm_start &
3038                                                 CFS_PAGE_MASK;
3039
3040                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3041                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3042                         policy.l_extent.end = OBD_OBJECT_EOF;
3043                 else
3044                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3045                                 fm_key->fiemap.fm_length +
3046                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3047
3048                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3049                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3050                                        LDLM_FL_BLOCK_GRANTED |
3051                                        LDLM_FL_LVB_READY,
3052                                        &res_id, LDLM_EXTENT, &policy,
3053                                        LCK_PR | LCK_PW, &lockh, 0);
3054                 if (mode) { /* lock is cached on client */
3055                         if (mode != LCK_PR) {
3056                                 ldlm_lock_addref(&lockh, LCK_PR);
3057                                 ldlm_lock_decref(&lockh, LCK_PW);
3058                         }
3059                 } else { /* no cached lock, needs acquire lock on server side */
3060                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3061                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3062                 }
3063
3064 skip_locking:
3065                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3066                                            &RQF_OST_GET_INFO_FIEMAP);
3067                 if (req == NULL)
3068                         GOTO(drop_lock, rc = -ENOMEM);
3069
3070                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3071                                      RCL_CLIENT, keylen);
3072                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3073                                      RCL_CLIENT, *vallen);
3074                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3075                                      RCL_SERVER, *vallen);
3076
3077                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3078                 if (rc) {
3079                         ptlrpc_request_free(req);
3080                         GOTO(drop_lock, rc);
3081                 }
3082
3083                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3084                 memcpy(tmp, key, keylen);
3085                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3086                 memcpy(tmp, val, *vallen);
3087
3088                 ptlrpc_request_set_replen(req);
3089                 rc = ptlrpc_queue_wait(req);
3090                 if (rc)
3091                         GOTO(fini_req, rc);
3092
3093                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3094                 if (reply == NULL)
3095                         GOTO(fini_req, rc = -EPROTO);
3096
3097                 memcpy(val, reply, *vallen);
3098 fini_req:
3099                 ptlrpc_req_finished(req);
3100 drop_lock:
3101                 if (mode)
3102                         ldlm_lock_decref(&lockh, LCK_PR);
3103                 return rc;
3104         }
3105
3106         return -EINVAL;
3107 }
3108
3109 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3110                               obd_count keylen, void *key, obd_count vallen,
3111                               void *val, struct ptlrpc_request_set *set)
3112 {
3113         struct ptlrpc_request *req;
3114         struct obd_device     *obd = exp->exp_obd;
3115         struct obd_import     *imp = class_exp2cliimp(exp);
3116         char              *tmp;
3117         int                 rc;
3118
3119         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3120
3121         if (KEY_IS(KEY_CHECKSUM)) {
3122                 if (vallen != sizeof(int))
3123                         return -EINVAL;
3124                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3125                 return 0;
3126         }
3127
3128         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3129                 sptlrpc_conf_client_adapt(obd);
3130                 return 0;
3131         }
3132
3133         if (KEY_IS(KEY_FLUSH_CTX)) {
3134                 sptlrpc_import_flush_my_ctx(imp);
3135                 return 0;
3136         }
3137
3138         if (KEY_IS(KEY_CACHE_SET)) {
3139                 struct client_obd *cli = &obd->u.cli;
3140
3141                 LASSERT(cli->cl_cache == NULL); /* only once */
3142                 cli->cl_cache = (struct cl_client_cache *)val;
3143                 atomic_inc(&cli->cl_cache->ccc_users);
3144                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3145
3146                 /* add this osc into entity list */
3147                 LASSERT(list_empty(&cli->cl_lru_osc));
3148                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3149                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3150                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3151
3152                 return 0;
3153         }
3154
3155         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3156                 struct client_obd *cli = &obd->u.cli;
3157                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3158                 int target = *(int *)val;
3159
3160                 nr = osc_lru_shrink(cli, min(nr, target));
3161                 *(int *)val -= nr;
3162                 return 0;
3163         }
3164
3165         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3166                 return -EINVAL;
3167
3168         /* We pass all other commands directly to OST. Since nobody calls osc
3169            methods directly and everybody is supposed to go through LOV, we
3170            assume lov checked invalid values for us.
3171            The only recognised values so far are evict_by_nid and mds_conn.
3172            Even if something bad goes through, we'd get a -EINVAL from OST
3173            anyway. */
3174
3175         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3176                                                 &RQF_OST_SET_GRANT_INFO :
3177                                                 &RQF_OBD_SET_INFO);
3178         if (req == NULL)
3179                 return -ENOMEM;
3180
3181         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3182                              RCL_CLIENT, keylen);
3183         if (!KEY_IS(KEY_GRANT_SHRINK))
3184                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3185                                      RCL_CLIENT, vallen);
3186         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3187         if (rc) {
3188                 ptlrpc_request_free(req);
3189                 return rc;
3190         }
3191
3192         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3193         memcpy(tmp, key, keylen);
3194         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3195                                                         &RMF_OST_BODY :
3196                                                         &RMF_SETINFO_VAL);
3197         memcpy(tmp, val, vallen);
3198
3199         if (KEY_IS(KEY_GRANT_SHRINK)) {
3200                 struct osc_grant_args *aa;
3201                 struct obdo *oa;
3202
3203                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3204                 aa = ptlrpc_req_async_args(req);
3205                 OBDO_ALLOC(oa);
3206                 if (!oa) {
3207                         ptlrpc_req_finished(req);
3208                         return -ENOMEM;
3209                 }
3210                 *oa = ((struct ost_body *)val)->oa;
3211                 aa->aa_oa = oa;
3212                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3213         }
3214
3215         ptlrpc_request_set_replen(req);
3216         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3217                 LASSERT(set != NULL);
3218                 ptlrpc_set_add_req(set, req);
3219                 ptlrpc_check_set(NULL, set);
3220         } else
3221                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3222
3223         return 0;
3224 }
3225
3226
3227 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3228                          struct obd_device *disk_obd, int *index)
3229 {
3230         /* this code is not supposed to be used with LOD/OSP
3231          * to be removed soon */
3232         LBUG();
3233         return 0;
3234 }
3235
3236 static int osc_llog_finish(struct obd_device *obd, int count)
3237 {
3238         struct llog_ctxt *ctxt;
3239
3240         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3241         if (ctxt) {
3242                 llog_cat_close(NULL, ctxt->loc_handle);
3243                 llog_cleanup(NULL, ctxt);
3244         }
3245
3246         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3247         if (ctxt)
3248                 llog_cleanup(NULL, ctxt);
3249         return 0;
3250 }
3251
3252 static int osc_reconnect(const struct lu_env *env,
3253                          struct obd_export *exp, struct obd_device *obd,
3254                          struct obd_uuid *cluuid,
3255                          struct obd_connect_data *data,
3256                          void *localdata)
3257 {
3258         struct client_obd *cli = &obd->u.cli;
3259
3260         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3261                 long lost_grant;
3262
3263                 client_obd_list_lock(&cli->cl_loi_list_lock);
3264                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3265                                 2 * cli_brw_size(obd);
3266                 lost_grant = cli->cl_lost_grant;
3267                 cli->cl_lost_grant = 0;
3268                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3269
3270                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3271                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3272                        data->ocd_version, data->ocd_grant, lost_grant);
3273         }
3274
3275         return 0;
3276 }
3277
3278 static int osc_disconnect(struct obd_export *exp)
3279 {
3280         struct obd_device *obd = class_exp2obd(exp);
3281         struct llog_ctxt  *ctxt;
3282         int rc;
3283
3284         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3285         if (ctxt) {
3286                 if (obd->u.cli.cl_conn_count == 1) {
3287                         /* Flush any remaining cancel messages out to the
3288                          * target */
3289                         llog_sync(ctxt, exp, 0);
3290                 }
3291                 llog_ctxt_put(ctxt);
3292         } else {
3293                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3294                        obd);
3295         }
3296
3297         rc = client_disconnect_export(exp);
3298         /**
3299          * Initially we put del_shrink_grant before disconnect_export, but it
3300          * causes the following problem if setup (connect) and cleanup
3301          * (disconnect) are tangled together.
3302          *      connect p1                   disconnect p2
3303          *   ptlrpc_connect_import
3304          *     ...............         class_manual_cleanup
3305          *                                   osc_disconnect
3306          *                                   del_shrink_grant
3307          *   ptlrpc_connect_interrupt
3308          *     init_grant_shrink
3309          *   add this client to shrink list
3310          *                                    cleanup_osc
3311          * Bang! pinger trigger the shrink.
3312          * So the osc should be disconnected from the shrink list, after we
3313          * are sure the import has been destroyed. BUG18662
3314          */
3315         if (obd->u.cli.cl_import == NULL)
3316                 osc_del_shrink_grant(&obd->u.cli);
3317         return rc;
3318 }
3319
3320 static int osc_import_event(struct obd_device *obd,
3321                             struct obd_import *imp,
3322                             enum obd_import_event event)
3323 {
3324         struct client_obd *cli;
3325         int rc = 0;
3326
3327         LASSERT(imp->imp_obd == obd);
3328
3329         switch (event) {
3330         case IMP_EVENT_DISCON: {
3331                 cli = &obd->u.cli;
3332                 client_obd_list_lock(&cli->cl_loi_list_lock);
3333                 cli->cl_avail_grant = 0;
3334                 cli->cl_lost_grant = 0;
3335                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3336                 break;
3337         }
3338         case IMP_EVENT_INACTIVE: {
3339                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3340                 break;
3341         }
3342         case IMP_EVENT_INVALIDATE: {
3343                 struct ldlm_namespace *ns = obd->obd_namespace;
3344                 struct lu_env    *env;
3345                 int                 refcheck;
3346
3347                 env = cl_env_get(&refcheck);
3348                 if (!IS_ERR(env)) {
3349                         /* Reset grants */
3350                         cli = &obd->u.cli;
3351                         /* all pages go to failing rpcs due to the invalid
3352                          * import */
3353                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3354
3355                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3356                         cl_env_put(env, &refcheck);
3357                 } else
3358                         rc = PTR_ERR(env);
3359                 break;
3360         }
3361         case IMP_EVENT_ACTIVE: {
3362                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3363                 break;
3364         }
3365         case IMP_EVENT_OCD: {
3366                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3367
3368                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3369                         osc_init_grant(&obd->u.cli, ocd);
3370
3371                 /* See bug 7198 */
3372                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3373                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3374
3375                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3376                 break;
3377         }
3378         case IMP_EVENT_DEACTIVATE: {
3379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3380                 break;
3381         }
3382         case IMP_EVENT_ACTIVATE: {
3383                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3384                 break;
3385         }
3386         default:
3387                 CERROR("Unknown import event %d\n", event);
3388                 LBUG();
3389         }
3390         return rc;
3391 }
3392
3393 /**
3394  * Determine whether the lock can be canceled before replaying the lock
3395  * during recovery, see bug16774 for detailed information.
3396  *
3397  * \retval zero the lock can't be canceled
3398  * \retval other ok to cancel
3399  */
3400 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3401 {
3402         check_res_locked(lock->l_resource);
3403
3404         /*
3405          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3406          *
3407          * XXX as a future improvement, we can also cancel unused write lock
3408          * if it doesn't have dirty data and active mmaps.
3409          */
3410         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3411             (lock->l_granted_mode == LCK_PR ||
3412              lock->l_granted_mode == LCK_CR) &&
3413             (osc_dlm_lock_pageref(lock) == 0))
3414                 return 1;
3415
3416         return 0;
3417 }
3418
3419 static int brw_queue_work(const struct lu_env *env, void *data)
3420 {
3421         struct client_obd *cli = data;
3422
3423         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3424
3425         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3426         return 0;
3427 }
3428
3429 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3430 {
3431         struct lprocfs_static_vars lvars = { 0 };
3432         struct client_obd         *cli = &obd->u.cli;
3433         void                   *handler;
3434         int                     rc;
3435
3436         rc = ptlrpcd_addref();
3437         if (rc)
3438                 return rc;
3439
3440         rc = client_obd_setup(obd, lcfg);
3441         if (rc)
3442                 GOTO(out_ptlrpcd, rc);
3443
3444         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3445         if (IS_ERR(handler))
3446                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3447         cli->cl_writeback_work = handler;
3448
3449         rc = osc_quota_setup(obd);
3450         if (rc)
3451                 GOTO(out_ptlrpcd_work, rc);
3452
3453         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3454         lprocfs_osc_init_vars(&lvars);
3455         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3456                 lproc_osc_attach_seqstat(obd);
3457                 sptlrpc_lprocfs_cliobd_attach(obd);
3458                 ptlrpc_lprocfs_register_obd(obd);
3459         }
3460
3461         /* We need to allocate a few requests more, because
3462          * brw_interpret tries to create new requests before freeing
3463          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3464          * reserved, but I'm afraid that might be too much wasted RAM
3465          * in fact, so 2 is just my guess and still should work. */
3466         cli->cl_import->imp_rq_pool =
3467                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3468                                     OST_MAXREQSIZE,
3469                                     ptlrpc_add_rqs_to_pool);
3470
3471         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3472         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3473         return rc;
3474
3475 out_ptlrpcd_work:
3476         ptlrpcd_destroy_work(handler);
3477 out_client_setup:
3478         client_obd_cleanup(obd);
3479 out_ptlrpcd:
3480         ptlrpcd_decref();
3481         return rc;
3482 }
3483
3484 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3485 {
3486         int rc = 0;
3487
3488         switch (stage) {
3489         case OBD_CLEANUP_EARLY: {
3490                 struct obd_import *imp;
3491                 imp = obd->u.cli.cl_import;
3492                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3493                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3494                 ptlrpc_deactivate_import(imp);
3495                 spin_lock(&imp->imp_lock);
3496                 imp->imp_pingable = 0;
3497                 spin_unlock(&imp->imp_lock);
3498                 break;
3499         }
3500         case OBD_CLEANUP_EXPORTS: {
3501                 struct client_obd *cli = &obd->u.cli;
3502                 /* LU-464
3503                  * for echo client, export may be on zombie list, wait for
3504                  * zombie thread to cull it, because cli.cl_import will be
3505                  * cleared in client_disconnect_export():
3506                  *   class_export_destroy() -> obd_cleanup() ->
3507                  *   echo_device_free() -> echo_client_cleanup() ->
3508                  *   obd_disconnect() -> osc_disconnect() ->
3509                  *   client_disconnect_export()
3510                  */
3511                 obd_zombie_barrier();
3512                 if (cli->cl_writeback_work) {
3513                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3514                         cli->cl_writeback_work = NULL;
3515                 }
3516                 obd_cleanup_client_import(obd);
3517                 ptlrpc_lprocfs_unregister_obd(obd);
3518                 lprocfs_obd_cleanup(obd);
3519                 rc = obd_llog_finish(obd, 0);
3520                 if (rc != 0)
3521                         CERROR("failed to cleanup llogging subsystems\n");
3522                 break;
3523                 }
3524         }
3525         return rc;
3526 }
3527
3528 int osc_cleanup(struct obd_device *obd)
3529 {
3530         struct client_obd *cli = &obd->u.cli;
3531         int rc;
3532
3533         /* lru cleanup */
3534         if (cli->cl_cache != NULL) {
3535                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3536                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3537                 list_del_init(&cli->cl_lru_osc);
3538                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3539                 cli->cl_lru_left = NULL;
3540                 atomic_dec(&cli->cl_cache->ccc_users);
3541                 cli->cl_cache = NULL;
3542         }
3543
3544         /* free memory of osc quota cache */
3545         osc_quota_cleanup(obd);
3546
3547         rc = client_obd_cleanup(obd);
3548
3549         ptlrpcd_decref();
3550         return rc;
3551 }
3552
3553 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3554 {
3555         struct lprocfs_static_vars lvars = { 0 };
3556         int rc = 0;
3557
3558         lprocfs_osc_init_vars(&lvars);
3559
3560         switch (lcfg->lcfg_command) {
3561         default:
3562                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3563                                               lcfg, obd);
3564                 if (rc > 0)
3565                         rc = 0;
3566                 break;
3567         }
3568
3569         return(rc);
3570 }
3571
3572 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3573 {
3574         return osc_process_config_base(obd, buf);
3575 }
3576
3577 struct obd_ops osc_obd_ops = {
3578         .o_owner                = THIS_MODULE,
3579         .o_setup                = osc_setup,
3580         .o_precleanup      = osc_precleanup,
3581         .o_cleanup            = osc_cleanup,
3582         .o_add_conn          = client_import_add_conn,
3583         .o_del_conn          = client_import_del_conn,
3584         .o_connect            = client_connect_import,
3585         .o_reconnect        = osc_reconnect,
3586         .o_disconnect      = osc_disconnect,
3587         .o_statfs              = osc_statfs,
3588         .o_statfs_async  = osc_statfs_async,
3589         .o_packmd              = osc_packmd,
3590         .o_unpackmd          = osc_unpackmd,
3591         .o_create              = osc_create,
3592         .o_destroy            = osc_destroy,
3593         .o_getattr            = osc_getattr,
3594         .o_getattr_async        = osc_getattr_async,
3595         .o_setattr            = osc_setattr,
3596         .o_setattr_async        = osc_setattr_async,
3597         .o_brw            = osc_brw,
3598         .o_punch                = osc_punch,
3599         .o_sync          = osc_sync,
3600         .o_enqueue            = osc_enqueue,
3601         .o_change_cbdata        = osc_change_cbdata,
3602         .o_find_cbdata    = osc_find_cbdata,
3603         .o_cancel              = osc_cancel,
3604         .o_cancel_unused        = osc_cancel_unused,
3605         .o_iocontrol        = osc_iocontrol,
3606         .o_get_info          = osc_get_info,
3607         .o_set_info_async       = osc_set_info_async,
3608         .o_import_event  = osc_import_event,
3609         .o_llog_init        = osc_llog_init,
3610         .o_llog_finish    = osc_llog_finish,
3611         .o_process_config       = osc_process_config,
3612         .o_quotactl          = osc_quotactl,
3613         .o_quotacheck      = osc_quotacheck,
3614 };
3615
3616 extern struct lu_kmem_descr osc_caches[];
3617 extern spinlock_t osc_ast_guard;
3618 extern struct lock_class_key osc_ast_guard_class;
3619
3620 int __init osc_init(void)
3621 {
3622         struct lprocfs_static_vars lvars = { 0 };
3623         int rc;
3624
3625         /* print an address of _any_ initialized kernel symbol from this
3626          * module, to allow debugging with gdb that doesn't support data
3627          * symbols from modules.*/
3628         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3629
3630         rc = lu_kmem_init(osc_caches);
3631         if (rc)
3632                 return rc;
3633
3634         lprocfs_osc_init_vars(&lvars);
3635
3636         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3637                                  LUSTRE_OSC_NAME, &osc_device_type);
3638         if (rc) {
3639                 lu_kmem_fini(osc_caches);
3640                 return rc;
3641         }
3642
3643         spin_lock_init(&osc_ast_guard);
3644         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3645
3646         return rc;
3647 }
3648
3649 static void /*__exit*/ osc_exit(void)
3650 {
3651         class_unregister_type(LUSTRE_OSC_NAME);
3652         lu_kmem_fini(osc_caches);
3653 }
3654
3655 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3656 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3657 MODULE_LICENSE("GPL");
3658 MODULE_VERSION(LUSTRE_VERSION_STRING);
3659
3660 module_init(osc_init);
3661 module_exit(osc_exit);