Whamcloud - gitweb
LU-2139 osc: Use SOFT_SYNC to urge server commit
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
188                              oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217                                      aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
297                              &body->oa);
298
299         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
342                              &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
365                              &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461
462         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
484
485         oa->o_blksize = cli_brw_size(exp->exp_obd);
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_oi = oa->o_oi;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_fsync_args *fa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *fa->fa_oi->oi_oa = body->oa;
589 out:
590         rc = fa->fa_upcall(fa->fa_cookie, rc);
591         RETURN(rc);
592 }
593
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595                   obd_enqueue_update_f upcall, void *cookie,
596                   struct ptlrpc_request_set *rqset)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_fsync_args *fa;
601         int                    rc;
602         ENTRY;
603
604         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605         if (req == NULL)
606                 RETURN(-ENOMEM);
607
608         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
610         if (rc) {
611                 ptlrpc_request_free(req);
612                 RETURN(rc);
613         }
614
615         /* overload the size and blocks fields in the oa with start/end */
616         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
617         LASSERT(body);
618         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
619                              oinfo->oi_oa);
620         osc_pack_capa(req, body, oinfo->oi_capa);
621
622         ptlrpc_request_set_replen(req);
623         req->rq_interpret_reply = osc_sync_interpret;
624
625         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626         fa = ptlrpc_req_async_args(req);
627         fa->fa_oi = oinfo;
628         fa->fa_upcall = upcall;
629         fa->fa_cookie = cookie;
630
631         if (rqset == PTLRPCD_SET)
632                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
633         else
634                 ptlrpc_set_add_req(rqset, req);
635
636         RETURN (0);
637 }
638
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640                     struct obd_info *oinfo, obd_size start, obd_size end,
641                     struct ptlrpc_request_set *set)
642 {
643         ENTRY;
644
645         if (!oinfo->oi_oa) {
646                 CDEBUG(D_INFO, "oa NULL\n");
647                 RETURN(-EINVAL);
648         }
649
650         oinfo->oi_oa->o_size = start;
651         oinfo->oi_oa->o_blocks = end;
652         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
653
654         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    cfs_list_t *cancels,
662                                    ldlm_mode_t mode, __u64 lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668         ENTRY;
669
670         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671          * export) but disabled through procfs (flag in NS).
672          *
673          * This distinguishes from a case when ELC is not supported originally,
674          * when we still want to cancel locks in advance and just cancel them
675          * locally, without sending any RPC. */
676         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677                 RETURN(0);
678
679         ostid_build_res_name(&oa->o_oi, &res_id);
680         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681         if (res == NULL)
682                 RETURN(0);
683
684         LDLM_RESOURCE_ADDREF(res);
685         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686                                            lock_flags, 0, NULL);
687         LDLM_RESOURCE_DELREF(res);
688         ldlm_resource_putref(res);
689         RETURN(count);
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693                                  struct ptlrpc_request *req, void *data,
694                                  int rc)
695 {
696         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698         cfs_atomic_dec(&cli->cl_destroy_in_flight);
699         wake_up(&cli->cl_destroy_waitq);
700         return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
706             cli->cl_max_rpcs_in_flight) {
707                 /* The destroy request can be sent */
708                 return 1;
709         }
710         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
711             cli->cl_max_rpcs_in_flight) {
712                 /*
713                  * The counter has been modified between the two atomic
714                  * operations.
715                  */
716                 wake_up(&cli->cl_destroy_waitq);
717         }
718         return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722                struct obdo *oa, struct lov_stripe_md **ea,
723                struct obd_trans_info *oti)
724 {
725         int rc = 0;
726         ENTRY;
727
728         LASSERT(oa);
729         LASSERT(ea);
730         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
731
732         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733             oa->o_flags == OBD_FL_RECREATE_OBJS) {
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735         }
736
737         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738                 RETURN(osc_real_create(exp, oa, ea, oti));
739
740         /* we should not get here anymore */
741         LBUG();
742
743         RETURN(rc);
744 }
745
746 /* Destroy requests can be async always on the client, and we don't even really
747  * care about the return code since the client cannot do anything at all about
748  * a destroy failure.
749  * When the MDS is unlinking a filename, it saves the file objects into a
750  * recovery llog, and these object records are cancelled when the OST reports
751  * they were destroyed and sync'd to disk (i.e. transaction committed).
752  * If the client dies, or the OST is down when the object should be destroyed,
753  * the records are not cancelled, and when the OST reconnects to the MDS next,
754  * it will retrieve the llog unlink logs and then sends the log cancellation
755  * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757                        struct obdo *oa, struct lov_stripe_md *ea,
758                        struct obd_trans_info *oti, struct obd_export *md_export,
759                        void *capa)
760 {
761         struct client_obd     *cli = &exp->exp_obd->u.cli;
762         struct ptlrpc_request *req;
763         struct ost_body       *body;
764         CFS_LIST_HEAD(cancels);
765         int rc, count;
766         ENTRY;
767
768         if (!oa) {
769                 CDEBUG(D_INFO, "oa NULL\n");
770                 RETURN(-EINVAL);
771         }
772
773         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774                                         LDLM_FL_DISCARD_DATA);
775
776         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
777         if (req == NULL) {
778                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779                 RETURN(-ENOMEM);
780         }
781
782         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
784                                0, &cancels, count);
785         if (rc) {
786                 ptlrpc_request_free(req);
787                 RETURN(rc);
788         }
789
790         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791         ptlrpc_at_set_req_timeout(req);
792
793         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794                 oa->o_lcookie = *oti->oti_logcookies;
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
798
799         osc_pack_capa(req, body, (struct obd_capa *)capa);
800         ptlrpc_request_set_replen(req);
801
802         /* If osc_destory is for destroying the unlink orphan,
803          * sent from MDT to OST, which should not be blocked here,
804          * because the process might be triggered by ptlrpcd, and
805          * it is not good to block ptlrpcd thread (b=16006)*/
806         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807                 req->rq_interpret_reply = osc_destroy_interpret;
808                 if (!osc_can_send_destroy(cli)) {
809                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810                                                           NULL);
811
812                         /*
813                          * Wait until the number of on-going destroy RPCs drops
814                          * under max_rpc_in_flight
815                          */
816                         l_wait_event_exclusive(cli->cl_destroy_waitq,
817                                                osc_can_send_destroy(cli), &lwi);
818                 }
819         }
820
821         /* Do not wait for response */
822         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823         RETURN(0);
824 }
825
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
827                                 long writing_bytes)
828 {
829         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
830
831         LASSERT(!(oa->o_valid & bits));
832
833         oa->o_valid |= bits;
834         client_obd_list_lock(&cli->cl_loi_list_lock);
835         oa->o_dirty = cli->cl_dirty;
836         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837                      cli->cl_dirty_max)) {
838                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) +
842                             cfs_atomic_read(&obd_dirty_pages) -
843                             cfs_atomic_read(&obd_dirty_transit_pages) >
844                             (long)(obd_max_dirty_pages + 1))) {
845                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
846                  * not covered by a lock thus they may safely race and trip
847                  * this CERROR() unless we add in a small fudge factor (+1). */
848                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
849                        cli->cl_import->imp_obd->obd_name,
850                        cfs_atomic_read(&obd_unstable_pages),
851                        cfs_atomic_read(&obd_dirty_pages),
852                        cfs_atomic_read(&obd_dirty_transit_pages),
853                        obd_max_dirty_pages);
854                 oa->o_undirty = 0;
855         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
856                 CERROR("dirty %lu - dirty_max %lu too big???\n",
857                        cli->cl_dirty, cli->cl_dirty_max);
858                 oa->o_undirty = 0;
859         } else {
860                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
861                                       PAGE_CACHE_SHIFT) *
862                                      (cli->cl_max_rpcs_in_flight + 1);
863                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
864         }
865         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
866         oa->o_dropped = cli->cl_lost_grant;
867         cli->cl_lost_grant = 0;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
870                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
871
872 }
873
874 void osc_update_next_shrink(struct client_obd *cli)
875 {
876         cli->cl_next_shrink_grant =
877                 cfs_time_shift(cli->cl_grant_shrink_interval);
878         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
879                cli->cl_next_shrink_grant);
880 }
881
882 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
883 {
884         client_obd_list_lock(&cli->cl_loi_list_lock);
885         cli->cl_avail_grant += grant;
886         client_obd_list_unlock(&cli->cl_loi_list_lock);
887 }
888
889 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
890 {
891         if (body->oa.o_valid & OBD_MD_FLGRANT) {
892                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
893                 __osc_update_grant(cli, body->oa.o_grant);
894         }
895 }
896
897 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
898                               obd_count keylen, void *key, obd_count vallen,
899                               void *val, struct ptlrpc_request_set *set);
900
901 static int osc_shrink_grant_interpret(const struct lu_env *env,
902                                       struct ptlrpc_request *req,
903                                       void *aa, int rc)
904 {
905         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
906         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
907         struct ost_body *body;
908
909         if (rc != 0) {
910                 __osc_update_grant(cli, oa->o_grant);
911                 GOTO(out, rc);
912         }
913
914         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
915         LASSERT(body);
916         osc_update_grant(cli, body);
917 out:
918         OBDO_FREE(oa);
919         return rc;
920 }
921
922 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
923 {
924         client_obd_list_lock(&cli->cl_loi_list_lock);
925         oa->o_grant = cli->cl_avail_grant / 4;
926         cli->cl_avail_grant -= oa->o_grant;
927         client_obd_list_unlock(&cli->cl_loi_list_lock);
928         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
929                 oa->o_valid |= OBD_MD_FLFLAGS;
930                 oa->o_flags = 0;
931         }
932         oa->o_flags |= OBD_FL_SHRINK_GRANT;
933         osc_update_next_shrink(cli);
934 }
935
936 /* Shrink the current grant, either from some large amount to enough for a
937  * full set of in-flight RPCs, or if we have already shrunk to that limit
938  * then to enough for a single RPC.  This avoids keeping more grant than
939  * needed, and avoids shrinking the grant piecemeal. */
940 static int osc_shrink_grant(struct client_obd *cli)
941 {
942         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
943                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
944
945         client_obd_list_lock(&cli->cl_loi_list_lock);
946         if (cli->cl_avail_grant <= target_bytes)
947                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949
950         return osc_shrink_grant_to_target(cli, target_bytes);
951 }
952
953 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
954 {
955         int                     rc = 0;
956         struct ost_body        *body;
957         ENTRY;
958
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         /* Don't shrink if we are already above or below the desired limit
961          * We don't want to shrink below a single RPC, as that will negatively
962          * impact block allocation and long-term performance. */
963         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
964                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
965
966         if (target_bytes >= cli->cl_avail_grant) {
967                 client_obd_list_unlock(&cli->cl_loi_list_lock);
968                 RETURN(0);
969         }
970         client_obd_list_unlock(&cli->cl_loi_list_lock);
971
972         OBD_ALLOC_PTR(body);
973         if (!body)
974                 RETURN(-ENOMEM);
975
976         osc_announce_cached(cli, &body->oa, 0);
977
978         client_obd_list_lock(&cli->cl_loi_list_lock);
979         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
980         cli->cl_avail_grant = target_bytes;
981         client_obd_list_unlock(&cli->cl_loi_list_lock);
982         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
983                 body->oa.o_valid |= OBD_MD_FLFLAGS;
984                 body->oa.o_flags = 0;
985         }
986         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
987         osc_update_next_shrink(cli);
988
989         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
990                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
991                                 sizeof(*body), body, NULL);
992         if (rc != 0)
993                 __osc_update_grant(cli, body->oa.o_grant);
994         OBD_FREE_PTR(body);
995         RETURN(rc);
996 }
997
998 static int osc_should_shrink_grant(struct client_obd *client)
999 {
1000         cfs_time_t time = cfs_time_current();
1001         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1002
1003         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1004              OBD_CONNECT_GRANT_SHRINK) == 0)
1005                 return 0;
1006
1007         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1008                 /* Get the current RPC size directly, instead of going via:
1009                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1010                  * Keep comment here so that it can be found by searching. */
1011                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1012
1013                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1014                     client->cl_avail_grant > brw_size)
1015                         return 1;
1016                 else
1017                         osc_update_next_shrink(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1023 {
1024         struct client_obd *client;
1025
1026         cfs_list_for_each_entry(client, &item->ti_obd_list,
1027                                 cl_grant_shrink_list) {
1028                 if (osc_should_shrink_grant(client))
1029                         osc_shrink_grant(client);
1030         }
1031         return 0;
1032 }
1033
1034 static int osc_add_shrink_grant(struct client_obd *client)
1035 {
1036         int rc;
1037
1038         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1039                                        TIMEOUT_GRANT,
1040                                        osc_grant_shrink_grant_cb, NULL,
1041                                        &client->cl_grant_shrink_list);
1042         if (rc) {
1043                 CERROR("add grant client %s error %d\n",
1044                         client->cl_import->imp_obd->obd_name, rc);
1045                 return rc;
1046         }
1047         CDEBUG(D_CACHE, "add grant client %s \n",
1048                client->cl_import->imp_obd->obd_name);
1049         osc_update_next_shrink(client);
1050         return 0;
1051 }
1052
1053 static int osc_del_shrink_grant(struct client_obd *client)
1054 {
1055         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1056                                          TIMEOUT_GRANT);
1057 }
1058
1059 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1060 {
1061         /*
1062          * ocd_grant is the total grant amount we're expect to hold: if we've
1063          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1064          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1065          *
1066          * race is tolerable here: if we're evicted, but imp_state already
1067          * left EVICTED state, then cl_dirty must be 0 already.
1068          */
1069         client_obd_list_lock(&cli->cl_loi_list_lock);
1070         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1071                 cli->cl_avail_grant = ocd->ocd_grant;
1072         else
1073                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1074
1075         if (cli->cl_avail_grant < 0) {
1076                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1077                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1078                       ocd->ocd_grant, cli->cl_dirty);
1079                 /* workaround for servers which do not have the patch from
1080                  * LU-2679 */
1081                 cli->cl_avail_grant = ocd->ocd_grant;
1082         }
1083
1084         /* determine the appropriate chunk size used by osc_extent. */
1085         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1086         client_obd_list_unlock(&cli->cl_loi_list_lock);
1087
1088         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1089                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1090                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1091
1092         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093             cfs_list_empty(&cli->cl_grant_shrink_list))
1094                 osc_add_shrink_grant(cli);
1095 }
1096
1097 /* We assume that the reason this OSC got a short read is because it read
1098  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100  * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102                               struct brw_page **pga)
1103 {
1104         char *ptr;
1105         int i = 0;
1106
1107         /* skip bytes read OK */
1108         while (nob_read > 0) {
1109                 LASSERT (page_count > 0);
1110
1111                 if (pga[i]->count > nob_read) {
1112                         /* EOF inside this page */
1113                         ptr = kmap(pga[i]->pg) +
1114                                 (pga[i]->off & ~CFS_PAGE_MASK);
1115                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116                         kunmap(pga[i]->pg);
1117                         page_count--;
1118                         i++;
1119                         break;
1120                 }
1121
1122                 nob_read -= pga[i]->count;
1123                 page_count--;
1124                 i++;
1125         }
1126
1127         /* zero remaining pages */
1128         while (page_count-- > 0) {
1129                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130                 memset(ptr, 0, pga[i]->count);
1131                 kunmap(pga[i]->pg);
1132                 i++;
1133         }
1134 }
1135
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137                            int requested_nob, int niocount,
1138                            obd_count page_count, struct brw_page **pga)
1139 {
1140         int     i;
1141         __u32   *remote_rcs;
1142
1143         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144                                                   sizeof(*remote_rcs) *
1145                                                   niocount);
1146         if (remote_rcs == NULL) {
1147                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1148                 return(-EPROTO);
1149         }
1150
1151         /* return error if any niobuf was in error */
1152         for (i = 0; i < niocount; i++) {
1153                 if ((int)remote_rcs[i] < 0)
1154                         return(remote_rcs[i]);
1155
1156                 if (remote_rcs[i] != 0) {
1157                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158                                 i, remote_rcs[i], req);
1159                         return(-EPROTO);
1160                 }
1161         }
1162
1163         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165                        req->rq_bulk->bd_nob_transferred, requested_nob);
1166                 return(-EPROTO);
1167         }
1168
1169         return (0);
1170 }
1171
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1173 {
1174         if (p1->flag != p2->flag) {
1175                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1176                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1177                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1178
1179                 /* warn if we try to combine flags that we don't know to be
1180                  * safe to combine */
1181                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1182                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1183                               "report this at http://bugs.whamcloud.com/\n",
1184                               p1->flag, p2->flag);
1185                 }
1186                 return 0;
1187         }
1188
1189         return (p1->off + p1->count == p2->off);
1190 }
1191
1192 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1193                                    struct brw_page **pga, int opc,
1194                                    cksum_type_t cksum_type)
1195 {
1196         __u32                           cksum;
1197         int                             i = 0;
1198         struct cfs_crypto_hash_desc     *hdesc;
1199         unsigned int                    bufsize;
1200         int                             err;
1201         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1202
1203         LASSERT(pg_count > 0);
1204
1205         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1206         if (IS_ERR(hdesc)) {
1207                 CERROR("Unable to initialize checksum hash %s\n",
1208                        cfs_crypto_hash_name(cfs_alg));
1209                 return PTR_ERR(hdesc);
1210         }
1211
1212         while (nob > 0 && pg_count > 0) {
1213                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1214
1215                 /* corrupt the data before we compute the checksum, to
1216                  * simulate an OST->client data error */
1217                 if (i == 0 && opc == OST_READ &&
1218                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1219                         unsigned char *ptr = kmap(pga[i]->pg);
1220                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1221                         memcpy(ptr + off, "bad1", min(4, nob));
1222                         kunmap(pga[i]->pg);
1223                 }
1224                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1225                                   pga[i]->off & ~CFS_PAGE_MASK,
1226                                   count);
1227                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1228                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1229
1230                 nob -= pga[i]->count;
1231                 pg_count--;
1232                 i++;
1233         }
1234
1235         bufsize = 4;
1236         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1237
1238         if (err)
1239                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1240
1241         /* For sending we only compute the wrong checksum instead
1242          * of corrupting the data so it is still correct on a redo */
1243         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1244                 cksum++;
1245
1246         return cksum;
1247 }
1248
1249 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1250                                 struct lov_stripe_md *lsm, obd_count page_count,
1251                                 struct brw_page **pga,
1252                                 struct ptlrpc_request **reqp,
1253                                 struct obd_capa *ocapa, int reserve,
1254                                 int resend)
1255 {
1256         struct ptlrpc_request   *req;
1257         struct ptlrpc_bulk_desc *desc;
1258         struct ost_body         *body;
1259         struct obd_ioobj        *ioobj;
1260         struct niobuf_remote    *niobuf;
1261         int niocount, i, requested_nob, opc, rc;
1262         struct osc_brw_async_args *aa;
1263         struct req_capsule      *pill;
1264         struct brw_page *pg_prev;
1265
1266         ENTRY;
1267         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1268                 RETURN(-ENOMEM); /* Recoverable */
1269         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1270                 RETURN(-EINVAL); /* Fatal */
1271
1272         if ((cmd & OBD_BRW_WRITE) != 0) {
1273                 opc = OST_WRITE;
1274                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1275                                                 cli->cl_import->imp_rq_pool,
1276                                                 &RQF_OST_BRW_WRITE);
1277         } else {
1278                 opc = OST_READ;
1279                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1280         }
1281         if (req == NULL)
1282                 RETURN(-ENOMEM);
1283
1284         for (niocount = i = 1; i < page_count; i++) {
1285                 if (!can_merge_pages(pga[i - 1], pga[i]))
1286                         niocount++;
1287         }
1288
1289         pill = &req->rq_pill;
1290         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1291                              sizeof(*ioobj));
1292         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1293                              niocount * sizeof(*niobuf));
1294         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1295
1296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1297         if (rc) {
1298                 ptlrpc_request_free(req);
1299                 RETURN(rc);
1300         }
1301         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1302         ptlrpc_at_set_req_timeout(req);
1303         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1304          * retry logic */
1305         req->rq_no_retry_einprogress = 1;
1306
1307         desc = ptlrpc_prep_bulk_imp(req, page_count,
1308                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1309                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1310                 OST_BULK_PORTAL);
1311
1312         if (desc == NULL)
1313                 GOTO(out, rc = -ENOMEM);
1314         /* NB request now owns desc and will free it when it gets freed */
1315
1316         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1317         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1318         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1319         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1320
1321         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1322
1323         obdo_to_ioobj(oa, ioobj);
1324         ioobj->ioo_bufcnt = niocount;
1325         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1326          * that might be send for this request.  The actual number is decided
1327          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1328          * "max - 1" for old client compatibility sending "0", and also so the
1329          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1330         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1331         osc_pack_capa(req, body, ocapa);
1332         LASSERT(page_count > 0);
1333         pg_prev = pga[0];
1334         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1335                 struct brw_page *pg = pga[i];
1336                 int poff = pg->off & ~CFS_PAGE_MASK;
1337
1338                 LASSERT(pg->count > 0);
1339                 /* make sure there is no gap in the middle of page array */
1340                 LASSERTF(page_count == 1 ||
1341                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1342                           ergo(i > 0 && i < page_count - 1,
1343                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1344                           ergo(i == page_count - 1, poff == 0)),
1345                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1346                          i, page_count, pg, pg->off, pg->count);
1347 #ifdef __linux__
1348                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1349                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1350                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1351                          i, page_count,
1352                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1353                          pg_prev->pg, page_private(pg_prev->pg),
1354                          pg_prev->pg->index, pg_prev->off);
1355 #else
1356                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1357                          "i %d p_c %u\n", i, page_count);
1358 #endif
1359                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1360                         (pg->flag & OBD_BRW_SRVLOCK));
1361
1362                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1363                 requested_nob += pg->count;
1364
1365                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1366                         niobuf--;
1367                         niobuf->len += pg->count;
1368                 } else {
1369                         niobuf->offset = pg->off;
1370                         niobuf->len    = pg->count;
1371                         niobuf->flags  = pg->flag;
1372                 }
1373                 pg_prev = pg;
1374         }
1375
1376         LASSERTF((void *)(niobuf - niocount) ==
1377                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1378                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1379                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1380
1381         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1382         if (resend) {
1383                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1384                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1385                         body->oa.o_flags = 0;
1386                 }
1387                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1388         }
1389
1390         if (osc_should_shrink_grant(cli))
1391                 osc_shrink_grant_local(cli, &body->oa);
1392
1393         /* size[REQ_REC_OFF] still sizeof (*body) */
1394         if (opc == OST_WRITE) {
1395                 if (cli->cl_checksum &&
1396                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397                         /* store cl_cksum_type in a local variable since
1398                          * it can be changed via lprocfs */
1399                         cksum_type_t cksum_type = cli->cl_cksum_type;
1400
1401                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1402                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1403                                 body->oa.o_flags = 0;
1404                         }
1405                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1406                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1408                                                              page_count, pga,
1409                                                              OST_WRITE,
1410                                                              cksum_type);
1411                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1412                                body->oa.o_cksum);
1413                         /* save this in 'oa', too, for later checking */
1414                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1415                         oa->o_flags |= cksum_type_pack(cksum_type);
1416                 } else {
1417                         /* clear out the checksum flag, in case this is a
1418                          * resend but cl_checksum is no longer set. b=11238 */
1419                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1420                 }
1421                 oa->o_cksum = body->oa.o_cksum;
1422                 /* 1 RC per niobuf */
1423                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1424                                      sizeof(__u32) * niocount);
1425         } else {
1426                 if (cli->cl_checksum &&
1427                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1428                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1429                                 body->oa.o_flags = 0;
1430                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1431                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432                 }
1433         }
1434         ptlrpc_request_set_replen(req);
1435
1436         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1437         aa = ptlrpc_req_async_args(req);
1438         aa->aa_oa = oa;
1439         aa->aa_requested_nob = requested_nob;
1440         aa->aa_nio_count = niocount;
1441         aa->aa_page_count = page_count;
1442         aa->aa_resends = 0;
1443         aa->aa_ppga = pga;
1444         aa->aa_cli = cli;
1445         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1446         if (ocapa && reserve)
1447                 aa->aa_ocapa = capa_get(ocapa);
1448
1449         *reqp = req;
1450         RETURN(0);
1451
1452  out:
1453         ptlrpc_req_finished(req);
1454         RETURN(rc);
1455 }
1456
1457 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1458                                 __u32 client_cksum, __u32 server_cksum, int nob,
1459                                 obd_count page_count, struct brw_page **pga,
1460                                 cksum_type_t client_cksum_type)
1461 {
1462         __u32 new_cksum;
1463         char *msg;
1464         cksum_type_t cksum_type;
1465
1466         if (server_cksum == client_cksum) {
1467                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1468                 return 0;
1469         }
1470
1471         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1472                                        oa->o_flags : 0);
1473         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1474                                       cksum_type);
1475
1476         if (cksum_type != client_cksum_type)
1477                 msg = "the server did not use the checksum type specified in "
1478                       "the original request - likely a protocol problem";
1479         else if (new_cksum == server_cksum)
1480                 msg = "changed on the client after we checksummed it - "
1481                       "likely false positive due to mmap IO (bug 11742)";
1482         else if (new_cksum == client_cksum)
1483                 msg = "changed in transit before arrival at OST";
1484         else
1485                 msg = "changed in transit AND doesn't match the original - "
1486                       "likely false positive due to mmap IO (bug 11742)";
1487
1488         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1489                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1490                            msg, libcfs_nid2str(peer->nid),
1491                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1492                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1493                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1494                            POSTID(&oa->o_oi), pga[0]->off,
1495                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1496         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1497                "client csum now %x\n", client_cksum, client_cksum_type,
1498                server_cksum, cksum_type, new_cksum);
1499         return 1;
1500 }
1501
1502 /* Note rc enters this function as number of bytes transferred */
1503 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1504 {
1505         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1506         const lnet_process_id_t *peer =
1507                         &req->rq_import->imp_connection->c_peer;
1508         struct client_obd *cli = aa->aa_cli;
1509         struct ost_body *body;
1510         __u32 client_cksum = 0;
1511         ENTRY;
1512
1513         if (rc < 0 && rc != -EDQUOT) {
1514                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1515                 RETURN(rc);
1516         }
1517
1518         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1519         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1520         if (body == NULL) {
1521                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1522                 RETURN(-EPROTO);
1523         }
1524
1525         /* set/clear over quota flag for a uid/gid */
1526         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1529
1530                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1531                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1532                        body->oa.o_flags);
1533                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1534         }
1535
1536         osc_update_grant(cli, body);
1537
1538         if (rc < 0)
1539                 RETURN(rc);
1540
1541         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1543
1544         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1545                 if (rc > 0) {
1546                         CERROR("Unexpected +ve rc %d\n", rc);
1547                         RETURN(-EPROTO);
1548                 }
1549                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1550
1551                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1552                         RETURN(-EAGAIN);
1553
1554                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555                     check_write_checksum(&body->oa, peer, client_cksum,
1556                                          body->oa.o_cksum, aa->aa_requested_nob,
1557                                          aa->aa_page_count, aa->aa_ppga,
1558                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1559                         RETURN(-EAGAIN);
1560
1561                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1562                                      aa->aa_page_count, aa->aa_ppga);
1563                 GOTO(out, rc);
1564         }
1565
1566         /* The rest of this function executes only for OST_READs */
1567
1568         /* if unwrap_bulk failed, return -EAGAIN to retry */
1569         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1570         if (rc < 0)
1571                 GOTO(out, rc = -EAGAIN);
1572
1573         if (rc > aa->aa_requested_nob) {
1574                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1575                        aa->aa_requested_nob);
1576                 RETURN(-EPROTO);
1577         }
1578
1579         if (rc != req->rq_bulk->bd_nob_transferred) {
1580                 CERROR ("Unexpected rc %d (%d transferred)\n",
1581                         rc, req->rq_bulk->bd_nob_transferred);
1582                 return (-EPROTO);
1583         }
1584
1585         if (rc < aa->aa_requested_nob)
1586                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1587
1588         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1589                 static int cksum_counter;
1590                 __u32      server_cksum = body->oa.o_cksum;
1591                 char      *via;
1592                 char      *router;
1593                 cksum_type_t cksum_type;
1594
1595                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1596                                                body->oa.o_flags : 0);
1597                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1598                                                  aa->aa_ppga, OST_READ,
1599                                                  cksum_type);
1600
1601                 if (peer->nid == req->rq_bulk->bd_sender) {
1602                         via = router = "";
1603                 } else {
1604                         via = " via ";
1605                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1606                 }
1607
1608                 if (server_cksum == ~0 && rc > 0) {
1609                         CERROR("Protocol error: server %s set the 'checksum' "
1610                                "bit, but didn't send a checksum.  Not fatal, "
1611                                "but please notify on http://bugs.whamcloud.com/\n",
1612                                libcfs_nid2str(peer->nid));
1613                 } else if (server_cksum != client_cksum) {
1614                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1615                                            "%s%s%s inode "DFID" object "DOSTID
1616                                            " extent ["LPU64"-"LPU64"]\n",
1617                                            req->rq_import->imp_obd->obd_name,
1618                                            libcfs_nid2str(peer->nid),
1619                                            via, router,
1620                                            body->oa.o_valid & OBD_MD_FLFID ?
1621                                                 body->oa.o_parent_seq : (__u64)0,
1622                                            body->oa.o_valid & OBD_MD_FLFID ?
1623                                                 body->oa.o_parent_oid : 0,
1624                                            body->oa.o_valid & OBD_MD_FLFID ?
1625                                                 body->oa.o_parent_ver : 0,
1626                                            POSTID(&body->oa.o_oi),
1627                                            aa->aa_ppga[0]->off,
1628                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1629                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1630                                                                         1);
1631                         CERROR("client %x, server %x, cksum_type %x\n",
1632                                client_cksum, server_cksum, cksum_type);
1633                         cksum_counter = 0;
1634                         aa->aa_oa->o_cksum = client_cksum;
1635                         rc = -EAGAIN;
1636                 } else {
1637                         cksum_counter++;
1638                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1639                         rc = 0;
1640                 }
1641         } else if (unlikely(client_cksum)) {
1642                 static int cksum_missed;
1643
1644                 cksum_missed++;
1645                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1646                         CERROR("Checksum %u requested from %s but not sent\n",
1647                                cksum_missed, libcfs_nid2str(peer->nid));
1648         } else {
1649                 rc = 0;
1650         }
1651 out:
1652         if (rc >= 0)
1653                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1654                                      aa->aa_oa, &body->oa);
1655
1656         RETURN(rc);
1657 }
1658
1659 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1660                             struct lov_stripe_md *lsm,
1661                             obd_count page_count, struct brw_page **pga,
1662                             struct obd_capa *ocapa)
1663 {
1664         struct ptlrpc_request *req;
1665         int                    rc;
1666         wait_queue_head_t            waitq;
1667         int                    generation, resends = 0;
1668         struct l_wait_info     lwi;
1669
1670         ENTRY;
1671
1672         init_waitqueue_head(&waitq);
1673         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1674
1675 restart_bulk:
1676         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1677                                   page_count, pga, &req, ocapa, 0, resends);
1678         if (rc != 0)
1679                 return (rc);
1680
1681         if (resends) {
1682                 req->rq_generation_set = 1;
1683                 req->rq_import_generation = generation;
1684                 req->rq_sent = cfs_time_current_sec() + resends;
1685         }
1686
1687         rc = ptlrpc_queue_wait(req);
1688
1689         if (rc == -ETIMEDOUT && req->rq_resend) {
1690                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1691                 ptlrpc_req_finished(req);
1692                 goto restart_bulk;
1693         }
1694
1695         rc = osc_brw_fini_request(req, rc);
1696
1697         ptlrpc_req_finished(req);
1698         /* When server return -EINPROGRESS, client should always retry
1699          * regardless of the number of times the bulk was resent already.*/
1700         if (osc_recoverable_error(rc)) {
1701                 resends++;
1702                 if (rc != -EINPROGRESS &&
1703                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1704                         CERROR("%s: too many resend retries for object: "
1705                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1706                                POSTID(&oa->o_oi), rc);
1707                         goto out;
1708                 }
1709                 if (generation !=
1710                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1711                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1712                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1713                                POSTID(&oa->o_oi), rc);
1714                         goto out;
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1718                                        NULL);
1719                 l_wait_event(waitq, 0, &lwi);
1720
1721                 goto restart_bulk;
1722         }
1723 out:
1724         if (rc == -EAGAIN || rc == -EINPROGRESS)
1725                 rc = -EIO;
1726         RETURN (rc);
1727 }
1728
1729 static int osc_brw_redo_request(struct ptlrpc_request *request,
1730                                 struct osc_brw_async_args *aa, int rc)
1731 {
1732         struct ptlrpc_request *new_req;
1733         struct osc_brw_async_args *new_aa;
1734         struct osc_async_page *oap;
1735         ENTRY;
1736
1737         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1738                   "redo for recoverable error %d", rc);
1739
1740         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1741                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1742                                   aa->aa_cli, aa->aa_oa,
1743                                   NULL /* lsm unused by osc currently */,
1744                                   aa->aa_page_count, aa->aa_ppga,
1745                                   &new_req, aa->aa_ocapa, 0, 1);
1746         if (rc)
1747                 RETURN(rc);
1748
1749         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1750                 if (oap->oap_request != NULL) {
1751                         LASSERTF(request == oap->oap_request,
1752                                  "request %p != oap_request %p\n",
1753                                  request, oap->oap_request);
1754                         if (oap->oap_interrupted) {
1755                                 ptlrpc_req_finished(new_req);
1756                                 RETURN(-EINTR);
1757                         }
1758                 }
1759         }
1760         /* New request takes over pga and oaps from old request.
1761          * Note that copying a list_head doesn't work, need to move it... */
1762         aa->aa_resends++;
1763         new_req->rq_interpret_reply = request->rq_interpret_reply;
1764         new_req->rq_async_args = request->rq_async_args;
1765         new_req->rq_commit_cb = request->rq_commit_cb;
1766         /* cap resend delay to the current request timeout, this is similar to
1767          * what ptlrpc does (see after_reply()) */
1768         if (aa->aa_resends > new_req->rq_timeout)
1769                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1770         else
1771                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772         new_req->rq_generation_set = 1;
1773         new_req->rq_import_generation = request->rq_import_generation;
1774
1775         new_aa = ptlrpc_req_async_args(new_req);
1776
1777         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1779         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1780         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1781         new_aa->aa_resends = aa->aa_resends;
1782
1783         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1784                 if (oap->oap_request) {
1785                         ptlrpc_req_finished(oap->oap_request);
1786                         oap->oap_request = ptlrpc_request_addref(new_req);
1787                 }
1788         }
1789
1790         new_aa->aa_ocapa = aa->aa_ocapa;
1791         aa->aa_ocapa = NULL;
1792
1793         /* XXX: This code will run into problem if we're going to support
1794          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1795          * and wait for all of them to be finished. We should inherit request
1796          * set from old request. */
1797         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1798
1799         DEBUG_REQ(D_INFO, new_req, "new request");
1800         RETURN(0);
1801 }
1802
1803 /*
1804  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1805  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806  * fine for our small page arrays and doesn't require allocation.  its an
1807  * insertion sort that swaps elements that are strides apart, shrinking the
1808  * stride down until its '1' and the array is sorted.
1809  */
1810 static void sort_brw_pages(struct brw_page **array, int num)
1811 {
1812         int stride, i, j;
1813         struct brw_page *tmp;
1814
1815         if (num == 1)
1816                 return;
1817         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1818                 ;
1819
1820         do {
1821                 stride /= 3;
1822                 for (i = stride ; i < num ; i++) {
1823                         tmp = array[i];
1824                         j = i;
1825                         while (j >= stride && array[j - stride]->off > tmp->off) {
1826                                 array[j] = array[j - stride];
1827                                 j -= stride;
1828                         }
1829                         array[j] = tmp;
1830                 }
1831         } while (stride > 1);
1832 }
1833
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1835 {
1836         int count = 1;
1837         int offset;
1838         int i = 0;
1839
1840         LASSERT (pages > 0);
1841         offset = pg[i]->off & ~CFS_PAGE_MASK;
1842
1843         for (;;) {
1844                 pages--;
1845                 if (pages == 0)         /* that's all */
1846                         return count;
1847
1848                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1849                         return count;   /* doesn't end on page boundary */
1850
1851                 i++;
1852                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853                 if (offset != 0)        /* doesn't start on page boundary */
1854                         return count;
1855
1856                 count++;
1857         }
1858 }
1859
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1861 {
1862         struct brw_page **ppga;
1863         int i;
1864
1865         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1866         if (ppga == NULL)
1867                 return NULL;
1868
1869         for (i = 0; i < count; i++)
1870                 ppga[i] = pga + i;
1871         return ppga;
1872 }
1873
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1875 {
1876         LASSERT(ppga != NULL);
1877         OBD_FREE(ppga, sizeof(*ppga) * count);
1878 }
1879
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881                    obd_count page_count, struct brw_page *pga,
1882                    struct obd_trans_info *oti)
1883 {
1884         struct obdo *saved_oa = NULL;
1885         struct brw_page **ppga, **orig;
1886         struct obd_import *imp = class_exp2cliimp(exp);
1887         struct client_obd *cli;
1888         int rc, page_count_orig;
1889         ENTRY;
1890
1891         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892         cli = &imp->imp_obd->u.cli;
1893
1894         if (cmd & OBD_BRW_CHECK) {
1895                 /* The caller just wants to know if there's a chance that this
1896                  * I/O can succeed */
1897
1898                 if (imp->imp_invalid)
1899                         RETURN(-EIO);
1900                 RETURN(0);
1901         }
1902
1903         /* test_brw with a failed create can trip this, maybe others. */
1904         LASSERT(cli->cl_max_pages_per_rpc);
1905
1906         rc = 0;
1907
1908         orig = ppga = osc_build_ppga(pga, page_count);
1909         if (ppga == NULL)
1910                 RETURN(-ENOMEM);
1911         page_count_orig = page_count;
1912
1913         sort_brw_pages(ppga, page_count);
1914         while (page_count) {
1915                 obd_count pages_per_brw;
1916
1917                 if (page_count > cli->cl_max_pages_per_rpc)
1918                         pages_per_brw = cli->cl_max_pages_per_rpc;
1919                 else
1920                         pages_per_brw = page_count;
1921
1922                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1923
1924                 if (saved_oa != NULL) {
1925                         /* restore previously saved oa */
1926                         *oinfo->oi_oa = *saved_oa;
1927                 } else if (page_count > pages_per_brw) {
1928                         /* save a copy of oa (brw will clobber it) */
1929                         OBDO_ALLOC(saved_oa);
1930                         if (saved_oa == NULL)
1931                                 GOTO(out, rc = -ENOMEM);
1932                         *saved_oa = *oinfo->oi_oa;
1933                 }
1934
1935                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936                                       pages_per_brw, ppga, oinfo->oi_capa);
1937
1938                 if (rc != 0)
1939                         break;
1940
1941                 page_count -= pages_per_brw;
1942                 ppga += pages_per_brw;
1943         }
1944
1945 out:
1946         osc_release_ppga(orig, page_count_orig);
1947
1948         if (saved_oa != NULL)
1949                 OBDO_FREE(saved_oa);
1950
1951         RETURN(rc);
1952 }
1953
1954 static int brw_interpret(const struct lu_env *env,
1955                          struct ptlrpc_request *req, void *data, int rc)
1956 {
1957         struct osc_brw_async_args *aa = data;
1958         struct osc_extent *ext;
1959         struct osc_extent *tmp;
1960         struct cl_object  *obj = NULL;
1961         struct client_obd *cli = aa->aa_cli;
1962         ENTRY;
1963
1964         rc = osc_brw_fini_request(req, rc);
1965         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1966         /* When server return -EINPROGRESS, client should always retry
1967          * regardless of the number of times the bulk was resent already. */
1968         if (osc_recoverable_error(rc)) {
1969                 if (req->rq_import_generation !=
1970                     req->rq_import->imp_generation) {
1971                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1972                                ""DOSTID", rc = %d.\n",
1973                                req->rq_import->imp_obd->obd_name,
1974                                POSTID(&aa->aa_oa->o_oi), rc);
1975                 } else if (rc == -EINPROGRESS ||
1976                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1977                         rc = osc_brw_redo_request(req, aa, rc);
1978                 } else {
1979                         CERROR("%s: too many resent retries for object: "
1980                                ""LPU64":"LPU64", rc = %d.\n",
1981                                req->rq_import->imp_obd->obd_name,
1982                                POSTID(&aa->aa_oa->o_oi), rc);
1983                 }
1984
1985                 if (rc == 0)
1986                         RETURN(0);
1987                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1988                         rc = -EIO;
1989         }
1990
1991         if (aa->aa_ocapa) {
1992                 capa_put(aa->aa_ocapa);
1993                 aa->aa_ocapa = NULL;
1994         }
1995
1996         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1997                 if (obj == NULL && rc == 0) {
1998                         obj = osc2cl(ext->oe_obj);
1999                         cl_object_get(obj);
2000                 }
2001
2002                 cfs_list_del_init(&ext->oe_link);
2003                 osc_extent_finish(env, ext, 1, rc);
2004         }
2005         LASSERT(cfs_list_empty(&aa->aa_exts));
2006         LASSERT(cfs_list_empty(&aa->aa_oaps));
2007
2008         if (obj != NULL) {
2009                 struct obdo *oa = aa->aa_oa;
2010                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2011                 unsigned long valid = 0;
2012
2013                 LASSERT(rc == 0);
2014                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2015                         attr->cat_blocks = oa->o_blocks;
2016                         valid |= CAT_BLOCKS;
2017                 }
2018                 if (oa->o_valid & OBD_MD_FLMTIME) {
2019                         attr->cat_mtime = oa->o_mtime;
2020                         valid |= CAT_MTIME;
2021                 }
2022                 if (oa->o_valid & OBD_MD_FLATIME) {
2023                         attr->cat_atime = oa->o_atime;
2024                         valid |= CAT_ATIME;
2025                 }
2026                 if (oa->o_valid & OBD_MD_FLCTIME) {
2027                         attr->cat_ctime = oa->o_ctime;
2028                         valid |= CAT_CTIME;
2029                 }
2030                 if (valid != 0) {
2031                         cl_object_attr_lock(obj);
2032                         cl_object_attr_set(env, obj, attr, valid);
2033                         cl_object_attr_unlock(obj);
2034                 }
2035                 cl_object_put(env, obj);
2036         }
2037         OBDO_FREE(aa->aa_oa);
2038
2039         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2040                           req->rq_bulk->bd_nob_transferred);
2041         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2042         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2043
2044         client_obd_list_lock(&cli->cl_loi_list_lock);
2045         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2046          * is called so we know whether to go to sync BRWs or wait for more
2047          * RPCs to complete */
2048         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2049                 cli->cl_w_in_flight--;
2050         else
2051                 cli->cl_r_in_flight--;
2052         osc_wake_cache_waiters(cli);
2053         client_obd_list_unlock(&cli->cl_loi_list_lock);
2054
2055         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2056         RETURN(rc);
2057 }
2058
2059 static void brw_commit(struct ptlrpc_request *req)
2060 {
2061         spin_lock(&req->rq_lock);
2062         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2063          * this called via the rq_commit_cb, I need to ensure
2064          * osc_dec_unstable_pages is still called. Otherwise unstable
2065          * pages may be leaked. */
2066         if (req->rq_unstable) {
2067                 spin_unlock(&req->rq_lock);
2068                 osc_dec_unstable_pages(req);
2069                 spin_lock(&req->rq_lock);
2070         } else {
2071                 req->rq_committed = 1;
2072         }
2073         spin_unlock(&req->rq_lock);
2074 }
2075
2076 /**
2077  * Build an RPC by the list of extent @ext_list. The caller must ensure
2078  * that the total pages in this list are NOT over max pages per RPC.
2079  * Extents in the list must be in OES_RPC state.
2080  */
2081 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2082                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2083 {
2084         struct ptlrpc_request           *req = NULL;
2085         struct osc_extent               *ext;
2086         struct brw_page                 **pga = NULL;
2087         struct osc_brw_async_args       *aa = NULL;
2088         struct obdo                     *oa = NULL;
2089         struct osc_async_page           *oap;
2090         struct osc_async_page           *tmp;
2091         struct cl_req                   *clerq = NULL;
2092         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2093                                                                       CRT_READ;
2094         struct ldlm_lock                *lock = NULL;
2095         struct cl_req_attr              *crattr = NULL;
2096         obd_off                         starting_offset = OBD_OBJECT_EOF;
2097         obd_off                         ending_offset = 0;
2098         int                             mpflag = 0;
2099         int                             mem_tight = 0;
2100         int                             page_count = 0;
2101         int                             i;
2102         int                             rc;
2103         CFS_LIST_HEAD(rpc_list);
2104
2105         ENTRY;
2106         LASSERT(!cfs_list_empty(ext_list));
2107
2108         /* add pages into rpc_list to build BRW rpc */
2109         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2110                 LASSERT(ext->oe_state == OES_RPC);
2111                 mem_tight |= ext->oe_memalloc;
2112                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2113                         ++page_count;
2114                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2115                         if (starting_offset > oap->oap_obj_off)
2116                                 starting_offset = oap->oap_obj_off;
2117                         else
2118                                 LASSERT(oap->oap_page_off == 0);
2119                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2120                                 ending_offset = oap->oap_obj_off +
2121                                                 oap->oap_count;
2122                         else
2123                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2124                                         PAGE_CACHE_SIZE);
2125                 }
2126         }
2127
2128         if (mem_tight)
2129                 mpflag = cfs_memory_pressure_get_and_set();
2130
2131         OBD_ALLOC(crattr, sizeof(*crattr));
2132         if (crattr == NULL)
2133                 GOTO(out, rc = -ENOMEM);
2134
2135         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2136         if (pga == NULL)
2137                 GOTO(out, rc = -ENOMEM);
2138
2139         OBDO_ALLOC(oa);
2140         if (oa == NULL)
2141                 GOTO(out, rc = -ENOMEM);
2142
2143         i = 0;
2144         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2145                 struct cl_page *page = oap2cl_page(oap);
2146                 if (clerq == NULL) {
2147                         clerq = cl_req_alloc(env, page, crt,
2148                                              1 /* only 1-object rpcs for now */);
2149                         if (IS_ERR(clerq))
2150                                 GOTO(out, rc = PTR_ERR(clerq));
2151                         lock = oap->oap_ldlm_lock;
2152                 }
2153                 if (mem_tight)
2154                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2155                 pga[i] = &oap->oap_brw_page;
2156                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2157                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2158                        pga[i]->pg, page_index(oap->oap_page), oap,
2159                        pga[i]->flag);
2160                 i++;
2161                 cl_req_page_add(env, clerq, page);
2162         }
2163
2164         /* always get the data for the obdo for the rpc */
2165         LASSERT(clerq != NULL);
2166         crattr->cra_oa = oa;
2167         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2168         if (lock) {
2169                 oa->o_handle = lock->l_remote_handle;
2170                 oa->o_valid |= OBD_MD_FLHANDLE;
2171         }
2172
2173         rc = cl_req_prep(env, clerq);
2174         if (rc != 0) {
2175                 CERROR("cl_req_prep failed: %d\n", rc);
2176                 GOTO(out, rc);
2177         }
2178
2179         sort_brw_pages(pga, page_count);
2180         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2181                         pga, &req, crattr->cra_capa, 1, 0);
2182         if (rc != 0) {
2183                 CERROR("prep_req failed: %d\n", rc);
2184                 GOTO(out, rc);
2185         }
2186
2187         req->rq_commit_cb = brw_commit;
2188         req->rq_interpret_reply = brw_interpret;
2189
2190         if (mem_tight != 0)
2191                 req->rq_memalloc = 1;
2192
2193         /* Need to update the timestamps after the request is built in case
2194          * we race with setattr (locally or in queue at OST).  If OST gets
2195          * later setattr before earlier BRW (as determined by the request xid),
2196          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2197          * way to do this in a single call.  bug 10150 */
2198         cl_req_attr_set(env, clerq, crattr,
2199                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2200
2201         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2202
2203         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2204         aa = ptlrpc_req_async_args(req);
2205         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2206         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2207         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2208         cfs_list_splice_init(ext_list, &aa->aa_exts);
2209         aa->aa_clerq = clerq;
2210
2211         /* queued sync pages can be torn down while the pages
2212          * were between the pending list and the rpc */
2213         tmp = NULL;
2214         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2215                 /* only one oap gets a request reference */
2216                 if (tmp == NULL)
2217                         tmp = oap;
2218                 if (oap->oap_interrupted && !req->rq_intr) {
2219                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2220                                         oap, req);
2221                         ptlrpc_mark_interrupted(req);
2222                 }
2223         }
2224         if (tmp != NULL)
2225                 tmp->oap_request = ptlrpc_request_addref(req);
2226
2227         client_obd_list_lock(&cli->cl_loi_list_lock);
2228         starting_offset >>= PAGE_CACHE_SHIFT;
2229         if (cmd == OBD_BRW_READ) {
2230                 cli->cl_r_in_flight++;
2231                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2232                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2233                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2234                                       starting_offset + 1);
2235         } else {
2236                 cli->cl_w_in_flight++;
2237                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2238                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2239                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2240                                       starting_offset + 1);
2241         }
2242         client_obd_list_unlock(&cli->cl_loi_list_lock);
2243
2244         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2245                   page_count, aa, cli->cl_r_in_flight,
2246                   cli->cl_w_in_flight);
2247
2248         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2249          * see which CPU/NUMA node the majority of pages were allocated
2250          * on, and try to assign the async RPC to the CPU core
2251          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2252          *
2253          * But on the other hand, we expect that multiple ptlrpcd
2254          * threads and the initial write sponsor can run in parallel,
2255          * especially when data checksum is enabled, which is CPU-bound
2256          * operation and single ptlrpcd thread cannot process in time.
2257          * So more ptlrpcd threads sharing BRW load
2258          * (with PDL_POLICY_ROUND) seems better.
2259          */
2260         ptlrpcd_add_req(req, pol, -1);
2261         rc = 0;
2262         EXIT;
2263
2264 out:
2265         if (mem_tight != 0)
2266                 cfs_memory_pressure_restore(mpflag);
2267
2268         if (crattr != NULL) {
2269                 capa_put(crattr->cra_capa);
2270                 OBD_FREE(crattr, sizeof(*crattr));
2271         }
2272
2273         if (rc != 0) {
2274                 LASSERT(req == NULL);
2275
2276                 if (oa)
2277                         OBDO_FREE(oa);
2278                 if (pga)
2279                         OBD_FREE(pga, sizeof(*pga) * page_count);
2280                 /* this should happen rarely and is pretty bad, it makes the
2281                  * pending list not follow the dirty order */
2282                 while (!cfs_list_empty(ext_list)) {
2283                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2284                                              oe_link);
2285                         cfs_list_del_init(&ext->oe_link);
2286                         osc_extent_finish(env, ext, 0, rc);
2287                 }
2288                 if (clerq && !IS_ERR(clerq))
2289                         cl_req_completion(env, clerq, rc);
2290         }
2291         RETURN(rc);
2292 }
2293
2294 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2295                                         struct ldlm_enqueue_info *einfo)
2296 {
2297         void *data = einfo->ei_cbdata;
2298         int set = 0;
2299
2300         LASSERT(lock != NULL);
2301         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2302         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2303         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2304         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2305
2306         lock_res_and_lock(lock);
2307         spin_lock(&osc_ast_guard);
2308
2309         if (lock->l_ast_data == NULL)
2310                 lock->l_ast_data = data;
2311         if (lock->l_ast_data == data)
2312                 set = 1;
2313
2314         spin_unlock(&osc_ast_guard);
2315         unlock_res_and_lock(lock);
2316
2317         return set;
2318 }
2319
2320 static int osc_set_data_with_check(struct lustre_handle *lockh,
2321                                    struct ldlm_enqueue_info *einfo)
2322 {
2323         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2324         int set = 0;
2325
2326         if (lock != NULL) {
2327                 set = osc_set_lock_data_with_check(lock, einfo);
2328                 LDLM_LOCK_PUT(lock);
2329         } else
2330                 CERROR("lockh %p, data %p - client evicted?\n",
2331                        lockh, einfo->ei_cbdata);
2332         return set;
2333 }
2334
2335 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2336                              ldlm_iterator_t replace, void *data)
2337 {
2338         struct ldlm_res_id res_id;
2339         struct obd_device *obd = class_exp2obd(exp);
2340
2341         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2342         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2343         return 0;
2344 }
2345
2346 /* find any ldlm lock of the inode in osc
2347  * return 0    not find
2348  *        1    find one
2349  *      < 0    error */
2350 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2351                            ldlm_iterator_t replace, void *data)
2352 {
2353         struct ldlm_res_id res_id;
2354         struct obd_device *obd = class_exp2obd(exp);
2355         int rc = 0;
2356
2357         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2358         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2359         if (rc == LDLM_ITER_STOP)
2360                 return(1);
2361         if (rc == LDLM_ITER_CONTINUE)
2362                 return(0);
2363         return(rc);
2364 }
2365
2366 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2367                             obd_enqueue_update_f upcall, void *cookie,
2368                             __u64 *flags, int agl, int rc)
2369 {
2370         int intent = *flags & LDLM_FL_HAS_INTENT;
2371         ENTRY;
2372
2373         if (intent) {
2374                 /* The request was created before ldlm_cli_enqueue call. */
2375                 if (rc == ELDLM_LOCK_ABORTED) {
2376                         struct ldlm_reply *rep;
2377                         rep = req_capsule_server_get(&req->rq_pill,
2378                                                      &RMF_DLM_REP);
2379
2380                         LASSERT(rep != NULL);
2381                         rep->lock_policy_res1 =
2382                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2383                         if (rep->lock_policy_res1)
2384                                 rc = rep->lock_policy_res1;
2385                 }
2386         }
2387
2388         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2389             (rc == 0)) {
2390                 *flags |= LDLM_FL_LVB_READY;
2391                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2392                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2393         }
2394
2395         /* Call the update callback. */
2396         rc = (*upcall)(cookie, rc);
2397         RETURN(rc);
2398 }
2399
2400 static int osc_enqueue_interpret(const struct lu_env *env,
2401                                  struct ptlrpc_request *req,
2402                                  struct osc_enqueue_args *aa, int rc)
2403 {
2404         struct ldlm_lock *lock;
2405         struct lustre_handle handle;
2406         __u32 mode;
2407         struct ost_lvb *lvb;
2408         __u32 lvb_len;
2409         __u64 *flags = aa->oa_flags;
2410
2411         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2412          * might be freed anytime after lock upcall has been called. */
2413         lustre_handle_copy(&handle, aa->oa_lockh);
2414         mode = aa->oa_ei->ei_mode;
2415
2416         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2417          * be valid. */
2418         lock = ldlm_handle2lock(&handle);
2419
2420         /* Take an additional reference so that a blocking AST that
2421          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2422          * to arrive after an upcall has been executed by
2423          * osc_enqueue_fini(). */
2424         ldlm_lock_addref(&handle, mode);
2425
2426         /* Let CP AST to grant the lock first. */
2427         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2428
2429         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2430                 lvb = NULL;
2431                 lvb_len = 0;
2432         } else {
2433                 lvb = aa->oa_lvb;
2434                 lvb_len = sizeof(*aa->oa_lvb);
2435         }
2436
2437         /* Complete obtaining the lock procedure. */
2438         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2439                                    mode, flags, lvb, lvb_len, &handle, rc);
2440         /* Complete osc stuff. */
2441         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2442                               flags, aa->oa_agl, rc);
2443
2444         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2445
2446         /* Release the lock for async request. */
2447         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2448                 /*
2449                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2450                  * not already released by
2451                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2452                  */
2453                 ldlm_lock_decref(&handle, mode);
2454
2455         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2456                  aa->oa_lockh, req, aa);
2457         ldlm_lock_decref(&handle, mode);
2458         LDLM_LOCK_PUT(lock);
2459         return rc;
2460 }
2461
2462 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2463                         struct lov_oinfo *loi, __u64 flags,
2464                         struct ost_lvb *lvb, __u32 mode, int rc)
2465 {
2466         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2467
2468         if (rc == ELDLM_OK) {
2469                 __u64 tmp;
2470
2471                 LASSERT(lock != NULL);
2472                 loi->loi_lvb = *lvb;
2473                 tmp = loi->loi_lvb.lvb_size;
2474                 /* Extend KMS up to the end of this lock and no further
2475                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2476                 if (tmp > lock->l_policy_data.l_extent.end)
2477                         tmp = lock->l_policy_data.l_extent.end + 1;
2478                 if (tmp >= loi->loi_kms) {
2479                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2480                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2481                         loi_kms_set(loi, tmp);
2482                 } else {
2483                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2484                                    LPU64"; leaving kms="LPU64", end="LPU64,
2485                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2486                                    lock->l_policy_data.l_extent.end);
2487                 }
2488                 ldlm_lock_allow_match(lock);
2489         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2490                 LASSERT(lock != NULL);
2491                 loi->loi_lvb = *lvb;
2492                 ldlm_lock_allow_match(lock);
2493                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2494                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2495                 rc = ELDLM_OK;
2496         }
2497
2498         if (lock != NULL) {
2499                 if (rc != ELDLM_OK)
2500                         ldlm_lock_fail_match(lock);
2501
2502                 LDLM_LOCK_PUT(lock);
2503         }
2504 }
2505 EXPORT_SYMBOL(osc_update_enqueue);
2506
2507 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2508
2509 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2510  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2511  * other synchronous requests, however keeping some locks and trying to obtain
2512  * others may take a considerable amount of time in a case of ost failure; and
2513  * when other sync requests do not get released lock from a client, the client
2514  * is excluded from the cluster -- such scenarious make the life difficult, so
2515  * release locks just after they are obtained. */
2516 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2517                      __u64 *flags, ldlm_policy_data_t *policy,
2518                      struct ost_lvb *lvb, int kms_valid,
2519                      obd_enqueue_update_f upcall, void *cookie,
2520                      struct ldlm_enqueue_info *einfo,
2521                      struct lustre_handle *lockh,
2522                      struct ptlrpc_request_set *rqset, int async, int agl)
2523 {
2524         struct obd_device *obd = exp->exp_obd;
2525         struct ptlrpc_request *req = NULL;
2526         int intent = *flags & LDLM_FL_HAS_INTENT;
2527         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2528         ldlm_mode_t mode;
2529         int rc;
2530         ENTRY;
2531
2532         /* Filesystem lock extents are extended to page boundaries so that
2533          * dealing with the page cache is a little smoother.  */
2534         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2535         policy->l_extent.end |= ~CFS_PAGE_MASK;
2536
2537         /*
2538          * kms is not valid when either object is completely fresh (so that no
2539          * locks are cached), or object was evicted. In the latter case cached
2540          * lock cannot be used, because it would prime inode state with
2541          * potentially stale LVB.
2542          */
2543         if (!kms_valid)
2544                 goto no_match;
2545
2546         /* Next, search for already existing extent locks that will cover us */
2547         /* If we're trying to read, we also search for an existing PW lock.  The
2548          * VFS and page cache already protect us locally, so lots of readers/
2549          * writers can share a single PW lock.
2550          *
2551          * There are problems with conversion deadlocks, so instead of
2552          * converting a read lock to a write lock, we'll just enqueue a new
2553          * one.
2554          *
2555          * At some point we should cancel the read lock instead of making them
2556          * send us a blocking callback, but there are problems with canceling
2557          * locks out from other users right now, too. */
2558         mode = einfo->ei_mode;
2559         if (einfo->ei_mode == LCK_PR)
2560                 mode |= LCK_PW;
2561         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2562                                einfo->ei_type, policy, mode, lockh, 0);
2563         if (mode) {
2564                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2565
2566                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2567                         /* For AGL, if enqueue RPC is sent but the lock is not
2568                          * granted, then skip to process this strpe.
2569                          * Return -ECANCELED to tell the caller. */
2570                         ldlm_lock_decref(lockh, mode);
2571                         LDLM_LOCK_PUT(matched);
2572                         RETURN(-ECANCELED);
2573                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2574                         *flags |= LDLM_FL_LVB_READY;
2575                         /* addref the lock only if not async requests and PW
2576                          * lock is matched whereas we asked for PR. */
2577                         if (!rqset && einfo->ei_mode != mode)
2578                                 ldlm_lock_addref(lockh, LCK_PR);
2579                         if (intent) {
2580                                 /* I would like to be able to ASSERT here that
2581                                  * rss <= kms, but I can't, for reasons which
2582                                  * are explained in lov_enqueue() */
2583                         }
2584
2585                         /* We already have a lock, and it's referenced.
2586                          *
2587                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2588                          * AGL upcall may change it to CLS_HELD directly. */
2589                         (*upcall)(cookie, ELDLM_OK);
2590
2591                         if (einfo->ei_mode != mode)
2592                                 ldlm_lock_decref(lockh, LCK_PW);
2593                         else if (rqset)
2594                                 /* For async requests, decref the lock. */
2595                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2596                         LDLM_LOCK_PUT(matched);
2597                         RETURN(ELDLM_OK);
2598                 } else {
2599                         ldlm_lock_decref(lockh, mode);
2600                         LDLM_LOCK_PUT(matched);
2601                 }
2602         }
2603
2604  no_match:
2605         if (intent) {
2606                 CFS_LIST_HEAD(cancels);
2607                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2608                                            &RQF_LDLM_ENQUEUE_LVB);
2609                 if (req == NULL)
2610                         RETURN(-ENOMEM);
2611
2612                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2613                 if (rc) {
2614                         ptlrpc_request_free(req);
2615                         RETURN(rc);
2616                 }
2617
2618                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2619                                      sizeof *lvb);
2620                 ptlrpc_request_set_replen(req);
2621         }
2622
2623         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2624         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2625
2626         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2627                               sizeof(*lvb), LVB_T_OST, lockh, async);
2628         if (rqset) {
2629                 if (!rc) {
2630                         struct osc_enqueue_args *aa;
2631                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2632                         aa = ptlrpc_req_async_args(req);
2633                         aa->oa_ei = einfo;
2634                         aa->oa_exp = exp;
2635                         aa->oa_flags  = flags;
2636                         aa->oa_upcall = upcall;
2637                         aa->oa_cookie = cookie;
2638                         aa->oa_lvb    = lvb;
2639                         aa->oa_lockh  = lockh;
2640                         aa->oa_agl    = !!agl;
2641
2642                         req->rq_interpret_reply =
2643                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2644                         if (rqset == PTLRPCD_SET)
2645                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2646                         else
2647                                 ptlrpc_set_add_req(rqset, req);
2648                 } else if (intent) {
2649                         ptlrpc_req_finished(req);
2650                 }
2651                 RETURN(rc);
2652         }
2653
2654         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2655         if (intent)
2656                 ptlrpc_req_finished(req);
2657
2658         RETURN(rc);
2659 }
2660
2661 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2662                        struct ldlm_enqueue_info *einfo,
2663                        struct ptlrpc_request_set *rqset)
2664 {
2665         struct ldlm_res_id res_id;
2666         int rc;
2667         ENTRY;
2668
2669         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2670         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2671                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2672                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2673                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2674                               rqset, rqset != NULL, 0);
2675         RETURN(rc);
2676 }
2677
2678 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2679                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2680                    __u64 *flags, void *data, struct lustre_handle *lockh,
2681                    int unref)
2682 {
2683         struct obd_device *obd = exp->exp_obd;
2684         __u64 lflags = *flags;
2685         ldlm_mode_t rc;
2686         ENTRY;
2687
2688         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2689                 RETURN(-EIO);
2690
2691         /* Filesystem lock extents are extended to page boundaries so that
2692          * dealing with the page cache is a little smoother */
2693         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2694         policy->l_extent.end |= ~CFS_PAGE_MASK;
2695
2696         /* Next, search for already existing extent locks that will cover us */
2697         /* If we're trying to read, we also search for an existing PW lock.  The
2698          * VFS and page cache already protect us locally, so lots of readers/
2699          * writers can share a single PW lock. */
2700         rc = mode;
2701         if (mode == LCK_PR)
2702                 rc |= LCK_PW;
2703         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2704                              res_id, type, policy, rc, lockh, unref);
2705         if (rc) {
2706                 if (data != NULL) {
2707                         if (!osc_set_data_with_check(lockh, data)) {
2708                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2709                                         ldlm_lock_decref(lockh, rc);
2710                                 RETURN(0);
2711                         }
2712                 }
2713                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2714                         ldlm_lock_addref(lockh, LCK_PR);
2715                         ldlm_lock_decref(lockh, LCK_PW);
2716                 }
2717                 RETURN(rc);
2718         }
2719         RETURN(rc);
2720 }
2721
2722 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2723 {
2724         ENTRY;
2725
2726         if (unlikely(mode == LCK_GROUP))
2727                 ldlm_lock_decref_and_cancel(lockh, mode);
2728         else
2729                 ldlm_lock_decref(lockh, mode);
2730
2731         RETURN(0);
2732 }
2733
2734 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2735                       __u32 mode, struct lustre_handle *lockh)
2736 {
2737         ENTRY;
2738         RETURN(osc_cancel_base(lockh, mode));
2739 }
2740
2741 static int osc_cancel_unused(struct obd_export *exp,
2742                              struct lov_stripe_md *lsm,
2743                              ldlm_cancel_flags_t flags,
2744                              void *opaque)
2745 {
2746         struct obd_device *obd = class_exp2obd(exp);
2747         struct ldlm_res_id res_id, *resp = NULL;
2748
2749         if (lsm != NULL) {
2750                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2751                 resp = &res_id;
2752         }
2753
2754         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2755 }
2756
2757 static int osc_statfs_interpret(const struct lu_env *env,
2758                                 struct ptlrpc_request *req,
2759                                 struct osc_async_args *aa, int rc)
2760 {
2761         struct obd_statfs *msfs;
2762         ENTRY;
2763
2764         if (rc == -EBADR)
2765                 /* The request has in fact never been sent
2766                  * due to issues at a higher level (LOV).
2767                  * Exit immediately since the caller is
2768                  * aware of the problem and takes care
2769                  * of the clean up */
2770                  RETURN(rc);
2771
2772         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2773             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2774                 GOTO(out, rc = 0);
2775
2776         if (rc != 0)
2777                 GOTO(out, rc);
2778
2779         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2780         if (msfs == NULL) {
2781                 GOTO(out, rc = -EPROTO);
2782         }
2783
2784         *aa->aa_oi->oi_osfs = *msfs;
2785 out:
2786         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2787         RETURN(rc);
2788 }
2789
2790 static int osc_statfs_async(struct obd_export *exp,
2791                             struct obd_info *oinfo, __u64 max_age,
2792                             struct ptlrpc_request_set *rqset)
2793 {
2794         struct obd_device     *obd = class_exp2obd(exp);
2795         struct ptlrpc_request *req;
2796         struct osc_async_args *aa;
2797         int                    rc;
2798         ENTRY;
2799
2800         /* We could possibly pass max_age in the request (as an absolute
2801          * timestamp or a "seconds.usec ago") so the target can avoid doing
2802          * extra calls into the filesystem if that isn't necessary (e.g.
2803          * during mount that would help a bit).  Having relative timestamps
2804          * is not so great if request processing is slow, while absolute
2805          * timestamps are not ideal because they need time synchronization. */
2806         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2807         if (req == NULL)
2808                 RETURN(-ENOMEM);
2809
2810         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2811         if (rc) {
2812                 ptlrpc_request_free(req);
2813                 RETURN(rc);
2814         }
2815         ptlrpc_request_set_replen(req);
2816         req->rq_request_portal = OST_CREATE_PORTAL;
2817         ptlrpc_at_set_req_timeout(req);
2818
2819         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2820                 /* procfs requests not want stat in wait for avoid deadlock */
2821                 req->rq_no_resend = 1;
2822                 req->rq_no_delay = 1;
2823         }
2824
2825         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2826         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2827         aa = ptlrpc_req_async_args(req);
2828         aa->aa_oi = oinfo;
2829
2830         ptlrpc_set_add_req(rqset, req);
2831         RETURN(0);
2832 }
2833
2834 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2835                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2836 {
2837         struct obd_device     *obd = class_exp2obd(exp);
2838         struct obd_statfs     *msfs;
2839         struct ptlrpc_request *req;
2840         struct obd_import     *imp = NULL;
2841         int rc;
2842         ENTRY;
2843
2844         /*Since the request might also come from lprocfs, so we need
2845          *sync this with client_disconnect_export Bug15684*/
2846         down_read(&obd->u.cli.cl_sem);
2847         if (obd->u.cli.cl_import)
2848                 imp = class_import_get(obd->u.cli.cl_import);
2849         up_read(&obd->u.cli.cl_sem);
2850         if (!imp)
2851                 RETURN(-ENODEV);
2852
2853         /* We could possibly pass max_age in the request (as an absolute
2854          * timestamp or a "seconds.usec ago") so the target can avoid doing
2855          * extra calls into the filesystem if that isn't necessary (e.g.
2856          * during mount that would help a bit).  Having relative timestamps
2857          * is not so great if request processing is slow, while absolute
2858          * timestamps are not ideal because they need time synchronization. */
2859         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2860
2861         class_import_put(imp);
2862
2863         if (req == NULL)
2864                 RETURN(-ENOMEM);
2865
2866         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2867         if (rc) {
2868                 ptlrpc_request_free(req);
2869                 RETURN(rc);
2870         }
2871         ptlrpc_request_set_replen(req);
2872         req->rq_request_portal = OST_CREATE_PORTAL;
2873         ptlrpc_at_set_req_timeout(req);
2874
2875         if (flags & OBD_STATFS_NODELAY) {
2876                 /* procfs requests not want stat in wait for avoid deadlock */
2877                 req->rq_no_resend = 1;
2878                 req->rq_no_delay = 1;
2879         }
2880
2881         rc = ptlrpc_queue_wait(req);
2882         if (rc)
2883                 GOTO(out, rc);
2884
2885         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2886         if (msfs == NULL) {
2887                 GOTO(out, rc = -EPROTO);
2888         }
2889
2890         *osfs = *msfs;
2891
2892         EXIT;
2893  out:
2894         ptlrpc_req_finished(req);
2895         return rc;
2896 }
2897
2898 /* Retrieve object striping information.
2899  *
2900  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2901  * the maximum number of OST indices which will fit in the user buffer.
2902  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2903  */
2904 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2905 {
2906         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2907         struct lov_user_md_v3 lum, *lumk;
2908         struct lov_user_ost_data_v1 *lmm_objects;
2909         int rc = 0, lum_size;
2910         ENTRY;
2911
2912         if (!lsm)
2913                 RETURN(-ENODATA);
2914
2915         /* we only need the header part from user space to get lmm_magic and
2916          * lmm_stripe_count, (the header part is common to v1 and v3) */
2917         lum_size = sizeof(struct lov_user_md_v1);
2918         if (copy_from_user(&lum, lump, lum_size))
2919                 RETURN(-EFAULT);
2920
2921         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2922             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2923                 RETURN(-EINVAL);
2924
2925         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2926         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2927         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2928         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2929
2930         /* we can use lov_mds_md_size() to compute lum_size
2931          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2932         if (lum.lmm_stripe_count > 0) {
2933                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2934                 OBD_ALLOC(lumk, lum_size);
2935                 if (!lumk)
2936                         RETURN(-ENOMEM);
2937
2938                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2939                         lmm_objects =
2940                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2941                 else
2942                         lmm_objects = &(lumk->lmm_objects[0]);
2943                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2944         } else {
2945                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2946                 lumk = &lum;
2947         }
2948
2949         lumk->lmm_oi = lsm->lsm_oi;
2950         lumk->lmm_stripe_count = 1;
2951
2952         if (copy_to_user(lump, lumk, lum_size))
2953                 rc = -EFAULT;
2954
2955         if (lumk != &lum)
2956                 OBD_FREE(lumk, lum_size);
2957
2958         RETURN(rc);
2959 }
2960
2961
2962 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2963                          void *karg, void *uarg)
2964 {
2965         struct obd_device *obd = exp->exp_obd;
2966         struct obd_ioctl_data *data = karg;
2967         int err = 0;
2968         ENTRY;
2969
2970         if (!try_module_get(THIS_MODULE)) {
2971                 CERROR("Can't get module. Is it alive?");
2972                 return -EINVAL;
2973         }
2974         switch (cmd) {
2975         case OBD_IOC_LOV_GET_CONFIG: {
2976                 char *buf;
2977                 struct lov_desc *desc;
2978                 struct obd_uuid uuid;
2979
2980                 buf = NULL;
2981                 len = 0;
2982                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2983                         GOTO(out, err = -EINVAL);
2984
2985                 data = (struct obd_ioctl_data *)buf;
2986
2987                 if (sizeof(*desc) > data->ioc_inllen1) {
2988                         obd_ioctl_freedata(buf, len);
2989                         GOTO(out, err = -EINVAL);
2990                 }
2991
2992                 if (data->ioc_inllen2 < sizeof(uuid)) {
2993                         obd_ioctl_freedata(buf, len);
2994                         GOTO(out, err = -EINVAL);
2995                 }
2996
2997                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2998                 desc->ld_tgt_count = 1;
2999                 desc->ld_active_tgt_count = 1;
3000                 desc->ld_default_stripe_count = 1;
3001                 desc->ld_default_stripe_size = 0;
3002                 desc->ld_default_stripe_offset = 0;
3003                 desc->ld_pattern = 0;
3004                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3005
3006                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3007
3008                 err = copy_to_user((void *)uarg, buf, len);
3009                 if (err)
3010                         err = -EFAULT;
3011                 obd_ioctl_freedata(buf, len);
3012                 GOTO(out, err);
3013         }
3014         case LL_IOC_LOV_SETSTRIPE:
3015                 err = obd_alloc_memmd(exp, karg);
3016                 if (err > 0)
3017                         err = 0;
3018                 GOTO(out, err);
3019         case LL_IOC_LOV_GETSTRIPE:
3020                 err = osc_getstripe(karg, uarg);
3021                 GOTO(out, err);
3022         case OBD_IOC_CLIENT_RECOVER:
3023                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3024                                             data->ioc_inlbuf1, 0);
3025                 if (err > 0)
3026                         err = 0;
3027                 GOTO(out, err);
3028         case IOC_OSC_SET_ACTIVE:
3029                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3030                                                data->ioc_offset);
3031                 GOTO(out, err);
3032         case OBD_IOC_POLL_QUOTACHECK:
3033                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3034                 GOTO(out, err);
3035         case OBD_IOC_PING_TARGET:
3036                 err = ptlrpc_obd_ping(obd);
3037                 GOTO(out, err);
3038         default:
3039                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3040                        cmd, current_comm());
3041                 GOTO(out, err = -ENOTTY);
3042         }
3043 out:
3044         module_put(THIS_MODULE);
3045         return err;
3046 }
3047
3048 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3049                         obd_count keylen, void *key, __u32 *vallen, void *val,
3050                         struct lov_stripe_md *lsm)
3051 {
3052         ENTRY;
3053         if (!vallen || !val)
3054                 RETURN(-EFAULT);
3055
3056         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3057                 __u32 *stripe = val;
3058                 *vallen = sizeof(*stripe);
3059                 *stripe = 0;
3060                 RETURN(0);
3061         } else if (KEY_IS(KEY_LAST_ID)) {
3062                 struct ptlrpc_request *req;
3063                 obd_id                *reply;
3064                 char                  *tmp;
3065                 int                    rc;
3066
3067                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3068                                            &RQF_OST_GET_INFO_LAST_ID);
3069                 if (req == NULL)
3070                         RETURN(-ENOMEM);
3071
3072                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3073                                      RCL_CLIENT, keylen);
3074                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3075                 if (rc) {
3076                         ptlrpc_request_free(req);
3077                         RETURN(rc);
3078                 }
3079
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3081                 memcpy(tmp, key, keylen);
3082
3083                 req->rq_no_delay = req->rq_no_resend = 1;
3084                 ptlrpc_request_set_replen(req);
3085                 rc = ptlrpc_queue_wait(req);
3086                 if (rc)
3087                         GOTO(out, rc);
3088
3089                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3090                 if (reply == NULL)
3091                         GOTO(out, rc = -EPROTO);
3092
3093                 *((obd_id *)val) = *reply;
3094         out:
3095                 ptlrpc_req_finished(req);
3096                 RETURN(rc);
3097         } else if (KEY_IS(KEY_FIEMAP)) {
3098                 struct ll_fiemap_info_key *fm_key =
3099                                 (struct ll_fiemap_info_key *)key;
3100                 struct ldlm_res_id       res_id;
3101                 ldlm_policy_data_t       policy;
3102                 struct lustre_handle     lockh;
3103                 ldlm_mode_t              mode = 0;
3104                 struct ptlrpc_request   *req;
3105                 struct ll_user_fiemap   *reply;
3106                 char                    *tmp;
3107                 int                      rc;
3108
3109                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3110                         goto skip_locking;
3111
3112                 policy.l_extent.start = fm_key->fiemap.fm_start &
3113                                                 CFS_PAGE_MASK;
3114
3115                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3116                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3117                         policy.l_extent.end = OBD_OBJECT_EOF;
3118                 else
3119                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3120                                 fm_key->fiemap.fm_length +
3121                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3122
3123                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3124                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3125                                        LDLM_FL_BLOCK_GRANTED |
3126                                        LDLM_FL_LVB_READY,
3127                                        &res_id, LDLM_EXTENT, &policy,
3128                                        LCK_PR | LCK_PW, &lockh, 0);
3129                 if (mode) { /* lock is cached on client */
3130                         if (mode != LCK_PR) {
3131                                 ldlm_lock_addref(&lockh, LCK_PR);
3132                                 ldlm_lock_decref(&lockh, LCK_PW);
3133                         }
3134                 } else { /* no cached lock, needs acquire lock on server side */
3135                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3136                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3137                 }
3138
3139 skip_locking:
3140                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3141                                            &RQF_OST_GET_INFO_FIEMAP);
3142                 if (req == NULL)
3143                         GOTO(drop_lock, rc = -ENOMEM);
3144
3145                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3146                                      RCL_CLIENT, keylen);
3147                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3148                                      RCL_CLIENT, *vallen);
3149                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3150                                      RCL_SERVER, *vallen);
3151
3152                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3153                 if (rc) {
3154                         ptlrpc_request_free(req);
3155                         GOTO(drop_lock, rc);
3156                 }
3157
3158                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3159                 memcpy(tmp, key, keylen);
3160