Whamcloud - gitweb
LU-3086 build: fix 'uninitialized variables' errors
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
188         osc_pack_capa(req, body, oinfo->oi_capa);
189 }
190
191 static inline void osc_set_capa_size(struct ptlrpc_request *req,
192                                      const struct req_msg_field *field,
193                                      struct obd_capa *oc)
194 {
195         if (oc == NULL)
196                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
197         else
198                 /* it is already calculated as sizeof struct obd_capa */
199                 ;
200 }
201
202 static int osc_getattr_interpret(const struct lu_env *env,
203                                  struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
213         if (body) {
214                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
215                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
216
217                 /* This should really be sent by the OST */
218                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
219                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
220         } else {
221                 CDEBUG(D_INFO, "can't unpack ost_body\n");
222                 rc = -EPROTO;
223                 aa->aa_oi->oi_oa->o_valid = 0;
224         }
225 out:
226         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
227         RETURN(rc);
228 }
229
230 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
231                              struct ptlrpc_request_set *set)
232 {
233         struct ptlrpc_request *req;
234         struct osc_async_args *aa;
235         int                    rc;
236         ENTRY;
237
238         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
239         if (req == NULL)
240                 RETURN(-ENOMEM);
241
242         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
243         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
244         if (rc) {
245                 ptlrpc_request_free(req);
246                 RETURN(rc);
247         }
248
249         osc_pack_req_body(req, oinfo);
250
251         ptlrpc_request_set_replen(req);
252         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
253
254         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
255         aa = ptlrpc_req_async_args(req);
256         aa->aa_oi = oinfo;
257
258         ptlrpc_set_add_req(set, req);
259         RETURN(0);
260 }
261
262 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
263                        struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(req);
302         return rc;
303 }
304
305 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306                        struct obd_info *oinfo, struct obd_trans_info *oti)
307 {
308         struct ptlrpc_request *req;
309         struct ost_body       *body;
310         int                    rc;
311         ENTRY;
312
313         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321         if (rc) {
322                 ptlrpc_request_free(req);
323                 RETURN(rc);
324         }
325
326         osc_pack_req_body(req, oinfo);
327
328         ptlrpc_request_set_replen(req);
329
330         rc = ptlrpc_queue_wait(req);
331         if (rc)
332                 GOTO(out, rc);
333
334         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335         if (body == NULL)
336                 GOTO(out, rc = -EPROTO);
337
338         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
339
340         EXIT;
341 out:
342         ptlrpc_req_finished(req);
343         RETURN(rc);
344 }
345
346 static int osc_setattr_interpret(const struct lu_env *env,
347                                  struct ptlrpc_request *req,
348                                  struct osc_setattr_args *sa, int rc)
349 {
350         struct ost_body *body;
351         ENTRY;
352
353         if (rc != 0)
354                 GOTO(out, rc);
355
356         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
357         if (body == NULL)
358                 GOTO(out, rc = -EPROTO);
359
360         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
361 out:
362         rc = sa->sa_upcall(sa->sa_cookie, rc);
363         RETURN(rc);
364 }
365
366 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
367                            struct obd_trans_info *oti,
368                            obd_enqueue_update_f upcall, void *cookie,
369                            struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request   *req;
372         struct osc_setattr_args *sa;
373         int                      rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
403                 sa = ptlrpc_req_async_args(req);
404                 sa->sa_oa = oinfo->oi_oa;
405                 sa->sa_upcall = upcall;
406                 sa->sa_cookie = cookie;
407
408                 if (rqset == PTLRPCD_SET)
409                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
410                 else
411                         ptlrpc_set_add_req(rqset, req);
412         }
413
414         RETURN(0);
415 }
416
417 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
418                              struct obd_trans_info *oti,
419                              struct ptlrpc_request_set *rqset)
420 {
421         return osc_setattr_async_base(exp, oinfo, oti,
422                                       oinfo->oi_cb_up, oinfo, rqset);
423 }
424
425 int osc_real_create(struct obd_export *exp, struct obdo *oa,
426                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
427 {
428         struct ptlrpc_request *req;
429         struct ost_body       *body;
430         struct lov_stripe_md  *lsm;
431         int                    rc;
432         ENTRY;
433
434         LASSERT(oa);
435         LASSERT(ea);
436
437         lsm = *ea;
438         if (!lsm) {
439                 rc = obd_alloc_memmd(exp, &lsm);
440                 if (rc < 0)
441                         RETURN(rc);
442         }
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
445         if (req == NULL)
446                 GOTO(out, rc = -ENOMEM);
447
448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 GOTO(out, rc);
452         }
453
454         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
455         LASSERT(body);
456         lustre_set_wire_obdo(&body->oa, oa);
457
458         ptlrpc_request_set_replen(req);
459
460         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
461             oa->o_flags == OBD_FL_DELORPHAN) {
462                 DEBUG_REQ(D_HA, req,
463                           "delorphan from OST integration");
464                 /* Don't resend the delorphan req */
465                 req->rq_no_resend = req->rq_no_delay = 1;
466         }
467
468         rc = ptlrpc_queue_wait(req);
469         if (rc)
470                 GOTO(out_req, rc);
471
472         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
473         if (body == NULL)
474                 GOTO(out_req, rc = -EPROTO);
475
476         lustre_get_wire_obdo(oa, &body->oa);
477
478         oa->o_blksize = cli_brw_size(exp->exp_obd);
479         oa->o_valid |= OBD_MD_FLBLKSZ;
480
481         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
482          * have valid lsm_oinfo data structs, so don't go touching that.
483          * This needs to be fixed in a big way.
484          */
485         lsm->lsm_oi = oa->o_oi;
486         *ea = lsm;
487
488         if (oti != NULL) {
489                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
490
491                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
492                         if (!oti->oti_logcookies)
493                                 oti_alloc_cookies(oti, 1);
494                         *oti->oti_logcookies = oa->o_lcookie;
495                 }
496         }
497
498         CDEBUG(D_HA, "transno: "LPD64"\n",
499                lustre_msg_get_transno(req->rq_repmsg));
500 out_req:
501         ptlrpc_req_finished(req);
502 out:
503         if (rc && !*ea)
504                 obd_free_memmd(exp, &lsm);
505         RETURN(rc);
506 }
507
508 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
509                    obd_enqueue_update_f upcall, void *cookie,
510                    struct ptlrpc_request_set *rqset)
511 {
512         struct ptlrpc_request   *req;
513         struct osc_setattr_args *sa;
514         struct ost_body         *body;
515         int                      rc;
516         ENTRY;
517
518         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
519         if (req == NULL)
520                 RETURN(-ENOMEM);
521
522         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
523         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
524         if (rc) {
525                 ptlrpc_request_free(req);
526                 RETURN(rc);
527         }
528         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
529         ptlrpc_at_set_req_timeout(req);
530
531         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
532         LASSERT(body);
533         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
534         osc_pack_capa(req, body, oinfo->oi_capa);
535
536         ptlrpc_request_set_replen(req);
537
538         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
539         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
540         sa = ptlrpc_req_async_args(req);
541         sa->sa_oa     = oinfo->oi_oa;
542         sa->sa_upcall = upcall;
543         sa->sa_cookie = cookie;
544         if (rqset == PTLRPCD_SET)
545                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
546         else
547                 ptlrpc_set_add_req(rqset, req);
548
549         RETURN(0);
550 }
551
552 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
553                      struct obd_info *oinfo, struct obd_trans_info *oti,
554                      struct ptlrpc_request_set *rqset)
555 {
556         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
557         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
558         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
559         return osc_punch_base(exp, oinfo,
560                               oinfo->oi_cb_up, oinfo, rqset);
561 }
562
563 static int osc_sync_interpret(const struct lu_env *env,
564                               struct ptlrpc_request *req,
565                               void *arg, int rc)
566 {
567         struct osc_fsync_args *fa = arg;
568         struct ost_body *body;
569         ENTRY;
570
571         if (rc)
572                 GOTO(out, rc);
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR ("can't unpack ost_body\n");
577                 GOTO(out, rc = -EPROTO);
578         }
579
580         *fa->fa_oi->oi_oa = body->oa;
581 out:
582         rc = fa->fa_upcall(fa->fa_cookie, rc);
583         RETURN(rc);
584 }
585
586 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
587                   obd_enqueue_update_f upcall, void *cookie,
588                   struct ptlrpc_request_set *rqset)
589 {
590         struct ptlrpc_request *req;
591         struct ost_body       *body;
592         struct osc_fsync_args *fa;
593         int                    rc;
594         ENTRY;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 RETURN(-ENOMEM);
599
600         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
601         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
602         if (rc) {
603                 ptlrpc_request_free(req);
604                 RETURN(rc);
605         }
606
607         /* overload the size and blocks fields in the oa with start/end */
608         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
609         LASSERT(body);
610         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
611         osc_pack_capa(req, body, oinfo->oi_capa);
612
613         ptlrpc_request_set_replen(req);
614         req->rq_interpret_reply = osc_sync_interpret;
615
616         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
617         fa = ptlrpc_req_async_args(req);
618         fa->fa_oi = oinfo;
619         fa->fa_upcall = upcall;
620         fa->fa_cookie = cookie;
621
622         if (rqset == PTLRPCD_SET)
623                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
624         else
625                 ptlrpc_set_add_req(rqset, req);
626
627         RETURN (0);
628 }
629
630 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
631                     struct obd_info *oinfo, obd_size start, obd_size end,
632                     struct ptlrpc_request_set *set)
633 {
634         ENTRY;
635
636         if (!oinfo->oi_oa) {
637                 CDEBUG(D_INFO, "oa NULL\n");
638                 RETURN(-EINVAL);
639         }
640
641         oinfo->oi_oa->o_size = start;
642         oinfo->oi_oa->o_blocks = end;
643         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
644
645         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
646 }
647
648 /* Find and cancel locally locks matched by @mode in the resource found by
649  * @objid. Found locks are added into @cancel list. Returns the amount of
650  * locks added to @cancels list. */
651 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
652                                    cfs_list_t *cancels,
653                                    ldlm_mode_t mode, int lock_flags)
654 {
655         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
656         struct ldlm_res_id res_id;
657         struct ldlm_resource *res;
658         int count;
659         ENTRY;
660
661         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
662          * export) but disabled through procfs (flag in NS).
663          *
664          * This distinguishes from a case when ELC is not supported originally,
665          * when we still want to cancel locks in advance and just cancel them
666          * locally, without sending any RPC. */
667         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
668                 RETURN(0);
669
670         ostid_build_res_name(&oa->o_oi, &res_id);
671         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
672         if (res == NULL)
673                 RETURN(0);
674
675         LDLM_RESOURCE_ADDREF(res);
676         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
677                                            lock_flags, 0, NULL);
678         LDLM_RESOURCE_DELREF(res);
679         ldlm_resource_putref(res);
680         RETURN(count);
681 }
682
683 static int osc_destroy_interpret(const struct lu_env *env,
684                                  struct ptlrpc_request *req, void *data,
685                                  int rc)
686 {
687         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
688
689         cfs_atomic_dec(&cli->cl_destroy_in_flight);
690         cfs_waitq_signal(&cli->cl_destroy_waitq);
691         return 0;
692 }
693
694 static int osc_can_send_destroy(struct client_obd *cli)
695 {
696         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
697             cli->cl_max_rpcs_in_flight) {
698                 /* The destroy request can be sent */
699                 return 1;
700         }
701         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
702             cli->cl_max_rpcs_in_flight) {
703                 /*
704                  * The counter has been modified between the two atomic
705                  * operations.
706                  */
707                 cfs_waitq_signal(&cli->cl_destroy_waitq);
708         }
709         return 0;
710 }
711
712 int osc_create(const struct lu_env *env, struct obd_export *exp,
713                struct obdo *oa, struct lov_stripe_md **ea,
714                struct obd_trans_info *oti)
715 {
716         int rc = 0;
717         ENTRY;
718
719         LASSERT(oa);
720         LASSERT(ea);
721         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
722
723         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
724             oa->o_flags == OBD_FL_RECREATE_OBJS) {
725                 RETURN(osc_real_create(exp, oa, ea, oti));
726         }
727
728         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
729                 RETURN(osc_real_create(exp, oa, ea, oti));
730
731         /* we should not get here anymore */
732         LBUG();
733
734         RETURN(rc);
735 }
736
737 /* Destroy requests can be async always on the client, and we don't even really
738  * care about the return code since the client cannot do anything at all about
739  * a destroy failure.
740  * When the MDS is unlinking a filename, it saves the file objects into a
741  * recovery llog, and these object records are cancelled when the OST reports
742  * they were destroyed and sync'd to disk (i.e. transaction committed).
743  * If the client dies, or the OST is down when the object should be destroyed,
744  * the records are not cancelled, and when the OST reconnects to the MDS next,
745  * it will retrieve the llog unlink logs and then sends the log cancellation
746  * cookies to the MDS after committing destroy transactions. */
747 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
748                        struct obdo *oa, struct lov_stripe_md *ea,
749                        struct obd_trans_info *oti, struct obd_export *md_export,
750                        void *capa)
751 {
752         struct client_obd     *cli = &exp->exp_obd->u.cli;
753         struct ptlrpc_request *req;
754         struct ost_body       *body;
755         CFS_LIST_HEAD(cancels);
756         int rc, count;
757         ENTRY;
758
759         if (!oa) {
760                 CDEBUG(D_INFO, "oa NULL\n");
761                 RETURN(-EINVAL);
762         }
763
764         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
765                                         LDLM_FL_DISCARD_DATA);
766
767         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
768         if (req == NULL) {
769                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
770                 RETURN(-ENOMEM);
771         }
772
773         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
774         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
775                                0, &cancels, count);
776         if (rc) {
777                 ptlrpc_request_free(req);
778                 RETURN(rc);
779         }
780
781         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
782         ptlrpc_at_set_req_timeout(req);
783
784         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
785                 oa->o_lcookie = *oti->oti_logcookies;
786         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
787         LASSERT(body);
788         lustre_set_wire_obdo(&body->oa, oa);
789
790         osc_pack_capa(req, body, (struct obd_capa *)capa);
791         ptlrpc_request_set_replen(req);
792
793         /* If osc_destory is for destroying the unlink orphan,
794          * sent from MDT to OST, which should not be blocked here,
795          * because the process might be triggered by ptlrpcd, and
796          * it is not good to block ptlrpcd thread (b=16006)*/
797         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
798                 req->rq_interpret_reply = osc_destroy_interpret;
799                 if (!osc_can_send_destroy(cli)) {
800                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
801                                                           NULL);
802
803                         /*
804                          * Wait until the number of on-going destroy RPCs drops
805                          * under max_rpc_in_flight
806                          */
807                         l_wait_event_exclusive(cli->cl_destroy_waitq,
808                                                osc_can_send_destroy(cli), &lwi);
809                 }
810         }
811
812         /* Do not wait for response */
813         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
814         RETURN(0);
815 }
816
817 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
818                                 long writing_bytes)
819 {
820         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
821
822         LASSERT(!(oa->o_valid & bits));
823
824         oa->o_valid |= bits;
825         client_obd_list_lock(&cli->cl_loi_list_lock);
826         oa->o_dirty = cli->cl_dirty;
827         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
828                      cli->cl_dirty_max)) {
829                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
830                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
831                 oa->o_undirty = 0;
832         } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) +
833                             cfs_atomic_read(&obd_dirty_pages) -
834                             cfs_atomic_read(&obd_dirty_transit_pages) >
835                             (long)(obd_max_dirty_pages + 1))) {
836                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
837                  * not covered by a lock thus they may safely race and trip
838                  * this CERROR() unless we add in a small fudge factor (+1). */
839                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
840                        cli->cl_import->imp_obd->obd_name,
841                        cfs_atomic_read(&obd_unstable_pages),
842                        cfs_atomic_read(&obd_dirty_pages),
843                        cfs_atomic_read(&obd_dirty_transit_pages),
844                        obd_max_dirty_pages);
845                 oa->o_undirty = 0;
846         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
847                 CERROR("dirty %lu - dirty_max %lu too big???\n",
848                        cli->cl_dirty, cli->cl_dirty_max);
849                 oa->o_undirty = 0;
850         } else {
851                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
852                                       CFS_PAGE_SHIFT)*
853                                      (cli->cl_max_rpcs_in_flight + 1);
854                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
855         }
856         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
857         oa->o_dropped = cli->cl_lost_grant;
858         cli->cl_lost_grant = 0;
859         client_obd_list_unlock(&cli->cl_loi_list_lock);
860         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
861                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
862
863 }
864
865 void osc_update_next_shrink(struct client_obd *cli)
866 {
867         cli->cl_next_shrink_grant =
868                 cfs_time_shift(cli->cl_grant_shrink_interval);
869         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
870                cli->cl_next_shrink_grant);
871 }
872
873 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
874 {
875         client_obd_list_lock(&cli->cl_loi_list_lock);
876         cli->cl_avail_grant += grant;
877         client_obd_list_unlock(&cli->cl_loi_list_lock);
878 }
879
880 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
881 {
882         if (body->oa.o_valid & OBD_MD_FLGRANT) {
883                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
884                 __osc_update_grant(cli, body->oa.o_grant);
885         }
886 }
887
888 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
889                               obd_count keylen, void *key, obd_count vallen,
890                               void *val, struct ptlrpc_request_set *set);
891
892 static int osc_shrink_grant_interpret(const struct lu_env *env,
893                                       struct ptlrpc_request *req,
894                                       void *aa, int rc)
895 {
896         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
897         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
898         struct ost_body *body;
899
900         if (rc != 0) {
901                 __osc_update_grant(cli, oa->o_grant);
902                 GOTO(out, rc);
903         }
904
905         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
906         LASSERT(body);
907         osc_update_grant(cli, body);
908 out:
909         OBDO_FREE(oa);
910         return rc;
911 }
912
913 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
914 {
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         oa->o_grant = cli->cl_avail_grant / 4;
917         cli->cl_avail_grant -= oa->o_grant;
918         client_obd_list_unlock(&cli->cl_loi_list_lock);
919         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
920                 oa->o_valid |= OBD_MD_FLFLAGS;
921                 oa->o_flags = 0;
922         }
923         oa->o_flags |= OBD_FL_SHRINK_GRANT;
924         osc_update_next_shrink(cli);
925 }
926
927 /* Shrink the current grant, either from some large amount to enough for a
928  * full set of in-flight RPCs, or if we have already shrunk to that limit
929  * then to enough for a single RPC.  This avoids keeping more grant than
930  * needed, and avoids shrinking the grant piecemeal. */
931 static int osc_shrink_grant(struct client_obd *cli)
932 {
933         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
934                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
935
936         client_obd_list_lock(&cli->cl_loi_list_lock);
937         if (cli->cl_avail_grant <= target_bytes)
938                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
939         client_obd_list_unlock(&cli->cl_loi_list_lock);
940
941         return osc_shrink_grant_to_target(cli, target_bytes);
942 }
943
944 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
945 {
946         int                     rc = 0;
947         struct ost_body        *body;
948         ENTRY;
949
950         client_obd_list_lock(&cli->cl_loi_list_lock);
951         /* Don't shrink if we are already above or below the desired limit
952          * We don't want to shrink below a single RPC, as that will negatively
953          * impact block allocation and long-term performance. */
954         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
955                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
956
957         if (target_bytes >= cli->cl_avail_grant) {
958                 client_obd_list_unlock(&cli->cl_loi_list_lock);
959                 RETURN(0);
960         }
961         client_obd_list_unlock(&cli->cl_loi_list_lock);
962
963         OBD_ALLOC_PTR(body);
964         if (!body)
965                 RETURN(-ENOMEM);
966
967         osc_announce_cached(cli, &body->oa, 0);
968
969         client_obd_list_lock(&cli->cl_loi_list_lock);
970         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
971         cli->cl_avail_grant = target_bytes;
972         client_obd_list_unlock(&cli->cl_loi_list_lock);
973         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
974                 body->oa.o_valid |= OBD_MD_FLFLAGS;
975                 body->oa.o_flags = 0;
976         }
977         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
978         osc_update_next_shrink(cli);
979
980         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
981                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
982                                 sizeof(*body), body, NULL);
983         if (rc != 0)
984                 __osc_update_grant(cli, body->oa.o_grant);
985         OBD_FREE_PTR(body);
986         RETURN(rc);
987 }
988
989 static int osc_should_shrink_grant(struct client_obd *client)
990 {
991         cfs_time_t time = cfs_time_current();
992         cfs_time_t next_shrink = client->cl_next_shrink_grant;
993
994         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
995              OBD_CONNECT_GRANT_SHRINK) == 0)
996                 return 0;
997
998         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
999                 /* Get the current RPC size directly, instead of going via:
1000                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1001                  * Keep comment here so that it can be found by searching. */
1002                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1003
1004                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1005                     client->cl_avail_grant > brw_size)
1006                         return 1;
1007                 else
1008                         osc_update_next_shrink(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1014 {
1015         struct client_obd *client;
1016
1017         cfs_list_for_each_entry(client, &item->ti_obd_list,
1018                                 cl_grant_shrink_list) {
1019                 if (osc_should_shrink_grant(client))
1020                         osc_shrink_grant(client);
1021         }
1022         return 0;
1023 }
1024
1025 static int osc_add_shrink_grant(struct client_obd *client)
1026 {
1027         int rc;
1028
1029         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1030                                        TIMEOUT_GRANT,
1031                                        osc_grant_shrink_grant_cb, NULL,
1032                                        &client->cl_grant_shrink_list);
1033         if (rc) {
1034                 CERROR("add grant client %s error %d\n",
1035                         client->cl_import->imp_obd->obd_name, rc);
1036                 return rc;
1037         }
1038         CDEBUG(D_CACHE, "add grant client %s \n",
1039                client->cl_import->imp_obd->obd_name);
1040         osc_update_next_shrink(client);
1041         return 0;
1042 }
1043
1044 static int osc_del_shrink_grant(struct client_obd *client)
1045 {
1046         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1047                                          TIMEOUT_GRANT);
1048 }
1049
1050 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1051 {
1052         /*
1053          * ocd_grant is the total grant amount we're expect to hold: if we've
1054          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1055          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1056          *
1057          * race is tolerable here: if we're evicted, but imp_state already
1058          * left EVICTED state, then cl_dirty must be 0 already.
1059          */
1060         client_obd_list_lock(&cli->cl_loi_list_lock);
1061         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1062                 cli->cl_avail_grant = ocd->ocd_grant;
1063         else
1064                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1065
1066         if (cli->cl_avail_grant < 0) {
1067                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1068                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1069                       ocd->ocd_grant, cli->cl_dirty);
1070                 /* workaround for servers which do not have the patch from
1071                  * LU-2679 */
1072                 cli->cl_avail_grant = ocd->ocd_grant;
1073         }
1074
1075         /* determine the appropriate chunk size used by osc_extent. */
1076         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1077         client_obd_list_unlock(&cli->cl_loi_list_lock);
1078
1079         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1080                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1081                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1082
1083         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1084             cfs_list_empty(&cli->cl_grant_shrink_list))
1085                 osc_add_shrink_grant(cli);
1086 }
1087
1088 /* We assume that the reason this OSC got a short read is because it read
1089  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1090  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1091  * this stripe never got written at or beyond this stripe offset yet. */
1092 static void handle_short_read(int nob_read, obd_count page_count,
1093                               struct brw_page **pga)
1094 {
1095         char *ptr;
1096         int i = 0;
1097
1098         /* skip bytes read OK */
1099         while (nob_read > 0) {
1100                 LASSERT (page_count > 0);
1101
1102                 if (pga[i]->count > nob_read) {
1103                         /* EOF inside this page */
1104                         ptr = cfs_kmap(pga[i]->pg) +
1105                                 (pga[i]->off & ~CFS_PAGE_MASK);
1106                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1107                         cfs_kunmap(pga[i]->pg);
1108                         page_count--;
1109                         i++;
1110                         break;
1111                 }
1112
1113                 nob_read -= pga[i]->count;
1114                 page_count--;
1115                 i++;
1116         }
1117
1118         /* zero remaining pages */
1119         while (page_count-- > 0) {
1120                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1121                 memset(ptr, 0, pga[i]->count);
1122                 cfs_kunmap(pga[i]->pg);
1123                 i++;
1124         }
1125 }
1126
1127 static int check_write_rcs(struct ptlrpc_request *req,
1128                            int requested_nob, int niocount,
1129                            obd_count page_count, struct brw_page **pga)
1130 {
1131         int     i;
1132         __u32   *remote_rcs;
1133
1134         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1135                                                   sizeof(*remote_rcs) *
1136                                                   niocount);
1137         if (remote_rcs == NULL) {
1138                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1139                 return(-EPROTO);
1140         }
1141
1142         /* return error if any niobuf was in error */
1143         for (i = 0; i < niocount; i++) {
1144                 if ((int)remote_rcs[i] < 0)
1145                         return(remote_rcs[i]);
1146
1147                 if (remote_rcs[i] != 0) {
1148                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1149                                 i, remote_rcs[i], req);
1150                         return(-EPROTO);
1151                 }
1152         }
1153
1154         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1155                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1156                        req->rq_bulk->bd_nob_transferred, requested_nob);
1157                 return(-EPROTO);
1158         }
1159
1160         return (0);
1161 }
1162
1163 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1164 {
1165         if (p1->flag != p2->flag) {
1166                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1167                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1168
1169                 /* warn if we try to combine flags that we don't know to be
1170                  * safe to combine */
1171                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1172                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1173                               "report this at http://bugs.whamcloud.com/\n",
1174                               p1->flag, p2->flag);
1175                 }
1176                 return 0;
1177         }
1178
1179         return (p1->off + p1->count == p2->off);
1180 }
1181
1182 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1183                                    struct brw_page **pga, int opc,
1184                                    cksum_type_t cksum_type)
1185 {
1186         __u32                           cksum;
1187         int                             i = 0;
1188         struct cfs_crypto_hash_desc     *hdesc;
1189         unsigned int                    bufsize;
1190         int                             err;
1191         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1192
1193         LASSERT(pg_count > 0);
1194
1195         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1196         if (IS_ERR(hdesc)) {
1197                 CERROR("Unable to initialize checksum hash %s\n",
1198                        cfs_crypto_hash_name(cfs_alg));
1199                 return PTR_ERR(hdesc);
1200         }
1201
1202         while (nob > 0 && pg_count > 0) {
1203                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1204
1205                 /* corrupt the data before we compute the checksum, to
1206                  * simulate an OST->client data error */
1207                 if (i == 0 && opc == OST_READ &&
1208                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1209                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1210                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1211                         memcpy(ptr + off, "bad1", min(4, nob));
1212                         cfs_kunmap(pga[i]->pg);
1213                 }
1214                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1215                                   pga[i]->off & ~CFS_PAGE_MASK,
1216                                   count);
1217                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1218                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1219
1220                 nob -= pga[i]->count;
1221                 pg_count--;
1222                 i++;
1223         }
1224
1225         bufsize = 4;
1226         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1227
1228         if (err)
1229                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1230
1231         /* For sending we only compute the wrong checksum instead
1232          * of corrupting the data so it is still correct on a redo */
1233         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1234                 cksum++;
1235
1236         return cksum;
1237 }
1238
1239 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1240                                 struct lov_stripe_md *lsm, obd_count page_count,
1241                                 struct brw_page **pga,
1242                                 struct ptlrpc_request **reqp,
1243                                 struct obd_capa *ocapa, int reserve,
1244                                 int resend)
1245 {
1246         struct ptlrpc_request   *req;
1247         struct ptlrpc_bulk_desc *desc;
1248         struct ost_body         *body;
1249         struct obd_ioobj        *ioobj;
1250         struct niobuf_remote    *niobuf;
1251         int niocount, i, requested_nob, opc, rc;
1252         struct osc_brw_async_args *aa;
1253         struct req_capsule      *pill;
1254         struct brw_page *pg_prev;
1255
1256         ENTRY;
1257         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1258                 RETURN(-ENOMEM); /* Recoverable */
1259         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1260                 RETURN(-EINVAL); /* Fatal */
1261
1262         if ((cmd & OBD_BRW_WRITE) != 0) {
1263                 opc = OST_WRITE;
1264                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1265                                                 cli->cl_import->imp_rq_pool,
1266                                                 &RQF_OST_BRW_WRITE);
1267         } else {
1268                 opc = OST_READ;
1269                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1270         }
1271         if (req == NULL)
1272                 RETURN(-ENOMEM);
1273
1274         for (niocount = i = 1; i < page_count; i++) {
1275                 if (!can_merge_pages(pga[i - 1], pga[i]))
1276                         niocount++;
1277         }
1278
1279         pill = &req->rq_pill;
1280         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1281                              sizeof(*ioobj));
1282         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1283                              niocount * sizeof(*niobuf));
1284         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1285
1286         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1287         if (rc) {
1288                 ptlrpc_request_free(req);
1289                 RETURN(rc);
1290         }
1291         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1292         ptlrpc_at_set_req_timeout(req);
1293         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1294          * retry logic */
1295         req->rq_no_retry_einprogress = 1;
1296
1297         desc = ptlrpc_prep_bulk_imp(req, page_count,
1298                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1299                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1300                 OST_BULK_PORTAL);
1301
1302         if (desc == NULL)
1303                 GOTO(out, rc = -ENOMEM);
1304         /* NB request now owns desc and will free it when it gets freed */
1305
1306         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1307         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1308         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1309         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1310
1311         lustre_set_wire_obdo(&body->oa, oa);
1312
1313         obdo_to_ioobj(oa, ioobj);
1314         ioobj->ioo_bufcnt = niocount;
1315         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1316          * that might be send for this request.  The actual number is decided
1317          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1318          * "max - 1" for old client compatibility sending "0", and also so the
1319          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1320         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1321         osc_pack_capa(req, body, ocapa);
1322         LASSERT(page_count > 0);
1323         pg_prev = pga[0];
1324         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1325                 struct brw_page *pg = pga[i];
1326                 int poff = pg->off & ~CFS_PAGE_MASK;
1327
1328                 LASSERT(pg->count > 0);
1329                 /* make sure there is no gap in the middle of page array */
1330                 LASSERTF(page_count == 1 ||
1331                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1332                           ergo(i > 0 && i < page_count - 1,
1333                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1334                           ergo(i == page_count - 1, poff == 0)),
1335                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1336                          i, page_count, pg, pg->off, pg->count);
1337 #ifdef __linux__
1338                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1339                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1340                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1341                          i, page_count,
1342                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1343                          pg_prev->pg, page_private(pg_prev->pg),
1344                          pg_prev->pg->index, pg_prev->off);
1345 #else
1346                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1347                          "i %d p_c %u\n", i, page_count);
1348 #endif
1349                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1350                         (pg->flag & OBD_BRW_SRVLOCK));
1351
1352                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1353                 requested_nob += pg->count;
1354
1355                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1356                         niobuf--;
1357                         niobuf->len += pg->count;
1358                 } else {
1359                         niobuf->offset = pg->off;
1360                         niobuf->len    = pg->count;
1361                         niobuf->flags  = pg->flag;
1362                 }
1363                 pg_prev = pg;
1364         }
1365
1366         LASSERTF((void *)(niobuf - niocount) ==
1367                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1368                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1369                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1370
1371         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1372         if (resend) {
1373                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1374                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1375                         body->oa.o_flags = 0;
1376                 }
1377                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1378         }
1379
1380         if (osc_should_shrink_grant(cli))
1381                 osc_shrink_grant_local(cli, &body->oa);
1382
1383         /* size[REQ_REC_OFF] still sizeof (*body) */
1384         if (opc == OST_WRITE) {
1385                 if (cli->cl_checksum &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         /* store cl_cksum_type in a local variable since
1388                          * it can be changed via lprocfs */
1389                         cksum_type_t cksum_type = cli->cl_cksum_type;
1390
1391                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1392                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1393                                 body->oa.o_flags = 0;
1394                         }
1395                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1396                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1397                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1398                                                              page_count, pga,
1399                                                              OST_WRITE,
1400                                                              cksum_type);
1401                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1402                                body->oa.o_cksum);
1403                         /* save this in 'oa', too, for later checking */
1404                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1405                         oa->o_flags |= cksum_type_pack(cksum_type);
1406                 } else {
1407                         /* clear out the checksum flag, in case this is a
1408                          * resend but cl_checksum is no longer set. b=11238 */
1409                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1410                 }
1411                 oa->o_cksum = body->oa.o_cksum;
1412                 /* 1 RC per niobuf */
1413                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1414                                      sizeof(__u32) * niocount);
1415         } else {
1416                 if (cli->cl_checksum &&
1417                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1418                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1419                                 body->oa.o_flags = 0;
1420                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1421                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1422                 }
1423         }
1424         ptlrpc_request_set_replen(req);
1425
1426         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1427         aa = ptlrpc_req_async_args(req);
1428         aa->aa_oa = oa;
1429         aa->aa_requested_nob = requested_nob;
1430         aa->aa_nio_count = niocount;
1431         aa->aa_page_count = page_count;
1432         aa->aa_resends = 0;
1433         aa->aa_ppga = pga;
1434         aa->aa_cli = cli;
1435         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1436         if (ocapa && reserve)
1437                 aa->aa_ocapa = capa_get(ocapa);
1438
1439         *reqp = req;
1440         RETURN(0);
1441
1442  out:
1443         ptlrpc_req_finished(req);
1444         RETURN(rc);
1445 }
1446
1447 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1448                                 __u32 client_cksum, __u32 server_cksum, int nob,
1449                                 obd_count page_count, struct brw_page **pga,
1450                                 cksum_type_t client_cksum_type)
1451 {
1452         __u32 new_cksum;
1453         char *msg;
1454         cksum_type_t cksum_type;
1455
1456         if (server_cksum == client_cksum) {
1457                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1458                 return 0;
1459         }
1460
1461         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1462                                        oa->o_flags : 0);
1463         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1464                                       cksum_type);
1465
1466         if (cksum_type != client_cksum_type)
1467                 msg = "the server did not use the checksum type specified in "
1468                       "the original request - likely a protocol problem";
1469         else if (new_cksum == server_cksum)
1470                 msg = "changed on the client after we checksummed it - "
1471                       "likely false positive due to mmap IO (bug 11742)";
1472         else if (new_cksum == client_cksum)
1473                 msg = "changed in transit before arrival at OST";
1474         else
1475                 msg = "changed in transit AND doesn't match the original - "
1476                       "likely false positive due to mmap IO (bug 11742)";
1477
1478         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1479                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1480                            msg, libcfs_nid2str(peer->nid),
1481                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1482                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1483                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1484                            POSTID(&oa->o_oi), pga[0]->off,
1485                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1486         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1487                "client csum now %x\n", client_cksum, client_cksum_type,
1488                server_cksum, cksum_type, new_cksum);
1489         return 1;
1490 }
1491
1492 /* Note rc enters this function as number of bytes transferred */
1493 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1494 {
1495         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1496         const lnet_process_id_t *peer =
1497                         &req->rq_import->imp_connection->c_peer;
1498         struct client_obd *cli = aa->aa_cli;
1499         struct ost_body *body;
1500         __u32 client_cksum = 0;
1501         ENTRY;
1502
1503         if (rc < 0 && rc != -EDQUOT) {
1504                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1505                 RETURN(rc);
1506         }
1507
1508         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1509         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1510         if (body == NULL) {
1511                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1512                 RETURN(-EPROTO);
1513         }
1514
1515         /* set/clear over quota flag for a uid/gid */
1516         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1517             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1518                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1519
1520                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1521                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1522                        body->oa.o_flags);
1523                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524         }
1525
1526         osc_update_grant(cli, body);
1527
1528         if (rc < 0)
1529                 RETURN(rc);
1530
1531         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1532                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1533
1534         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1535                 if (rc > 0) {
1536                         CERROR("Unexpected +ve rc %d\n", rc);
1537                         RETURN(-EPROTO);
1538                 }
1539                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1540
1541                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542                         RETURN(-EAGAIN);
1543
1544                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1545                     check_write_checksum(&body->oa, peer, client_cksum,
1546                                          body->oa.o_cksum, aa->aa_requested_nob,
1547                                          aa->aa_page_count, aa->aa_ppga,
1548                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1549                         RETURN(-EAGAIN);
1550
1551                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1552                                      aa->aa_page_count, aa->aa_ppga);
1553                 GOTO(out, rc);
1554         }
1555
1556         /* The rest of this function executes only for OST_READs */
1557
1558         /* if unwrap_bulk failed, return -EAGAIN to retry */
1559         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1560         if (rc < 0)
1561                 GOTO(out, rc = -EAGAIN);
1562
1563         if (rc > aa->aa_requested_nob) {
1564                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1565                        aa->aa_requested_nob);
1566                 RETURN(-EPROTO);
1567         }
1568
1569         if (rc != req->rq_bulk->bd_nob_transferred) {
1570                 CERROR ("Unexpected rc %d (%d transferred)\n",
1571                         rc, req->rq_bulk->bd_nob_transferred);
1572                 return (-EPROTO);
1573         }
1574
1575         if (rc < aa->aa_requested_nob)
1576                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1577
1578         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1579                 static int cksum_counter;
1580                 __u32      server_cksum = body->oa.o_cksum;
1581                 char      *via;
1582                 char      *router;
1583                 cksum_type_t cksum_type;
1584
1585                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1586                                                body->oa.o_flags : 0);
1587                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1588                                                  aa->aa_ppga, OST_READ,
1589                                                  cksum_type);
1590
1591                 if (peer->nid == req->rq_bulk->bd_sender) {
1592                         via = router = "";
1593                 } else {
1594                         via = " via ";
1595                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1596                 }
1597
1598                 if (server_cksum == ~0 && rc > 0) {
1599                         CERROR("Protocol error: server %s set the 'checksum' "
1600                                "bit, but didn't send a checksum.  Not fatal, "
1601                                "but please notify on http://bugs.whamcloud.com/\n",
1602                                libcfs_nid2str(peer->nid));
1603                 } else if (server_cksum != client_cksum) {
1604                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1605                                            "%s%s%s inode "DFID" object "DOSTID
1606                                            " extent ["LPU64"-"LPU64"]\n",
1607                                            req->rq_import->imp_obd->obd_name,
1608                                            libcfs_nid2str(peer->nid),
1609                                            via, router,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                                 body->oa.o_parent_seq : (__u64)0,
1612                                            body->oa.o_valid & OBD_MD_FLFID ?
1613                                                 body->oa.o_parent_oid : 0,
1614                                            body->oa.o_valid & OBD_MD_FLFID ?
1615                                                 body->oa.o_parent_ver : 0,
1616                                            POSTID(&body->oa.o_oi),
1617                                            aa->aa_ppga[0]->off,
1618                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1619                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1620                                                                         1);
1621                         CERROR("client %x, server %x, cksum_type %x\n",
1622                                client_cksum, server_cksum, cksum_type);
1623                         cksum_counter = 0;
1624                         aa->aa_oa->o_cksum = client_cksum;
1625                         rc = -EAGAIN;
1626                 } else {
1627                         cksum_counter++;
1628                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1629                         rc = 0;
1630                 }
1631         } else if (unlikely(client_cksum)) {
1632                 static int cksum_missed;
1633
1634                 cksum_missed++;
1635                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1636                         CERROR("Checksum %u requested from %s but not sent\n",
1637                                cksum_missed, libcfs_nid2str(peer->nid));
1638         } else {
1639                 rc = 0;
1640         }
1641 out:
1642         if (rc >= 0)
1643                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1644
1645         RETURN(rc);
1646 }
1647
1648 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1649                             struct lov_stripe_md *lsm,
1650                             obd_count page_count, struct brw_page **pga,
1651                             struct obd_capa *ocapa)
1652 {
1653         struct ptlrpc_request *req;
1654         int                    rc;
1655         cfs_waitq_t            waitq;
1656         int                    generation, resends = 0;
1657         struct l_wait_info     lwi;
1658
1659         ENTRY;
1660
1661         cfs_waitq_init(&waitq);
1662         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1663
1664 restart_bulk:
1665         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1666                                   page_count, pga, &req, ocapa, 0, resends);
1667         if (rc != 0)
1668                 return (rc);
1669
1670         if (resends) {
1671                 req->rq_generation_set = 1;
1672                 req->rq_import_generation = generation;
1673                 req->rq_sent = cfs_time_current_sec() + resends;
1674         }
1675
1676         rc = ptlrpc_queue_wait(req);
1677
1678         if (rc == -ETIMEDOUT && req->rq_resend) {
1679                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1680                 ptlrpc_req_finished(req);
1681                 goto restart_bulk;
1682         }
1683
1684         rc = osc_brw_fini_request(req, rc);
1685
1686         ptlrpc_req_finished(req);
1687         /* When server return -EINPROGRESS, client should always retry
1688          * regardless of the number of times the bulk was resent already.*/
1689         if (osc_recoverable_error(rc)) {
1690                 resends++;
1691                 if (rc != -EINPROGRESS &&
1692                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1693                         CERROR("%s: too many resend retries for object: "
1694                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1695                                POSTID(&oa->o_oi), rc);
1696                         goto out;
1697                 }
1698                 if (generation !=
1699                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1700                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1701                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1702                                POSTID(&oa->o_oi), rc);
1703                         goto out;
1704                 }
1705
1706                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1707                                        NULL);
1708                 l_wait_event(waitq, 0, &lwi);
1709
1710                 goto restart_bulk;
1711         }
1712 out:
1713         if (rc == -EAGAIN || rc == -EINPROGRESS)
1714                 rc = -EIO;
1715         RETURN (rc);
1716 }
1717
1718 static int osc_brw_redo_request(struct ptlrpc_request *request,
1719                                 struct osc_brw_async_args *aa, int rc)
1720 {
1721         struct ptlrpc_request *new_req;
1722         struct osc_brw_async_args *new_aa;
1723         struct osc_async_page *oap;
1724         ENTRY;
1725
1726         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1727                   "redo for recoverable error %d", rc);
1728
1729         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1730                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1731                                   aa->aa_cli, aa->aa_oa,
1732                                   NULL /* lsm unused by osc currently */,
1733                                   aa->aa_page_count, aa->aa_ppga,
1734                                   &new_req, aa->aa_ocapa, 0, 1);
1735         if (rc)
1736                 RETURN(rc);
1737
1738         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1739                 if (oap->oap_request != NULL) {
1740                         LASSERTF(request == oap->oap_request,
1741                                  "request %p != oap_request %p\n",
1742                                  request, oap->oap_request);
1743                         if (oap->oap_interrupted) {
1744                                 ptlrpc_req_finished(new_req);
1745                                 RETURN(-EINTR);
1746                         }
1747                 }
1748         }
1749         /* New request takes over pga and oaps from old request.
1750          * Note that copying a list_head doesn't work, need to move it... */
1751         aa->aa_resends++;
1752         new_req->rq_interpret_reply = request->rq_interpret_reply;
1753         new_req->rq_async_args = request->rq_async_args;
1754         new_req->rq_commit_cb = request->rq_commit_cb;
1755         /* cap resend delay to the current request timeout, this is similar to
1756          * what ptlrpc does (see after_reply()) */
1757         if (aa->aa_resends > new_req->rq_timeout)
1758                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1759         else
1760                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1761         new_req->rq_generation_set = 1;
1762         new_req->rq_import_generation = request->rq_import_generation;
1763
1764         new_aa = ptlrpc_req_async_args(new_req);
1765
1766         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1767         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1768         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1769         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1770         new_aa->aa_resends = aa->aa_resends;
1771
1772         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1773                 if (oap->oap_request) {
1774                         ptlrpc_req_finished(oap->oap_request);
1775                         oap->oap_request = ptlrpc_request_addref(new_req);
1776                 }
1777         }
1778
1779         new_aa->aa_ocapa = aa->aa_ocapa;
1780         aa->aa_ocapa = NULL;
1781
1782         /* XXX: This code will run into problem if we're going to support
1783          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1784          * and wait for all of them to be finished. We should inherit request
1785          * set from old request. */
1786         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1787
1788         DEBUG_REQ(D_INFO, new_req, "new request");
1789         RETURN(0);
1790 }
1791
1792 /*
1793  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1794  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1795  * fine for our small page arrays and doesn't require allocation.  its an
1796  * insertion sort that swaps elements that are strides apart, shrinking the
1797  * stride down until its '1' and the array is sorted.
1798  */
1799 static void sort_brw_pages(struct brw_page **array, int num)
1800 {
1801         int stride, i, j;
1802         struct brw_page *tmp;
1803
1804         if (num == 1)
1805                 return;
1806         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1807                 ;
1808
1809         do {
1810                 stride /= 3;
1811                 for (i = stride ; i < num ; i++) {
1812                         tmp = array[i];
1813                         j = i;
1814                         while (j >= stride && array[j - stride]->off > tmp->off) {
1815                                 array[j] = array[j - stride];
1816                                 j -= stride;
1817                         }
1818                         array[j] = tmp;
1819                 }
1820         } while (stride > 1);
1821 }
1822
1823 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1824 {
1825         int count = 1;
1826         int offset;
1827         int i = 0;
1828
1829         LASSERT (pages > 0);
1830         offset = pg[i]->off & ~CFS_PAGE_MASK;
1831
1832         for (;;) {
1833                 pages--;
1834                 if (pages == 0)         /* that's all */
1835                         return count;
1836
1837                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1838                         return count;   /* doesn't end on page boundary */
1839
1840                 i++;
1841                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1842                 if (offset != 0)        /* doesn't start on page boundary */
1843                         return count;
1844
1845                 count++;
1846         }
1847 }
1848
1849 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1850 {
1851         struct brw_page **ppga;
1852         int i;
1853
1854         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1855         if (ppga == NULL)
1856                 return NULL;
1857
1858         for (i = 0; i < count; i++)
1859                 ppga[i] = pga + i;
1860         return ppga;
1861 }
1862
1863 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1864 {
1865         LASSERT(ppga != NULL);
1866         OBD_FREE(ppga, sizeof(*ppga) * count);
1867 }
1868
1869 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1870                    obd_count page_count, struct brw_page *pga,
1871                    struct obd_trans_info *oti)
1872 {
1873         struct obdo *saved_oa = NULL;
1874         struct brw_page **ppga, **orig;
1875         struct obd_import *imp = class_exp2cliimp(exp);
1876         struct client_obd *cli;
1877         int rc, page_count_orig;
1878         ENTRY;
1879
1880         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1881         cli = &imp->imp_obd->u.cli;
1882
1883         if (cmd & OBD_BRW_CHECK) {
1884                 /* The caller just wants to know if there's a chance that this
1885                  * I/O can succeed */
1886
1887                 if (imp->imp_invalid)
1888                         RETURN(-EIO);
1889                 RETURN(0);
1890         }
1891
1892         /* test_brw with a failed create can trip this, maybe others. */
1893         LASSERT(cli->cl_max_pages_per_rpc);
1894
1895         rc = 0;
1896
1897         orig = ppga = osc_build_ppga(pga, page_count);
1898         if (ppga == NULL)
1899                 RETURN(-ENOMEM);
1900         page_count_orig = page_count;
1901
1902         sort_brw_pages(ppga, page_count);
1903         while (page_count) {
1904                 obd_count pages_per_brw;
1905
1906                 if (page_count > cli->cl_max_pages_per_rpc)
1907                         pages_per_brw = cli->cl_max_pages_per_rpc;
1908                 else
1909                         pages_per_brw = page_count;
1910
1911                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1912
1913                 if (saved_oa != NULL) {
1914                         /* restore previously saved oa */
1915                         *oinfo->oi_oa = *saved_oa;
1916                 } else if (page_count > pages_per_brw) {
1917                         /* save a copy of oa (brw will clobber it) */
1918                         OBDO_ALLOC(saved_oa);
1919                         if (saved_oa == NULL)
1920                                 GOTO(out, rc = -ENOMEM);
1921                         *saved_oa = *oinfo->oi_oa;
1922                 }
1923
1924                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1925                                       pages_per_brw, ppga, oinfo->oi_capa);
1926
1927                 if (rc != 0)
1928                         break;
1929
1930                 page_count -= pages_per_brw;
1931                 ppga += pages_per_brw;
1932         }
1933
1934 out:
1935         osc_release_ppga(orig, page_count_orig);
1936
1937         if (saved_oa != NULL)
1938                 OBDO_FREE(saved_oa);
1939
1940         RETURN(rc);
1941 }
1942
1943 static int brw_interpret(const struct lu_env *env,
1944                          struct ptlrpc_request *req, void *data, int rc)
1945 {
1946         struct osc_brw_async_args *aa = data;
1947         struct osc_extent *ext;
1948         struct osc_extent *tmp;
1949         struct cl_object  *obj = NULL;
1950         struct client_obd *cli = aa->aa_cli;
1951         ENTRY;
1952
1953         rc = osc_brw_fini_request(req, rc);
1954         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1955         /* When server return -EINPROGRESS, client should always retry
1956          * regardless of the number of times the bulk was resent already. */
1957         if (osc_recoverable_error(rc)) {
1958                 if (req->rq_import_generation !=
1959                     req->rq_import->imp_generation) {
1960                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1961                                ""DOSTID", rc = %d.\n",
1962                                req->rq_import->imp_obd->obd_name,
1963                                POSTID(&aa->aa_oa->o_oi), rc);
1964                 } else if (rc == -EINPROGRESS ||
1965                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1966                         rc = osc_brw_redo_request(req, aa, rc);
1967                 } else {
1968                         CERROR("%s: too many resent retries for object: "
1969                                ""LPU64":"LPU64", rc = %d.\n",
1970                                req->rq_import->imp_obd->obd_name,
1971                                POSTID(&aa->aa_oa->o_oi), rc);
1972                 }
1973
1974                 if (rc == 0)
1975                         RETURN(0);
1976                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1977                         rc = -EIO;
1978         }
1979
1980         if (aa->aa_ocapa) {
1981                 capa_put(aa->aa_ocapa);
1982                 aa->aa_ocapa = NULL;
1983         }
1984
1985         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1986                 if (obj == NULL && rc == 0) {
1987                         obj = osc2cl(ext->oe_obj);
1988                         cl_object_get(obj);
1989                 }
1990
1991                 cfs_list_del_init(&ext->oe_link);
1992                 osc_extent_finish(env, ext, 1, rc);
1993         }
1994         LASSERT(cfs_list_empty(&aa->aa_exts));
1995         LASSERT(cfs_list_empty(&aa->aa_oaps));
1996
1997         if (obj != NULL) {
1998                 struct obdo *oa = aa->aa_oa;
1999                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2000                 unsigned long valid = 0;
2001
2002                 LASSERT(rc == 0);
2003                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2004                         attr->cat_blocks = oa->o_blocks;
2005                         valid |= CAT_BLOCKS;
2006                 }
2007                 if (oa->o_valid & OBD_MD_FLMTIME) {
2008                         attr->cat_mtime = oa->o_mtime;
2009                         valid |= CAT_MTIME;
2010                 }
2011                 if (oa->o_valid & OBD_MD_FLATIME) {
2012                         attr->cat_atime = oa->o_atime;
2013                         valid |= CAT_ATIME;
2014                 }
2015                 if (oa->o_valid & OBD_MD_FLCTIME) {
2016                         attr->cat_ctime = oa->o_ctime;
2017                         valid |= CAT_CTIME;
2018                 }
2019                 if (valid != 0) {
2020                         cl_object_attr_lock(obj);
2021                         cl_object_attr_set(env, obj, attr, valid);
2022                         cl_object_attr_unlock(obj);
2023                 }
2024                 cl_object_put(env, obj);
2025         }
2026         OBDO_FREE(aa->aa_oa);
2027
2028         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2029                           req->rq_bulk->bd_nob_transferred);
2030         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2031         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2032
2033         client_obd_list_lock(&cli->cl_loi_list_lock);
2034         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2035          * is called so we know whether to go to sync BRWs or wait for more
2036          * RPCs to complete */
2037         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2038                 cli->cl_w_in_flight--;
2039         else
2040                 cli->cl_r_in_flight--;
2041         osc_wake_cache_waiters(cli);
2042         client_obd_list_unlock(&cli->cl_loi_list_lock);
2043
2044         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2045         RETURN(rc);
2046 }
2047
2048 static void brw_commit(struct ptlrpc_request *req)
2049 {
2050         spin_lock(&req->rq_lock);
2051         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2052          * this called via the rq_commit_cb, I need to ensure
2053          * osc_dec_unstable_pages is still called. Otherwise unstable
2054          * pages may be leaked. */
2055         if (req->rq_unstable)
2056                 osc_dec_unstable_pages(req);
2057         else
2058                 req->rq_committed = 1;
2059         spin_unlock(&req->rq_lock);
2060 }
2061
2062 /**
2063  * Build an RPC by the list of extent @ext_list. The caller must ensure
2064  * that the total pages in this list are NOT over max pages per RPC.
2065  * Extents in the list must be in OES_RPC state.
2066  */
2067 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2068                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2069 {
2070         struct ptlrpc_request *req = NULL;
2071         struct osc_extent *ext;
2072         CFS_LIST_HEAD(rpc_list);
2073         struct brw_page **pga = NULL;
2074         struct osc_brw_async_args *aa = NULL;
2075         struct obdo *oa = NULL;
2076         struct osc_async_page *oap;
2077         struct osc_async_page *tmp;
2078         struct cl_req *clerq = NULL;
2079         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2080         struct ldlm_lock *lock = NULL;
2081         struct cl_req_attr crattr;
2082         obd_off starting_offset = OBD_OBJECT_EOF;
2083         obd_off ending_offset = 0;
2084         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2085
2086         ENTRY;
2087         LASSERT(!cfs_list_empty(ext_list));
2088
2089         /* add pages into rpc_list to build BRW rpc */
2090         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2091                 LASSERT(ext->oe_state == OES_RPC);
2092                 mem_tight |= ext->oe_memalloc;
2093                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2094                         ++page_count;
2095                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2096                         if (starting_offset > oap->oap_obj_off)
2097                                 starting_offset = oap->oap_obj_off;
2098                         else
2099                                 LASSERT(oap->oap_page_off == 0);
2100                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2101                                 ending_offset = oap->oap_obj_off +
2102                                                 oap->oap_count;
2103                         else
2104                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2105                                         CFS_PAGE_SIZE);
2106                 }
2107         }
2108
2109         if (mem_tight)
2110                 mpflag = cfs_memory_pressure_get_and_set();
2111
2112         memset(&crattr, 0, sizeof crattr);
2113         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2114         if (pga == NULL)
2115                 GOTO(out, rc = -ENOMEM);
2116
2117         OBDO_ALLOC(oa);
2118         if (oa == NULL)
2119                 GOTO(out, rc = -ENOMEM);
2120
2121         i = 0;
2122         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2123                 struct cl_page *page = oap2cl_page(oap);
2124                 if (clerq == NULL) {
2125                         clerq = cl_req_alloc(env, page, crt,
2126                                              1 /* only 1-object rpcs for
2127                                                 * now */);
2128                         if (IS_ERR(clerq))
2129                                 GOTO(out, rc = PTR_ERR(clerq));
2130                         lock = oap->oap_ldlm_lock;
2131                 }
2132                 if (mem_tight)
2133                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2134                 pga[i] = &oap->oap_brw_page;
2135                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2136                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2137                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2138                 i++;
2139                 cl_req_page_add(env, clerq, page);
2140         }
2141
2142         /* always get the data for the obdo for the rpc */
2143         LASSERT(clerq != NULL);
2144         crattr.cra_oa = oa;
2145         crattr.cra_capa = NULL;
2146         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2147         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2148         if (lock) {
2149                 oa->o_handle = lock->l_remote_handle;
2150                 oa->o_valid |= OBD_MD_FLHANDLE;
2151         }
2152
2153         rc = cl_req_prep(env, clerq);
2154         if (rc != 0) {
2155                 CERROR("cl_req_prep failed: %d\n", rc);
2156                 GOTO(out, rc);
2157         }
2158
2159         sort_brw_pages(pga, page_count);
2160         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2161                         pga, &req, crattr.cra_capa, 1, 0);
2162         if (rc != 0) {
2163                 CERROR("prep_req failed: %d\n", rc);
2164                 GOTO(out, rc);
2165         }
2166
2167         req->rq_commit_cb = brw_commit;
2168         req->rq_interpret_reply = brw_interpret;
2169
2170         if (mem_tight != 0)
2171                 req->rq_memalloc = 1;
2172
2173         /* Need to update the timestamps after the request is built in case
2174          * we race with setattr (locally or in queue at OST).  If OST gets
2175          * later setattr before earlier BRW (as determined by the request xid),
2176          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2177          * way to do this in a single call.  bug 10150 */
2178         cl_req_attr_set(env, clerq, &crattr,
2179                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2180
2181         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2182
2183         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2184         aa = ptlrpc_req_async_args(req);
2185         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2186         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2187         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2188         cfs_list_splice_init(ext_list, &aa->aa_exts);
2189         aa->aa_clerq = clerq;
2190
2191         /* queued sync pages can be torn down while the pages
2192          * were between the pending list and the rpc */
2193         tmp = NULL;
2194         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2195                 /* only one oap gets a request reference */
2196                 if (tmp == NULL)
2197                         tmp = oap;
2198                 if (oap->oap_interrupted && !req->rq_intr) {
2199                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2200                                         oap, req);
2201                         ptlrpc_mark_interrupted(req);
2202                 }
2203         }
2204         if (tmp != NULL)
2205                 tmp->oap_request = ptlrpc_request_addref(req);
2206
2207         client_obd_list_lock(&cli->cl_loi_list_lock);
2208         starting_offset >>= CFS_PAGE_SHIFT;
2209         if (cmd == OBD_BRW_READ) {
2210                 cli->cl_r_in_flight++;
2211                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2212                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2213                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2214                                       starting_offset + 1);
2215         } else {
2216                 cli->cl_w_in_flight++;
2217                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2218                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2219                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2220                                       starting_offset + 1);
2221         }
2222         client_obd_list_unlock(&cli->cl_loi_list_lock);
2223
2224         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2225                   page_count, aa, cli->cl_r_in_flight,
2226                   cli->cl_w_in_flight);
2227
2228         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2229          * see which CPU/NUMA node the majority of pages were allocated
2230          * on, and try to assign the async RPC to the CPU core
2231          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2232          *
2233          * But on the other hand, we expect that multiple ptlrpcd
2234          * threads and the initial write sponsor can run in parallel,
2235          * especially when data checksum is enabled, which is CPU-bound
2236          * operation and single ptlrpcd thread cannot process in time.
2237          * So more ptlrpcd threads sharing BRW load
2238          * (with PDL_POLICY_ROUND) seems better.
2239          */
2240         ptlrpcd_add_req(req, pol, -1);
2241         rc = 0;
2242         EXIT;
2243
2244 out:
2245         if (mem_tight != 0)
2246                 cfs_memory_pressure_restore(mpflag);
2247
2248         capa_put(crattr.cra_capa);
2249         if (rc != 0) {
2250                 LASSERT(req == NULL);
2251
2252                 if (oa)
2253                         OBDO_FREE(oa);
2254                 if (pga)
2255                         OBD_FREE(pga, sizeof(*pga) * page_count);
2256                 /* this should happen rarely and is pretty bad, it makes the
2257                  * pending list not follow the dirty order */
2258                 while (!cfs_list_empty(ext_list)) {
2259                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2260                                              oe_link);
2261                         cfs_list_del_init(&ext->oe_link);
2262                         osc_extent_finish(env, ext, 0, rc);
2263                 }
2264                 if (clerq && !IS_ERR(clerq))
2265                         cl_req_completion(env, clerq, rc);
2266         }
2267         RETURN(rc);
2268 }
2269
2270 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2271                                         struct ldlm_enqueue_info *einfo)
2272 {
2273         void *data = einfo->ei_cbdata;
2274         int set = 0;
2275
2276         LASSERT(lock != NULL);
2277         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2278         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2279         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2280         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2281
2282         lock_res_and_lock(lock);
2283         spin_lock(&osc_ast_guard);
2284
2285         if (lock->l_ast_data == NULL)
2286                 lock->l_ast_data = data;
2287         if (lock->l_ast_data == data)
2288                 set = 1;
2289
2290         spin_unlock(&osc_ast_guard);
2291         unlock_res_and_lock(lock);
2292
2293         return set;
2294 }
2295
2296 static int osc_set_data_with_check(struct lustre_handle *lockh,
2297                                    struct ldlm_enqueue_info *einfo)
2298 {
2299         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2300         int set = 0;
2301
2302         if (lock != NULL) {
2303                 set = osc_set_lock_data_with_check(lock, einfo);
2304                 LDLM_LOCK_PUT(lock);
2305         } else
2306                 CERROR("lockh %p, data %p - client evicted?\n",
2307                        lockh, einfo->ei_cbdata);
2308         return set;
2309 }
2310
2311 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2312                              ldlm_iterator_t replace, void *data)
2313 {
2314         struct ldlm_res_id res_id;
2315         struct obd_device *obd = class_exp2obd(exp);
2316
2317         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2318         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2319         return 0;
2320 }
2321
2322 /* find any ldlm lock of the inode in osc
2323  * return 0    not find
2324  *        1    find one
2325  *      < 0    error */
2326 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2327                            ldlm_iterator_t replace, void *data)
2328 {
2329         struct ldlm_res_id res_id;
2330         struct obd_device *obd = class_exp2obd(exp);
2331         int rc = 0;
2332
2333         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2334         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2335         if (rc == LDLM_ITER_STOP)
2336                 return(1);
2337         if (rc == LDLM_ITER_CONTINUE)
2338                 return(0);
2339         return(rc);
2340 }
2341
2342 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2343                             obd_enqueue_update_f upcall, void *cookie,
2344                             __u64 *flags, int agl, int rc)
2345 {
2346         int intent = *flags & LDLM_FL_HAS_INTENT;
2347         ENTRY;
2348
2349         if (intent) {
2350                 /* The request was created before ldlm_cli_enqueue call. */
2351                 if (rc == ELDLM_LOCK_ABORTED) {
2352                         struct ldlm_reply *rep;
2353                         rep = req_capsule_server_get(&req->rq_pill,
2354                                                      &RMF_DLM_REP);
2355
2356                         LASSERT(rep != NULL);
2357                         if (rep->lock_policy_res1)
2358                                 rc = rep->lock_policy_res1;
2359                 }
2360         }
2361
2362         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2363             (rc == 0)) {
2364                 *flags |= LDLM_FL_LVB_READY;
2365                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2366                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2367         }
2368
2369         /* Call the update callback. */
2370         rc = (*upcall)(cookie, rc);
2371         RETURN(rc);
2372 }
2373
2374 static int osc_enqueue_interpret(const struct lu_env *env,
2375                                  struct ptlrpc_request *req,
2376                                  struct osc_enqueue_args *aa, int rc)
2377 {
2378         struct ldlm_lock *lock;
2379         struct lustre_handle handle;
2380         __u32 mode;
2381         struct ost_lvb *lvb;
2382         __u32 lvb_len;
2383         __u64 *flags = aa->oa_flags;
2384
2385         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2386          * might be freed anytime after lock upcall has been called. */
2387         lustre_handle_copy(&handle, aa->oa_lockh);
2388         mode = aa->oa_ei->ei_mode;
2389
2390         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2391          * be valid. */
2392         lock = ldlm_handle2lock(&handle);
2393
2394         /* Take an additional reference so that a blocking AST that
2395          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2396          * to arrive after an upcall has been executed by
2397          * osc_enqueue_fini(). */
2398         ldlm_lock_addref(&handle, mode);
2399
2400         /* Let CP AST to grant the lock first. */
2401         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2402
2403         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2404                 lvb = NULL;
2405                 lvb_len = 0;
2406         } else {
2407                 lvb = aa->oa_lvb;
2408                 lvb_len = sizeof(*aa->oa_lvb);
2409         }
2410
2411         /* Complete obtaining the lock procedure. */
2412         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2413                                    mode, flags, lvb, lvb_len, &handle, rc);
2414         /* Complete osc stuff. */
2415         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2416                               flags, aa->oa_agl, rc);
2417
2418         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2419
2420         /* Release the lock for async request. */
2421         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2422                 /*
2423                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2424                  * not already released by
2425                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2426                  */
2427                 ldlm_lock_decref(&handle, mode);
2428
2429         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2430                  aa->oa_lockh, req, aa);
2431         ldlm_lock_decref(&handle, mode);
2432         LDLM_LOCK_PUT(lock);
2433         return rc;
2434 }
2435
2436 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2437                         struct lov_oinfo *loi, int flags,
2438                         struct ost_lvb *lvb, __u32 mode, int rc)
2439 {
2440         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2441
2442         if (rc == ELDLM_OK) {
2443                 __u64 tmp;
2444
2445                 LASSERT(lock != NULL);
2446                 loi->loi_lvb = *lvb;
2447                 tmp = loi->loi_lvb.lvb_size;
2448                 /* Extend KMS up to the end of this lock and no further
2449                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2450                 if (tmp > lock->l_policy_data.l_extent.end)
2451                         tmp = lock->l_policy_data.l_extent.end + 1;
2452                 if (tmp >= loi->loi_kms) {
2453                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2454                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2455                         loi_kms_set(loi, tmp);
2456                 } else {
2457                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2458                                    LPU64"; leaving kms="LPU64", end="LPU64,
2459                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2460                                    lock->l_policy_data.l_extent.end);
2461                 }
2462                 ldlm_lock_allow_match(lock);
2463         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2464                 LASSERT(lock != NULL);
2465                 loi->loi_lvb = *lvb;
2466                 ldlm_lock_allow_match(lock);
2467                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2468                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2469                 rc = ELDLM_OK;
2470         }
2471
2472         if (lock != NULL) {
2473                 if (rc != ELDLM_OK)
2474                         ldlm_lock_fail_match(lock);
2475
2476                 LDLM_LOCK_PUT(lock);
2477         }
2478 }
2479 EXPORT_SYMBOL(osc_update_enqueue);
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is excluded from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, ldlm_policy_data_t *policy,
2492                      struct ost_lvb *lvb, int kms_valid,
2493                      obd_enqueue_update_f upcall, void *cookie,
2494                      struct ldlm_enqueue_info *einfo,
2495                      struct lustre_handle *lockh,
2496                      struct ptlrpc_request_set *rqset, int async, int agl)
2497 {
2498         struct obd_device *obd = exp->exp_obd;
2499         struct ptlrpc_request *req = NULL;
2500         int intent = *flags & LDLM_FL_HAS_INTENT;
2501         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2502         ldlm_mode_t mode;
2503         int rc;
2504         ENTRY;
2505
2506         /* Filesystem lock extents are extended to page boundaries so that
2507          * dealing with the page cache is a little smoother.  */
2508         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2509         policy->l_extent.end |= ~CFS_PAGE_MASK;
2510
2511         /*
2512          * kms is not valid when either object is completely fresh (so that no
2513          * locks are cached), or object was evicted. In the latter case cached
2514          * lock cannot be used, because it would prime inode state with
2515          * potentially stale LVB.
2516          */
2517         if (!kms_valid)
2518                 goto no_match;
2519
2520         /* Next, search for already existing extent locks that will cover us */
2521         /* If we're trying to read, we also search for an existing PW lock.  The
2522          * VFS and page cache already protect us locally, so lots of readers/
2523          * writers can share a single PW lock.
2524          *
2525          * There are problems with conversion deadlocks, so instead of
2526          * converting a read lock to a write lock, we'll just enqueue a new
2527          * one.
2528          *
2529          * At some point we should cancel the read lock instead of making them
2530          * send us a blocking callback, but there are problems with canceling
2531          * locks out from other users right now, too. */
2532         mode = einfo->ei_mode;
2533         if (einfo->ei_mode == LCK_PR)
2534                 mode |= LCK_PW;
2535         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2536                                einfo->ei_type, policy, mode, lockh, 0);
2537         if (mode) {
2538                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2539
2540                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2541                         /* For AGL, if enqueue RPC is sent but the lock is not
2542                          * granted, then skip to process this strpe.
2543                          * Return -ECANCELED to tell the caller. */
2544                         ldlm_lock_decref(lockh, mode);
2545                         LDLM_LOCK_PUT(matched);
2546                         RETURN(-ECANCELED);
2547                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2548                         *flags |= LDLM_FL_LVB_READY;
2549                         /* addref the lock only if not async requests and PW
2550                          * lock is matched whereas we asked for PR. */
2551                         if (!rqset && einfo->ei_mode != mode)
2552                                 ldlm_lock_addref(lockh, LCK_PR);
2553                         if (intent) {
2554                                 /* I would like to be able to ASSERT here that
2555                                  * rss <= kms, but I can't, for reasons which
2556                                  * are explained in lov_enqueue() */
2557                         }
2558
2559                         /* We already have a lock, and it's referenced.
2560                          *
2561                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2562                          * AGL upcall may change it to CLS_HELD directly. */
2563                         (*upcall)(cookie, ELDLM_OK);
2564
2565                         if (einfo->ei_mode != mode)
2566                                 ldlm_lock_decref(lockh, LCK_PW);
2567                         else if (rqset)
2568                                 /* For async requests, decref the lock. */
2569                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2570                         LDLM_LOCK_PUT(matched);
2571                         RETURN(ELDLM_OK);
2572                 } else {
2573                         ldlm_lock_decref(lockh, mode);
2574                         LDLM_LOCK_PUT(matched);
2575                 }
2576         }
2577
2578  no_match:
2579         if (intent) {
2580                 CFS_LIST_HEAD(cancels);
2581                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2582                                            &RQF_LDLM_ENQUEUE_LVB);
2583                 if (req == NULL)
2584                         RETURN(-ENOMEM);
2585
2586                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2587                 if (rc) {
2588                         ptlrpc_request_free(req);
2589                         RETURN(rc);
2590                 }
2591
2592                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2593                                      sizeof *lvb);
2594                 ptlrpc_request_set_replen(req);
2595         }
2596
2597         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2598         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2599
2600         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2601                               sizeof(*lvb), LVB_T_OST, lockh, async);
2602         if (rqset) {
2603                 if (!rc) {
2604                         struct osc_enqueue_args *aa;
2605                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2606                         aa = ptlrpc_req_async_args(req);
2607                         aa->oa_ei = einfo;
2608                         aa->oa_exp = exp;
2609                         aa->oa_flags  = flags;
2610                         aa->oa_upcall = upcall;
2611                         aa->oa_cookie = cookie;
2612                         aa->oa_lvb    = lvb;
2613                         aa->oa_lockh  = lockh;
2614                         aa->oa_agl    = !!agl;
2615
2616                         req->rq_interpret_reply =
2617                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2618                         if (rqset == PTLRPCD_SET)
2619                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2620                         else
2621                                 ptlrpc_set_add_req(rqset, req);
2622                 } else if (intent) {
2623                         ptlrpc_req_finished(req);
2624                 }
2625                 RETURN(rc);
2626         }
2627
2628         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2629         if (intent)
2630                 ptlrpc_req_finished(req);
2631
2632         RETURN(rc);
2633 }
2634
2635 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2636                        struct ldlm_enqueue_info *einfo,
2637                        struct ptlrpc_request_set *rqset)
2638 {
2639         struct ldlm_res_id res_id;
2640         int rc;
2641         ENTRY;
2642
2643         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2644         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2645                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2646                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2647                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2648                               rqset, rqset != NULL, 0);
2649         RETURN(rc);
2650 }
2651
2652 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2653                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2654                    int *flags, void *data, struct lustre_handle *lockh,
2655                    int unref)
2656 {
2657         struct obd_device *obd = exp->exp_obd;
2658         int lflags = *flags;
2659         ldlm_mode_t rc;
2660         ENTRY;
2661
2662         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2663                 RETURN(-EIO);
2664
2665         /* Filesystem lock extents are extended to page boundaries so that
2666          * dealing with the page cache is a little smoother */
2667         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2668         policy->l_extent.end |= ~CFS_PAGE_MASK;
2669
2670         /* Next, search for already existing extent locks that will cover us */
2671         /* If we're trying to read, we also search for an existing PW lock.  The
2672          * VFS and page cache already protect us locally, so lots of readers/
2673          * writers can share a single PW lock. */
2674         rc = mode;
2675         if (mode == LCK_PR)
2676                 rc |= LCK_PW;
2677         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2678                              res_id, type, policy, rc, lockh, unref);
2679         if (rc) {
2680                 if (data != NULL) {
2681                         if (!osc_set_data_with_check(lockh, data)) {
2682                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2683                                         ldlm_lock_decref(lockh, rc);
2684                                 RETURN(0);
2685                         }
2686                 }
2687                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2688                         ldlm_lock_addref(lockh, LCK_PR);
2689                         ldlm_lock_decref(lockh, LCK_PW);
2690                 }
2691                 RETURN(rc);
2692         }
2693         RETURN(rc);
2694 }
2695
2696 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2697 {
2698         ENTRY;
2699
2700         if (unlikely(mode == LCK_GROUP))
2701                 ldlm_lock_decref_and_cancel(lockh, mode);
2702         else
2703                 ldlm_lock_decref(lockh, mode);
2704
2705         RETURN(0);
2706 }
2707
2708 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2709                       __u32 mode, struct lustre_handle *lockh)
2710 {
2711         ENTRY;
2712         RETURN(osc_cancel_base(lockh, mode));
2713 }
2714
2715 static int osc_cancel_unused(struct obd_export *exp,
2716                              struct lov_stripe_md *lsm,
2717                              ldlm_cancel_flags_t flags,
2718                              void *opaque)
2719 {
2720         struct obd_device *obd = class_exp2obd(exp);
2721         struct ldlm_res_id res_id, *resp = NULL;
2722
2723         if (lsm != NULL) {
2724                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2725                 resp = &res_id;
2726         }
2727
2728         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2729 }
2730
2731 static int osc_statfs_interpret(const struct lu_env *env,
2732                                 struct ptlrpc_request *req,
2733                                 struct osc_async_args *aa, int rc)
2734 {
2735         struct obd_statfs *msfs;
2736         ENTRY;
2737
2738         if (rc == -EBADR)
2739                 /* The request has in fact never been sent
2740                  * due to issues at a higher level (LOV).
2741                  * Exit immediately since the caller is
2742                  * aware of the problem and takes care
2743                  * of the clean up */
2744                  RETURN(rc);
2745
2746         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2747             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2748                 GOTO(out, rc = 0);
2749
2750         if (rc != 0)
2751                 GOTO(out, rc);
2752
2753         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2754         if (msfs == NULL) {
2755                 GOTO(out, rc = -EPROTO);
2756         }
2757
2758         *aa->aa_oi->oi_osfs = *msfs;
2759 out:
2760         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2761         RETURN(rc);
2762 }
2763
2764 static int osc_statfs_async(struct obd_export *exp,
2765                             struct obd_info *oinfo, __u64 max_age,
2766                             struct ptlrpc_request_set *rqset)
2767 {
2768         struct obd_device     *obd = class_exp2obd(exp);
2769         struct ptlrpc_request *req;
2770         struct osc_async_args *aa;
2771         int                    rc;
2772         ENTRY;
2773
2774         /* We could possibly pass max_age in the request (as an absolute
2775          * timestamp or a "seconds.usec ago") so the target can avoid doing
2776          * extra calls into the filesystem if that isn't necessary (e.g.
2777          * during mount that would help a bit).  Having relative timestamps
2778          * is not so great if request processing is slow, while absolute
2779          * timestamps are not ideal because they need time synchronization. */
2780         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2781         if (req == NULL)
2782                 RETURN(-ENOMEM);
2783
2784         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2785         if (rc) {
2786                 ptlrpc_request_free(req);
2787                 RETURN(rc);
2788         }
2789         ptlrpc_request_set_replen(req);
2790         req->rq_request_portal = OST_CREATE_PORTAL;
2791         ptlrpc_at_set_req_timeout(req);
2792
2793         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2794                 /* procfs requests not want stat in wait for avoid deadlock */
2795                 req->rq_no_resend = 1;
2796                 req->rq_no_delay = 1;
2797         }
2798
2799         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2800         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2801         aa = ptlrpc_req_async_args(req);
2802         aa->aa_oi = oinfo;
2803
2804         ptlrpc_set_add_req(rqset, req);
2805         RETURN(0);
2806 }
2807
2808 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2809                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2810 {
2811         struct obd_device     *obd = class_exp2obd(exp);
2812         struct obd_statfs     *msfs;
2813         struct ptlrpc_request *req;
2814         struct obd_import     *imp = NULL;
2815         int rc;
2816         ENTRY;
2817
2818         /*Since the request might also come from lprocfs, so we need
2819          *sync this with client_disconnect_export Bug15684*/
2820         down_read(&obd->u.cli.cl_sem);
2821         if (obd->u.cli.cl_import)
2822                 imp = class_import_get(obd->u.cli.cl_import);
2823         up_read(&obd->u.cli.cl_sem);
2824         if (!imp)
2825                 RETURN(-ENODEV);
2826
2827         /* We could possibly pass max_age in the request (as an absolute
2828          * timestamp or a "seconds.usec ago") so the target can avoid doing
2829          * extra calls into the filesystem if that isn't necessary (e.g.
2830          * during mount that would help a bit).  Having relative timestamps
2831          * is not so great if request processing is slow, while absolute
2832          * timestamps are not ideal because they need time synchronization. */
2833         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2834
2835         class_import_put(imp);
2836
2837         if (req == NULL)
2838                 RETURN(-ENOMEM);
2839
2840         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2841         if (rc) {
2842                 ptlrpc_request_free(req);
2843                 RETURN(rc);
2844         }
2845         ptlrpc_request_set_replen(req);
2846         req->rq_request_portal = OST_CREATE_PORTAL;
2847         ptlrpc_at_set_req_timeout(req);
2848
2849         if (flags & OBD_STATFS_NODELAY) {
2850                 /* procfs requests not want stat in wait for avoid deadlock */
2851                 req->rq_no_resend = 1;
2852                 req->rq_no_delay = 1;
2853         }
2854
2855         rc = ptlrpc_queue_wait(req);
2856         if (rc)
2857                 GOTO(out, rc);
2858
2859         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2860         if (msfs == NULL) {
2861                 GOTO(out, rc = -EPROTO);
2862         }
2863
2864         *osfs = *msfs;
2865
2866         EXIT;
2867  out:
2868         ptlrpc_req_finished(req);
2869         return rc;
2870 }
2871
2872 /* Retrieve object striping information.
2873  *
2874  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2875  * the maximum number of OST indices which will fit in the user buffer.
2876  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2877  */
2878 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2879 {
2880         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2881         struct lov_user_md_v3 lum, *lumk;
2882         struct lov_user_ost_data_v1 *lmm_objects;
2883         int rc = 0, lum_size;
2884         ENTRY;
2885
2886         if (!lsm)
2887                 RETURN(-ENODATA);
2888
2889         /* we only need the header part from user space to get lmm_magic and
2890          * lmm_stripe_count, (the header part is common to v1 and v3) */
2891         lum_size = sizeof(struct lov_user_md_v1);
2892         if (cfs_copy_from_user(&lum, lump, lum_size))
2893                 RETURN(-EFAULT);
2894
2895         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2896             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2897                 RETURN(-EINVAL);
2898
2899         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2900         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2901         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2902         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2903
2904         /* we can use lov_mds_md_size() to compute lum_size
2905          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2906         if (lum.lmm_stripe_count > 0) {
2907                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2908                 OBD_ALLOC(lumk, lum_size);
2909                 if (!lumk)
2910                         RETURN(-ENOMEM);
2911
2912                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2913                         lmm_objects =
2914                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2915                 else
2916                         lmm_objects = &(lumk->lmm_objects[0]);
2917                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2918         } else {
2919                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2920                 lumk = &lum;
2921         }
2922
2923         lumk->lmm_oi = lsm->lsm_oi;
2924         lumk->lmm_stripe_count = 1;
2925
2926         if (cfs_copy_to_user(lump, lumk, lum_size))
2927                 rc = -EFAULT;
2928
2929         if (lumk != &lum)
2930                 OBD_FREE(lumk, lum_size);
2931
2932         RETURN(rc);
2933 }
2934
2935
2936 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2937                          void *karg, void *uarg)
2938 {
2939         struct obd_device *obd = exp->exp_obd;
2940         struct obd_ioctl_data *data = karg;
2941         int err = 0;
2942         ENTRY;
2943
2944         if (!cfs_try_module_get(THIS_MODULE)) {
2945                 CERROR("Can't get module. Is it alive?");
2946                 return -EINVAL;
2947         }
2948         switch (cmd) {
2949         case OBD_IOC_LOV_GET_CONFIG: {
2950                 char *buf;
2951                 struct lov_desc *desc;
2952                 struct obd_uuid uuid;
2953
2954                 buf = NULL;
2955                 len = 0;
2956                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2957                         GOTO(out, err = -EINVAL);
2958
2959                 data = (struct obd_ioctl_data *)buf;
2960
2961                 if (sizeof(*desc) > data->ioc_inllen1) {
2962                         obd_ioctl_freedata(buf, len);
2963                         GOTO(out, err = -EINVAL);
2964                 }
2965
2966                 if (data->ioc_inllen2 < sizeof(uuid)) {
2967                         obd_ioctl_freedata(buf, len);
2968                         GOTO(out, err = -EINVAL);
2969                 }
2970
2971                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2972                 desc->ld_tgt_count = 1;
2973                 desc->ld_active_tgt_count = 1;
2974                 desc->ld_default_stripe_count = 1;
2975                 desc->ld_default_stripe_size = 0;
2976                 desc->ld_default_stripe_offset = 0;
2977                 desc->ld_pattern = 0;
2978                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2979
2980                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2981
2982                 err = cfs_copy_to_user((void *)uarg, buf, len);
2983                 if (err)
2984                         err = -EFAULT;
2985                 obd_ioctl_freedata(buf, len);
2986                 GOTO(out, err);
2987         }
2988         case LL_IOC_LOV_SETSTRIPE:
2989                 err = obd_alloc_memmd(exp, karg);
2990                 if (err > 0)
2991                         err = 0;
2992                 GOTO(out, err);
2993         case LL_IOC_LOV_GETSTRIPE:
2994                 err = osc_getstripe(karg, uarg);
2995                 GOTO(out, err);
2996         case OBD_IOC_CLIENT_RECOVER:
2997                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2998                                             data->ioc_inlbuf1, 0);
2999                 if (err > 0)
3000                         err = 0;
3001                 GOTO(out, err);
3002         case IOC_OSC_SET_ACTIVE:
3003                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3004                                                data->ioc_offset);
3005                 GOTO(out, err);
3006         case OBD_IOC_POLL_QUOTACHECK:
3007                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3008                 GOTO(out, err);
3009         case OBD_IOC_PING_TARGET:
3010                 err = ptlrpc_obd_ping(obd);
3011                 GOTO(out, err);
3012         default:
3013                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3014                        cmd, cfs_curproc_comm());
3015                 GOTO(out, err = -ENOTTY);
3016         }
3017 out:
3018         cfs_module_put(THIS_MODULE);
3019         return err;
3020 }
3021
3022 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3023                         obd_count keylen, void *key, __u32 *vallen, void *val,
3024                         struct lov_stripe_md *lsm)
3025 {
3026         ENTRY;
3027         if (!vallen || !val)
3028                 RETURN(-EFAULT);
3029
3030         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3031                 __u32 *stripe = val;
3032                 *vallen = sizeof(*stripe);
3033                 *stripe = 0;
3034                 RETURN(0);
3035         } else if (KEY_IS(KEY_LAST_ID)) {
3036                 struct ptlrpc_request *req;
3037                 obd_id                *reply;
3038                 char                  *tmp;
3039                 int                    rc;
3040
3041                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3042                                            &RQF_OST_GET_INFO_LAST_ID);
3043                 if (req == NULL)
3044                         RETURN(-ENOMEM);
3045
3046                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3047                                      RCL_CLIENT, keylen);
3048                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3049                 if (rc) {
3050                         ptlrpc_request_free(req);
3051                         RETURN(rc);
3052                 }
3053
3054                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3055                 memcpy(tmp, key, keylen);
3056
3057                 req->rq_no_delay = req->rq_no_resend = 1;
3058                 ptlrpc_request_set_replen(req);
3059                 rc = ptlrpc_queue_wait(req);
3060                 if (rc)
3061                         GOTO(out, rc);
3062
3063                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3064                 if (reply == NULL)
3065                         GOTO(out, rc = -EPROTO);
3066
3067                 *((obd_id *)val) = *reply;
3068         out:
3069                 ptlrpc_req_finished(req);
3070                 RETURN(rc);
3071         } else if (KEY_IS(KEY_FIEMAP)) {
3072                 struct ptlrpc_request *req;
3073                 struct ll_user_fiemap *reply;
3074                 char *tmp;
3075                 int rc;
3076
3077                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3078                                            &RQF_OST_GET_INFO_FIEMAP);
3079                 if (req == NULL)
3080                         RETURN(-ENOMEM);
3081
3082                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3083                                      RCL_CLIENT, keylen);
3084                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3085                                      RCL_CLIENT, *vallen);
3086                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3087                                      RCL_SERVER, *vallen);
3088
3089                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3090                 if (rc) {
3091                         ptlrpc_request_free(req);
3092                         RETURN(rc);
3093                 }
3094
3095                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3096                 memcpy(tmp, key, keylen);
3097                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3098                 memcpy(tmp, val, *vallen);
3099
3100                 ptlrpc_request_set_replen(req);
3101                 rc = ptlrpc_queue_wait(req);
3102                 if (rc)
3103                         GOTO(out1, rc);
3104
3105                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3106                 if (reply == NULL)
3107                         GOTO(out1, rc = -EPROTO);
3108
3109                 memcpy(val, reply, *vallen);
3110         out1:
3111                 ptlrpc_req_finished(req);
3112
3113                 RETURN(rc);
3114         }
3115
3116         RETURN(-EINVAL);
3117 }
3118
3119 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3120                               obd_count keylen, void *key, obd_count vallen,
3121                               void *val, struct ptlrpc_request_set *set)
3122 {
3123         struct ptlrpc_request *req;
3124         struct obd_device     *obd = exp->exp_obd;
3125         struct obd_import     *imp = class_exp2cliimp(exp);
3126         char                  *tmp;
3127         int                    rc;
3128         ENTRY;
3129
3130         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3131
3132         if (KEY_IS(KEY_CHECKSUM)) {
3133                 if (vallen != sizeof(int))
3134                         RETURN(-EINVAL);
3135                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3136                 RETURN(0);
3137         }
3138
3139         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3140                 sptlrpc_conf_client_adapt(obd);
3141                 RETURN(0);
3142         }
3143
3144         if (KEY_IS(KEY_FLUSH_CTX)) {
3145                 sptlrpc_import_flush_my_ctx(imp);
3146                 RETURN(0);
3147         }
3148
3149         if (KEY_IS(KEY_CACHE_SET)) {
3150                 struct client_obd *cli = &obd->u.cli;
3151
3152                 LASSERT(cli->cl_cache == NULL); /* only once */
3153                 cli->cl_cache = (struct cl_client_cache *)val;
3154                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3155                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3156
3157                 /* add this osc into entity list */
3158                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3159                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3160                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3161                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3162
3163                 RETURN(0);
3164         }
3165
3166         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3167                 struct client_obd *cli = &obd->u.cli;
3168                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3169                 int target = *(int *)val;
3170
3171                 nr = osc_lru_shrink(cli, min(nr, target));
3172                 *(int *)val -= nr;
3173                 RETURN(0);
3174         }
3175
3176         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3177                 RETURN(-EINVAL);
3178
3179         /* We pass all other commands directly to OST. Since nobody calls osc
3180            methods directly and everybody is supposed to go through LOV, we