Whamcloud - gitweb
LU-3319 lprocfs: client side cleanups
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
188                              oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217                                      aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
297                              &body->oa);
298
299         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
342                              &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
365                              &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461
462         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
484
485         oa->o_blksize = cli_brw_size(exp->exp_obd);
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_oi = oa->o_oi;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_fsync_args *fa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *fa->fa_oi->oi_oa = body->oa;
589 out:
590         rc = fa->fa_upcall(fa->fa_cookie, rc);
591         RETURN(rc);
592 }
593
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595                   obd_enqueue_update_f upcall, void *cookie,
596                   struct ptlrpc_request_set *rqset)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_fsync_args *fa;
601         int                    rc;
602         ENTRY;
603
604         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605         if (req == NULL)
606                 RETURN(-ENOMEM);
607
608         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
610         if (rc) {
611                 ptlrpc_request_free(req);
612                 RETURN(rc);
613         }
614
615         /* overload the size and blocks fields in the oa with start/end */
616         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
617         LASSERT(body);
618         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
619                              oinfo->oi_oa);
620         osc_pack_capa(req, body, oinfo->oi_capa);
621
622         ptlrpc_request_set_replen(req);
623         req->rq_interpret_reply = osc_sync_interpret;
624
625         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626         fa = ptlrpc_req_async_args(req);
627         fa->fa_oi = oinfo;
628         fa->fa_upcall = upcall;
629         fa->fa_cookie = cookie;
630
631         if (rqset == PTLRPCD_SET)
632                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
633         else
634                 ptlrpc_set_add_req(rqset, req);
635
636         RETURN (0);
637 }
638
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640                     struct obd_info *oinfo, obd_size start, obd_size end,
641                     struct ptlrpc_request_set *set)
642 {
643         ENTRY;
644
645         if (!oinfo->oi_oa) {
646                 CDEBUG(D_INFO, "oa NULL\n");
647                 RETURN(-EINVAL);
648         }
649
650         oinfo->oi_oa->o_size = start;
651         oinfo->oi_oa->o_blocks = end;
652         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
653
654         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    cfs_list_t *cancels,
662                                    ldlm_mode_t mode, __u64 lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668         ENTRY;
669
670         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671          * export) but disabled through procfs (flag in NS).
672          *
673          * This distinguishes from a case when ELC is not supported originally,
674          * when we still want to cancel locks in advance and just cancel them
675          * locally, without sending any RPC. */
676         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677                 RETURN(0);
678
679         ostid_build_res_name(&oa->o_oi, &res_id);
680         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681         if (res == NULL)
682                 RETURN(0);
683
684         LDLM_RESOURCE_ADDREF(res);
685         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686                                            lock_flags, 0, NULL);
687         LDLM_RESOURCE_DELREF(res);
688         ldlm_resource_putref(res);
689         RETURN(count);
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693                                  struct ptlrpc_request *req, void *data,
694                                  int rc)
695 {
696         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698         atomic_dec(&cli->cl_destroy_in_flight);
699         wake_up(&cli->cl_destroy_waitq);
700         return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
706             cli->cl_max_rpcs_in_flight) {
707                 /* The destroy request can be sent */
708                 return 1;
709         }
710         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
711             cli->cl_max_rpcs_in_flight) {
712                 /*
713                  * The counter has been modified between the two atomic
714                  * operations.
715                  */
716                 wake_up(&cli->cl_destroy_waitq);
717         }
718         return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722                struct obdo *oa, struct lov_stripe_md **ea,
723                struct obd_trans_info *oti)
724 {
725         int rc = 0;
726         ENTRY;
727
728         LASSERT(oa);
729         LASSERT(ea);
730         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
731
732         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733             oa->o_flags == OBD_FL_RECREATE_OBJS) {
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735         }
736
737         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738                 RETURN(osc_real_create(exp, oa, ea, oti));
739
740         /* we should not get here anymore */
741         LBUG();
742
743         RETURN(rc);
744 }
745
746 /* Destroy requests can be async always on the client, and we don't even really
747  * care about the return code since the client cannot do anything at all about
748  * a destroy failure.
749  * When the MDS is unlinking a filename, it saves the file objects into a
750  * recovery llog, and these object records are cancelled when the OST reports
751  * they were destroyed and sync'd to disk (i.e. transaction committed).
752  * If the client dies, or the OST is down when the object should be destroyed,
753  * the records are not cancelled, and when the OST reconnects to the MDS next,
754  * it will retrieve the llog unlink logs and then sends the log cancellation
755  * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757                        struct obdo *oa, struct lov_stripe_md *ea,
758                        struct obd_trans_info *oti, struct obd_export *md_export,
759                        void *capa)
760 {
761         struct client_obd     *cli = &exp->exp_obd->u.cli;
762         struct ptlrpc_request *req;
763         struct ost_body       *body;
764         CFS_LIST_HEAD(cancels);
765         int rc, count;
766         ENTRY;
767
768         if (!oa) {
769                 CDEBUG(D_INFO, "oa NULL\n");
770                 RETURN(-EINVAL);
771         }
772
773         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774                                         LDLM_FL_DISCARD_DATA);
775
776         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
777         if (req == NULL) {
778                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779                 RETURN(-ENOMEM);
780         }
781
782         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
784                                0, &cancels, count);
785         if (rc) {
786                 ptlrpc_request_free(req);
787                 RETURN(rc);
788         }
789
790         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791         ptlrpc_at_set_req_timeout(req);
792
793         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794                 oa->o_lcookie = *oti->oti_logcookies;
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
798
799         osc_pack_capa(req, body, (struct obd_capa *)capa);
800         ptlrpc_request_set_replen(req);
801
802         /* If osc_destory is for destroying the unlink orphan,
803          * sent from MDT to OST, which should not be blocked here,
804          * because the process might be triggered by ptlrpcd, and
805          * it is not good to block ptlrpcd thread (b=16006)*/
806         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807                 req->rq_interpret_reply = osc_destroy_interpret;
808                 if (!osc_can_send_destroy(cli)) {
809                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810                                                           NULL);
811
812                         /*
813                          * Wait until the number of on-going destroy RPCs drops
814                          * under max_rpc_in_flight
815                          */
816                         l_wait_event_exclusive(cli->cl_destroy_waitq,
817                                                osc_can_send_destroy(cli), &lwi);
818                 }
819         }
820
821         /* Do not wait for response */
822         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823         RETURN(0);
824 }
825
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
827                                 long writing_bytes)
828 {
829         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
830
831         LASSERT(!(oa->o_valid & bits));
832
833         oa->o_valid |= bits;
834         client_obd_list_lock(&cli->cl_loi_list_lock);
835         oa->o_dirty = cli->cl_dirty;
836         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837                      cli->cl_dirty_max)) {
838                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else if (unlikely(atomic_read(&obd_unstable_pages) +
842                             atomic_read(&obd_dirty_pages) -
843                             atomic_read(&obd_dirty_transit_pages) >
844                             (long)(obd_max_dirty_pages + 1))) {
845                 /* The atomic_read() allowing the atomic_inc() are
846                  * not covered by a lock thus they may safely race and trip
847                  * this CERROR() unless we add in a small fudge factor (+1). */
848                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
849                        cli->cl_import->imp_obd->obd_name,
850                        atomic_read(&obd_unstable_pages),
851                        atomic_read(&obd_dirty_pages),
852                        atomic_read(&obd_dirty_transit_pages),
853                        obd_max_dirty_pages);
854                 oa->o_undirty = 0;
855         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
856                 CERROR("dirty %lu - dirty_max %lu too big???\n",
857                        cli->cl_dirty, cli->cl_dirty_max);
858                 oa->o_undirty = 0;
859         } else {
860                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
861                                       PAGE_CACHE_SHIFT) *
862                                      (cli->cl_max_rpcs_in_flight + 1);
863                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
864         }
865         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
866         oa->o_dropped = cli->cl_lost_grant;
867         cli->cl_lost_grant = 0;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
870                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
871
872 }
873
874 void osc_update_next_shrink(struct client_obd *cli)
875 {
876         cli->cl_next_shrink_grant =
877                 cfs_time_shift(cli->cl_grant_shrink_interval);
878         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
879                cli->cl_next_shrink_grant);
880 }
881
882 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
883 {
884         client_obd_list_lock(&cli->cl_loi_list_lock);
885         cli->cl_avail_grant += grant;
886         client_obd_list_unlock(&cli->cl_loi_list_lock);
887 }
888
889 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
890 {
891         if (body->oa.o_valid & OBD_MD_FLGRANT) {
892                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
893                 __osc_update_grant(cli, body->oa.o_grant);
894         }
895 }
896
897 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
898                               obd_count keylen, void *key, obd_count vallen,
899                               void *val, struct ptlrpc_request_set *set);
900
901 static int osc_shrink_grant_interpret(const struct lu_env *env,
902                                       struct ptlrpc_request *req,
903                                       void *aa, int rc)
904 {
905         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
906         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
907         struct ost_body *body;
908
909         if (rc != 0) {
910                 __osc_update_grant(cli, oa->o_grant);
911                 GOTO(out, rc);
912         }
913
914         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
915         LASSERT(body);
916         osc_update_grant(cli, body);
917 out:
918         OBDO_FREE(oa);
919         return rc;
920 }
921
922 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
923 {
924         client_obd_list_lock(&cli->cl_loi_list_lock);
925         oa->o_grant = cli->cl_avail_grant / 4;
926         cli->cl_avail_grant -= oa->o_grant;
927         client_obd_list_unlock(&cli->cl_loi_list_lock);
928         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
929                 oa->o_valid |= OBD_MD_FLFLAGS;
930                 oa->o_flags = 0;
931         }
932         oa->o_flags |= OBD_FL_SHRINK_GRANT;
933         osc_update_next_shrink(cli);
934 }
935
936 /* Shrink the current grant, either from some large amount to enough for a
937  * full set of in-flight RPCs, or if we have already shrunk to that limit
938  * then to enough for a single RPC.  This avoids keeping more grant than
939  * needed, and avoids shrinking the grant piecemeal. */
940 static int osc_shrink_grant(struct client_obd *cli)
941 {
942         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
943                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
944
945         client_obd_list_lock(&cli->cl_loi_list_lock);
946         if (cli->cl_avail_grant <= target_bytes)
947                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949
950         return osc_shrink_grant_to_target(cli, target_bytes);
951 }
952
953 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
954 {
955         int                     rc = 0;
956         struct ost_body        *body;
957         ENTRY;
958
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         /* Don't shrink if we are already above or below the desired limit
961          * We don't want to shrink below a single RPC, as that will negatively
962          * impact block allocation and long-term performance. */
963         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
964                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
965
966         if (target_bytes >= cli->cl_avail_grant) {
967                 client_obd_list_unlock(&cli->cl_loi_list_lock);
968                 RETURN(0);
969         }
970         client_obd_list_unlock(&cli->cl_loi_list_lock);
971
972         OBD_ALLOC_PTR(body);
973         if (!body)
974                 RETURN(-ENOMEM);
975
976         osc_announce_cached(cli, &body->oa, 0);
977
978         client_obd_list_lock(&cli->cl_loi_list_lock);
979         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
980         cli->cl_avail_grant = target_bytes;
981         client_obd_list_unlock(&cli->cl_loi_list_lock);
982         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
983                 body->oa.o_valid |= OBD_MD_FLFLAGS;
984                 body->oa.o_flags = 0;
985         }
986         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
987         osc_update_next_shrink(cli);
988
989         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
990                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
991                                 sizeof(*body), body, NULL);
992         if (rc != 0)
993                 __osc_update_grant(cli, body->oa.o_grant);
994         OBD_FREE_PTR(body);
995         RETURN(rc);
996 }
997
998 static int osc_should_shrink_grant(struct client_obd *client)
999 {
1000         cfs_time_t time = cfs_time_current();
1001         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1002
1003         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1004              OBD_CONNECT_GRANT_SHRINK) == 0)
1005                 return 0;
1006
1007         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1008                 /* Get the current RPC size directly, instead of going via:
1009                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1010                  * Keep comment here so that it can be found by searching. */
1011                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1012
1013                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1014                     client->cl_avail_grant > brw_size)
1015                         return 1;
1016                 else
1017                         osc_update_next_shrink(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1023 {
1024         struct client_obd *client;
1025
1026         cfs_list_for_each_entry(client, &item->ti_obd_list,
1027                                 cl_grant_shrink_list) {
1028                 if (osc_should_shrink_grant(client))
1029                         osc_shrink_grant(client);
1030         }
1031         return 0;
1032 }
1033
1034 static int osc_add_shrink_grant(struct client_obd *client)
1035 {
1036         int rc;
1037
1038         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1039                                        TIMEOUT_GRANT,
1040                                        osc_grant_shrink_grant_cb, NULL,
1041                                        &client->cl_grant_shrink_list);
1042         if (rc) {
1043                 CERROR("add grant client %s error %d\n",
1044                         client->cl_import->imp_obd->obd_name, rc);
1045                 return rc;
1046         }
1047         CDEBUG(D_CACHE, "add grant client %s \n",
1048                client->cl_import->imp_obd->obd_name);
1049         osc_update_next_shrink(client);
1050         return 0;
1051 }
1052
1053 static int osc_del_shrink_grant(struct client_obd *client)
1054 {
1055         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1056                                          TIMEOUT_GRANT);
1057 }
1058
1059 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1060 {
1061         /*
1062          * ocd_grant is the total grant amount we're expect to hold: if we've
1063          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1064          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1065          *
1066          * race is tolerable here: if we're evicted, but imp_state already
1067          * left EVICTED state, then cl_dirty must be 0 already.
1068          */
1069         client_obd_list_lock(&cli->cl_loi_list_lock);
1070         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1071                 cli->cl_avail_grant = ocd->ocd_grant;
1072         else
1073                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1074
1075         if (cli->cl_avail_grant < 0) {
1076                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1077                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1078                       ocd->ocd_grant, cli->cl_dirty);
1079                 /* workaround for servers which do not have the patch from
1080                  * LU-2679 */
1081                 cli->cl_avail_grant = ocd->ocd_grant;
1082         }
1083
1084         /* determine the appropriate chunk size used by osc_extent. */
1085         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1086         client_obd_list_unlock(&cli->cl_loi_list_lock);
1087
1088         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1089                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1090                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1091
1092         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093             cfs_list_empty(&cli->cl_grant_shrink_list))
1094                 osc_add_shrink_grant(cli);
1095 }
1096
1097 /* We assume that the reason this OSC got a short read is because it read
1098  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100  * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102                               struct brw_page **pga)
1103 {
1104         char *ptr;
1105         int i = 0;
1106
1107         /* skip bytes read OK */
1108         while (nob_read > 0) {
1109                 LASSERT (page_count > 0);
1110
1111                 if (pga[i]->count > nob_read) {
1112                         /* EOF inside this page */
1113                         ptr = kmap(pga[i]->pg) +
1114                                 (pga[i]->off & ~CFS_PAGE_MASK);
1115                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116                         kunmap(pga[i]->pg);
1117                         page_count--;
1118                         i++;
1119                         break;
1120                 }
1121
1122                 nob_read -= pga[i]->count;
1123                 page_count--;
1124                 i++;
1125         }
1126
1127         /* zero remaining pages */
1128         while (page_count-- > 0) {
1129                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130                 memset(ptr, 0, pga[i]->count);
1131                 kunmap(pga[i]->pg);
1132                 i++;
1133         }
1134 }
1135
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137                            int requested_nob, int niocount,
1138                            obd_count page_count, struct brw_page **pga)
1139 {
1140         int     i;
1141         __u32   *remote_rcs;
1142
1143         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144                                                   sizeof(*remote_rcs) *
1145                                                   niocount);
1146         if (remote_rcs == NULL) {
1147                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1148                 return(-EPROTO);
1149         }
1150
1151         /* return error if any niobuf was in error */
1152         for (i = 0; i < niocount; i++) {
1153                 if ((int)remote_rcs[i] < 0)
1154                         return(remote_rcs[i]);
1155
1156                 if (remote_rcs[i] != 0) {
1157                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158                                 i, remote_rcs[i], req);
1159                         return(-EPROTO);
1160                 }
1161         }
1162
1163         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165                        req->rq_bulk->bd_nob_transferred, requested_nob);
1166                 return(-EPROTO);
1167         }
1168
1169         return (0);
1170 }
1171
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1173 {
1174         if (p1->flag != p2->flag) {
1175                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1176                                   OBD_BRW_SYNC       | OBD_BRW_ASYNC   |
1177                                   OBD_BRW_NOQUOTA    | OBD_BRW_SOFT_SYNC);
1178
1179                 /* warn if we try to combine flags that we don't know to be
1180                  * safe to combine */
1181                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1182                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1183                               "report this at http://bugs.whamcloud.com/\n",
1184                               p1->flag, p2->flag);
1185                 }
1186                 return 0;
1187         }
1188
1189         return (p1->off + p1->count == p2->off);
1190 }
1191
1192 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1193                                    struct brw_page **pga, int opc,
1194                                    cksum_type_t cksum_type)
1195 {
1196         __u32                           cksum;
1197         int                             i = 0;
1198         struct cfs_crypto_hash_desc     *hdesc;
1199         unsigned int                    bufsize;
1200         int                             err;
1201         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1202
1203         LASSERT(pg_count > 0);
1204
1205         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1206         if (IS_ERR(hdesc)) {
1207                 CERROR("Unable to initialize checksum hash %s\n",
1208                        cfs_crypto_hash_name(cfs_alg));
1209                 return PTR_ERR(hdesc);
1210         }
1211
1212         while (nob > 0 && pg_count > 0) {
1213                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1214
1215                 /* corrupt the data before we compute the checksum, to
1216                  * simulate an OST->client data error */
1217                 if (i == 0 && opc == OST_READ &&
1218                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1219                         unsigned char *ptr = kmap(pga[i]->pg);
1220                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1221                         memcpy(ptr + off, "bad1", min(4, nob));
1222                         kunmap(pga[i]->pg);
1223                 }
1224                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1225                                   pga[i]->off & ~CFS_PAGE_MASK,
1226                                   count);
1227                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1228                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1229
1230                 nob -= pga[i]->count;
1231                 pg_count--;
1232                 i++;
1233         }
1234
1235         bufsize = 4;
1236         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1237
1238         if (err)
1239                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1240
1241         /* For sending we only compute the wrong checksum instead
1242          * of corrupting the data so it is still correct on a redo */
1243         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1244                 cksum++;
1245
1246         return cksum;
1247 }
1248
1249 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1250                                 struct lov_stripe_md *lsm, obd_count page_count,
1251                                 struct brw_page **pga,
1252                                 struct ptlrpc_request **reqp,
1253                                 struct obd_capa *ocapa, int reserve,
1254                                 int resend)
1255 {
1256         struct ptlrpc_request   *req;
1257         struct ptlrpc_bulk_desc *desc;
1258         struct ost_body         *body;
1259         struct obd_ioobj        *ioobj;
1260         struct niobuf_remote    *niobuf;
1261         int niocount, i, requested_nob, opc, rc;
1262         struct osc_brw_async_args *aa;
1263         struct req_capsule      *pill;
1264         struct brw_page *pg_prev;
1265
1266         ENTRY;
1267         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1268                 RETURN(-ENOMEM); /* Recoverable */
1269         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1270                 RETURN(-EINVAL); /* Fatal */
1271
1272         if ((cmd & OBD_BRW_WRITE) != 0) {
1273                 opc = OST_WRITE;
1274                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1275                                                 cli->cl_import->imp_rq_pool,
1276                                                 &RQF_OST_BRW_WRITE);
1277         } else {
1278                 opc = OST_READ;
1279                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1280         }
1281         if (req == NULL)
1282                 RETURN(-ENOMEM);
1283
1284         for (niocount = i = 1; i < page_count; i++) {
1285                 if (!can_merge_pages(pga[i - 1], pga[i]))
1286                         niocount++;
1287         }
1288
1289         pill = &req->rq_pill;
1290         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1291                              sizeof(*ioobj));
1292         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1293                              niocount * sizeof(*niobuf));
1294         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1295
1296         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1297         if (rc) {
1298                 ptlrpc_request_free(req);
1299                 RETURN(rc);
1300         }
1301         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1302         ptlrpc_at_set_req_timeout(req);
1303         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1304          * retry logic */
1305         req->rq_no_retry_einprogress = 1;
1306
1307         desc = ptlrpc_prep_bulk_imp(req, page_count,
1308                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1309                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1310                 OST_BULK_PORTAL);
1311
1312         if (desc == NULL)
1313                 GOTO(out, rc = -ENOMEM);
1314         /* NB request now owns desc and will free it when it gets freed */
1315
1316         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1317         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1318         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1319         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1320
1321         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1322
1323         obdo_to_ioobj(oa, ioobj);
1324         ioobj->ioo_bufcnt = niocount;
1325         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1326          * that might be send for this request.  The actual number is decided
1327          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1328          * "max - 1" for old client compatibility sending "0", and also so the
1329          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1330         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1331         osc_pack_capa(req, body, ocapa);
1332         LASSERT(page_count > 0);
1333         pg_prev = pga[0];
1334         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1335                 struct brw_page *pg = pga[i];
1336                 int poff = pg->off & ~CFS_PAGE_MASK;
1337
1338                 LASSERT(pg->count > 0);
1339                 /* make sure there is no gap in the middle of page array */
1340                 LASSERTF(page_count == 1 ||
1341                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1342                           ergo(i > 0 && i < page_count - 1,
1343                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1344                           ergo(i == page_count - 1, poff == 0)),
1345                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1346                          i, page_count, pg, pg->off, pg->count);
1347 #ifdef __linux__
1348                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1349                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1350                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1351                          i, page_count,
1352                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1353                          pg_prev->pg, page_private(pg_prev->pg),
1354                          pg_prev->pg->index, pg_prev->off);
1355 #else
1356                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1357                          "i %d p_c %u\n", i, page_count);
1358 #endif
1359                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1360                         (pg->flag & OBD_BRW_SRVLOCK));
1361
1362                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1363                 requested_nob += pg->count;
1364
1365                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1366                         niobuf--;
1367                         niobuf->len += pg->count;
1368                 } else {
1369                         niobuf->offset = pg->off;
1370                         niobuf->len    = pg->count;
1371                         niobuf->flags  = pg->flag;
1372                 }
1373                 pg_prev = pg;
1374         }
1375
1376         LASSERTF((void *)(niobuf - niocount) ==
1377                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1378                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1379                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1380
1381         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1382         if (resend) {
1383                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1384                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1385                         body->oa.o_flags = 0;
1386                 }
1387                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1388         }
1389
1390         if (osc_should_shrink_grant(cli))
1391                 osc_shrink_grant_local(cli, &body->oa);
1392
1393         /* size[REQ_REC_OFF] still sizeof (*body) */
1394         if (opc == OST_WRITE) {
1395                 if (cli->cl_checksum &&
1396                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1397                         /* store cl_cksum_type in a local variable since
1398                          * it can be changed via lprocfs */
1399                         cksum_type_t cksum_type = cli->cl_cksum_type;
1400
1401                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1402                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1403                                 body->oa.o_flags = 0;
1404                         }
1405                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1406                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1407                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1408                                                              page_count, pga,
1409                                                              OST_WRITE,
1410                                                              cksum_type);
1411                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1412                                body->oa.o_cksum);
1413                         /* save this in 'oa', too, for later checking */
1414                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1415                         oa->o_flags |= cksum_type_pack(cksum_type);
1416                 } else {
1417                         /* clear out the checksum flag, in case this is a
1418                          * resend but cl_checksum is no longer set. b=11238 */
1419                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1420                 }
1421                 oa->o_cksum = body->oa.o_cksum;
1422                 /* 1 RC per niobuf */
1423                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1424                                      sizeof(__u32) * niocount);
1425         } else {
1426                 if (cli->cl_checksum &&
1427                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1428                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1429                                 body->oa.o_flags = 0;
1430                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1431                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1432                 }
1433         }
1434         ptlrpc_request_set_replen(req);
1435
1436         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1437         aa = ptlrpc_req_async_args(req);
1438         aa->aa_oa = oa;
1439         aa->aa_requested_nob = requested_nob;
1440         aa->aa_nio_count = niocount;
1441         aa->aa_page_count = page_count;
1442         aa->aa_resends = 0;
1443         aa->aa_ppga = pga;
1444         aa->aa_cli = cli;
1445         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1446         if (ocapa && reserve)
1447                 aa->aa_ocapa = capa_get(ocapa);
1448
1449         *reqp = req;
1450         RETURN(0);
1451
1452  out:
1453         ptlrpc_req_finished(req);
1454         RETURN(rc);
1455 }
1456
1457 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1458                                 __u32 client_cksum, __u32 server_cksum, int nob,
1459                                 obd_count page_count, struct brw_page **pga,
1460                                 cksum_type_t client_cksum_type)
1461 {
1462         __u32 new_cksum;
1463         char *msg;
1464         cksum_type_t cksum_type;
1465
1466         if (server_cksum == client_cksum) {
1467                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1468                 return 0;
1469         }
1470
1471         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1472                                        oa->o_flags : 0);
1473         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1474                                       cksum_type);
1475
1476         if (cksum_type != client_cksum_type)
1477                 msg = "the server did not use the checksum type specified in "
1478                       "the original request - likely a protocol problem";
1479         else if (new_cksum == server_cksum)
1480                 msg = "changed on the client after we checksummed it - "
1481                       "likely false positive due to mmap IO (bug 11742)";
1482         else if (new_cksum == client_cksum)
1483                 msg = "changed in transit before arrival at OST";
1484         else
1485                 msg = "changed in transit AND doesn't match the original - "
1486                       "likely false positive due to mmap IO (bug 11742)";
1487
1488         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1489                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1490                            msg, libcfs_nid2str(peer->nid),
1491                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1492                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1493                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1494                            POSTID(&oa->o_oi), pga[0]->off,
1495                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1496         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1497                "client csum now %x\n", client_cksum, client_cksum_type,
1498                server_cksum, cksum_type, new_cksum);
1499         return 1;
1500 }
1501
1502 /* Note rc enters this function as number of bytes transferred */
1503 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1504 {
1505         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1506         const lnet_process_id_t *peer =
1507                         &req->rq_import->imp_connection->c_peer;
1508         struct client_obd *cli = aa->aa_cli;
1509         struct ost_body *body;
1510         __u32 client_cksum = 0;
1511         ENTRY;
1512
1513         if (rc < 0 && rc != -EDQUOT) {
1514                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1515                 RETURN(rc);
1516         }
1517
1518         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1519         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1520         if (body == NULL) {
1521                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1522                 RETURN(-EPROTO);
1523         }
1524
1525         /* set/clear over quota flag for a uid/gid */
1526         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1527             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1528                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1529
1530                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1531                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1532                        body->oa.o_flags);
1533                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1534         }
1535
1536         osc_update_grant(cli, body);
1537
1538         if (rc < 0)
1539                 RETURN(rc);
1540
1541         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1542                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1543
1544         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1545                 if (rc > 0) {
1546                         CERROR("Unexpected +ve rc %d\n", rc);
1547                         RETURN(-EPROTO);
1548                 }
1549                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1550
1551                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1552                         RETURN(-EAGAIN);
1553
1554                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1555                     check_write_checksum(&body->oa, peer, client_cksum,
1556                                          body->oa.o_cksum, aa->aa_requested_nob,
1557                                          aa->aa_page_count, aa->aa_ppga,
1558                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1559                         RETURN(-EAGAIN);
1560
1561                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1562                                      aa->aa_page_count, aa->aa_ppga);
1563                 GOTO(out, rc);
1564         }
1565
1566         /* The rest of this function executes only for OST_READs */
1567
1568         /* if unwrap_bulk failed, return -EAGAIN to retry */
1569         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1570         if (rc < 0)
1571                 GOTO(out, rc = -EAGAIN);
1572
1573         if (rc > aa->aa_requested_nob) {
1574                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1575                        aa->aa_requested_nob);
1576                 RETURN(-EPROTO);
1577         }
1578
1579         if (rc != req->rq_bulk->bd_nob_transferred) {
1580                 CERROR ("Unexpected rc %d (%d transferred)\n",
1581                         rc, req->rq_bulk->bd_nob_transferred);
1582                 return (-EPROTO);
1583         }
1584
1585         if (rc < aa->aa_requested_nob)
1586                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1587
1588         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1589                 static int cksum_counter;
1590                 __u32      server_cksum = body->oa.o_cksum;
1591                 char      *via;
1592                 char      *router;
1593                 cksum_type_t cksum_type;
1594
1595                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1596                                                body->oa.o_flags : 0);
1597                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1598                                                  aa->aa_ppga, OST_READ,
1599                                                  cksum_type);
1600
1601                 if (peer->nid == req->rq_bulk->bd_sender) {
1602                         via = router = "";
1603                 } else {
1604                         via = " via ";
1605                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1606                 }
1607
1608                 if (server_cksum == ~0 && rc > 0) {
1609                         CERROR("Protocol error: server %s set the 'checksum' "
1610                                "bit, but didn't send a checksum.  Not fatal, "
1611                                "but please notify on http://bugs.whamcloud.com/\n",
1612                                libcfs_nid2str(peer->nid));
1613                 } else if (server_cksum != client_cksum) {
1614                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1615                                            "%s%s%s inode "DFID" object "DOSTID
1616                                            " extent ["LPU64"-"LPU64"]\n",
1617                                            req->rq_import->imp_obd->obd_name,
1618                                            libcfs_nid2str(peer->nid),
1619                                            via, router,
1620                                            body->oa.o_valid & OBD_MD_FLFID ?
1621                                                 body->oa.o_parent_seq : (__u64)0,
1622                                            body->oa.o_valid & OBD_MD_FLFID ?
1623                                                 body->oa.o_parent_oid : 0,
1624                                            body->oa.o_valid & OBD_MD_FLFID ?
1625                                                 body->oa.o_parent_ver : 0,
1626                                            POSTID(&body->oa.o_oi),
1627                                            aa->aa_ppga[0]->off,
1628                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1629                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1630                                                                         1);
1631                         CERROR("client %x, server %x, cksum_type %x\n",
1632                                client_cksum, server_cksum, cksum_type);
1633                         cksum_counter = 0;
1634                         aa->aa_oa->o_cksum = client_cksum;
1635                         rc = -EAGAIN;
1636                 } else {
1637                         cksum_counter++;
1638                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1639                         rc = 0;
1640                 }
1641         } else if (unlikely(client_cksum)) {
1642                 static int cksum_missed;
1643
1644                 cksum_missed++;
1645                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1646                         CERROR("Checksum %u requested from %s but not sent\n",
1647                                cksum_missed, libcfs_nid2str(peer->nid));
1648         } else {
1649                 rc = 0;
1650         }
1651 out:
1652         if (rc >= 0)
1653                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1654                                      aa->aa_oa, &body->oa);
1655
1656         RETURN(rc);
1657 }
1658
1659 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1660                             struct lov_stripe_md *lsm,
1661                             obd_count page_count, struct brw_page **pga,
1662                             struct obd_capa *ocapa)
1663 {
1664         struct ptlrpc_request *req;
1665         int                    rc;
1666         wait_queue_head_t            waitq;
1667         int                    generation, resends = 0;
1668         struct l_wait_info     lwi;
1669
1670         ENTRY;
1671
1672         init_waitqueue_head(&waitq);
1673         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1674
1675 restart_bulk:
1676         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1677                                   page_count, pga, &req, ocapa, 0, resends);
1678         if (rc != 0)
1679                 return (rc);
1680
1681         if (resends) {
1682                 req->rq_generation_set = 1;
1683                 req->rq_import_generation = generation;
1684                 req->rq_sent = cfs_time_current_sec() + resends;
1685         }
1686
1687         rc = ptlrpc_queue_wait(req);
1688
1689         if (rc == -ETIMEDOUT && req->rq_resend) {
1690                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1691                 ptlrpc_req_finished(req);
1692                 goto restart_bulk;
1693         }
1694
1695         rc = osc_brw_fini_request(req, rc);
1696
1697         ptlrpc_req_finished(req);
1698         /* When server return -EINPROGRESS, client should always retry
1699          * regardless of the number of times the bulk was resent already.*/
1700         if (osc_recoverable_error(rc)) {
1701                 resends++;
1702                 if (rc != -EINPROGRESS &&
1703                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1704                         CERROR("%s: too many resend retries for object: "
1705                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1706                                POSTID(&oa->o_oi), rc);
1707                         goto out;
1708                 }
1709                 if (generation !=
1710                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1711                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1712                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1713                                POSTID(&oa->o_oi), rc);
1714                         goto out;
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1718                                        NULL);
1719                 l_wait_event(waitq, 0, &lwi);
1720
1721                 goto restart_bulk;
1722         }
1723 out:
1724         if (rc == -EAGAIN || rc == -EINPROGRESS)
1725                 rc = -EIO;
1726         RETURN (rc);
1727 }
1728
1729 static int osc_brw_redo_request(struct ptlrpc_request *request,
1730                                 struct osc_brw_async_args *aa, int rc)
1731 {
1732         struct ptlrpc_request *new_req;
1733         struct osc_brw_async_args *new_aa;
1734         struct osc_async_page *oap;
1735         ENTRY;
1736
1737         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1738                   "redo for recoverable error %d", rc);
1739
1740         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1741                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1742                                   aa->aa_cli, aa->aa_oa,
1743                                   NULL /* lsm unused by osc currently */,
1744                                   aa->aa_page_count, aa->aa_ppga,
1745                                   &new_req, aa->aa_ocapa, 0, 1);
1746         if (rc)
1747                 RETURN(rc);
1748
1749         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1750                 if (oap->oap_request != NULL) {
1751                         LASSERTF(request == oap->oap_request,
1752                                  "request %p != oap_request %p\n",
1753                                  request, oap->oap_request);
1754                         if (oap->oap_interrupted) {
1755                                 ptlrpc_req_finished(new_req);
1756                                 RETURN(-EINTR);
1757                         }
1758                 }
1759         }
1760         /* New request takes over pga and oaps from old request.
1761          * Note that copying a list_head doesn't work, need to move it... */
1762         aa->aa_resends++;
1763         new_req->rq_interpret_reply = request->rq_interpret_reply;
1764         new_req->rq_async_args = request->rq_async_args;
1765         new_req->rq_commit_cb = request->rq_commit_cb;
1766         /* cap resend delay to the current request timeout, this is similar to
1767          * what ptlrpc does (see after_reply()) */
1768         if (aa->aa_resends > new_req->rq_timeout)
1769                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1770         else
1771                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772         new_req->rq_generation_set = 1;
1773         new_req->rq_import_generation = request->rq_import_generation;
1774
1775         new_aa = ptlrpc_req_async_args(new_req);
1776
1777         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1778         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1779         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1780         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1781         new_aa->aa_resends = aa->aa_resends;
1782
1783         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1784                 if (oap->oap_request) {
1785                         ptlrpc_req_finished(oap->oap_request);
1786                         oap->oap_request = ptlrpc_request_addref(new_req);
1787                 }
1788         }
1789
1790         new_aa->aa_ocapa = aa->aa_ocapa;
1791         aa->aa_ocapa = NULL;
1792
1793         /* XXX: This code will run into problem if we're going to support
1794          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1795          * and wait for all of them to be finished. We should inherit request
1796          * set from old request. */
1797         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1798
1799         DEBUG_REQ(D_INFO, new_req, "new request");
1800         RETURN(0);
1801 }
1802
1803 /*
1804  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1805  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1806  * fine for our small page arrays and doesn't require allocation.  its an
1807  * insertion sort that swaps elements that are strides apart, shrinking the
1808  * stride down until its '1' and the array is sorted.
1809  */
1810 static void sort_brw_pages(struct brw_page **array, int num)
1811 {
1812         int stride, i, j;
1813         struct brw_page *tmp;
1814
1815         if (num == 1)
1816                 return;
1817         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1818                 ;
1819
1820         do {
1821                 stride /= 3;
1822                 for (i = stride ; i < num ; i++) {
1823                         tmp = array[i];
1824                         j = i;
1825                         while (j >= stride && array[j - stride]->off > tmp->off) {
1826                                 array[j] = array[j - stride];
1827                                 j -= stride;
1828                         }
1829                         array[j] = tmp;
1830                 }
1831         } while (stride > 1);
1832 }
1833
1834 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1835 {
1836         int count = 1;
1837         int offset;
1838         int i = 0;
1839
1840         LASSERT (pages > 0);
1841         offset = pg[i]->off & ~CFS_PAGE_MASK;
1842
1843         for (;;) {
1844                 pages--;
1845                 if (pages == 0)         /* that's all */
1846                         return count;
1847
1848                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1849                         return count;   /* doesn't end on page boundary */
1850
1851                 i++;
1852                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1853                 if (offset != 0)        /* doesn't start on page boundary */
1854                         return count;
1855
1856                 count++;
1857         }
1858 }
1859
1860 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1861 {
1862         struct brw_page **ppga;
1863         int i;
1864
1865         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1866         if (ppga == NULL)
1867                 return NULL;
1868
1869         for (i = 0; i < count; i++)
1870                 ppga[i] = pga + i;
1871         return ppga;
1872 }
1873
1874 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1875 {
1876         LASSERT(ppga != NULL);
1877         OBD_FREE(ppga, sizeof(*ppga) * count);
1878 }
1879
1880 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1881                    obd_count page_count, struct brw_page *pga,
1882                    struct obd_trans_info *oti)
1883 {
1884         struct obdo *saved_oa = NULL;
1885         struct brw_page **ppga, **orig;
1886         struct obd_import *imp = class_exp2cliimp(exp);
1887         struct client_obd *cli;
1888         int rc, page_count_orig;
1889         ENTRY;
1890
1891         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1892         cli = &imp->imp_obd->u.cli;
1893
1894         if (cmd & OBD_BRW_CHECK) {
1895                 /* The caller just wants to know if there's a chance that this
1896                  * I/O can succeed */
1897
1898                 if (imp->imp_invalid)
1899                         RETURN(-EIO);
1900                 RETURN(0);
1901         }
1902
1903         /* test_brw with a failed create can trip this, maybe others. */
1904         LASSERT(cli->cl_max_pages_per_rpc);
1905
1906         rc = 0;
1907
1908         orig = ppga = osc_build_ppga(pga, page_count);
1909         if (ppga == NULL)
1910                 RETURN(-ENOMEM);
1911         page_count_orig = page_count;
1912
1913         sort_brw_pages(ppga, page_count);
1914         while (page_count) {
1915                 obd_count pages_per_brw;
1916
1917                 if (page_count > cli->cl_max_pages_per_rpc)
1918                         pages_per_brw = cli->cl_max_pages_per_rpc;
1919                 else
1920                         pages_per_brw = page_count;
1921
1922                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1923
1924                 if (saved_oa != NULL) {
1925                         /* restore previously saved oa */
1926                         *oinfo->oi_oa = *saved_oa;
1927                 } else if (page_count > pages_per_brw) {
1928                         /* save a copy of oa (brw will clobber it) */
1929                         OBDO_ALLOC(saved_oa);
1930                         if (saved_oa == NULL)
1931                                 GOTO(out, rc = -ENOMEM);
1932                         *saved_oa = *oinfo->oi_oa;
1933                 }
1934
1935                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1936                                       pages_per_brw, ppga, oinfo->oi_capa);
1937
1938                 if (rc != 0)
1939                         break;
1940
1941                 page_count -= pages_per_brw;
1942                 ppga += pages_per_brw;
1943         }
1944
1945 out:
1946         osc_release_ppga(orig, page_count_orig);
1947
1948         if (saved_oa != NULL)
1949                 OBDO_FREE(saved_oa);
1950
1951         RETURN(rc);
1952 }
1953
1954 static int brw_interpret(const struct lu_env *env,
1955                          struct ptlrpc_request *req, void *data, int rc)
1956 {
1957         struct osc_brw_async_args *aa = data;
1958         struct osc_extent *ext;
1959         struct osc_extent *tmp;
1960         struct client_obd *cli = aa->aa_cli;
1961         ENTRY;
1962
1963         rc = osc_brw_fini_request(req, rc);
1964         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1965         /* When server return -EINPROGRESS, client should always retry
1966          * regardless of the number of times the bulk was resent already. */
1967         if (osc_recoverable_error(rc)) {
1968                 if (req->rq_import_generation !=
1969                     req->rq_import->imp_generation) {
1970                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1971                                ""DOSTID", rc = %d.\n",
1972                                req->rq_import->imp_obd->obd_name,
1973                                POSTID(&aa->aa_oa->o_oi), rc);
1974                 } else if (rc == -EINPROGRESS ||
1975                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1976                         rc = osc_brw_redo_request(req, aa, rc);
1977                 } else {
1978                         CERROR("%s: too many resent retries for object: "
1979                                ""LPU64":"LPU64", rc = %d.\n",
1980                                req->rq_import->imp_obd->obd_name,
1981                                POSTID(&aa->aa_oa->o_oi), rc);
1982                 }
1983
1984                 if (rc == 0)
1985                         RETURN(0);
1986                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1987                         rc = -EIO;
1988         }
1989
1990         if (aa->aa_ocapa) {
1991                 capa_put(aa->aa_ocapa);
1992                 aa->aa_ocapa = NULL;
1993         }
1994
1995         if (rc == 0) {
1996                 struct obdo *oa = aa->aa_oa;
1997                 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1998                 unsigned long valid = 0;
1999                 struct cl_object *obj;
2000                 struct osc_async_page *last;
2001
2002                 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2003                 obj = osc2cl(last->oap_obj);
2004
2005                 cl_object_attr_lock(obj);
2006                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2007                         attr->cat_blocks = oa->o_blocks;
2008                         valid |= CAT_BLOCKS;
2009                 }
2010                 if (oa->o_valid & OBD_MD_FLMTIME) {
2011                         attr->cat_mtime = oa->o_mtime;
2012                         valid |= CAT_MTIME;
2013                 }
2014                 if (oa->o_valid & OBD_MD_FLATIME) {
2015                         attr->cat_atime = oa->o_atime;
2016                         valid |= CAT_ATIME;
2017                 }
2018                 if (oa->o_valid & OBD_MD_FLCTIME) {
2019                         attr->cat_ctime = oa->o_ctime;
2020                         valid |= CAT_CTIME;
2021                 }
2022
2023                 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2024                         struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2025                         loff_t last_off = last->oap_count + last->oap_obj_off;
2026
2027                         /* Change file size if this is an out of quota or
2028                          * direct IO write and it extends the file size */
2029                         if (loi->loi_lvb.lvb_size < last_off) {
2030                                 attr->cat_size = last_off;
2031                                 valid |= CAT_SIZE;
2032                         }
2033                         /* Extend KMS if it's not a lockless write */
2034                         if (loi->loi_kms < last_off &&
2035                             oap2osc_page(last)->ops_srvlock == 0) {
2036                                 attr->cat_kms = last_off;
2037                                 valid |= CAT_KMS;
2038                         }
2039                 }
2040
2041                 if (valid != 0)
2042                         cl_object_attr_set(env, obj, attr, valid);
2043                 cl_object_attr_unlock(obj);
2044         }
2045         OBDO_FREE(aa->aa_oa);
2046
2047         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2048                 cfs_list_del_init(&ext->oe_link);
2049                 osc_extent_finish(env, ext, 1, rc);
2050         }
2051         LASSERT(cfs_list_empty(&aa->aa_exts));
2052         LASSERT(cfs_list_empty(&aa->aa_oaps));
2053
2054         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2055                           req->rq_bulk->bd_nob_transferred);
2056         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2057         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2058
2059         client_obd_list_lock(&cli->cl_loi_list_lock);
2060         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2061          * is called so we know whether to go to sync BRWs or wait for more
2062          * RPCs to complete */
2063         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2064                 cli->cl_w_in_flight--;
2065         else
2066                 cli->cl_r_in_flight--;
2067         osc_wake_cache_waiters(cli);
2068         client_obd_list_unlock(&cli->cl_loi_list_lock);
2069
2070         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2071         RETURN(rc);
2072 }
2073
2074 static void brw_commit(struct ptlrpc_request *req)
2075 {
2076         spin_lock(&req->rq_lock);
2077         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2078          * this called via the rq_commit_cb, I need to ensure
2079          * osc_dec_unstable_pages is still called. Otherwise unstable
2080          * pages may be leaked. */
2081         if (req->rq_unstable) {
2082                 spin_unlock(&req->rq_lock);
2083                 osc_dec_unstable_pages(req);
2084                 spin_lock(&req->rq_lock);
2085         } else {
2086                 req->rq_committed = 1;
2087         }
2088         spin_unlock(&req->rq_lock);
2089 }
2090
2091 /**
2092  * Build an RPC by the list of extent @ext_list. The caller must ensure
2093  * that the total pages in this list are NOT over max pages per RPC.
2094  * Extents in the list must be in OES_RPC state.
2095  */
2096 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2097                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2098 {
2099         struct ptlrpc_request           *req = NULL;
2100         struct osc_extent               *ext;
2101         struct brw_page                 **pga = NULL;
2102         struct osc_brw_async_args       *aa = NULL;
2103         struct obdo                     *oa = NULL;
2104         struct osc_async_page           *oap;
2105         struct osc_async_page           *tmp;
2106         struct cl_req                   *clerq = NULL;
2107         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2108                                                                       CRT_READ;
2109         struct ldlm_lock                *lock = NULL;
2110         struct cl_req_attr              *crattr = NULL;
2111         obd_off                         starting_offset = OBD_OBJECT_EOF;
2112         obd_off                         ending_offset = 0;
2113         int                             mpflag = 0;
2114         int                             mem_tight = 0;
2115         int                             page_count = 0;
2116         int                             i;
2117         int                             rc;
2118         CFS_LIST_HEAD(rpc_list);
2119
2120         ENTRY;
2121         LASSERT(!cfs_list_empty(ext_list));
2122
2123         /* add pages into rpc_list to build BRW rpc */
2124         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2125                 LASSERT(ext->oe_state == OES_RPC);
2126                 mem_tight |= ext->oe_memalloc;
2127                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2128                         ++page_count;
2129                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2130                         if (starting_offset > oap->oap_obj_off)
2131                                 starting_offset = oap->oap_obj_off;
2132                         else
2133                                 LASSERT(oap->oap_page_off == 0);
2134                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2135                                 ending_offset = oap->oap_obj_off +
2136                                                 oap->oap_count;
2137                         else
2138                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2139                                         PAGE_CACHE_SIZE);
2140                 }
2141         }
2142
2143         if (mem_tight)
2144                 mpflag = cfs_memory_pressure_get_and_set();
2145
2146         OBD_ALLOC(crattr, sizeof(*crattr));
2147         if (crattr == NULL)
2148                 GOTO(out, rc = -ENOMEM);
2149
2150         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2151         if (pga == NULL)
2152                 GOTO(out, rc = -ENOMEM);
2153
2154         OBDO_ALLOC(oa);
2155         if (oa == NULL)
2156                 GOTO(out, rc = -ENOMEM);
2157
2158         i = 0;
2159         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2160                 struct cl_page *page = oap2cl_page(oap);
2161                 if (clerq == NULL) {
2162                         clerq = cl_req_alloc(env, page, crt,
2163                                              1 /* only 1-object rpcs for now */);
2164                         if (IS_ERR(clerq))
2165                                 GOTO(out, rc = PTR_ERR(clerq));
2166                         lock = oap->oap_ldlm_lock;
2167                 }
2168                 if (mem_tight)
2169                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2170                 pga[i] = &oap->oap_brw_page;
2171                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2172                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2173                        pga[i]->pg, page_index(oap->oap_page), oap,
2174                        pga[i]->flag);
2175                 i++;
2176                 cl_req_page_add(env, clerq, page);
2177         }
2178
2179         /* always get the data for the obdo for the rpc */
2180         LASSERT(clerq != NULL);
2181         crattr->cra_oa = oa;
2182         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2183         if (lock) {
2184                 oa->o_handle = lock->l_remote_handle;
2185                 oa->o_valid |= OBD_MD_FLHANDLE;
2186         }
2187
2188         rc = cl_req_prep(env, clerq);
2189         if (rc != 0) {
2190                 CERROR("cl_req_prep failed: %d\n", rc);
2191                 GOTO(out, rc);
2192         }
2193
2194         sort_brw_pages(pga, page_count);
2195         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2196                         pga, &req, crattr->cra_capa, 1, 0);
2197         if (rc != 0) {
2198                 CERROR("prep_req failed: %d\n", rc);
2199                 GOTO(out, rc);
2200         }
2201
2202         req->rq_commit_cb = brw_commit;
2203         req->rq_interpret_reply = brw_interpret;
2204
2205         if (mem_tight != 0)
2206                 req->rq_memalloc = 1;
2207
2208         /* Need to update the timestamps after the request is built in case
2209          * we race with setattr (locally or in queue at OST).  If OST gets
2210          * later setattr before earlier BRW (as determined by the request xid),
2211          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2212          * way to do this in a single call.  bug 10150 */
2213         cl_req_attr_set(env, clerq, crattr,
2214                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2215
2216         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2217
2218         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2219         aa = ptlrpc_req_async_args(req);
2220         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2221         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2222         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2223         cfs_list_splice_init(ext_list, &aa->aa_exts);
2224         aa->aa_clerq = clerq;
2225
2226         /* queued sync pages can be torn down while the pages
2227          * were between the pending list and the rpc */
2228         tmp = NULL;
2229         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2230                 /* only one oap gets a request reference */
2231                 if (tmp == NULL)
2232                         tmp = oap;
2233                 if (oap->oap_interrupted && !req->rq_intr) {
2234                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2235                                         oap, req);
2236                         ptlrpc_mark_interrupted(req);
2237                 }
2238         }
2239         if (tmp != NULL)
2240                 tmp->oap_request = ptlrpc_request_addref(req);
2241
2242         client_obd_list_lock(&cli->cl_loi_list_lock);
2243         starting_offset >>= PAGE_CACHE_SHIFT;
2244         if (cmd == OBD_BRW_READ) {
2245                 cli->cl_r_in_flight++;
2246                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2247                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2248                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2249                                       starting_offset + 1);
2250         } else {
2251                 cli->cl_w_in_flight++;
2252                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2253                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2254                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2255                                       starting_offset + 1);
2256         }
2257         client_obd_list_unlock(&cli->cl_loi_list_lock);
2258
2259         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2260                   page_count, aa, cli->cl_r_in_flight,
2261                   cli->cl_w_in_flight);
2262
2263         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2264          * see which CPU/NUMA node the majority of pages were allocated
2265          * on, and try to assign the async RPC to the CPU core
2266          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2267          *
2268          * But on the other hand, we expect that multiple ptlrpcd
2269          * threads and the initial write sponsor can run in parallel,
2270          * especially when data checksum is enabled, which is CPU-bound
2271          * operation and single ptlrpcd thread cannot process in time.
2272          * So more ptlrpcd threads sharing BRW load
2273          * (with PDL_POLICY_ROUND) seems better.
2274          */
2275         ptlrpcd_add_req(req, pol, -1);
2276         rc = 0;
2277         EXIT;
2278
2279 out:
2280         if (mem_tight != 0)
2281                 cfs_memory_pressure_restore(mpflag);
2282
2283         if (crattr != NULL) {
2284                 capa_put(crattr->cra_capa);
2285                 OBD_FREE(crattr, sizeof(*crattr));
2286         }
2287
2288         if (rc != 0) {
2289                 LASSERT(req == NULL);
2290
2291                 if (oa)
2292                         OBDO_FREE(oa);
2293                 if (pga)
2294                         OBD_FREE(pga, sizeof(*pga) * page_count);
2295                 /* this should happen rarely and is pretty bad, it makes the
2296                  * pending list not follow the dirty order */
2297                 while (!cfs_list_empty(ext_list)) {
2298                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2299                                              oe_link);
2300                         cfs_list_del_init(&ext->oe_link);
2301                         osc_extent_finish(env, ext, 0, rc);
2302                 }
2303                 if (clerq && !IS_ERR(clerq))
2304                         cl_req_completion(env, clerq, rc);
2305         }
2306         RETURN(rc);
2307 }
2308
2309 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2310                                         struct ldlm_enqueue_info *einfo)
2311 {
2312         void *data = einfo->ei_cbdata;
2313         int set = 0;
2314
2315         LASSERT(lock != NULL);
2316         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2317         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2318         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2319         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2320
2321         lock_res_and_lock(lock);
2322         spin_lock(&osc_ast_guard);
2323
2324         if (lock->l_ast_data == NULL)
2325                 lock->l_ast_data = data;
2326         if (lock->l_ast_data == data)
2327                 set = 1;
2328
2329         spin_unlock(&osc_ast_guard);
2330         unlock_res_and_lock(lock);
2331
2332         return set;
2333 }
2334
2335 static int osc_set_data_with_check(struct lustre_handle *lockh,
2336                                    struct ldlm_enqueue_info *einfo)
2337 {
2338         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2339         int set = 0;
2340
2341         if (lock != NULL) {
2342                 set = osc_set_lock_data_with_check(lock, einfo);
2343                 LDLM_LOCK_PUT(lock);
2344         } else
2345                 CERROR("lockh %p, data %p - client evicted?\n",
2346                        lockh, einfo->ei_cbdata);
2347         return set;
2348 }
2349
2350 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2351                              ldlm_iterator_t replace, void *data)
2352 {
2353         struct ldlm_res_id res_id;
2354         struct obd_device *obd = class_exp2obd(exp);
2355
2356         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2357         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2358         return 0;
2359 }
2360
2361 /* find any ldlm lock of the inode in osc
2362  * return 0    not find
2363  *        1    find one
2364  *      < 0    error */
2365 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2366                            ldlm_iterator_t replace, void *data)
2367 {
2368         struct ldlm_res_id res_id;
2369         struct obd_device *obd = class_exp2obd(exp);
2370         int rc = 0;
2371
2372         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2373         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2374         if (rc == LDLM_ITER_STOP)
2375                 return(1);
2376         if (rc == LDLM_ITER_CONTINUE)
2377                 return(0);
2378         return(rc);
2379 }
2380
2381 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2382                             obd_enqueue_update_f upcall, void *cookie,
2383                             __u64 *flags, int agl, int rc)
2384 {
2385         int intent = *flags & LDLM_FL_HAS_INTENT;
2386         ENTRY;
2387
2388         if (intent) {
2389                 /* The request was created before ldlm_cli_enqueue call. */
2390                 if (rc == ELDLM_LOCK_ABORTED) {
2391                         struct ldlm_reply *rep;
2392                         rep = req_capsule_server_get(&req->rq_pill,
2393                                                      &RMF_DLM_REP);
2394
2395                         LASSERT(rep != NULL);
2396                         rep->lock_policy_res1 =
2397                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2398                         if (rep->lock_policy_res1)
2399                                 rc = rep->lock_policy_res1;
2400                 }
2401         }
2402
2403         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2404             (rc == 0)) {
2405                 *flags |= LDLM_FL_LVB_READY;
2406                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2407                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2408         }
2409
2410         /* Call the update callback. */
2411         rc = (*upcall)(cookie, rc);
2412         RETURN(rc);
2413 }
2414
2415 static int osc_enqueue_interpret(const struct lu_env *env,
2416                                  struct ptlrpc_request *req,
2417                                  struct osc_enqueue_args *aa, int rc)
2418 {
2419         struct ldlm_lock *lock;
2420         struct lustre_handle handle;
2421         __u32 mode;
2422         struct ost_lvb *lvb;
2423         __u32 lvb_len;
2424         __u64 *flags = aa->oa_flags;
2425
2426         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2427          * might be freed anytime after lock upcall has been called. */
2428         lustre_handle_copy(&handle, aa->oa_lockh);
2429         mode = aa->oa_ei->ei_mode;
2430
2431         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2432          * be valid. */
2433         lock = ldlm_handle2lock(&handle);
2434
2435         /* Take an additional reference so that a blocking AST that
2436          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2437          * to arrive after an upcall has been executed by
2438          * osc_enqueue_fini(). */
2439         ldlm_lock_addref(&handle, mode);
2440
2441         /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2442         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2443
2444         /* Let CP AST to grant the lock first. */
2445         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2446
2447         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2448                 lvb = NULL;
2449                 lvb_len = 0;
2450         } else {
2451                 lvb = aa->oa_lvb;
2452                 lvb_len = sizeof(*aa->oa_lvb);
2453         }
2454
2455         /* Complete obtaining the lock procedure. */
2456         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2457                                    mode, flags, lvb, lvb_len, &handle, rc);
2458         /* Complete osc stuff. */
2459         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2460                               flags, aa->oa_agl, rc);
2461
2462         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2463
2464         /* Release the lock for async request. */
2465         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2466                 /*
2467                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2468                  * not already released by
2469                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2470                  */
2471                 ldlm_lock_decref(&handle, mode);
2472
2473         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2474                  aa->oa_lockh, req, aa);
2475         ldlm_lock_decref(&handle, mode);
2476         LDLM_LOCK_PUT(lock);
2477         return rc;
2478 }
2479
2480 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2481                         struct lov_oinfo *loi, __u64 flags,
2482                         struct ost_lvb *lvb, __u32 mode, int rc)
2483 {
2484         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2485
2486         if (rc == ELDLM_OK) {
2487                 __u64 tmp;
2488
2489                 LASSERT(lock != NULL);
2490                 loi->loi_lvb = *lvb;
2491                 tmp = loi->loi_lvb.lvb_size;
2492                 /* Extend KMS up to the end of this lock and no further
2493                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2494                 if (tmp > lock->l_policy_data.l_extent.end)
2495                         tmp = lock->l_policy_data.l_extent.end + 1;
2496                 if (tmp >= loi->loi_kms) {
2497                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2498                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2499                         loi_kms_set(loi, tmp);
2500                 } else {
2501                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2502                                    LPU64"; leaving kms="LPU64", end="LPU64,
2503                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2504                                    lock->l_policy_data.l_extent.end);
2505                 }
2506                 ldlm_lock_allow_match(lock);
2507         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2508                 LASSERT(lock != NULL);
2509                 loi->loi_lvb = *lvb;
2510                 ldlm_lock_allow_match(lock);
2511                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2512                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2513                 rc = ELDLM_OK;
2514         }
2515
2516         if (lock != NULL) {
2517                 if (rc != ELDLM_OK)
2518                         ldlm_lock_fail_match(lock);
2519
2520                 LDLM_LOCK_PUT(lock);
2521         }
2522 }
2523 EXPORT_SYMBOL(osc_update_enqueue);
2524
2525 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2526
2527 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2528  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2529  * other synchronous requests, however keeping some locks and trying to obtain
2530  * others may take a considerable amount of time in a case of ost failure; and
2531  * when other sync requests do not get released lock from a client, the client
2532  * is excluded from the cluster -- such scenarious make the life difficult, so
2533  * release locks just after they are obtained. */
2534 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2535                      __u64 *flags, ldlm_policy_data_t *policy,
2536                      struct ost_lvb *lvb, int kms_valid,
2537                      obd_enqueue_update_f upcall, void *cookie,
2538                      struct ldlm_enqueue_info *einfo,
2539                      struct lustre_handle *lockh,
2540                      struct ptlrpc_request_set *rqset, int async, int agl)
2541 {
2542         struct obd_device *obd = exp->exp_obd;
2543         struct ptlrpc_request *req = NULL;
2544         int intent = *flags & LDLM_FL_HAS_INTENT;
2545         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2546         ldlm_mode_t mode;
2547         int rc;
2548         ENTRY;
2549
2550         /* Filesystem lock extents are extended to page boundaries so that
2551          * dealing with the page cache is a little smoother.  */
2552         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2553         policy->l_extent.end |= ~CFS_PAGE_MASK;
2554
2555         /*
2556          * kms is not valid when either object is completely fresh (so that no
2557          * locks are cached), or object was evicted. In the latter case cached
2558          * lock cannot be used, because it would prime inode state with
2559          * potentially stale LVB.
2560          */
2561         if (!kms_valid)
2562                 goto no_match;
2563
2564         /* Next, search for already existing extent locks that will cover us */
2565         /* If we're trying to read, we also search for an existing PW lock.  The
2566          * VFS and page cache already protect us locally, so lots of readers/
2567          * writers can share a single PW lock.
2568          *
2569          * There are problems with conversion deadlocks, so instead of
2570          * converting a read lock to a write lock, we'll just enqueue a new
2571          * one.
2572          *
2573          * At some point we should cancel the read lock instead of making them
2574          * send us a blocking callback, but there are problems with canceling
2575          * locks out from other users right now, too. */
2576         mode = einfo->ei_mode;
2577         if (einfo->ei_mode == LCK_PR)
2578                 mode |= LCK_PW;
2579         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2580                                einfo->ei_type, policy, mode, lockh, 0);
2581         if (mode) {
2582                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2583
2584                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2585                         /* For AGL, if enqueue RPC is sent but the lock is not
2586                          * granted, then skip to process this strpe.
2587                          * Return -ECANCELED to tell the caller. */
2588                         ldlm_lock_decref(lockh, mode);
2589                         LDLM_LOCK_PUT(matched);
2590                         RETURN(-ECANCELED);
2591                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2592                         *flags |= LDLM_FL_LVB_READY;
2593                         /* addref the lock only if not async requests and PW
2594                          * lock is matched whereas we asked for PR. */
2595                         if (!rqset && einfo->ei_mode != mode)
2596                                 ldlm_lock_addref(lockh, LCK_PR);
2597                         if (intent) {
2598                                 /* I would like to be able to ASSERT here that
2599                                  * rss <= kms, but I can't, for reasons which
2600                                  * are explained in lov_enqueue() */
2601                         }
2602
2603                         /* We already have a lock, and it's referenced.
2604                          *
2605                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2606                          * AGL upcall may change it to CLS_HELD directly. */
2607                         (*upcall)(cookie, ELDLM_OK);
2608
2609                         if (einfo->ei_mode != mode)
2610                                 ldlm_lock_decref(lockh, LCK_PW);
2611                         else if (rqset)
2612                                 /* For async requests, decref the lock. */
2613                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2614                         LDLM_LOCK_PUT(matched);
2615                         RETURN(ELDLM_OK);
2616                 } else {
2617                         ldlm_lock_decref(lockh, mode);
2618                         LDLM_LOCK_PUT(matched);
2619                 }
2620         }
2621
2622  no_match:
2623         if (intent) {
2624                 CFS_LIST_HEAD(cancels);
2625                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2626                                            &RQF_LDLM_ENQUEUE_LVB);
2627                 if (req == NULL)
2628                         RETURN(-ENOMEM);
2629
2630                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2631                 if (rc) {
2632                         ptlrpc_request_free(req);
2633                         RETURN(rc);
2634                 }
2635
2636                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2637                                      sizeof *lvb);
2638                 ptlrpc_request_set_replen(req);
2639         }
2640
2641         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2642         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2643
2644         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2645                               sizeof(*lvb), LVB_T_OST, lockh, async);
2646         if (rqset) {
2647                 if (!rc) {
2648                         struct osc_enqueue_args *aa;
2649                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2650                         aa = ptlrpc_req_async_args(req);
2651                         aa->oa_ei = einfo;
2652                         aa->oa_exp = exp;
2653                         aa->oa_flags  = flags;
2654                         aa->oa_upcall = upcall;
2655                         aa->oa_cookie = cookie;
2656                         aa->oa_lvb    = lvb;
2657                         aa->oa_lockh  = lockh;
2658                         aa->oa_agl    = !!agl;
2659
2660                         req->rq_interpret_reply =
2661                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2662                         if (rqset == PTLRPCD_SET)
2663                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2664                         else
2665                                 ptlrpc_set_add_req(rqset, req);
2666                 } else if (intent) {
2667                         ptlrpc_req_finished(req);
2668                 }
2669                 RETURN(rc);
2670         }
2671
2672         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2673         if (intent)
2674                 ptlrpc_req_finished(req);
2675
2676         RETURN(rc);
2677 }
2678
2679 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2680                        struct ldlm_enqueue_info *einfo,
2681                        struct ptlrpc_request_set *rqset)
2682 {
2683         struct ldlm_res_id res_id;
2684         int rc;
2685         ENTRY;
2686
2687         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2688         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2689                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2690                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2691                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2692                               rqset, rqset != NULL, 0);
2693         RETURN(rc);
2694 }
2695
2696 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2697                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2698                    __u64 *flags, void *data, struct lustre_handle *lockh,
2699                    int unref)
2700 {
2701         struct obd_device *obd = exp->exp_obd;
2702         __u64 lflags = *flags;
2703         ldlm_mode_t rc;
2704         ENTRY;
2705
2706         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2707                 RETURN(-EIO);
2708
2709         /* Filesystem lock extents are extended to page boundaries so that
2710          * dealing with the page cache is a little smoother */
2711         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2712         policy->l_extent.end |= ~CFS_PAGE_MASK;
2713
2714         /* Next, search for already existing extent locks that will cover us */
2715         /* If we're trying to read, we also search for an existing PW lock.  The
2716          * VFS and page cache already protect us locally, so lots of readers/
2717          * writers can share a single PW lock. */
2718         rc = mode;
2719         if (mode == LCK_PR)
2720                 rc |= LCK_PW;
2721         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2722                              res_id, type, policy, rc, lockh, unref);
2723         if (rc) {
2724                 if (data != NULL) {
2725                         if (!osc_set_data_with_check(lockh, data)) {
2726                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2727                                         ldlm_lock_decref(lockh, rc);
2728                                 RETURN(0);
2729                         }
2730                 }
2731                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2732                         ldlm_lock_addref(lockh, LCK_PR);
2733                         ldlm_lock_decref(lockh, LCK_PW);
2734                 }
2735                 RETURN(rc);
2736         }
2737         RETURN(rc);
2738 }
2739
2740 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2741 {
2742         ENTRY;
2743
2744         if (unlikely(mode == LCK_GROUP))
2745                 ldlm_lock_decref_and_cancel(lockh, mode);
2746         else
2747                 ldlm_lock_decref(lockh, mode);
2748
2749         RETURN(0);
2750 }
2751
2752 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2753                       __u32 mode, struct lustre_handle *lockh)
2754 {
2755         ENTRY;
2756         RETURN(osc_cancel_base(lockh, mode));
2757 }
2758
2759 static int osc_cancel_unused(struct obd_export *exp,
2760                              struct lov_stripe_md *lsm,
2761                              ldlm_cancel_flags_t flags,
2762                              void *opaque)
2763 {
2764         struct obd_device *obd = class_exp2obd(exp);
2765         struct ldlm_res_id res_id, *resp = NULL;
2766
2767         if (lsm != NULL) {
2768                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2769                 resp = &res_id;
2770         }
2771
2772         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2773 }
2774
2775 static int osc_statfs_interpret(const struct lu_env *env,
2776                                 struct ptlrpc_request *req,
2777                                 struct osc_async_args *aa, int rc)
2778 {
2779         struct obd_statfs *msfs;
2780         ENTRY;
2781
2782         if (rc == -EBADR)
2783                 /* The request has in fact never been sent
2784                  * due to issues at a higher level (LOV).
2785                  * Exit immediately since the caller is
2786                  * aware of the problem and takes care
2787                  * of the clean up */
2788                  RETURN(rc);
2789
2790         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2791             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2792                 GOTO(out, rc = 0);
2793
2794         if (rc != 0)
2795                 GOTO(out, rc);
2796
2797         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2798         if (msfs == NULL) {
2799                 GOTO(out, rc = -EPROTO);
2800         }
2801
2802         *aa->aa_oi->oi_osfs = *msfs;
2803 out:
2804         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2805         RETURN(rc);
2806 }
2807
2808 static int osc_statfs_async(struct obd_export *exp,
2809                             struct obd_info *oinfo, __u64 max_age,
2810                             struct ptlrpc_request_set *rqset)
2811 {
2812         struct obd_device     *obd = class_exp2obd(exp);
2813         struct ptlrpc_request *req;
2814         struct osc_async_args *aa;
2815         int                    rc;
2816         ENTRY;
2817
2818         /* We could possibly pass max_age in the request (as an absolute
2819          * timestamp or a "seconds.usec ago") so the target can avoid doing
2820          * extra calls into the filesystem if that isn't necessary (e.g.
2821          * during mount that would help a bit).  Having relative timestamps
2822          * is not so great if request processing is slow, while absolute
2823          * timestamps are not ideal because they need time synchronization. */
2824         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2825         if (req == NULL)
2826                 RETURN(-ENOMEM);
2827
2828         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2829         if (rc) {
2830                 ptlrpc_request_free(req);
2831                 RETURN(rc);
2832         }
2833         ptlrpc_request_set_replen(req);
2834         req->rq_request_portal = OST_CREATE_PORTAL;
2835         ptlrpc_at_set_req_timeout(req);
2836
2837         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2838                 /* procfs requests not want stat in wait for avoid deadlock */
2839                 req->rq_no_resend = 1;
2840                 req->rq_no_delay = 1;
2841         }
2842
2843         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2844         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2845         aa = ptlrpc_req_async_args(req);
2846         aa->aa_oi = oinfo;
2847
2848         ptlrpc_set_add_req(rqset, req);
2849         RETURN(0);
2850 }
2851
2852 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2853                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2854 {
2855         struct obd_device     *obd = class_exp2obd(exp);
2856         struct obd_statfs     *msfs;
2857         struct ptlrpc_request *req;
2858         struct obd_import     *imp = NULL;
2859         int rc;
2860         ENTRY;
2861
2862         /*Since the request might also come from lprocfs, so we need
2863          *sync this with client_disconnect_export Bug15684*/
2864         down_read(&obd->u.cli.cl_sem);
2865         if (obd->u.cli.cl_import)
2866                 imp = class_import_get(obd->u.cli.cl_import);
2867         up_read(&obd->u.cli.cl_sem);
2868         if (!imp)
2869                 RETURN(-ENODEV);
2870
2871         /* We could possibly pass max_age in the request (as an absolute
2872          * timestamp or a "seconds.usec ago") so the target can avoid doing
2873          * extra calls into the filesystem if that isn't necessary (e.g.
2874          * during mount that would help a bit).  Having relative timestamps
2875          * is not so great if request processing is slow, while absolute
2876          * timestamps are not ideal because they need time synchronization. */
2877         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2878
2879         class_import_put(imp);
2880
2881         if (req == NULL)
2882                 RETURN(-ENOMEM);
2883
2884         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2885         if (rc) {
2886                 ptlrpc_request_free(req);
2887                 RETURN(rc);
2888         }
2889         ptlrpc_request_set_replen(req);
2890         req->rq_request_portal = OST_CREATE_PORTAL;
2891         ptlrpc_at_set_req_timeout(req);
2892
2893         if (flags & OBD_STATFS_NODELAY) {
2894                 /* procfs requests not want stat in wait for avoid deadlock */
2895                 req->rq_no_resend = 1;
2896                 req->rq_no_delay = 1;
2897         }
2898
2899         rc = ptlrpc_queue_wait(req);
2900         if (rc)
2901                 GOTO(out, rc);
2902
2903         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2904         if (msfs == NULL) {
2905                 GOTO(out, rc = -EPROTO);
2906         }
2907
2908         *osfs = *msfs;
2909
2910         EXIT;
2911  out:
2912         ptlrpc_req_finished(req);
2913         return rc;
2914 }
2915
2916 /* Retrieve object striping information.
2917  *
2918  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2919  * the maximum number of OST indices which will fit in the user buffer.
2920  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2921  */
2922 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2923 {
2924         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2925         struct lov_user_md_v3 lum, *lumk;
2926         struct lov_user_ost_data_v1 *lmm_objects;
2927         int rc = 0, lum_size;
2928         ENTRY;
2929
2930         if (!lsm)
2931                 RETURN(-ENODATA);
2932
2933         /* we only need the header part from user space to get lmm_magic and
2934          * lmm_stripe_count, (the header part is common to v1 and v3) */
2935         lum_size = sizeof(struct lov_user_md_v1);
2936         if (copy_from_user(&lum, lump, lum_size))
2937                 RETURN(-EFAULT);
2938
2939         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2940             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2941                 RETURN(-EINVAL);
2942
2943         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2944         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2945         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2946         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2947
2948         /* we can use lov_mds_md_size() to compute lum_size
2949          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2950         if (lum.lmm_stripe_count > 0) {
2951                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2952                 OBD_ALLOC(lumk, lum_size);
2953                 if (!lumk)
2954                         RETURN(-ENOMEM);
2955
2956                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2957                         lmm_objects =
2958                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2959                 else
2960                         lmm_objects = &(lumk->lmm_objects[0]);
2961                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2962         } else {
2963                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2964                 lumk = &lum;
2965         }
2966
2967         lumk->lmm_oi = lsm->lsm_oi;
2968         lumk->lmm_stripe_count = 1;
2969
2970         if (copy_to_user(lump, lumk, lum_size))
2971                 rc = -EFAULT;
2972
2973         if (lumk != &lum)
2974                 OBD_FREE(lumk, lum_size);
2975
2976         RETURN(rc);
2977 }
2978
2979
2980 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2981                          void *karg, void *uarg)
2982 {
2983         struct obd_device *obd = exp->exp_obd;
2984         struct obd_ioctl_data *data = karg;
2985         int err = 0;
2986         ENTRY;
2987
2988         if (!try_module_get(THIS_MODULE)) {
2989                 CERROR("Can't get module. Is it alive?");
2990                 return -EINVAL;
2991         }
2992         switch (cmd) {
2993         case OBD_IOC_LOV_GET_CONFIG: {
2994                 char *buf;
2995                 struct lov_desc *desc;
2996                 struct obd_uuid uuid;
2997
2998                 buf = NULL;
2999                 len = 0;
3000                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3001                         GOTO(out, err = -EINVAL);
3002
3003                 data = (struct obd_ioctl_data *)buf;
3004
3005                 if (sizeof(*desc) > data->ioc_inllen1) {
3006                         obd_ioctl_freedata(buf, len);
3007                         GOTO(out, err = -EINVAL);
3008                 }
3009
3010                 if (data->ioc_inllen2 < sizeof(uuid)) {
3011                         obd_ioctl_freedata(buf, len);
3012                         GOTO(out, err = -EINVAL);
3013                 }
3014
3015                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3016                 desc->ld_tgt_count = 1;
3017                 desc->ld_active_tgt_count = 1;
3018                 desc->ld_default_stripe_count = 1;
3019                 desc->ld_default_stripe_size = 0;
3020                 desc->ld_default_stripe_offset = 0;
3021                 desc->ld_pattern = 0;
3022                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3023
3024                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3025
3026                 err = copy_to_user((void *)uarg, buf, len);
3027                 if (err)
3028                         err = -EFAULT;
3029                 obd_ioctl_freedata(buf, len);
3030                 GOTO(out, err);
3031         }
3032         case LL_IOC_LOV_SETSTRIPE:
3033                 err = obd_alloc_memmd(exp, karg);
3034                 if (err > 0)
3035                         err = 0;
3036                 GOTO(out, err);
3037         case LL_IOC_LOV_GETSTRIPE:
3038                 err = osc_getstripe(karg, uarg);
3039                 GOTO(out, err);
3040         case OBD_IOC_CLIENT_RECOVER:
3041                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3042                                             data->ioc_inlbuf1, 0);
3043                 if (err > 0)
3044                         err = 0;
3045                 GOTO(out, err);
3046         case IOC_OSC_SET_ACTIVE:
3047                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3048                                                data->ioc_offset);
3049                 GOTO(out, err);
3050         case OBD_IOC_POLL_QUOTACHECK:
3051                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3052                 GOTO(out, err);
3053         case OBD_IOC_PING_TARGET:
3054                 err = ptlrpc_obd_ping(obd);
3055                 GOTO(out, err);
3056         default:
3057                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3058                        cmd, current_comm());
3059                 GOTO(out, err = -ENOTTY);
3060         }
3061 out:
3062         module_put(THIS_MODULE);
3063         return err;
3064 }
3065
3066 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3067                         obd_count keylen, void *key, __u32 *vallen, void *val,
3068                         struct lov_stripe_md *lsm)
3069 {
3070         ENTRY;
3071         if (!vallen || !val)
3072                 RETURN(-EFAULT);
3073
3074         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3075                 __u32 *stripe = val;
3076                 *vallen = sizeof(*stripe);
3077                 *stripe = 0;
3078                 RETURN(0);
3079         } else if (KEY_IS(KEY_LAST_ID)) {
3080                 struct ptlrpc_request *req;
3081                 obd_id                *reply;
3082                 char                  *tmp;
3083                 int                    rc;
3084
3085                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3086                                            &RQF_OST_GET_INFO_LAST_ID);
3087                 if (req == NULL)
3088                         RETURN(-ENOMEM);
3089
3090                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3091                                      RCL_CLIENT, keylen);
3092                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3093                 if (rc) {
3094                         ptlrpc_request_free(req);
3095                         RETURN(rc);
3096                 }
3097
3098                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3099                 memcpy(tmp, key, keylen);
3100
3101                 req->rq_no_delay = req->rq_no_resend = 1;
3102                 ptlrpc_request_set_replen(req);
3103                 rc = ptlrpc_queue_wait(req);
3104                 if (rc)
3105                         GOTO(out, rc);
3106
3107                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3108                 if (reply == NULL)
3109                         GOTO(out, rc = -EPROTO);
3110
3111                 *((obd_id *)val) = *reply;
3112         out:
3113                 ptlrpc_req_finished(req);
3114                 RETURN(rc);
3115         } else if (KEY_IS(KEY_FIEMAP)) {
3116                 struct ll_fiemap_info_key *fm_key =
3117                                 (struct ll_fiemap_info_key *)key;
3118                 struct ldlm_res_id       res_id;
3119                 ldlm_policy_data_t       policy;
3120                 struct lustre_handle     lockh;
3121                 ldlm_mode_t              mode = 0;
3122                 struct ptlrpc_request   *req;
3123                 struct ll_user_fiemap   *reply;
3124                 char                    *tmp;
3125                 int                      rc;
3126
3127                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3128                         goto skip_locking;
3129
3130                 policy.l_extent.start = fm_key->fiemap.fm_start &
3131                                                 CFS_PAGE_MASK;
3132
3133                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3134                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3135                         policy.l_extent.end = OBD_OBJECT_EOF;
3136                 else
3137                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3138                                 fm_key->fiemap.fm_length +
3139                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3140
3141                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3142                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3143                                        LDLM_FL_BLOCK_GRANTED |
3144                                        LDLM_FL_LVB_READY,
3145                                        &res_id, LDLM_EXTENT, &policy,
3146                                        LCK_PR | LCK_PW, &lockh, 0);
3147                 if (mode) { /* lock is cached on client */
3148                         if (mode != LCK_PR) {
3149                                 ldlm_lock_addref(&lockh, LCK_PR);
3150                                 ldlm_lock_decref(&lockh, LCK_PW);
3151                         }
3152                 } else { /* no cached lock, needs acquire lock on server side */
3153                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3154                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3155                 }
3156
3157 skip_locking:
3158                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3159                                            &RQF_OST_GET_INFO_FIEMAP);