Whamcloud - gitweb
LU-3187 ost: check pre 2.4 echo client in obdo validation
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
188                              oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217                                      aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
297                              &body->oa);
298
299         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
342                              &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
365                              &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461
462         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
484
485         oa->o_blksize = cli_brw_size(exp->exp_obd);
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_oi = oa->o_oi;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_fsync_args *fa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *fa->fa_oi->oi_oa = body->oa;
589 out:
590         rc = fa->fa_upcall(fa->fa_cookie, rc);
591         RETURN(rc);
592 }
593
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595                   obd_enqueue_update_f upcall, void *cookie,
596                   struct ptlrpc_request_set *rqset)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_fsync_args *fa;
601         int                    rc;
602         ENTRY;
603
604         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605         if (req == NULL)
606                 RETURN(-ENOMEM);
607
608         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
610         if (rc) {
611                 ptlrpc_request_free(req);
612                 RETURN(rc);
613         }
614
615         /* overload the size and blocks fields in the oa with start/end */
616         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
617         LASSERT(body);
618         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
619                              oinfo->oi_oa);
620         osc_pack_capa(req, body, oinfo->oi_capa);
621
622         ptlrpc_request_set_replen(req);
623         req->rq_interpret_reply = osc_sync_interpret;
624
625         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626         fa = ptlrpc_req_async_args(req);
627         fa->fa_oi = oinfo;
628         fa->fa_upcall = upcall;
629         fa->fa_cookie = cookie;
630
631         if (rqset == PTLRPCD_SET)
632                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
633         else
634                 ptlrpc_set_add_req(rqset, req);
635
636         RETURN (0);
637 }
638
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640                     struct obd_info *oinfo, obd_size start, obd_size end,
641                     struct ptlrpc_request_set *set)
642 {
643         ENTRY;
644
645         if (!oinfo->oi_oa) {
646                 CDEBUG(D_INFO, "oa NULL\n");
647                 RETURN(-EINVAL);
648         }
649
650         oinfo->oi_oa->o_size = start;
651         oinfo->oi_oa->o_blocks = end;
652         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
653
654         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    cfs_list_t *cancels,
662                                    ldlm_mode_t mode, int lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668         ENTRY;
669
670         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671          * export) but disabled through procfs (flag in NS).
672          *
673          * This distinguishes from a case when ELC is not supported originally,
674          * when we still want to cancel locks in advance and just cancel them
675          * locally, without sending any RPC. */
676         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677                 RETURN(0);
678
679         ostid_build_res_name(&oa->o_oi, &res_id);
680         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681         if (res == NULL)
682                 RETURN(0);
683
684         LDLM_RESOURCE_ADDREF(res);
685         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686                                            lock_flags, 0, NULL);
687         LDLM_RESOURCE_DELREF(res);
688         ldlm_resource_putref(res);
689         RETURN(count);
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693                                  struct ptlrpc_request *req, void *data,
694                                  int rc)
695 {
696         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698         cfs_atomic_dec(&cli->cl_destroy_in_flight);
699         cfs_waitq_signal(&cli->cl_destroy_waitq);
700         return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
706             cli->cl_max_rpcs_in_flight) {
707                 /* The destroy request can be sent */
708                 return 1;
709         }
710         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
711             cli->cl_max_rpcs_in_flight) {
712                 /*
713                  * The counter has been modified between the two atomic
714                  * operations.
715                  */
716                 cfs_waitq_signal(&cli->cl_destroy_waitq);
717         }
718         return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722                struct obdo *oa, struct lov_stripe_md **ea,
723                struct obd_trans_info *oti)
724 {
725         int rc = 0;
726         ENTRY;
727
728         LASSERT(oa);
729         LASSERT(ea);
730         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
731
732         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733             oa->o_flags == OBD_FL_RECREATE_OBJS) {
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735         }
736
737         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738                 RETURN(osc_real_create(exp, oa, ea, oti));
739
740         /* we should not get here anymore */
741         LBUG();
742
743         RETURN(rc);
744 }
745
746 /* Destroy requests can be async always on the client, and we don't even really
747  * care about the return code since the client cannot do anything at all about
748  * a destroy failure.
749  * When the MDS is unlinking a filename, it saves the file objects into a
750  * recovery llog, and these object records are cancelled when the OST reports
751  * they were destroyed and sync'd to disk (i.e. transaction committed).
752  * If the client dies, or the OST is down when the object should be destroyed,
753  * the records are not cancelled, and when the OST reconnects to the MDS next,
754  * it will retrieve the llog unlink logs and then sends the log cancellation
755  * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757                        struct obdo *oa, struct lov_stripe_md *ea,
758                        struct obd_trans_info *oti, struct obd_export *md_export,
759                        void *capa)
760 {
761         struct client_obd     *cli = &exp->exp_obd->u.cli;
762         struct ptlrpc_request *req;
763         struct ost_body       *body;
764         CFS_LIST_HEAD(cancels);
765         int rc, count;
766         ENTRY;
767
768         if (!oa) {
769                 CDEBUG(D_INFO, "oa NULL\n");
770                 RETURN(-EINVAL);
771         }
772
773         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774                                         LDLM_FL_DISCARD_DATA);
775
776         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
777         if (req == NULL) {
778                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779                 RETURN(-ENOMEM);
780         }
781
782         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
784                                0, &cancels, count);
785         if (rc) {
786                 ptlrpc_request_free(req);
787                 RETURN(rc);
788         }
789
790         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791         ptlrpc_at_set_req_timeout(req);
792
793         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794                 oa->o_lcookie = *oti->oti_logcookies;
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
798
799         osc_pack_capa(req, body, (struct obd_capa *)capa);
800         ptlrpc_request_set_replen(req);
801
802         /* If osc_destory is for destroying the unlink orphan,
803          * sent from MDT to OST, which should not be blocked here,
804          * because the process might be triggered by ptlrpcd, and
805          * it is not good to block ptlrpcd thread (b=16006)*/
806         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807                 req->rq_interpret_reply = osc_destroy_interpret;
808                 if (!osc_can_send_destroy(cli)) {
809                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810                                                           NULL);
811
812                         /*
813                          * Wait until the number of on-going destroy RPCs drops
814                          * under max_rpc_in_flight
815                          */
816                         l_wait_event_exclusive(cli->cl_destroy_waitq,
817                                                osc_can_send_destroy(cli), &lwi);
818                 }
819         }
820
821         /* Do not wait for response */
822         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823         RETURN(0);
824 }
825
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
827                                 long writing_bytes)
828 {
829         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
830
831         LASSERT(!(oa->o_valid & bits));
832
833         oa->o_valid |= bits;
834         client_obd_list_lock(&cli->cl_loi_list_lock);
835         oa->o_dirty = cli->cl_dirty;
836         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837                      cli->cl_dirty_max)) {
838                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
842                             cfs_atomic_read(&obd_dirty_transit_pages) >
843                             (long)(obd_max_dirty_pages + 1))) {
844                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
845                  * not covered by a lock thus they may safely race and trip
846                  * this CERROR() unless we add in a small fudge factor (+1). */
847                 CERROR("dirty %d - %d > system dirty_max %d\n",
848                        cfs_atomic_read(&obd_dirty_pages),
849                        cfs_atomic_read(&obd_dirty_transit_pages),
850                        obd_max_dirty_pages);
851                 oa->o_undirty = 0;
852         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
853                 CERROR("dirty %lu - dirty_max %lu too big???\n",
854                        cli->cl_dirty, cli->cl_dirty_max);
855                 oa->o_undirty = 0;
856         } else {
857                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
858                                       CFS_PAGE_SHIFT)*
859                                      (cli->cl_max_rpcs_in_flight + 1);
860                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
861         }
862         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
863         oa->o_dropped = cli->cl_lost_grant;
864         cli->cl_lost_grant = 0;
865         client_obd_list_unlock(&cli->cl_loi_list_lock);
866         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
867                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
868
869 }
870
871 void osc_update_next_shrink(struct client_obd *cli)
872 {
873         cli->cl_next_shrink_grant =
874                 cfs_time_shift(cli->cl_grant_shrink_interval);
875         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
876                cli->cl_next_shrink_grant);
877 }
878
879 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
880 {
881         client_obd_list_lock(&cli->cl_loi_list_lock);
882         cli->cl_avail_grant += grant;
883         client_obd_list_unlock(&cli->cl_loi_list_lock);
884 }
885
886 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
887 {
888         if (body->oa.o_valid & OBD_MD_FLGRANT) {
889                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
890                 __osc_update_grant(cli, body->oa.o_grant);
891         }
892 }
893
894 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
895                               obd_count keylen, void *key, obd_count vallen,
896                               void *val, struct ptlrpc_request_set *set);
897
898 static int osc_shrink_grant_interpret(const struct lu_env *env,
899                                       struct ptlrpc_request *req,
900                                       void *aa, int rc)
901 {
902         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
903         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
904         struct ost_body *body;
905
906         if (rc != 0) {
907                 __osc_update_grant(cli, oa->o_grant);
908                 GOTO(out, rc);
909         }
910
911         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
912         LASSERT(body);
913         osc_update_grant(cli, body);
914 out:
915         OBDO_FREE(oa);
916         return rc;
917 }
918
919 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
920 {
921         client_obd_list_lock(&cli->cl_loi_list_lock);
922         oa->o_grant = cli->cl_avail_grant / 4;
923         cli->cl_avail_grant -= oa->o_grant;
924         client_obd_list_unlock(&cli->cl_loi_list_lock);
925         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
926                 oa->o_valid |= OBD_MD_FLFLAGS;
927                 oa->o_flags = 0;
928         }
929         oa->o_flags |= OBD_FL_SHRINK_GRANT;
930         osc_update_next_shrink(cli);
931 }
932
933 /* Shrink the current grant, either from some large amount to enough for a
934  * full set of in-flight RPCs, or if we have already shrunk to that limit
935  * then to enough for a single RPC.  This avoids keeping more grant than
936  * needed, and avoids shrinking the grant piecemeal. */
937 static int osc_shrink_grant(struct client_obd *cli)
938 {
939         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
940                              (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT);
941
942         client_obd_list_lock(&cli->cl_loi_list_lock);
943         if (cli->cl_avail_grant <= target_bytes)
944                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
945         client_obd_list_unlock(&cli->cl_loi_list_lock);
946
947         return osc_shrink_grant_to_target(cli, target_bytes);
948 }
949
950 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
951 {
952         int                     rc = 0;
953         struct ost_body        *body;
954         ENTRY;
955
956         client_obd_list_lock(&cli->cl_loi_list_lock);
957         /* Don't shrink if we are already above or below the desired limit
958          * We don't want to shrink below a single RPC, as that will negatively
959          * impact block allocation and long-term performance. */
960         if (target_bytes < cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)
961                 target_bytes = cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
962
963         if (target_bytes >= cli->cl_avail_grant) {
964                 client_obd_list_unlock(&cli->cl_loi_list_lock);
965                 RETURN(0);
966         }
967         client_obd_list_unlock(&cli->cl_loi_list_lock);
968
969         OBD_ALLOC_PTR(body);
970         if (!body)
971                 RETURN(-ENOMEM);
972
973         osc_announce_cached(cli, &body->oa, 0);
974
975         client_obd_list_lock(&cli->cl_loi_list_lock);
976         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
977         cli->cl_avail_grant = target_bytes;
978         client_obd_list_unlock(&cli->cl_loi_list_lock);
979         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
980                 body->oa.o_valid |= OBD_MD_FLFLAGS;
981                 body->oa.o_flags = 0;
982         }
983         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
984         osc_update_next_shrink(cli);
985
986         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
987                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
988                                 sizeof(*body), body, NULL);
989         if (rc != 0)
990                 __osc_update_grant(cli, body->oa.o_grant);
991         OBD_FREE_PTR(body);
992         RETURN(rc);
993 }
994
995 static int osc_should_shrink_grant(struct client_obd *client)
996 {
997         cfs_time_t time = cfs_time_current();
998         cfs_time_t next_shrink = client->cl_next_shrink_grant;
999
1000         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1001              OBD_CONNECT_GRANT_SHRINK) == 0)
1002                 return 0;
1003
1004         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1005                 /* Get the current RPC size directly, instead of going via:
1006                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1007                  * Keep comment here so that it can be found by searching. */
1008                 int brw_size = client->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
1009
1010                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1011                     client->cl_avail_grant > brw_size)
1012                         return 1;
1013                 else
1014                         osc_update_next_shrink(client);
1015         }
1016         return 0;
1017 }
1018
1019 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1020 {
1021         struct client_obd *client;
1022
1023         cfs_list_for_each_entry(client, &item->ti_obd_list,
1024                                 cl_grant_shrink_list) {
1025                 if (osc_should_shrink_grant(client))
1026                         osc_shrink_grant(client);
1027         }
1028         return 0;
1029 }
1030
1031 static int osc_add_shrink_grant(struct client_obd *client)
1032 {
1033         int rc;
1034
1035         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1036                                        TIMEOUT_GRANT,
1037                                        osc_grant_shrink_grant_cb, NULL,
1038                                        &client->cl_grant_shrink_list);
1039         if (rc) {
1040                 CERROR("add grant client %s error %d\n",
1041                         client->cl_import->imp_obd->obd_name, rc);
1042                 return rc;
1043         }
1044         CDEBUG(D_CACHE, "add grant client %s \n",
1045                client->cl_import->imp_obd->obd_name);
1046         osc_update_next_shrink(client);
1047         return 0;
1048 }
1049
1050 static int osc_del_shrink_grant(struct client_obd *client)
1051 {
1052         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1053                                          TIMEOUT_GRANT);
1054 }
1055
1056 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1057 {
1058         /*
1059          * ocd_grant is the total grant amount we're expect to hold: if we've
1060          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1061          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1062          *
1063          * race is tolerable here: if we're evicted, but imp_state already
1064          * left EVICTED state, then cl_dirty must be 0 already.
1065          */
1066         client_obd_list_lock(&cli->cl_loi_list_lock);
1067         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1068                 cli->cl_avail_grant = ocd->ocd_grant;
1069         else
1070                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1071
1072         if (cli->cl_avail_grant < 0) {
1073                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1074                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1075                       ocd->ocd_grant, cli->cl_dirty);
1076                 /* workaround for servers which do not have the patch from
1077                  * LU-2679 */
1078                 cli->cl_avail_grant = ocd->ocd_grant;
1079         }
1080
1081         /* determine the appropriate chunk size used by osc_extent. */
1082         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1083         client_obd_list_unlock(&cli->cl_loi_list_lock);
1084
1085         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1086                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1087                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1088
1089         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1090             cfs_list_empty(&cli->cl_grant_shrink_list))
1091                 osc_add_shrink_grant(cli);
1092 }
1093
1094 /* We assume that the reason this OSC got a short read is because it read
1095  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1096  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1097  * this stripe never got written at or beyond this stripe offset yet. */
1098 static void handle_short_read(int nob_read, obd_count page_count,
1099                               struct brw_page **pga)
1100 {
1101         char *ptr;
1102         int i = 0;
1103
1104         /* skip bytes read OK */
1105         while (nob_read > 0) {
1106                 LASSERT (page_count > 0);
1107
1108                 if (pga[i]->count > nob_read) {
1109                         /* EOF inside this page */
1110                         ptr = cfs_kmap(pga[i]->pg) +
1111                                 (pga[i]->off & ~CFS_PAGE_MASK);
1112                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1113                         cfs_kunmap(pga[i]->pg);
1114                         page_count--;
1115                         i++;
1116                         break;
1117                 }
1118
1119                 nob_read -= pga[i]->count;
1120                 page_count--;
1121                 i++;
1122         }
1123
1124         /* zero remaining pages */
1125         while (page_count-- > 0) {
1126                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1127                 memset(ptr, 0, pga[i]->count);
1128                 cfs_kunmap(pga[i]->pg);
1129                 i++;
1130         }
1131 }
1132
1133 static int check_write_rcs(struct ptlrpc_request *req,
1134                            int requested_nob, int niocount,
1135                            obd_count page_count, struct brw_page **pga)
1136 {
1137         int     i;
1138         __u32   *remote_rcs;
1139
1140         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1141                                                   sizeof(*remote_rcs) *
1142                                                   niocount);
1143         if (remote_rcs == NULL) {
1144                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1145                 return(-EPROTO);
1146         }
1147
1148         /* return error if any niobuf was in error */
1149         for (i = 0; i < niocount; i++) {
1150                 if ((int)remote_rcs[i] < 0)
1151                         return(remote_rcs[i]);
1152
1153                 if (remote_rcs[i] != 0) {
1154                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1155                                 i, remote_rcs[i], req);
1156                         return(-EPROTO);
1157                 }
1158         }
1159
1160         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1161                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1162                        req->rq_bulk->bd_nob_transferred, requested_nob);
1163                 return(-EPROTO);
1164         }
1165
1166         return (0);
1167 }
1168
1169 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1170 {
1171         if (p1->flag != p2->flag) {
1172                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1173                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1174
1175                 /* warn if we try to combine flags that we don't know to be
1176                  * safe to combine */
1177                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1178                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1179                               "report this at http://bugs.whamcloud.com/\n",
1180                               p1->flag, p2->flag);
1181                 }
1182                 return 0;
1183         }
1184
1185         return (p1->off + p1->count == p2->off);
1186 }
1187
1188 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1189                                    struct brw_page **pga, int opc,
1190                                    cksum_type_t cksum_type)
1191 {
1192         __u32                           cksum;
1193         int                             i = 0;
1194         struct cfs_crypto_hash_desc     *hdesc;
1195         unsigned int                    bufsize;
1196         int                             err;
1197         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1198
1199         LASSERT(pg_count > 0);
1200
1201         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1202         if (IS_ERR(hdesc)) {
1203                 CERROR("Unable to initialize checksum hash %s\n",
1204                        cfs_crypto_hash_name(cfs_alg));
1205                 return PTR_ERR(hdesc);
1206         }
1207
1208         while (nob > 0 && pg_count > 0) {
1209                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1210
1211                 /* corrupt the data before we compute the checksum, to
1212                  * simulate an OST->client data error */
1213                 if (i == 0 && opc == OST_READ &&
1214                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1215                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1216                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1217                         memcpy(ptr + off, "bad1", min(4, nob));
1218                         cfs_kunmap(pga[i]->pg);
1219                 }
1220                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1221                                   pga[i]->off & ~CFS_PAGE_MASK,
1222                                   count);
1223                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1224                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1225
1226                 nob -= pga[i]->count;
1227                 pg_count--;
1228                 i++;
1229         }
1230
1231         bufsize = 4;
1232         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234         if (err)
1235                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237         /* For sending we only compute the wrong checksum instead
1238          * of corrupting the data so it is still correct on a redo */
1239         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240                 cksum++;
1241
1242         return cksum;
1243 }
1244
1245 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1246                                 struct lov_stripe_md *lsm, obd_count page_count,
1247                                 struct brw_page **pga,
1248                                 struct ptlrpc_request **reqp,
1249                                 struct obd_capa *ocapa, int reserve,
1250                                 int resend)
1251 {
1252         struct ptlrpc_request   *req;
1253         struct ptlrpc_bulk_desc *desc;
1254         struct ost_body         *body;
1255         struct obd_ioobj        *ioobj;
1256         struct niobuf_remote    *niobuf;
1257         int niocount, i, requested_nob, opc, rc;
1258         struct osc_brw_async_args *aa;
1259         struct req_capsule      *pill;
1260         struct brw_page *pg_prev;
1261
1262         ENTRY;
1263         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1264                 RETURN(-ENOMEM); /* Recoverable */
1265         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1266                 RETURN(-EINVAL); /* Fatal */
1267
1268         if ((cmd & OBD_BRW_WRITE) != 0) {
1269                 opc = OST_WRITE;
1270                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271                                                 cli->cl_import->imp_rq_pool,
1272                                                 &RQF_OST_BRW_WRITE);
1273         } else {
1274                 opc = OST_READ;
1275                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276         }
1277         if (req == NULL)
1278                 RETURN(-ENOMEM);
1279
1280         for (niocount = i = 1; i < page_count; i++) {
1281                 if (!can_merge_pages(pga[i - 1], pga[i]))
1282                         niocount++;
1283         }
1284
1285         pill = &req->rq_pill;
1286         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287                              sizeof(*ioobj));
1288         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289                              niocount * sizeof(*niobuf));
1290         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293         if (rc) {
1294                 ptlrpc_request_free(req);
1295                 RETURN(rc);
1296         }
1297         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298         ptlrpc_at_set_req_timeout(req);
1299         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300          * retry logic */
1301         req->rq_no_retry_einprogress = 1;
1302
1303         desc = ptlrpc_prep_bulk_imp(req, page_count,
1304                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306                 OST_BULK_PORTAL);
1307
1308         if (desc == NULL)
1309                 GOTO(out, rc = -ENOMEM);
1310         /* NB request now owns desc and will free it when it gets freed */
1311
1312         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1313         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1314         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1315         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1316
1317         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1318
1319         obdo_to_ioobj(oa, ioobj);
1320         ioobj->ioo_bufcnt = niocount;
1321         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1322          * that might be send for this request.  The actual number is decided
1323          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1324          * "max - 1" for old client compatibility sending "0", and also so the
1325          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1326         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1327         osc_pack_capa(req, body, ocapa);
1328         LASSERT(page_count > 0);
1329         pg_prev = pga[0];
1330         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1331                 struct brw_page *pg = pga[i];
1332                 int poff = pg->off & ~CFS_PAGE_MASK;
1333
1334                 LASSERT(pg->count > 0);
1335                 /* make sure there is no gap in the middle of page array */
1336                 LASSERTF(page_count == 1 ||
1337                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1338                           ergo(i > 0 && i < page_count - 1,
1339                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1340                           ergo(i == page_count - 1, poff == 0)),
1341                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1342                          i, page_count, pg, pg->off, pg->count);
1343 #ifdef __linux__
1344                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1346                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1347                          i, page_count,
1348                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349                          pg_prev->pg, page_private(pg_prev->pg),
1350                          pg_prev->pg->index, pg_prev->off);
1351 #else
1352                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1353                          "i %d p_c %u\n", i, page_count);
1354 #endif
1355                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1356                         (pg->flag & OBD_BRW_SRVLOCK));
1357
1358                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1359                 requested_nob += pg->count;
1360
1361                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1362                         niobuf--;
1363                         niobuf->len += pg->count;
1364                 } else {
1365                         niobuf->offset = pg->off;
1366                         niobuf->len    = pg->count;
1367                         niobuf->flags  = pg->flag;
1368                 }
1369                 pg_prev = pg;
1370         }
1371
1372         LASSERTF((void *)(niobuf - niocount) ==
1373                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1374                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1375                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1376
1377         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1378         if (resend) {
1379                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1380                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1381                         body->oa.o_flags = 0;
1382                 }
1383                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1384         }
1385
1386         if (osc_should_shrink_grant(cli))
1387                 osc_shrink_grant_local(cli, &body->oa);
1388
1389         /* size[REQ_REC_OFF] still sizeof (*body) */
1390         if (opc == OST_WRITE) {
1391                 if (cli->cl_checksum &&
1392                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1393                         /* store cl_cksum_type in a local variable since
1394                          * it can be changed via lprocfs */
1395                         cksum_type_t cksum_type = cli->cl_cksum_type;
1396
1397                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1398                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1399                                 body->oa.o_flags = 0;
1400                         }
1401                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1402                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1404                                                              page_count, pga,
1405                                                              OST_WRITE,
1406                                                              cksum_type);
1407                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1408                                body->oa.o_cksum);
1409                         /* save this in 'oa', too, for later checking */
1410                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1411                         oa->o_flags |= cksum_type_pack(cksum_type);
1412                 } else {
1413                         /* clear out the checksum flag, in case this is a
1414                          * resend but cl_checksum is no longer set. b=11238 */
1415                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1416                 }
1417                 oa->o_cksum = body->oa.o_cksum;
1418                 /* 1 RC per niobuf */
1419                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1420                                      sizeof(__u32) * niocount);
1421         } else {
1422                 if (cli->cl_checksum &&
1423                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1424                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1425                                 body->oa.o_flags = 0;
1426                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1427                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1428                 }
1429         }
1430         ptlrpc_request_set_replen(req);
1431
1432         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1433         aa = ptlrpc_req_async_args(req);
1434         aa->aa_oa = oa;
1435         aa->aa_requested_nob = requested_nob;
1436         aa->aa_nio_count = niocount;
1437         aa->aa_page_count = page_count;
1438         aa->aa_resends = 0;
1439         aa->aa_ppga = pga;
1440         aa->aa_cli = cli;
1441         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1442         if (ocapa && reserve)
1443                 aa->aa_ocapa = capa_get(ocapa);
1444
1445         *reqp = req;
1446         RETURN(0);
1447
1448  out:
1449         ptlrpc_req_finished(req);
1450         RETURN(rc);
1451 }
1452
1453 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1454                                 __u32 client_cksum, __u32 server_cksum, int nob,
1455                                 obd_count page_count, struct brw_page **pga,
1456                                 cksum_type_t client_cksum_type)
1457 {
1458         __u32 new_cksum;
1459         char *msg;
1460         cksum_type_t cksum_type;
1461
1462         if (server_cksum == client_cksum) {
1463                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1464                 return 0;
1465         }
1466
1467         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1468                                        oa->o_flags : 0);
1469         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1470                                       cksum_type);
1471
1472         if (cksum_type != client_cksum_type)
1473                 msg = "the server did not use the checksum type specified in "
1474                       "the original request - likely a protocol problem";
1475         else if (new_cksum == server_cksum)
1476                 msg = "changed on the client after we checksummed it - "
1477                       "likely false positive due to mmap IO (bug 11742)";
1478         else if (new_cksum == client_cksum)
1479                 msg = "changed in transit before arrival at OST";
1480         else
1481                 msg = "changed in transit AND doesn't match the original - "
1482                       "likely false positive due to mmap IO (bug 11742)";
1483
1484         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1485                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1486                            msg, libcfs_nid2str(peer->nid),
1487                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1488                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1489                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1490                            POSTID(&oa->o_oi), pga[0]->off,
1491                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1492         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1493                "client csum now %x\n", client_cksum, client_cksum_type,
1494                server_cksum, cksum_type, new_cksum);
1495         return 1;
1496 }
1497
1498 /* Note rc enters this function as number of bytes transferred */
1499 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1500 {
1501         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1502         const lnet_process_id_t *peer =
1503                         &req->rq_import->imp_connection->c_peer;
1504         struct client_obd *cli = aa->aa_cli;
1505         struct ost_body *body;
1506         __u32 client_cksum = 0;
1507         ENTRY;
1508
1509         if (rc < 0 && rc != -EDQUOT) {
1510                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1511                 RETURN(rc);
1512         }
1513
1514         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1515         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1516         if (body == NULL) {
1517                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1518                 RETURN(-EPROTO);
1519         }
1520
1521         /* set/clear over quota flag for a uid/gid */
1522         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1523             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1524                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1525
1526                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1527                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1528                        body->oa.o_flags);
1529                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1530         }
1531
1532         osc_update_grant(cli, body);
1533
1534         if (rc < 0)
1535                 RETURN(rc);
1536
1537         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1538                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1539
1540         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1541                 if (rc > 0) {
1542                         CERROR("Unexpected +ve rc %d\n", rc);
1543                         RETURN(-EPROTO);
1544                 }
1545                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1546
1547                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1548                         RETURN(-EAGAIN);
1549
1550                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1551                     check_write_checksum(&body->oa, peer, client_cksum,
1552                                          body->oa.o_cksum, aa->aa_requested_nob,
1553                                          aa->aa_page_count, aa->aa_ppga,
1554                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1555                         RETURN(-EAGAIN);
1556
1557                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1558                                      aa->aa_page_count, aa->aa_ppga);
1559                 GOTO(out, rc);
1560         }
1561
1562         /* The rest of this function executes only for OST_READs */
1563
1564         /* if unwrap_bulk failed, return -EAGAIN to retry */
1565         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1566         if (rc < 0)
1567                 GOTO(out, rc = -EAGAIN);
1568
1569         if (rc > aa->aa_requested_nob) {
1570                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1571                        aa->aa_requested_nob);
1572                 RETURN(-EPROTO);
1573         }
1574
1575         if (rc != req->rq_bulk->bd_nob_transferred) {
1576                 CERROR ("Unexpected rc %d (%d transferred)\n",
1577                         rc, req->rq_bulk->bd_nob_transferred);
1578                 return (-EPROTO);
1579         }
1580
1581         if (rc < aa->aa_requested_nob)
1582                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1583
1584         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1585                 static int cksum_counter;
1586                 __u32      server_cksum = body->oa.o_cksum;
1587                 char      *via;
1588                 char      *router;
1589                 cksum_type_t cksum_type;
1590
1591                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1592                                                body->oa.o_flags : 0);
1593                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1594                                                  aa->aa_ppga, OST_READ,
1595                                                  cksum_type);
1596
1597                 if (peer->nid == req->rq_bulk->bd_sender) {
1598                         via = router = "";
1599                 } else {
1600                         via = " via ";
1601                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1602                 }
1603
1604                 if (server_cksum == ~0 && rc > 0) {
1605                         CERROR("Protocol error: server %s set the 'checksum' "
1606                                "bit, but didn't send a checksum.  Not fatal, "
1607                                "but please notify on http://bugs.whamcloud.com/\n",
1608                                libcfs_nid2str(peer->nid));
1609                 } else if (server_cksum != client_cksum) {
1610                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1611                                            "%s%s%s inode "DFID" object "DOSTID
1612                                            " extent ["LPU64"-"LPU64"]\n",
1613                                            req->rq_import->imp_obd->obd_name,
1614                                            libcfs_nid2str(peer->nid),
1615                                            via, router,
1616                                            body->oa.o_valid & OBD_MD_FLFID ?
1617                                                 body->oa.o_parent_seq : (__u64)0,
1618                                            body->oa.o_valid & OBD_MD_FLFID ?
1619                                                 body->oa.o_parent_oid : 0,
1620                                            body->oa.o_valid & OBD_MD_FLFID ?
1621                                                 body->oa.o_parent_ver : 0,
1622                                            POSTID(&body->oa.o_oi),
1623                                            aa->aa_ppga[0]->off,
1624                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1625                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1626                                                                         1);
1627                         CERROR("client %x, server %x, cksum_type %x\n",
1628                                client_cksum, server_cksum, cksum_type);
1629                         cksum_counter = 0;
1630                         aa->aa_oa->o_cksum = client_cksum;
1631                         rc = -EAGAIN;
1632                 } else {
1633                         cksum_counter++;
1634                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1635                         rc = 0;
1636                 }
1637         } else if (unlikely(client_cksum)) {
1638                 static int cksum_missed;
1639
1640                 cksum_missed++;
1641                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1642                         CERROR("Checksum %u requested from %s but not sent\n",
1643                                cksum_missed, libcfs_nid2str(peer->nid));
1644         } else {
1645                 rc = 0;
1646         }
1647 out:
1648         if (rc >= 0)
1649                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1650                                      aa->aa_oa, &body->oa);
1651
1652         RETURN(rc);
1653 }
1654
1655 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1656                             struct lov_stripe_md *lsm,
1657                             obd_count page_count, struct brw_page **pga,
1658                             struct obd_capa *ocapa)
1659 {
1660         struct ptlrpc_request *req;
1661         int                    rc;
1662         cfs_waitq_t            waitq;
1663         int                    generation, resends = 0;
1664         struct l_wait_info     lwi;
1665
1666         ENTRY;
1667
1668         cfs_waitq_init(&waitq);
1669         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1670
1671 restart_bulk:
1672         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1673                                   page_count, pga, &req, ocapa, 0, resends);
1674         if (rc != 0)
1675                 return (rc);
1676
1677         if (resends) {
1678                 req->rq_generation_set = 1;
1679                 req->rq_import_generation = generation;
1680                 req->rq_sent = cfs_time_current_sec() + resends;
1681         }
1682
1683         rc = ptlrpc_queue_wait(req);
1684
1685         if (rc == -ETIMEDOUT && req->rq_resend) {
1686                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1687                 ptlrpc_req_finished(req);
1688                 goto restart_bulk;
1689         }
1690
1691         rc = osc_brw_fini_request(req, rc);
1692
1693         ptlrpc_req_finished(req);
1694         /* When server return -EINPROGRESS, client should always retry
1695          * regardless of the number of times the bulk was resent already.*/
1696         if (osc_recoverable_error(rc)) {
1697                 resends++;
1698                 if (rc != -EINPROGRESS &&
1699                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1700                         CERROR("%s: too many resend retries for object: "
1701                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1702                                POSTID(&oa->o_oi), rc);
1703                         goto out;
1704                 }
1705                 if (generation !=
1706                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1707                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1708                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1709                                POSTID(&oa->o_oi), rc);
1710                         goto out;
1711                 }
1712
1713                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1714                                        NULL);
1715                 l_wait_event(waitq, 0, &lwi);
1716
1717                 goto restart_bulk;
1718         }
1719 out:
1720         if (rc == -EAGAIN || rc == -EINPROGRESS)
1721                 rc = -EIO;
1722         RETURN (rc);
1723 }
1724
1725 static int osc_brw_redo_request(struct ptlrpc_request *request,
1726                                 struct osc_brw_async_args *aa, int rc)
1727 {
1728         struct ptlrpc_request *new_req;
1729         struct osc_brw_async_args *new_aa;
1730         struct osc_async_page *oap;
1731         ENTRY;
1732
1733         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1734                   "redo for recoverable error %d", rc);
1735
1736         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1737                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1738                                   aa->aa_cli, aa->aa_oa,
1739                                   NULL /* lsm unused by osc currently */,
1740                                   aa->aa_page_count, aa->aa_ppga,
1741                                   &new_req, aa->aa_ocapa, 0, 1);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1746                 if (oap->oap_request != NULL) {
1747                         LASSERTF(request == oap->oap_request,
1748                                  "request %p != oap_request %p\n",
1749                                  request, oap->oap_request);
1750                         if (oap->oap_interrupted) {
1751                                 ptlrpc_req_finished(new_req);
1752                                 RETURN(-EINTR);
1753                         }
1754                 }
1755         }
1756         /* New request takes over pga and oaps from old request.
1757          * Note that copying a list_head doesn't work, need to move it... */
1758         aa->aa_resends++;
1759         new_req->rq_interpret_reply = request->rq_interpret_reply;
1760         new_req->rq_async_args = request->rq_async_args;
1761         /* cap resend delay to the current request timeout, this is similar to
1762          * what ptlrpc does (see after_reply()) */
1763         if (aa->aa_resends > new_req->rq_timeout)
1764                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1765         else
1766                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1767         new_req->rq_generation_set = 1;
1768         new_req->rq_import_generation = request->rq_import_generation;
1769
1770         new_aa = ptlrpc_req_async_args(new_req);
1771
1772         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1773         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1774         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1775         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1776         new_aa->aa_resends = aa->aa_resends;
1777
1778         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1779                 if (oap->oap_request) {
1780                         ptlrpc_req_finished(oap->oap_request);
1781                         oap->oap_request = ptlrpc_request_addref(new_req);
1782                 }
1783         }
1784
1785         new_aa->aa_ocapa = aa->aa_ocapa;
1786         aa->aa_ocapa = NULL;
1787
1788         /* XXX: This code will run into problem if we're going to support
1789          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1790          * and wait for all of them to be finished. We should inherit request
1791          * set from old request. */
1792         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1793
1794         DEBUG_REQ(D_INFO, new_req, "new request");
1795         RETURN(0);
1796 }
1797
1798 /*
1799  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1800  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1801  * fine for our small page arrays and doesn't require allocation.  its an
1802  * insertion sort that swaps elements that are strides apart, shrinking the
1803  * stride down until its '1' and the array is sorted.
1804  */
1805 static void sort_brw_pages(struct brw_page **array, int num)
1806 {
1807         int stride, i, j;
1808         struct brw_page *tmp;
1809
1810         if (num == 1)
1811                 return;
1812         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1813                 ;
1814
1815         do {
1816                 stride /= 3;
1817                 for (i = stride ; i < num ; i++) {
1818                         tmp = array[i];
1819                         j = i;
1820                         while (j >= stride && array[j - stride]->off > tmp->off) {
1821                                 array[j] = array[j - stride];
1822                                 j -= stride;
1823                         }
1824                         array[j] = tmp;
1825                 }
1826         } while (stride > 1);
1827 }
1828
1829 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1830 {
1831         int count = 1;
1832         int offset;
1833         int i = 0;
1834
1835         LASSERT (pages > 0);
1836         offset = pg[i]->off & ~CFS_PAGE_MASK;
1837
1838         for (;;) {
1839                 pages--;
1840                 if (pages == 0)         /* that's all */
1841                         return count;
1842
1843                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1844                         return count;   /* doesn't end on page boundary */
1845
1846                 i++;
1847                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1848                 if (offset != 0)        /* doesn't start on page boundary */
1849                         return count;
1850
1851                 count++;
1852         }
1853 }
1854
1855 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1856 {
1857         struct brw_page **ppga;
1858         int i;
1859
1860         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1861         if (ppga == NULL)
1862                 return NULL;
1863
1864         for (i = 0; i < count; i++)
1865                 ppga[i] = pga + i;
1866         return ppga;
1867 }
1868
1869 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1870 {
1871         LASSERT(ppga != NULL);
1872         OBD_FREE(ppga, sizeof(*ppga) * count);
1873 }
1874
1875 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1876                    obd_count page_count, struct brw_page *pga,
1877                    struct obd_trans_info *oti)
1878 {
1879         struct obdo *saved_oa = NULL;
1880         struct brw_page **ppga, **orig;
1881         struct obd_import *imp = class_exp2cliimp(exp);
1882         struct client_obd *cli;
1883         int rc, page_count_orig;
1884         ENTRY;
1885
1886         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1887         cli = &imp->imp_obd->u.cli;
1888
1889         if (cmd & OBD_BRW_CHECK) {
1890                 /* The caller just wants to know if there's a chance that this
1891                  * I/O can succeed */
1892
1893                 if (imp->imp_invalid)
1894                         RETURN(-EIO);
1895                 RETURN(0);
1896         }
1897
1898         /* test_brw with a failed create can trip this, maybe others. */
1899         LASSERT(cli->cl_max_pages_per_rpc);
1900
1901         rc = 0;
1902
1903         orig = ppga = osc_build_ppga(pga, page_count);
1904         if (ppga == NULL)
1905                 RETURN(-ENOMEM);
1906         page_count_orig = page_count;
1907
1908         sort_brw_pages(ppga, page_count);
1909         while (page_count) {
1910                 obd_count pages_per_brw;
1911
1912                 if (page_count > cli->cl_max_pages_per_rpc)
1913                         pages_per_brw = cli->cl_max_pages_per_rpc;
1914                 else
1915                         pages_per_brw = page_count;
1916
1917                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1918
1919                 if (saved_oa != NULL) {
1920                         /* restore previously saved oa */
1921                         *oinfo->oi_oa = *saved_oa;
1922                 } else if (page_count > pages_per_brw) {
1923                         /* save a copy of oa (brw will clobber it) */
1924                         OBDO_ALLOC(saved_oa);
1925                         if (saved_oa == NULL)
1926                                 GOTO(out, rc = -ENOMEM);
1927                         *saved_oa = *oinfo->oi_oa;
1928                 }
1929
1930                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1931                                       pages_per_brw, ppga, oinfo->oi_capa);
1932
1933                 if (rc != 0)
1934                         break;
1935
1936                 page_count -= pages_per_brw;
1937                 ppga += pages_per_brw;
1938         }
1939
1940 out:
1941         osc_release_ppga(orig, page_count_orig);
1942
1943         if (saved_oa != NULL)
1944                 OBDO_FREE(saved_oa);
1945
1946         RETURN(rc);
1947 }
1948
1949 static int brw_interpret(const struct lu_env *env,
1950                          struct ptlrpc_request *req, void *data, int rc)
1951 {
1952         struct osc_brw_async_args *aa = data;
1953         struct osc_extent *ext;
1954         struct osc_extent *tmp;
1955         struct cl_object  *obj = NULL;
1956         struct client_obd *cli = aa->aa_cli;
1957         ENTRY;
1958
1959         rc = osc_brw_fini_request(req, rc);
1960         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1961         /* When server return -EINPROGRESS, client should always retry
1962          * regardless of the number of times the bulk was resent already. */
1963         if (osc_recoverable_error(rc)) {
1964                 if (req->rq_import_generation !=
1965                     req->rq_import->imp_generation) {
1966                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1967                                ""DOSTID", rc = %d.\n",
1968                                req->rq_import->imp_obd->obd_name,
1969                                POSTID(&aa->aa_oa->o_oi), rc);
1970                 } else if (rc == -EINPROGRESS ||
1971                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1972                         rc = osc_brw_redo_request(req, aa, rc);
1973                 } else {
1974                         CERROR("%s: too many resent retries for object: "
1975                                ""LPU64":"LPU64", rc = %d.\n",
1976                                req->rq_import->imp_obd->obd_name,
1977                                POSTID(&aa->aa_oa->o_oi), rc);
1978                 }
1979
1980                 if (rc == 0)
1981                         RETURN(0);
1982                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1983                         rc = -EIO;
1984         }
1985
1986         if (aa->aa_ocapa) {
1987                 capa_put(aa->aa_ocapa);
1988                 aa->aa_ocapa = NULL;
1989         }
1990
1991         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1992                 if (obj == NULL && rc == 0) {
1993                         obj = osc2cl(ext->oe_obj);
1994                         cl_object_get(obj);
1995                 }
1996
1997                 cfs_list_del_init(&ext->oe_link);
1998                 osc_extent_finish(env, ext, 1, rc);
1999         }
2000         LASSERT(cfs_list_empty(&aa->aa_exts));
2001         LASSERT(cfs_list_empty(&aa->aa_oaps));
2002
2003         if (obj != NULL) {
2004                 struct obdo *oa = aa->aa_oa;
2005                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2006                 unsigned long valid = 0;
2007
2008                 LASSERT(rc == 0);
2009                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2010                         attr->cat_blocks = oa->o_blocks;
2011                         valid |= CAT_BLOCKS;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLMTIME) {
2014                         attr->cat_mtime = oa->o_mtime;
2015                         valid |= CAT_MTIME;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLATIME) {
2018                         attr->cat_atime = oa->o_atime;
2019                         valid |= CAT_ATIME;
2020                 }
2021                 if (oa->o_valid & OBD_MD_FLCTIME) {
2022                         attr->cat_ctime = oa->o_ctime;
2023                         valid |= CAT_CTIME;
2024                 }
2025                 if (valid != 0) {
2026                         cl_object_attr_lock(obj);
2027                         cl_object_attr_set(env, obj, attr, valid);
2028                         cl_object_attr_unlock(obj);
2029                 }
2030                 cl_object_put(env, obj);
2031         }
2032         OBDO_FREE(aa->aa_oa);
2033
2034         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2035                           req->rq_bulk->bd_nob_transferred);
2036         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2037         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2038
2039         client_obd_list_lock(&cli->cl_loi_list_lock);
2040         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2041          * is called so we know whether to go to sync BRWs or wait for more
2042          * RPCs to complete */
2043         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2044                 cli->cl_w_in_flight--;
2045         else
2046                 cli->cl_r_in_flight--;
2047         osc_wake_cache_waiters(cli);
2048         client_obd_list_unlock(&cli->cl_loi_list_lock);
2049
2050         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2051         RETURN(rc);
2052 }
2053
2054 /**
2055  * Build an RPC by the list of extent @ext_list. The caller must ensure
2056  * that the total pages in this list are NOT over max pages per RPC.
2057  * Extents in the list must be in OES_RPC state.
2058  */
2059 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2060                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2061 {
2062         struct ptlrpc_request           *req = NULL;
2063         struct osc_extent               *ext;
2064         struct brw_page                 **pga = NULL;
2065         struct osc_brw_async_args       *aa = NULL;
2066         struct obdo                     *oa = NULL;
2067         struct osc_async_page           *oap;
2068         struct osc_async_page           *tmp;
2069         struct cl_req                   *clerq = NULL;
2070         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2071                                                                       CRT_READ;
2072         struct ldlm_lock                *lock = NULL;
2073         struct cl_req_attr              *crattr = NULL;
2074         obd_off                         starting_offset = OBD_OBJECT_EOF;
2075         obd_off                         ending_offset = 0;
2076         int                             mpflag = 0;
2077         int                             mem_tight = 0;
2078         int                             page_count = 0;
2079         int                             i;
2080         int                             rc;
2081         CFS_LIST_HEAD(rpc_list);
2082
2083         ENTRY;
2084         LASSERT(!cfs_list_empty(ext_list));
2085
2086         /* add pages into rpc_list to build BRW rpc */
2087         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2088                 LASSERT(ext->oe_state == OES_RPC);
2089                 mem_tight |= ext->oe_memalloc;
2090                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2091                         ++page_count;
2092                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2093                         if (starting_offset > oap->oap_obj_off)
2094                                 starting_offset = oap->oap_obj_off;
2095                         else
2096                                 LASSERT(oap->oap_page_off == 0);
2097                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2098                                 ending_offset = oap->oap_obj_off +
2099                                                 oap->oap_count;
2100                         else
2101                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2102                                         CFS_PAGE_SIZE);
2103                 }
2104         }
2105
2106         if (mem_tight)
2107                 mpflag = cfs_memory_pressure_get_and_set();
2108
2109         OBD_ALLOC(crattr, sizeof(*crattr));
2110         if (crattr == NULL)
2111                 GOTO(out, rc = -ENOMEM);
2112
2113         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2114         if (pga == NULL)
2115                 GOTO(out, rc = -ENOMEM);
2116
2117         OBDO_ALLOC(oa);
2118         if (oa == NULL)
2119                 GOTO(out, rc = -ENOMEM);
2120
2121         i = 0;
2122         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2123                 struct cl_page *page = oap2cl_page(oap);
2124                 if (clerq == NULL) {
2125                         clerq = cl_req_alloc(env, page, crt,
2126                                              1 /* only 1-object rpcs for now */);
2127                         if (IS_ERR(clerq))
2128                                 GOTO(out, rc = PTR_ERR(clerq));
2129                         lock = oap->oap_ldlm_lock;
2130                 }
2131                 if (mem_tight)
2132                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2133                 pga[i] = &oap->oap_brw_page;
2134                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2135                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2136                        pga[i]->pg, cfs_page_index(oap->oap_page), oap,
2137                        pga[i]->flag);
2138                 i++;
2139                 cl_req_page_add(env, clerq, page);
2140         }
2141
2142         /* always get the data for the obdo for the rpc */
2143         LASSERT(clerq != NULL);
2144         crattr->cra_oa = oa;
2145         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2146         if (lock) {
2147                 oa->o_handle = lock->l_remote_handle;
2148                 oa->o_valid |= OBD_MD_FLHANDLE;
2149         }
2150
2151         rc = cl_req_prep(env, clerq);
2152         if (rc != 0) {
2153                 CERROR("cl_req_prep failed: %d\n", rc);
2154                 GOTO(out, rc);
2155         }
2156
2157         sort_brw_pages(pga, page_count);
2158         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2159                         pga, &req, crattr->cra_capa, 1, 0);
2160         if (rc != 0) {
2161                 CERROR("prep_req failed: %d\n", rc);
2162                 GOTO(out, rc);
2163         }
2164
2165         req->rq_interpret_reply = brw_interpret;
2166         if (mem_tight != 0)
2167                 req->rq_memalloc = 1;
2168
2169         /* Need to update the timestamps after the request is built in case
2170          * we race with setattr (locally or in queue at OST).  If OST gets
2171          * later setattr before earlier BRW (as determined by the request xid),
2172          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2173          * way to do this in a single call.  bug 10150 */
2174         cl_req_attr_set(env, clerq, crattr,
2175                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2176
2177         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2178
2179         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2180         aa = ptlrpc_req_async_args(req);
2181         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2182         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2183         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2184         cfs_list_splice_init(ext_list, &aa->aa_exts);
2185         aa->aa_clerq = clerq;
2186
2187         /* queued sync pages can be torn down while the pages
2188          * were between the pending list and the rpc */
2189         tmp = NULL;
2190         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2191                 /* only one oap gets a request reference */
2192                 if (tmp == NULL)
2193                         tmp = oap;
2194                 if (oap->oap_interrupted && !req->rq_intr) {
2195                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2196                                         oap, req);
2197                         ptlrpc_mark_interrupted(req);
2198                 }
2199         }
2200         if (tmp != NULL)
2201                 tmp->oap_request = ptlrpc_request_addref(req);
2202
2203         client_obd_list_lock(&cli->cl_loi_list_lock);
2204         starting_offset >>= CFS_PAGE_SHIFT;
2205         if (cmd == OBD_BRW_READ) {
2206                 cli->cl_r_in_flight++;
2207                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2208                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2209                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2210                                       starting_offset + 1);
2211         } else {
2212                 cli->cl_w_in_flight++;
2213                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2214                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2215                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2216                                       starting_offset + 1);
2217         }
2218         client_obd_list_unlock(&cli->cl_loi_list_lock);
2219
2220         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2221                   page_count, aa, cli->cl_r_in_flight,
2222                   cli->cl_w_in_flight);
2223
2224         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2225          * see which CPU/NUMA node the majority of pages were allocated
2226          * on, and try to assign the async RPC to the CPU core
2227          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2228          *
2229          * But on the other hand, we expect that multiple ptlrpcd
2230          * threads and the initial write sponsor can run in parallel,
2231          * especially when data checksum is enabled, which is CPU-bound
2232          * operation and single ptlrpcd thread cannot process in time.
2233          * So more ptlrpcd threads sharing BRW load
2234          * (with PDL_POLICY_ROUND) seems better.
2235          */
2236         ptlrpcd_add_req(req, pol, -1);
2237         rc = 0;
2238         EXIT;
2239
2240 out:
2241         if (mem_tight != 0)
2242                 cfs_memory_pressure_restore(mpflag);
2243
2244         if (crattr != NULL) {
2245                 capa_put(crattr->cra_capa);
2246                 OBD_FREE(crattr, sizeof(*crattr));
2247         }
2248
2249         if (rc != 0) {
2250                 LASSERT(req == NULL);
2251
2252                 if (oa)
2253                         OBDO_FREE(oa);
2254                 if (pga)
2255                         OBD_FREE(pga, sizeof(*pga) * page_count);
2256                 /* this should happen rarely and is pretty bad, it makes the
2257                  * pending list not follow the dirty order */
2258                 while (!cfs_list_empty(ext_list)) {
2259                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2260                                              oe_link);
2261                         cfs_list_del_init(&ext->oe_link);
2262                         osc_extent_finish(env, ext, 0, rc);
2263                 }
2264                 if (clerq && !IS_ERR(clerq))
2265                         cl_req_completion(env, clerq, rc);
2266         }
2267         RETURN(rc);
2268 }
2269
2270 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2271                                         struct ldlm_enqueue_info *einfo)
2272 {
2273         void *data = einfo->ei_cbdata;
2274         int set = 0;
2275
2276         LASSERT(lock != NULL);
2277         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2278         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2279         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2280         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2281
2282         lock_res_and_lock(lock);
2283         spin_lock(&osc_ast_guard);
2284
2285         if (lock->l_ast_data == NULL)
2286                 lock->l_ast_data = data;
2287         if (lock->l_ast_data == data)
2288                 set = 1;
2289
2290         spin_unlock(&osc_ast_guard);
2291         unlock_res_and_lock(lock);
2292
2293         return set;
2294 }
2295
2296 static int osc_set_data_with_check(struct lustre_handle *lockh,
2297                                    struct ldlm_enqueue_info *einfo)
2298 {
2299         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2300         int set = 0;
2301
2302         if (lock != NULL) {
2303                 set = osc_set_lock_data_with_check(lock, einfo);
2304                 LDLM_LOCK_PUT(lock);
2305         } else
2306                 CERROR("lockh %p, data %p - client evicted?\n",
2307                        lockh, einfo->ei_cbdata);
2308         return set;
2309 }
2310
2311 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2312                              ldlm_iterator_t replace, void *data)
2313 {
2314         struct ldlm_res_id res_id;
2315         struct obd_device *obd = class_exp2obd(exp);
2316
2317         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2318         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2319         return 0;
2320 }
2321
2322 /* find any ldlm lock of the inode in osc
2323  * return 0    not find
2324  *        1    find one
2325  *      < 0    error */
2326 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2327                            ldlm_iterator_t replace, void *data)
2328 {
2329         struct ldlm_res_id res_id;
2330         struct obd_device *obd = class_exp2obd(exp);
2331         int rc = 0;
2332
2333         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2334         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2335         if (rc == LDLM_ITER_STOP)
2336                 return(1);
2337         if (rc == LDLM_ITER_CONTINUE)
2338                 return(0);
2339         return(rc);
2340 }
2341
2342 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2343                             obd_enqueue_update_f upcall, void *cookie,
2344                             __u64 *flags, int agl, int rc)
2345 {
2346         int intent = *flags & LDLM_FL_HAS_INTENT;
2347         ENTRY;
2348
2349         if (intent) {
2350                 /* The request was created before ldlm_cli_enqueue call. */
2351                 if (rc == ELDLM_LOCK_ABORTED) {
2352                         struct ldlm_reply *rep;
2353                         rep = req_capsule_server_get(&req->rq_pill,
2354                                                      &RMF_DLM_REP);
2355
2356                         LASSERT(rep != NULL);
2357                         if (rep->lock_policy_res1)
2358                                 rc = rep->lock_policy_res1;
2359                 }
2360         }
2361
2362         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2363             (rc == 0)) {
2364                 *flags |= LDLM_FL_LVB_READY;
2365                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2366                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2367         }
2368
2369         /* Call the update callback. */
2370         rc = (*upcall)(cookie, rc);
2371         RETURN(rc);
2372 }
2373
2374 static int osc_enqueue_interpret(const struct lu_env *env,
2375                                  struct ptlrpc_request *req,
2376                                  struct osc_enqueue_args *aa, int rc)
2377 {
2378         struct ldlm_lock *lock;
2379         struct lustre_handle handle;
2380         __u32 mode;
2381         struct ost_lvb *lvb;
2382         __u32 lvb_len;
2383         __u64 *flags = aa->oa_flags;
2384
2385         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2386          * might be freed anytime after lock upcall has been called. */
2387         lustre_handle_copy(&handle, aa->oa_lockh);
2388         mode = aa->oa_ei->ei_mode;
2389
2390         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2391          * be valid. */
2392         lock = ldlm_handle2lock(&handle);
2393
2394         /* Take an additional reference so that a blocking AST that
2395          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2396          * to arrive after an upcall has been executed by
2397          * osc_enqueue_fini(). */
2398         ldlm_lock_addref(&handle, mode);
2399
2400         /* Let CP AST to grant the lock first. */
2401         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2402
2403         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2404                 lvb = NULL;
2405                 lvb_len = 0;
2406         } else {
2407                 lvb = aa->oa_lvb;
2408                 lvb_len = sizeof(*aa->oa_lvb);
2409         }
2410
2411         /* Complete obtaining the lock procedure. */
2412         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2413                                    mode, flags, lvb, lvb_len, &handle, rc);
2414         /* Complete osc stuff. */
2415         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2416                               flags, aa->oa_agl, rc);
2417
2418         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2419
2420         /* Release the lock for async request. */
2421         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2422                 /*
2423                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2424                  * not already released by
2425                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2426                  */
2427                 ldlm_lock_decref(&handle, mode);
2428
2429         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2430                  aa->oa_lockh, req, aa);
2431         ldlm_lock_decref(&handle, mode);
2432         LDLM_LOCK_PUT(lock);
2433         return rc;
2434 }
2435
2436 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2437                         struct lov_oinfo *loi, int flags,
2438                         struct ost_lvb *lvb, __u32 mode, int rc)
2439 {
2440         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2441
2442         if (rc == ELDLM_OK) {
2443                 __u64 tmp;
2444
2445                 LASSERT(lock != NULL);
2446                 loi->loi_lvb = *lvb;
2447                 tmp = loi->loi_lvb.lvb_size;
2448                 /* Extend KMS up to the end of this lock and no further
2449                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2450                 if (tmp > lock->l_policy_data.l_extent.end)
2451                         tmp = lock->l_policy_data.l_extent.end + 1;
2452                 if (tmp >= loi->loi_kms) {
2453                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2454                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2455                         loi_kms_set(loi, tmp);
2456                 } else {
2457                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2458                                    LPU64"; leaving kms="LPU64", end="LPU64,
2459                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2460                                    lock->l_policy_data.l_extent.end);
2461                 }
2462                 ldlm_lock_allow_match(lock);
2463         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2464                 LASSERT(lock != NULL);
2465                 loi->loi_lvb = *lvb;
2466                 ldlm_lock_allow_match(lock);
2467                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2468                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2469                 rc = ELDLM_OK;
2470         }
2471
2472         if (lock != NULL) {
2473                 if (rc != ELDLM_OK)
2474                         ldlm_lock_fail_match(lock);
2475
2476                 LDLM_LOCK_PUT(lock);
2477         }
2478 }
2479 EXPORT_SYMBOL(osc_update_enqueue);
2480
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485  * other synchronous requests, however keeping some locks and trying to obtain
2486  * others may take a considerable amount of time in a case of ost failure; and
2487  * when other sync requests do not get released lock from a client, the client
2488  * is excluded from the cluster -- such scenarious make the life difficult, so
2489  * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491                      __u64 *flags, ldlm_policy_data_t *policy,
2492                      struct ost_lvb *lvb, int kms_valid,
2493                      obd_enqueue_update_f upcall, void *cookie,
2494                      struct ldlm_enqueue_info *einfo,
2495                      struct lustre_handle *lockh,
2496                      struct ptlrpc_request_set *rqset, int async, int agl)
2497 {
2498         struct obd_device *obd = exp->exp_obd;
2499         struct ptlrpc_request *req = NULL;
2500         int intent = *flags & LDLM_FL_HAS_INTENT;
2501         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2502         ldlm_mode_t mode;
2503         int rc;
2504         ENTRY;
2505
2506         /* Filesystem lock extents are extended to page boundaries so that
2507          * dealing with the page cache is a little smoother.  */
2508         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2509         policy->l_extent.end |= ~CFS_PAGE_MASK;
2510
2511         /*
2512          * kms is not valid when either object is completely fresh (so that no
2513          * locks are cached), or object was evicted. In the latter case cached
2514          * lock cannot be used, because it would prime inode state with
2515          * potentially stale LVB.
2516          */
2517         if (!kms_valid)
2518                 goto no_match;
2519
2520         /* Next, search for already existing extent locks that will cover us */
2521         /* If we're trying to read, we also search for an existing PW lock.  The
2522          * VFS and page cache already protect us locally, so lots of readers/
2523          * writers can share a single PW lock.
2524          *
2525          * There are problems with conversion deadlocks, so instead of
2526          * converting a read lock to a write lock, we'll just enqueue a new
2527          * one.
2528          *
2529          * At some point we should cancel the read lock instead of making them
2530          * send us a blocking callback, but there are problems with canceling
2531          * locks out from other users right now, too. */
2532         mode = einfo->ei_mode;
2533         if (einfo->ei_mode == LCK_PR)
2534                 mode |= LCK_PW;
2535         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2536                                einfo->ei_type, policy, mode, lockh, 0);
2537         if (mode) {
2538                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2539
2540                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2541                         /* For AGL, if enqueue RPC is sent but the lock is not
2542                          * granted, then skip to process this strpe.
2543                          * Return -ECANCELED to tell the caller. */
2544                         ldlm_lock_decref(lockh, mode);
2545                         LDLM_LOCK_PUT(matched);
2546                         RETURN(-ECANCELED);
2547                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2548                         *flags |= LDLM_FL_LVB_READY;
2549                         /* addref the lock only if not async requests and PW
2550                          * lock is matched whereas we asked for PR. */
2551                         if (!rqset && einfo->ei_mode != mode)
2552                                 ldlm_lock_addref(lockh, LCK_PR);
2553                         if (intent) {
2554                                 /* I would like to be able to ASSERT here that
2555                                  * rss <= kms, but I can't, for reasons which
2556                                  * are explained in lov_enqueue() */
2557                         }
2558
2559                         /* We already have a lock, and it's referenced.
2560                          *
2561                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2562                          * AGL upcall may change it to CLS_HELD directly. */
2563                         (*upcall)(cookie, ELDLM_OK);
2564
2565                         if (einfo->ei_mode != mode)
2566                                 ldlm_lock_decref(lockh, LCK_PW);
2567                         else if (rqset)
2568                                 /* For async requests, decref the lock. */
2569                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2570                         LDLM_LOCK_PUT(matched);
2571                         RETURN(ELDLM_OK);
2572                 } else {
2573                         ldlm_lock_decref(lockh, mode);
2574                         LDLM_LOCK_PUT(matched);
2575                 }
2576         }
2577
2578  no_match:
2579         if (intent) {
2580                 CFS_LIST_HEAD(cancels);
2581                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2582                                            &RQF_LDLM_ENQUEUE_LVB);
2583                 if (req == NULL)
2584                         RETURN(-ENOMEM);
2585
2586                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2587                 if (rc) {
2588                         ptlrpc_request_free(req);
2589                         RETURN(rc);
2590                 }
2591
2592                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2593                                      sizeof *lvb);
2594                 ptlrpc_request_set_replen(req);
2595         }
2596
2597         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2598         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2599
2600         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2601                               sizeof(*lvb), LVB_T_OST, lockh, async);
2602         if (rqset) {
2603                 if (!rc) {
2604                         struct osc_enqueue_args *aa;
2605                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2606                         aa = ptlrpc_req_async_args(req);
2607                         aa->oa_ei = einfo;
2608                         aa->oa_exp = exp;
2609                         aa->oa_flags  = flags;
2610                         aa->oa_upcall = upcall;
2611                         aa->oa_cookie = cookie;
2612                         aa->oa_lvb    = lvb;
2613                         aa->oa_lockh  = lockh;
2614                         aa->oa_agl    = !!agl;
2615
2616                         req->rq_interpret_reply =
2617                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2618                         if (rqset == PTLRPCD_SET)
2619                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2620                         else
2621                                 ptlrpc_set_add_req(rqset, req);
2622                 } else if (intent) {
2623                         ptlrpc_req_finished(req);
2624                 }
2625                 RETURN(rc);
2626         }
2627
2628         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2629         if (intent)
2630                 ptlrpc_req_finished(req);
2631
2632         RETURN(rc);
2633 }
2634
2635 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2636                        struct ldlm_enqueue_info *einfo,
2637                        struct ptlrpc_request_set *rqset)
2638 {
2639         struct ldlm_res_id res_id;
2640         int rc;
2641         ENTRY;
2642
2643         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2644         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2645                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2646                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2647                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2648                               rqset, rqset != NULL, 0);
2649         RETURN(rc);
2650 }
2651
2652 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2653                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2654                    int *flags, void *data, struct lustre_handle *lockh,
2655                    int unref)
2656 {
2657         struct obd_device *obd = exp->exp_obd;
2658         int lflags = *flags;
2659         ldlm_mode_t rc;
2660         ENTRY;
2661
2662         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2663                 RETURN(-EIO);
2664
2665         /* Filesystem lock extents are extended to page boundaries so that
2666          * dealing with the page cache is a little smoother */
2667         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2668         policy->l_extent.end |= ~CFS_PAGE_MASK;
2669
2670         /* Next, search for already existing extent locks that will cover us */
2671         /* If we're trying to read, we also search for an existing PW lock.  The
2672          * VFS and page cache already protect us locally, so lots of readers/
2673          * writers can share a single PW lock. */
2674         rc = mode;
2675         if (mode == LCK_PR)
2676                 rc |= LCK_PW;
2677         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2678                              res_id, type, policy, rc, lockh, unref);
2679         if (rc) {
2680                 if (data != NULL) {
2681                         if (!osc_set_data_with_check(lockh, data)) {
2682                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2683                                         ldlm_lock_decref(lockh, rc);
2684                                 RETURN(0);
2685                         }
2686                 }
2687                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2688                         ldlm_lock_addref(lockh, LCK_PR);
2689                         ldlm_lock_decref(lockh, LCK_PW);
2690                 }
2691                 RETURN(rc);
2692         }
2693         RETURN(rc);
2694 }
2695
2696 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2697 {
2698         ENTRY;
2699
2700         if (unlikely(mode == LCK_GROUP))
2701                 ldlm_lock_decref_and_cancel(lockh, mode);
2702         else
2703                 ldlm_lock_decref(lockh, mode);
2704
2705         RETURN(0);
2706 }
2707
2708 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2709                       __u32 mode, struct lustre_handle *lockh)
2710 {
2711         ENTRY;
2712         RETURN(osc_cancel_base(lockh, mode));
2713 }
2714
2715 static int osc_cancel_unused(struct obd_export *exp,
2716                              struct lov_stripe_md *lsm,
2717                              ldlm_cancel_flags_t flags,
2718                              void *opaque)
2719 {
2720         struct obd_device *obd = class_exp2obd(exp);
2721         struct ldlm_res_id res_id, *resp = NULL;
2722
2723         if (lsm != NULL) {
2724                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2725                 resp = &res_id;
2726         }
2727
2728         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2729 }
2730
2731 static int osc_statfs_interpret(const struct lu_env *env,
2732                                 struct ptlrpc_request *req,
2733                                 struct osc_async_args *aa, int rc)
2734 {
2735         struct obd_statfs *msfs;
2736         ENTRY;
2737
2738         if (rc == -EBADR)
2739                 /* The request has in fact never been sent
2740                  * due to issues at a higher level (LOV).
2741                  * Exit immediately since the caller is
2742                  * aware of the problem and takes care
2743                  * of the clean up */
2744                  RETURN(rc);
2745
2746         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2747             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2748                 GOTO(out, rc = 0);
2749
2750         if (rc != 0)
2751                 GOTO(out, rc);
2752
2753         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2754         if (msfs == NULL) {
2755                 GOTO(out, rc = -EPROTO);
2756         }
2757
2758         *aa->aa_oi->oi_osfs = *msfs;
2759 out:
2760         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2761         RETURN(rc);
2762 }
2763
2764 static int osc_statfs_async(struct obd_export *exp,
2765                             struct obd_info *oinfo, __u64 max_age,
2766                             struct ptlrpc_request_set *rqset)
2767 {
2768         struct obd_device     *obd = class_exp2obd(exp);
2769         struct ptlrpc_request *req;
2770         struct osc_async_args *aa;
2771         int                    rc;
2772         ENTRY;
2773
2774         /* We could possibly pass max_age in the request (as an absolute
2775          * timestamp or a "seconds.usec ago") so the target can avoid doing
2776          * extra calls into the filesystem if that isn't necessary (e.g.
2777          * during mount that would help a bit).  Having relative timestamps
2778          * is not so great if request processing is slow, while absolute
2779          * timestamps are not ideal because they need time synchronization. */
2780         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2781         if (req == NULL)
2782                 RETURN(-ENOMEM);
2783
2784         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2785         if (rc) {
2786                 ptlrpc_request_free(req);
2787                 RETURN(rc);
2788         }
2789         ptlrpc_request_set_replen(req);
2790         req->rq_request_portal = OST_CREATE_PORTAL;
2791         ptlrpc_at_set_req_timeout(req);
2792
2793         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2794                 /* procfs requests not want stat in wait for avoid deadlock */
2795                 req->rq_no_resend = 1;
2796                 req->rq_no_delay = 1;
2797         }
2798
2799         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2800         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2801         aa = ptlrpc_req_async_args(req);
2802         aa->aa_oi = oinfo;
2803
2804         ptlrpc_set_add_req(rqset, req);
2805         RETURN(0);
2806 }
2807
2808 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2809                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2810 {
2811         struct obd_device     *obd = class_exp2obd(exp);
2812         struct obd_statfs     *msfs;
2813         struct ptlrpc_request *req;
2814         struct obd_import     *imp = NULL;
2815         int rc;
2816         ENTRY;
2817
2818         /*Since the request might also come from lprocfs, so we need
2819          *sync this with client_disconnect_export Bug15684*/
2820         down_read(&obd->u.cli.cl_sem);
2821         if (obd->u.cli.cl_import)
2822                 imp = class_import_get(obd->u.cli.cl_import);
2823         up_read(&obd->u.cli.cl_sem);
2824         if (!imp)
2825                 RETURN(-ENODEV);
2826
2827         /* We could possibly pass max_age in the request (as an absolute
2828          * timestamp or a "seconds.usec ago") so the target can avoid doing
2829          * extra calls into the filesystem if that isn't necessary (e.g.
2830          * during mount that would help a bit).  Having relative timestamps
2831          * is not so great if request processing is slow, while absolute
2832          * timestamps are not ideal because they need time synchronization. */
2833         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2834
2835         class_import_put(imp);
2836
2837         if (req == NULL)
2838                 RETURN(-ENOMEM);
2839
2840         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2841         if (rc) {
2842                 ptlrpc_request_free(req);
2843                 RETURN(rc);
2844         }
2845         ptlrpc_request_set_replen(req);
2846         req->rq_request_portal = OST_CREATE_PORTAL;
2847         ptlrpc_at_set_req_timeout(req);
2848
2849         if (flags & OBD_STATFS_NODELAY) {
2850                 /* procfs requests not want stat in wait for avoid deadlock */
2851                 req->rq_no_resend = 1;
2852                 req->rq_no_delay = 1;
2853         }
2854
2855         rc = ptlrpc_queue_wait(req);
2856         if (rc)
2857                 GOTO(out, rc);
2858
2859         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2860         if (msfs == NULL) {
2861                 GOTO(out, rc = -EPROTO);
2862         }
2863
2864         *osfs = *msfs;
2865
2866         EXIT;
2867  out:
2868         ptlrpc_req_finished(req);
2869         return rc;
2870 }
2871
2872 /* Retrieve object striping information.
2873  *
2874  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2875  * the maximum number of OST indices which will fit in the user buffer.
2876  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2877  */
2878 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2879 {
2880         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2881         struct lov_user_md_v3 lum, *lumk;
2882         struct lov_user_ost_data_v1 *lmm_objects;
2883         int rc = 0, lum_size;
2884         ENTRY;
2885
2886         if (!lsm)
2887                 RETURN(-ENODATA);
2888
2889         /* we only need the header part from user space to get lmm_magic and
2890          * lmm_stripe_count, (the header part is common to v1 and v3) */
2891         lum_size = sizeof(struct lov_user_md_v1);
2892         if (cfs_copy_from_user(&lum, lump, lum_size))
2893                 RETURN(-EFAULT);
2894
2895         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2896             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2897                 RETURN(-EINVAL);
2898
2899         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2900         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2901         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2902         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2903
2904         /* we can use lov_mds_md_size() to compute lum_size
2905          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2906         if (lum.lmm_stripe_count > 0) {
2907                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2908                 OBD_ALLOC(lumk, lum_size);
2909                 if (!lumk)
2910                         RETURN(-ENOMEM);
2911
2912                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2913                         lmm_objects =
2914                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2915                 else
2916                         lmm_objects = &(lumk->lmm_objects[0]);
2917                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2918         } else {
2919                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2920                 lumk = &lum;
2921         }
2922
2923         lumk->lmm_oi = lsm->lsm_oi;
2924         lumk->lmm_stripe_count = 1;
2925
2926         if (cfs_copy_to_user(lump, lumk, lum_size))
2927                 rc = -EFAULT;
2928
2929         if (lumk != &lum)
2930                 OBD_FREE(lumk, lum_size);
2931
2932         RETURN(rc);
2933 }
2934
2935
2936 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2937                          void *karg, void *uarg)
2938 {
2939         struct obd_device *obd = exp->exp_obd;
2940         struct obd_ioctl_data *data = karg;
2941         int err = 0;
2942         ENTRY;
2943
2944         if (!cfs_try_module_get(THIS_MODULE)) {
2945                 CERROR("Can't get module. Is it alive?");
2946                 return -EINVAL;
2947         }
2948         switch (cmd) {
2949         case OBD_IOC_LOV_GET_CONFIG: {
2950                 char *buf;
2951                 struct lov_desc *desc;
2952                 struct obd_uuid uuid;
2953
2954                 buf = NULL;
2955                 len = 0;
2956                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2957                         GOTO(out, err = -EINVAL);
2958
2959                 data = (struct obd_ioctl_data *)buf;
2960
2961                 if (sizeof(*desc) > data->ioc_inllen1) {
2962                         obd_ioctl_freedata(buf, len);
2963                         GOTO(out, err = -EINVAL);
2964                 }
2965
2966                 if (data->ioc_inllen2 < sizeof(uuid)) {
2967                         obd_ioctl_freedata(buf, len);
2968                         GOTO(out, err = -EINVAL);
2969                 }
2970
2971                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2972                 desc->ld_tgt_count = 1;
2973                 desc->ld_active_tgt_count = 1;
2974                 desc->ld_default_stripe_count = 1;
2975                 desc->ld_default_stripe_size = 0;
2976                 desc->ld_default_stripe_offset = 0;
2977                 desc->ld_pattern = 0;
2978                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2979
2980                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2981
2982                 err = cfs_copy_to_user((void *)uarg, buf, len);
2983                 if (err)
2984                         err = -EFAULT;
2985                 obd_ioctl_freedata(buf, len);
2986                 GOTO(out, err);
2987         }
2988         case LL_IOC_LOV_SETSTRIPE:
2989                 err = obd_alloc_memmd(exp, karg);
2990                 if (err > 0)
2991                         err = 0;
2992                 GOTO(out, err);
2993         case LL_IOC_LOV_GETSTRIPE:
2994                 err = osc_getstripe(karg, uarg);
2995                 GOTO(out, err);
2996         case OBD_IOC_CLIENT_RECOVER:
2997                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2998                                             data->ioc_inlbuf1, 0);
2999                 if (err > 0)
3000                         err = 0;
3001                 GOTO(out, err);
3002         case IOC_OSC_SET_ACTIVE:
3003                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3004                                                data->ioc_offset);
3005                 GOTO(out, err);
3006         case OBD_IOC_POLL_QUOTACHECK:
3007                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3008                 GOTO(out, err);
3009         case OBD_IOC_PING_TARGET:
3010                 err = ptlrpc_obd_ping(obd);
3011                 GOTO(out, err);
3012         default:
3013                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3014                        cmd, cfs_curproc_comm());
3015                 GOTO(out, err = -ENOTTY);
3016         }
3017 out:
3018         cfs_module_put(THIS_MODULE);
3019         return err;
3020 }
3021
3022 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3023                         obd_count keylen, void *key, __u32 *vallen, void *val,
3024                         struct lov_stripe_md *lsm)
3025 {
3026         ENTRY;
3027         if (!vallen || !val)
3028                 RETURN(-EFAULT);
3029
3030         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3031                 __u32 *stripe = val;
3032                 *vallen = sizeof(*stripe);
3033                 *stripe = 0;
3034                 RETURN(0);
3035         } else if (KEY_IS(KEY_LAST_ID)) {
3036                 struct ptlrpc_request *req;
3037                 obd_id                *reply;
3038                 char                  *tmp;
3039                 int                    rc;
3040
3041                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3042                                            &RQF_OST_GET_INFO_LAST_ID);
3043                 if (req == NULL)
3044                         RETURN(-ENOMEM);
3045
3046                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3047                                      RCL_CLIENT, keylen);
3048                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3049                 if (rc) {
3050                         ptlrpc_request_free(req);
3051                         RETURN(rc);
3052                 }
3053
3054                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3055                 memcpy(tmp, key, keylen);
3056
3057                 req->rq_no_delay = req->rq_no_resend = 1;
3058                 ptlrpc_request_set_replen(req);
3059                 rc = ptlrpc_queue_wait(req);
3060                 if (rc)
3061                         GOTO(out, rc);
3062
3063                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3064                 if (reply == NULL)
3065                         GOTO(out, rc = -EPROTO);
3066
3067                 *((obd_id *)val) = *reply;
3068         out:
3069                 ptlrpc_req_finished(req);
3070                 RETURN(rc);
3071         } else if (KEY_IS(KEY_FIEMAP)) {
3072                 struct ptlrpc_request *req;
3073                 struct ll_user_fiemap *reply;
3074                 char *tmp;
3075                 int rc;
3076
3077                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3078                                            &RQF_OST_GET_INFO_FIEMAP);
3079                 if (req == NULL)
3080                         RETURN(-ENOMEM);
3081
3082                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3083                                      RCL_CLIENT, keylen);
3084                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3085                                      RCL_CLIENT, *vallen);
3086                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3087                                      RCL_SERVER, *vallen);
3088
3089                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3090                 if (rc) {
3091                         ptlrpc_request_free(req);
3092                         RETURN(rc);
3093                 }
3094
3095                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3096                 memcpy(tmp, key, keylen);
3097                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3098                 memcpy(tmp, val, *vallen);
3099
3100                 ptlrpc_request_set_replen(req);
3101                 rc = ptlrpc_queue_wait(req);
3102                 if (rc)
3103                         GOTO(out1, rc);
3104
3105                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3106                 if (reply == NULL)
3107                         GOTO(out1, rc = -EPROTO);
3108
3109                 memcpy(val, reply, *vallen);
3110         out1:
3111                 ptlrpc_req_finished(req);
3112
3113                 RETURN(rc);
3114         }
3115
3116         RETURN(-EINVAL);
3117 }
3118
3119 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3120                               obd_count keylen, void *key, obd_count vallen,
3121                               void *val, struct ptlrpc_request_set *set)
3122 {
3123         struct ptlrpc_request *req;
3124         struct obd_device     *obd = exp->exp_obd;
3125         struct obd_import     *imp = class_exp2cliimp(exp);
3126         char                  *tmp;
3127         int                    rc;
3128         ENTRY;
3129
3130         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3131
3132         if (KEY_IS(KEY_CHECKSUM)) {
3133                 if (vallen != sizeof(int))
3134                         RETURN(-EINVAL);
3135                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3136                 RETURN(0);
3137         }
3138
3139         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3140                 sptlrpc_conf_client_adapt(obd);
3141                 RETURN(0);
3142         }
3143
3144         if (KEY_IS(KEY_FLUSH_CTX)) {
3145                 sptlrpc_import_flush_my_ctx(imp);
3146                 RETURN(0);
3147         }
3148
3149         if (KEY_IS(KEY_CACHE_SET)) {
3150                 struct client_obd *cli = &obd->u.cli;
3151
3152                 LASSERT(cli->cl_cache == NULL); /* only once */
3153                 cli->cl_cache = (struct cl_client_cache *)val;
3154                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3155                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3156
3157                 /* add this osc into entity list */
3158                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3159                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3160                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3161                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3162
3163                 RETURN(0);
3164         }
3165
3166         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3167                 struct client_obd *cli = &obd->u.cli;
3168                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3169                 int target = *(int *)val;
3170
3171