Whamcloud - gitweb
LU-2139 osc: Track and limit "unstable" pages
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (lmmp == NULL)
79                 RETURN(lmm_size);
80
81         if (*lmmp != NULL && lsm == NULL) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
86                 RETURN(-EBADF);
87         }
88
89         if (*lmmp == NULL) {
90                 OBD_ALLOC(*lmmp, lmm_size);
91                 if (*lmmp == NULL)
92                         RETURN(-ENOMEM);
93         }
94
95         if (lsm)
96                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
97
98         RETURN(lmm_size);
99 }
100
101 /* Unpack OSC object metadata from disk storage (LE byte order). */
102 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
103                         struct lov_mds_md *lmm, int lmm_bytes)
104 {
105         int lsm_size;
106         struct obd_import *imp = class_exp2cliimp(exp);
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof(*lmm)) {
111                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
112                                exp->exp_obd->obd_name, lmm_bytes,
113                                (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
119                         CERROR("%s: zero lmm_object_id: rc = %d\n",
120                                exp->exp_obd->obd_name, -EINVAL);
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (unlikely(*lsmp == NULL))
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
147                 RETURN(-EBADF);
148         }
149
150         if (lmm != NULL)
151                 /* XXX zero *lsmp? */
152                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
153
154         if (imp != NULL &&
155             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
156                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
157         else
158                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
159
160         RETURN(lsm_size);
161 }
162
163 static inline void osc_pack_capa(struct ptlrpc_request *req,
164                                  struct ost_body *body, void *capa)
165 {
166         struct obd_capa *oc = (struct obd_capa *)capa;
167         struct lustre_capa *c;
168
169         if (!capa)
170                 return;
171
172         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
173         LASSERT(c);
174         capa_cpy(c, oc);
175         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
176         DEBUG_CAPA(D_SEC, c, "pack");
177 }
178
179 static inline void osc_pack_req_body(struct ptlrpc_request *req,
180                                      struct obd_info *oinfo)
181 {
182         struct ost_body *body;
183
184         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
185         LASSERT(body);
186
187         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
188                              oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
217                                      aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
297                              &body->oa);
298
299         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
342                              &body->oa);
343
344         EXIT;
345 out:
346         ptlrpc_req_finished(req);
347         RETURN(rc);
348 }
349
350 static int osc_setattr_interpret(const struct lu_env *env,
351                                  struct ptlrpc_request *req,
352                                  struct osc_setattr_args *sa, int rc)
353 {
354         struct ost_body *body;
355         ENTRY;
356
357         if (rc != 0)
358                 GOTO(out, rc);
359
360         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
361         if (body == NULL)
362                 GOTO(out, rc = -EPROTO);
363
364         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
365                              &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461
462         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
463
464         ptlrpc_request_set_replen(req);
465
466         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
467             oa->o_flags == OBD_FL_DELORPHAN) {
468                 DEBUG_REQ(D_HA, req,
469                           "delorphan from OST integration");
470                 /* Don't resend the delorphan req */
471                 req->rq_no_resend = req->rq_no_delay = 1;
472         }
473
474         rc = ptlrpc_queue_wait(req);
475         if (rc)
476                 GOTO(out_req, rc);
477
478         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
479         if (body == NULL)
480                 GOTO(out_req, rc = -EPROTO);
481
482         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
483         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
484
485         oa->o_blksize = cli_brw_size(exp->exp_obd);
486         oa->o_valid |= OBD_MD_FLBLKSZ;
487
488         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
489          * have valid lsm_oinfo data structs, so don't go touching that.
490          * This needs to be fixed in a big way.
491          */
492         lsm->lsm_oi = oa->o_oi;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541                              oinfo->oi_oa);
542         osc_pack_capa(req, body, oinfo->oi_capa);
543
544         ptlrpc_request_set_replen(req);
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
561                      struct obd_info *oinfo, struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_fsync_args *fa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *fa->fa_oi->oi_oa = body->oa;
589 out:
590         rc = fa->fa_upcall(fa->fa_cookie, rc);
591         RETURN(rc);
592 }
593
594 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
595                   obd_enqueue_update_f upcall, void *cookie,
596                   struct ptlrpc_request_set *rqset)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_fsync_args *fa;
601         int                    rc;
602         ENTRY;
603
604         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
605         if (req == NULL)
606                 RETURN(-ENOMEM);
607
608         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
610         if (rc) {
611                 ptlrpc_request_free(req);
612                 RETURN(rc);
613         }
614
615         /* overload the size and blocks fields in the oa with start/end */
616         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
617         LASSERT(body);
618         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
619                              oinfo->oi_oa);
620         osc_pack_capa(req, body, oinfo->oi_capa);
621
622         ptlrpc_request_set_replen(req);
623         req->rq_interpret_reply = osc_sync_interpret;
624
625         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
626         fa = ptlrpc_req_async_args(req);
627         fa->fa_oi = oinfo;
628         fa->fa_upcall = upcall;
629         fa->fa_cookie = cookie;
630
631         if (rqset == PTLRPCD_SET)
632                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
633         else
634                 ptlrpc_set_add_req(rqset, req);
635
636         RETURN (0);
637 }
638
639 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
640                     struct obd_info *oinfo, obd_size start, obd_size end,
641                     struct ptlrpc_request_set *set)
642 {
643         ENTRY;
644
645         if (!oinfo->oi_oa) {
646                 CDEBUG(D_INFO, "oa NULL\n");
647                 RETURN(-EINVAL);
648         }
649
650         oinfo->oi_oa->o_size = start;
651         oinfo->oi_oa->o_blocks = end;
652         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
653
654         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    cfs_list_t *cancels,
662                                    ldlm_mode_t mode, __u64 lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668         ENTRY;
669
670         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
671          * export) but disabled through procfs (flag in NS).
672          *
673          * This distinguishes from a case when ELC is not supported originally,
674          * when we still want to cancel locks in advance and just cancel them
675          * locally, without sending any RPC. */
676         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
677                 RETURN(0);
678
679         ostid_build_res_name(&oa->o_oi, &res_id);
680         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
681         if (res == NULL)
682                 RETURN(0);
683
684         LDLM_RESOURCE_ADDREF(res);
685         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
686                                            lock_flags, 0, NULL);
687         LDLM_RESOURCE_DELREF(res);
688         ldlm_resource_putref(res);
689         RETURN(count);
690 }
691
692 static int osc_destroy_interpret(const struct lu_env *env,
693                                  struct ptlrpc_request *req, void *data,
694                                  int rc)
695 {
696         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697
698         cfs_atomic_dec(&cli->cl_destroy_in_flight);
699         wake_up(&cli->cl_destroy_waitq);
700         return 0;
701 }
702
703 static int osc_can_send_destroy(struct client_obd *cli)
704 {
705         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
706             cli->cl_max_rpcs_in_flight) {
707                 /* The destroy request can be sent */
708                 return 1;
709         }
710         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
711             cli->cl_max_rpcs_in_flight) {
712                 /*
713                  * The counter has been modified between the two atomic
714                  * operations.
715                  */
716                 wake_up(&cli->cl_destroy_waitq);
717         }
718         return 0;
719 }
720
721 int osc_create(const struct lu_env *env, struct obd_export *exp,
722                struct obdo *oa, struct lov_stripe_md **ea,
723                struct obd_trans_info *oti)
724 {
725         int rc = 0;
726         ENTRY;
727
728         LASSERT(oa);
729         LASSERT(ea);
730         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
731
732         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
733             oa->o_flags == OBD_FL_RECREATE_OBJS) {
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735         }
736
737         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
738                 RETURN(osc_real_create(exp, oa, ea, oti));
739
740         /* we should not get here anymore */
741         LBUG();
742
743         RETURN(rc);
744 }
745
746 /* Destroy requests can be async always on the client, and we don't even really
747  * care about the return code since the client cannot do anything at all about
748  * a destroy failure.
749  * When the MDS is unlinking a filename, it saves the file objects into a
750  * recovery llog, and these object records are cancelled when the OST reports
751  * they were destroyed and sync'd to disk (i.e. transaction committed).
752  * If the client dies, or the OST is down when the object should be destroyed,
753  * the records are not cancelled, and when the OST reconnects to the MDS next,
754  * it will retrieve the llog unlink logs and then sends the log cancellation
755  * cookies to the MDS after committing destroy transactions. */
756 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
757                        struct obdo *oa, struct lov_stripe_md *ea,
758                        struct obd_trans_info *oti, struct obd_export *md_export,
759                        void *capa)
760 {
761         struct client_obd     *cli = &exp->exp_obd->u.cli;
762         struct ptlrpc_request *req;
763         struct ost_body       *body;
764         CFS_LIST_HEAD(cancels);
765         int rc, count;
766         ENTRY;
767
768         if (!oa) {
769                 CDEBUG(D_INFO, "oa NULL\n");
770                 RETURN(-EINVAL);
771         }
772
773         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
774                                         LDLM_FL_DISCARD_DATA);
775
776         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
777         if (req == NULL) {
778                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779                 RETURN(-ENOMEM);
780         }
781
782         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
783         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
784                                0, &cancels, count);
785         if (rc) {
786                 ptlrpc_request_free(req);
787                 RETURN(rc);
788         }
789
790         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
791         ptlrpc_at_set_req_timeout(req);
792
793         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
794                 oa->o_lcookie = *oti->oti_logcookies;
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
798
799         osc_pack_capa(req, body, (struct obd_capa *)capa);
800         ptlrpc_request_set_replen(req);
801
802         /* If osc_destory is for destroying the unlink orphan,
803          * sent from MDT to OST, which should not be blocked here,
804          * because the process might be triggered by ptlrpcd, and
805          * it is not good to block ptlrpcd thread (b=16006)*/
806         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
807                 req->rq_interpret_reply = osc_destroy_interpret;
808                 if (!osc_can_send_destroy(cli)) {
809                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810                                                           NULL);
811
812                         /*
813                          * Wait until the number of on-going destroy RPCs drops
814                          * under max_rpc_in_flight
815                          */
816                         l_wait_event_exclusive(cli->cl_destroy_waitq,
817                                                osc_can_send_destroy(cli), &lwi);
818                 }
819         }
820
821         /* Do not wait for response */
822         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823         RETURN(0);
824 }
825
826 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
827                                 long writing_bytes)
828 {
829         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
830
831         LASSERT(!(oa->o_valid & bits));
832
833         oa->o_valid |= bits;
834         client_obd_list_lock(&cli->cl_loi_list_lock);
835         oa->o_dirty = cli->cl_dirty;
836         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
837                      cli->cl_dirty_max)) {
838                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
839                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
840                 oa->o_undirty = 0;
841         } else if (unlikely(cfs_atomic_read(&obd_unstable_pages) +
842                             cfs_atomic_read(&obd_dirty_pages) -
843                             cfs_atomic_read(&obd_dirty_transit_pages) >
844                             (long)(obd_max_dirty_pages + 1))) {
845                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
846                  * not covered by a lock thus they may safely race and trip
847                  * this CERROR() unless we add in a small fudge factor (+1). */
848                 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
849                        cli->cl_import->imp_obd->obd_name,
850                        cfs_atomic_read(&obd_unstable_pages),
851                        cfs_atomic_read(&obd_dirty_pages),
852                        cfs_atomic_read(&obd_dirty_transit_pages),
853                        obd_max_dirty_pages);
854                 oa->o_undirty = 0;
855         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
856                 CERROR("dirty %lu - dirty_max %lu too big???\n",
857                        cli->cl_dirty, cli->cl_dirty_max);
858                 oa->o_undirty = 0;
859         } else {
860                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
861                                       PAGE_CACHE_SHIFT) *
862                                      (cli->cl_max_rpcs_in_flight + 1);
863                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
864         }
865         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
866         oa->o_dropped = cli->cl_lost_grant;
867         cli->cl_lost_grant = 0;
868         client_obd_list_unlock(&cli->cl_loi_list_lock);
869         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
870                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
871
872 }
873
874 void osc_update_next_shrink(struct client_obd *cli)
875 {
876         cli->cl_next_shrink_grant =
877                 cfs_time_shift(cli->cl_grant_shrink_interval);
878         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
879                cli->cl_next_shrink_grant);
880 }
881
882 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
883 {
884         client_obd_list_lock(&cli->cl_loi_list_lock);
885         cli->cl_avail_grant += grant;
886         client_obd_list_unlock(&cli->cl_loi_list_lock);
887 }
888
889 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
890 {
891         if (body->oa.o_valid & OBD_MD_FLGRANT) {
892                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
893                 __osc_update_grant(cli, body->oa.o_grant);
894         }
895 }
896
897 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
898                               obd_count keylen, void *key, obd_count vallen,
899                               void *val, struct ptlrpc_request_set *set);
900
901 static int osc_shrink_grant_interpret(const struct lu_env *env,
902                                       struct ptlrpc_request *req,
903                                       void *aa, int rc)
904 {
905         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
906         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
907         struct ost_body *body;
908
909         if (rc != 0) {
910                 __osc_update_grant(cli, oa->o_grant);
911                 GOTO(out, rc);
912         }
913
914         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
915         LASSERT(body);
916         osc_update_grant(cli, body);
917 out:
918         OBDO_FREE(oa);
919         return rc;
920 }
921
922 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
923 {
924         client_obd_list_lock(&cli->cl_loi_list_lock);
925         oa->o_grant = cli->cl_avail_grant / 4;
926         cli->cl_avail_grant -= oa->o_grant;
927         client_obd_list_unlock(&cli->cl_loi_list_lock);
928         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
929                 oa->o_valid |= OBD_MD_FLFLAGS;
930                 oa->o_flags = 0;
931         }
932         oa->o_flags |= OBD_FL_SHRINK_GRANT;
933         osc_update_next_shrink(cli);
934 }
935
936 /* Shrink the current grant, either from some large amount to enough for a
937  * full set of in-flight RPCs, or if we have already shrunk to that limit
938  * then to enough for a single RPC.  This avoids keeping more grant than
939  * needed, and avoids shrinking the grant piecemeal. */
940 static int osc_shrink_grant(struct client_obd *cli)
941 {
942         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
943                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
944
945         client_obd_list_lock(&cli->cl_loi_list_lock);
946         if (cli->cl_avail_grant <= target_bytes)
947                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
948         client_obd_list_unlock(&cli->cl_loi_list_lock);
949
950         return osc_shrink_grant_to_target(cli, target_bytes);
951 }
952
953 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
954 {
955         int                     rc = 0;
956         struct ost_body        *body;
957         ENTRY;
958
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         /* Don't shrink if we are already above or below the desired limit
961          * We don't want to shrink below a single RPC, as that will negatively
962          * impact block allocation and long-term performance. */
963         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
964                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
965
966         if (target_bytes >= cli->cl_avail_grant) {
967                 client_obd_list_unlock(&cli->cl_loi_list_lock);
968                 RETURN(0);
969         }
970         client_obd_list_unlock(&cli->cl_loi_list_lock);
971
972         OBD_ALLOC_PTR(body);
973         if (!body)
974                 RETURN(-ENOMEM);
975
976         osc_announce_cached(cli, &body->oa, 0);
977
978         client_obd_list_lock(&cli->cl_loi_list_lock);
979         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
980         cli->cl_avail_grant = target_bytes;
981         client_obd_list_unlock(&cli->cl_loi_list_lock);
982         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
983                 body->oa.o_valid |= OBD_MD_FLFLAGS;
984                 body->oa.o_flags = 0;
985         }
986         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
987         osc_update_next_shrink(cli);
988
989         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
990                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
991                                 sizeof(*body), body, NULL);
992         if (rc != 0)
993                 __osc_update_grant(cli, body->oa.o_grant);
994         OBD_FREE_PTR(body);
995         RETURN(rc);
996 }
997
998 static int osc_should_shrink_grant(struct client_obd *client)
999 {
1000         cfs_time_t time = cfs_time_current();
1001         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1002
1003         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1004              OBD_CONNECT_GRANT_SHRINK) == 0)
1005                 return 0;
1006
1007         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1008                 /* Get the current RPC size directly, instead of going via:
1009                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1010                  * Keep comment here so that it can be found by searching. */
1011                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1012
1013                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1014                     client->cl_avail_grant > brw_size)
1015                         return 1;
1016                 else
1017                         osc_update_next_shrink(client);
1018         }
1019         return 0;
1020 }
1021
1022 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1023 {
1024         struct client_obd *client;
1025
1026         cfs_list_for_each_entry(client, &item->ti_obd_list,
1027                                 cl_grant_shrink_list) {
1028                 if (osc_should_shrink_grant(client))
1029                         osc_shrink_grant(client);
1030         }
1031         return 0;
1032 }
1033
1034 static int osc_add_shrink_grant(struct client_obd *client)
1035 {
1036         int rc;
1037
1038         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1039                                        TIMEOUT_GRANT,
1040                                        osc_grant_shrink_grant_cb, NULL,
1041                                        &client->cl_grant_shrink_list);
1042         if (rc) {
1043                 CERROR("add grant client %s error %d\n",
1044                         client->cl_import->imp_obd->obd_name, rc);
1045                 return rc;
1046         }
1047         CDEBUG(D_CACHE, "add grant client %s \n",
1048                client->cl_import->imp_obd->obd_name);
1049         osc_update_next_shrink(client);
1050         return 0;
1051 }
1052
1053 static int osc_del_shrink_grant(struct client_obd *client)
1054 {
1055         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1056                                          TIMEOUT_GRANT);
1057 }
1058
1059 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1060 {
1061         /*
1062          * ocd_grant is the total grant amount we're expect to hold: if we've
1063          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1064          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1065          *
1066          * race is tolerable here: if we're evicted, but imp_state already
1067          * left EVICTED state, then cl_dirty must be 0 already.
1068          */
1069         client_obd_list_lock(&cli->cl_loi_list_lock);
1070         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1071                 cli->cl_avail_grant = ocd->ocd_grant;
1072         else
1073                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1074
1075         if (cli->cl_avail_grant < 0) {
1076                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1077                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1078                       ocd->ocd_grant, cli->cl_dirty);
1079                 /* workaround for servers which do not have the patch from
1080                  * LU-2679 */
1081                 cli->cl_avail_grant = ocd->ocd_grant;
1082         }
1083
1084         /* determine the appropriate chunk size used by osc_extent. */
1085         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1086         client_obd_list_unlock(&cli->cl_loi_list_lock);
1087
1088         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1089                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1090                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1091
1092         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1093             cfs_list_empty(&cli->cl_grant_shrink_list))
1094                 osc_add_shrink_grant(cli);
1095 }
1096
1097 /* We assume that the reason this OSC got a short read is because it read
1098  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1099  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1100  * this stripe never got written at or beyond this stripe offset yet. */
1101 static void handle_short_read(int nob_read, obd_count page_count,
1102                               struct brw_page **pga)
1103 {
1104         char *ptr;
1105         int i = 0;
1106
1107         /* skip bytes read OK */
1108         while (nob_read > 0) {
1109                 LASSERT (page_count > 0);
1110
1111                 if (pga[i]->count > nob_read) {
1112                         /* EOF inside this page */
1113                         ptr = kmap(pga[i]->pg) +
1114                                 (pga[i]->off & ~CFS_PAGE_MASK);
1115                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1116                         kunmap(pga[i]->pg);
1117                         page_count--;
1118                         i++;
1119                         break;
1120                 }
1121
1122                 nob_read -= pga[i]->count;
1123                 page_count--;
1124                 i++;
1125         }
1126
1127         /* zero remaining pages */
1128         while (page_count-- > 0) {
1129                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1130                 memset(ptr, 0, pga[i]->count);
1131                 kunmap(pga[i]->pg);
1132                 i++;
1133         }
1134 }
1135
1136 static int check_write_rcs(struct ptlrpc_request *req,
1137                            int requested_nob, int niocount,
1138                            obd_count page_count, struct brw_page **pga)
1139 {
1140         int     i;
1141         __u32   *remote_rcs;
1142
1143         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1144                                                   sizeof(*remote_rcs) *
1145                                                   niocount);
1146         if (remote_rcs == NULL) {
1147                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1148                 return(-EPROTO);
1149         }
1150
1151         /* return error if any niobuf was in error */
1152         for (i = 0; i < niocount; i++) {
1153                 if ((int)remote_rcs[i] < 0)
1154                         return(remote_rcs[i]);
1155
1156                 if (remote_rcs[i] != 0) {
1157                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1158                                 i, remote_rcs[i], req);
1159                         return(-EPROTO);
1160                 }
1161         }
1162
1163         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1164                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1165                        req->rq_bulk->bd_nob_transferred, requested_nob);
1166                 return(-EPROTO);
1167         }
1168
1169         return (0);
1170 }
1171
1172 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1173 {
1174         if (p1->flag != p2->flag) {
1175                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1176                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1177
1178                 /* warn if we try to combine flags that we don't know to be
1179                  * safe to combine */
1180                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1181                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1182                               "report this at http://bugs.whamcloud.com/\n",
1183                               p1->flag, p2->flag);
1184                 }
1185                 return 0;
1186         }
1187
1188         return (p1->off + p1->count == p2->off);
1189 }
1190
1191 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1192                                    struct brw_page **pga, int opc,
1193                                    cksum_type_t cksum_type)
1194 {
1195         __u32                           cksum;
1196         int                             i = 0;
1197         struct cfs_crypto_hash_desc     *hdesc;
1198         unsigned int                    bufsize;
1199         int                             err;
1200         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1201
1202         LASSERT(pg_count > 0);
1203
1204         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1205         if (IS_ERR(hdesc)) {
1206                 CERROR("Unable to initialize checksum hash %s\n",
1207                        cfs_crypto_hash_name(cfs_alg));
1208                 return PTR_ERR(hdesc);
1209         }
1210
1211         while (nob > 0 && pg_count > 0) {
1212                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1213
1214                 /* corrupt the data before we compute the checksum, to
1215                  * simulate an OST->client data error */
1216                 if (i == 0 && opc == OST_READ &&
1217                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1218                         unsigned char *ptr = kmap(pga[i]->pg);
1219                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1220                         memcpy(ptr + off, "bad1", min(4, nob));
1221                         kunmap(pga[i]->pg);
1222                 }
1223                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1224                                   pga[i]->off & ~CFS_PAGE_MASK,
1225                                   count);
1226                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1227                                (int)(pga[i]->off & ~CFS_PAGE_MASK));
1228
1229                 nob -= pga[i]->count;
1230                 pg_count--;
1231                 i++;
1232         }
1233
1234         bufsize = 4;
1235         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1236
1237         if (err)
1238                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1239
1240         /* For sending we only compute the wrong checksum instead
1241          * of corrupting the data so it is still correct on a redo */
1242         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1243                 cksum++;
1244
1245         return cksum;
1246 }
1247
1248 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1249                                 struct lov_stripe_md *lsm, obd_count page_count,
1250                                 struct brw_page **pga,
1251                                 struct ptlrpc_request **reqp,
1252                                 struct obd_capa *ocapa, int reserve,
1253                                 int resend)
1254 {
1255         struct ptlrpc_request   *req;
1256         struct ptlrpc_bulk_desc *desc;
1257         struct ost_body         *body;
1258         struct obd_ioobj        *ioobj;
1259         struct niobuf_remote    *niobuf;
1260         int niocount, i, requested_nob, opc, rc;
1261         struct osc_brw_async_args *aa;
1262         struct req_capsule      *pill;
1263         struct brw_page *pg_prev;
1264
1265         ENTRY;
1266         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1267                 RETURN(-ENOMEM); /* Recoverable */
1268         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1269                 RETURN(-EINVAL); /* Fatal */
1270
1271         if ((cmd & OBD_BRW_WRITE) != 0) {
1272                 opc = OST_WRITE;
1273                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1274                                                 cli->cl_import->imp_rq_pool,
1275                                                 &RQF_OST_BRW_WRITE);
1276         } else {
1277                 opc = OST_READ;
1278                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1279         }
1280         if (req == NULL)
1281                 RETURN(-ENOMEM);
1282
1283         for (niocount = i = 1; i < page_count; i++) {
1284                 if (!can_merge_pages(pga[i - 1], pga[i]))
1285                         niocount++;
1286         }
1287
1288         pill = &req->rq_pill;
1289         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1290                              sizeof(*ioobj));
1291         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1292                              niocount * sizeof(*niobuf));
1293         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1294
1295         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1296         if (rc) {
1297                 ptlrpc_request_free(req);
1298                 RETURN(rc);
1299         }
1300         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1301         ptlrpc_at_set_req_timeout(req);
1302         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1303          * retry logic */
1304         req->rq_no_retry_einprogress = 1;
1305
1306         desc = ptlrpc_prep_bulk_imp(req, page_count,
1307                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1308                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1309                 OST_BULK_PORTAL);
1310
1311         if (desc == NULL)
1312                 GOTO(out, rc = -ENOMEM);
1313         /* NB request now owns desc and will free it when it gets freed */
1314
1315         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1316         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1317         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1318         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1319
1320         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1321
1322         obdo_to_ioobj(oa, ioobj);
1323         ioobj->ioo_bufcnt = niocount;
1324         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1325          * that might be send for this request.  The actual number is decided
1326          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1327          * "max - 1" for old client compatibility sending "0", and also so the
1328          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1329         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1330         osc_pack_capa(req, body, ocapa);
1331         LASSERT(page_count > 0);
1332         pg_prev = pga[0];
1333         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1334                 struct brw_page *pg = pga[i];
1335                 int poff = pg->off & ~CFS_PAGE_MASK;
1336
1337                 LASSERT(pg->count > 0);
1338                 /* make sure there is no gap in the middle of page array */
1339                 LASSERTF(page_count == 1 ||
1340                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1341                           ergo(i > 0 && i < page_count - 1,
1342                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1343                           ergo(i == page_count - 1, poff == 0)),
1344                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1345                          i, page_count, pg, pg->off, pg->count);
1346 #ifdef __linux__
1347                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1348                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1349                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1350                          i, page_count,
1351                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1352                          pg_prev->pg, page_private(pg_prev->pg),
1353                          pg_prev->pg->index, pg_prev->off);
1354 #else
1355                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1356                          "i %d p_c %u\n", i, page_count);
1357 #endif
1358                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1359                         (pg->flag & OBD_BRW_SRVLOCK));
1360
1361                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1362                 requested_nob += pg->count;
1363
1364                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1365                         niobuf--;
1366                         niobuf->len += pg->count;
1367                 } else {
1368                         niobuf->offset = pg->off;
1369                         niobuf->len    = pg->count;
1370                         niobuf->flags  = pg->flag;
1371                 }
1372                 pg_prev = pg;
1373         }
1374
1375         LASSERTF((void *)(niobuf - niocount) ==
1376                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1377                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1378                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1379
1380         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1381         if (resend) {
1382                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1383                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1384                         body->oa.o_flags = 0;
1385                 }
1386                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1387         }
1388
1389         if (osc_should_shrink_grant(cli))
1390                 osc_shrink_grant_local(cli, &body->oa);
1391
1392         /* size[REQ_REC_OFF] still sizeof (*body) */
1393         if (opc == OST_WRITE) {
1394                 if (cli->cl_checksum &&
1395                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1396                         /* store cl_cksum_type in a local variable since
1397                          * it can be changed via lprocfs */
1398                         cksum_type_t cksum_type = cli->cl_cksum_type;
1399
1400                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1402                                 body->oa.o_flags = 0;
1403                         }
1404                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1405                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1407                                                              page_count, pga,
1408                                                              OST_WRITE,
1409                                                              cksum_type);
1410                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1411                                body->oa.o_cksum);
1412                         /* save this in 'oa', too, for later checking */
1413                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1414                         oa->o_flags |= cksum_type_pack(cksum_type);
1415                 } else {
1416                         /* clear out the checksum flag, in case this is a
1417                          * resend but cl_checksum is no longer set. b=11238 */
1418                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1419                 }
1420                 oa->o_cksum = body->oa.o_cksum;
1421                 /* 1 RC per niobuf */
1422                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1423                                      sizeof(__u32) * niocount);
1424         } else {
1425                 if (cli->cl_checksum &&
1426                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1427                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1428                                 body->oa.o_flags = 0;
1429                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1430                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                 }
1432         }
1433         ptlrpc_request_set_replen(req);
1434
1435         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1436         aa = ptlrpc_req_async_args(req);
1437         aa->aa_oa = oa;
1438         aa->aa_requested_nob = requested_nob;
1439         aa->aa_nio_count = niocount;
1440         aa->aa_page_count = page_count;
1441         aa->aa_resends = 0;
1442         aa->aa_ppga = pga;
1443         aa->aa_cli = cli;
1444         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1445         if (ocapa && reserve)
1446                 aa->aa_ocapa = capa_get(ocapa);
1447
1448         *reqp = req;
1449         RETURN(0);
1450
1451  out:
1452         ptlrpc_req_finished(req);
1453         RETURN(rc);
1454 }
1455
1456 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1457                                 __u32 client_cksum, __u32 server_cksum, int nob,
1458                                 obd_count page_count, struct brw_page **pga,
1459                                 cksum_type_t client_cksum_type)
1460 {
1461         __u32 new_cksum;
1462         char *msg;
1463         cksum_type_t cksum_type;
1464
1465         if (server_cksum == client_cksum) {
1466                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1467                 return 0;
1468         }
1469
1470         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1471                                        oa->o_flags : 0);
1472         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1473                                       cksum_type);
1474
1475         if (cksum_type != client_cksum_type)
1476                 msg = "the server did not use the checksum type specified in "
1477                       "the original request - likely a protocol problem";
1478         else if (new_cksum == server_cksum)
1479                 msg = "changed on the client after we checksummed it - "
1480                       "likely false positive due to mmap IO (bug 11742)";
1481         else if (new_cksum == client_cksum)
1482                 msg = "changed in transit before arrival at OST";
1483         else
1484                 msg = "changed in transit AND doesn't match the original - "
1485                       "likely false positive due to mmap IO (bug 11742)";
1486
1487         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1488                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1489                            msg, libcfs_nid2str(peer->nid),
1490                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1491                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1492                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1493                            POSTID(&oa->o_oi), pga[0]->off,
1494                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1495         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1496                "client csum now %x\n", client_cksum, client_cksum_type,
1497                server_cksum, cksum_type, new_cksum);
1498         return 1;
1499 }
1500
1501 /* Note rc enters this function as number of bytes transferred */
1502 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1503 {
1504         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1505         const lnet_process_id_t *peer =
1506                         &req->rq_import->imp_connection->c_peer;
1507         struct client_obd *cli = aa->aa_cli;
1508         struct ost_body *body;
1509         __u32 client_cksum = 0;
1510         ENTRY;
1511
1512         if (rc < 0 && rc != -EDQUOT) {
1513                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1514                 RETURN(rc);
1515         }
1516
1517         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1518         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1519         if (body == NULL) {
1520                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1521                 RETURN(-EPROTO);
1522         }
1523
1524         /* set/clear over quota flag for a uid/gid */
1525         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1526             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1527                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1528
1529                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1530                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1531                        body->oa.o_flags);
1532                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1533         }
1534
1535         osc_update_grant(cli, body);
1536
1537         if (rc < 0)
1538                 RETURN(rc);
1539
1540         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1541                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1542
1543         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1544                 if (rc > 0) {
1545                         CERROR("Unexpected +ve rc %d\n", rc);
1546                         RETURN(-EPROTO);
1547                 }
1548                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1549
1550                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1551                         RETURN(-EAGAIN);
1552
1553                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1554                     check_write_checksum(&body->oa, peer, client_cksum,
1555                                          body->oa.o_cksum, aa->aa_requested_nob,
1556                                          aa->aa_page_count, aa->aa_ppga,
1557                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1558                         RETURN(-EAGAIN);
1559
1560                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1561                                      aa->aa_page_count, aa->aa_ppga);
1562                 GOTO(out, rc);
1563         }
1564
1565         /* The rest of this function executes only for OST_READs */
1566
1567         /* if unwrap_bulk failed, return -EAGAIN to retry */
1568         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1569         if (rc < 0)
1570                 GOTO(out, rc = -EAGAIN);
1571
1572         if (rc > aa->aa_requested_nob) {
1573                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1574                        aa->aa_requested_nob);
1575                 RETURN(-EPROTO);
1576         }
1577
1578         if (rc != req->rq_bulk->bd_nob_transferred) {
1579                 CERROR ("Unexpected rc %d (%d transferred)\n",
1580                         rc, req->rq_bulk->bd_nob_transferred);
1581                 return (-EPROTO);
1582         }
1583
1584         if (rc < aa->aa_requested_nob)
1585                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1586
1587         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1588                 static int cksum_counter;
1589                 __u32      server_cksum = body->oa.o_cksum;
1590                 char      *via;
1591                 char      *router;
1592                 cksum_type_t cksum_type;
1593
1594                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1595                                                body->oa.o_flags : 0);
1596                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1597                                                  aa->aa_ppga, OST_READ,
1598                                                  cksum_type);
1599
1600                 if (peer->nid == req->rq_bulk->bd_sender) {
1601                         via = router = "";
1602                 } else {
1603                         via = " via ";
1604                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1605                 }
1606
1607                 if (server_cksum == ~0 && rc > 0) {
1608                         CERROR("Protocol error: server %s set the 'checksum' "
1609                                "bit, but didn't send a checksum.  Not fatal, "
1610                                "but please notify on http://bugs.whamcloud.com/\n",
1611                                libcfs_nid2str(peer->nid));
1612                 } else if (server_cksum != client_cksum) {
1613                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1614                                            "%s%s%s inode "DFID" object "DOSTID
1615                                            " extent ["LPU64"-"LPU64"]\n",
1616                                            req->rq_import->imp_obd->obd_name,
1617                                            libcfs_nid2str(peer->nid),
1618                                            via, router,
1619                                            body->oa.o_valid & OBD_MD_FLFID ?
1620                                                 body->oa.o_parent_seq : (__u64)0,
1621                                            body->oa.o_valid & OBD_MD_FLFID ?
1622                                                 body->oa.o_parent_oid : 0,
1623                                            body->oa.o_valid & OBD_MD_FLFID ?
1624                                                 body->oa.o_parent_ver : 0,
1625                                            POSTID(&body->oa.o_oi),
1626                                            aa->aa_ppga[0]->off,
1627                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1628                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1629                                                                         1);
1630                         CERROR("client %x, server %x, cksum_type %x\n",
1631                                client_cksum, server_cksum, cksum_type);
1632                         cksum_counter = 0;
1633                         aa->aa_oa->o_cksum = client_cksum;
1634                         rc = -EAGAIN;
1635                 } else {
1636                         cksum_counter++;
1637                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1638                         rc = 0;
1639                 }
1640         } else if (unlikely(client_cksum)) {
1641                 static int cksum_missed;
1642
1643                 cksum_missed++;
1644                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1645                         CERROR("Checksum %u requested from %s but not sent\n",
1646                                cksum_missed, libcfs_nid2str(peer->nid));
1647         } else {
1648                 rc = 0;
1649         }
1650 out:
1651         if (rc >= 0)
1652                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1653                                      aa->aa_oa, &body->oa);
1654
1655         RETURN(rc);
1656 }
1657
1658 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1659                             struct lov_stripe_md *lsm,
1660                             obd_count page_count, struct brw_page **pga,
1661                             struct obd_capa *ocapa)
1662 {
1663         struct ptlrpc_request *req;
1664         int                    rc;
1665         wait_queue_head_t            waitq;
1666         int                    generation, resends = 0;
1667         struct l_wait_info     lwi;
1668
1669         ENTRY;
1670
1671         init_waitqueue_head(&waitq);
1672         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1673
1674 restart_bulk:
1675         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1676                                   page_count, pga, &req, ocapa, 0, resends);
1677         if (rc != 0)
1678                 return (rc);
1679
1680         if (resends) {
1681                 req->rq_generation_set = 1;
1682                 req->rq_import_generation = generation;
1683                 req->rq_sent = cfs_time_current_sec() + resends;
1684         }
1685
1686         rc = ptlrpc_queue_wait(req);
1687
1688         if (rc == -ETIMEDOUT && req->rq_resend) {
1689                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1690                 ptlrpc_req_finished(req);
1691                 goto restart_bulk;
1692         }
1693
1694         rc = osc_brw_fini_request(req, rc);
1695
1696         ptlrpc_req_finished(req);
1697         /* When server return -EINPROGRESS, client should always retry
1698          * regardless of the number of times the bulk was resent already.*/
1699         if (osc_recoverable_error(rc)) {
1700                 resends++;
1701                 if (rc != -EINPROGRESS &&
1702                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1703                         CERROR("%s: too many resend retries for object: "
1704                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1705                                POSTID(&oa->o_oi), rc);
1706                         goto out;
1707                 }
1708                 if (generation !=
1709                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1710                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1711                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1712                                POSTID(&oa->o_oi), rc);
1713                         goto out;
1714                 }
1715
1716                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1717                                        NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722 out:
1723         if (rc == -EAGAIN || rc == -EINPROGRESS)
1724                 rc = -EIO;
1725         RETURN (rc);
1726 }
1727
1728 static int osc_brw_redo_request(struct ptlrpc_request *request,
1729                                 struct osc_brw_async_args *aa, int rc)
1730 {
1731         struct ptlrpc_request *new_req;
1732         struct osc_brw_async_args *new_aa;
1733         struct osc_async_page *oap;
1734         ENTRY;
1735
1736         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1737                   "redo for recoverable error %d", rc);
1738
1739         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1740                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1741                                   aa->aa_cli, aa->aa_oa,
1742                                   NULL /* lsm unused by osc currently */,
1743                                   aa->aa_page_count, aa->aa_ppga,
1744                                   &new_req, aa->aa_ocapa, 0, 1);
1745         if (rc)
1746                 RETURN(rc);
1747
1748         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749                 if (oap->oap_request != NULL) {
1750                         LASSERTF(request == oap->oap_request,
1751                                  "request %p != oap_request %p\n",
1752                                  request, oap->oap_request);
1753                         if (oap->oap_interrupted) {
1754                                 ptlrpc_req_finished(new_req);
1755                                 RETURN(-EINTR);
1756                         }
1757                 }
1758         }
1759         /* New request takes over pga and oaps from old request.
1760          * Note that copying a list_head doesn't work, need to move it... */
1761         aa->aa_resends++;
1762         new_req->rq_interpret_reply = request->rq_interpret_reply;
1763         new_req->rq_async_args = request->rq_async_args;
1764         new_req->rq_commit_cb = request->rq_commit_cb;
1765         /* cap resend delay to the current request timeout, this is similar to
1766          * what ptlrpc does (see after_reply()) */
1767         if (aa->aa_resends > new_req->rq_timeout)
1768                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1769         else
1770                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1771         new_req->rq_generation_set = 1;
1772         new_req->rq_import_generation = request->rq_import_generation;
1773
1774         new_aa = ptlrpc_req_async_args(new_req);
1775
1776         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1777         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1778         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1779         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1780         new_aa->aa_resends = aa->aa_resends;
1781
1782         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1783                 if (oap->oap_request) {
1784                         ptlrpc_req_finished(oap->oap_request);
1785                         oap->oap_request = ptlrpc_request_addref(new_req);
1786                 }
1787         }
1788
1789         new_aa->aa_ocapa = aa->aa_ocapa;
1790         aa->aa_ocapa = NULL;
1791
1792         /* XXX: This code will run into problem if we're going to support
1793          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1794          * and wait for all of them to be finished. We should inherit request
1795          * set from old request. */
1796         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1797
1798         DEBUG_REQ(D_INFO, new_req, "new request");
1799         RETURN(0);
1800 }
1801
1802 /*
1803  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1804  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1805  * fine for our small page arrays and doesn't require allocation.  its an
1806  * insertion sort that swaps elements that are strides apart, shrinking the
1807  * stride down until its '1' and the array is sorted.
1808  */
1809 static void sort_brw_pages(struct brw_page **array, int num)
1810 {
1811         int stride, i, j;
1812         struct brw_page *tmp;
1813
1814         if (num == 1)
1815                 return;
1816         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1817                 ;
1818
1819         do {
1820                 stride /= 3;
1821                 for (i = stride ; i < num ; i++) {
1822                         tmp = array[i];
1823                         j = i;
1824                         while (j >= stride && array[j - stride]->off > tmp->off) {
1825                                 array[j] = array[j - stride];
1826                                 j -= stride;
1827                         }
1828                         array[j] = tmp;
1829                 }
1830         } while (stride > 1);
1831 }
1832
1833 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1834 {
1835         int count = 1;
1836         int offset;
1837         int i = 0;
1838
1839         LASSERT (pages > 0);
1840         offset = pg[i]->off & ~CFS_PAGE_MASK;
1841
1842         for (;;) {
1843                 pages--;
1844                 if (pages == 0)         /* that's all */
1845                         return count;
1846
1847                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1848                         return count;   /* doesn't end on page boundary */
1849
1850                 i++;
1851                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1852                 if (offset != 0)        /* doesn't start on page boundary */
1853                         return count;
1854
1855                 count++;
1856         }
1857 }
1858
1859 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1860 {
1861         struct brw_page **ppga;
1862         int i;
1863
1864         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1865         if (ppga == NULL)
1866                 return NULL;
1867
1868         for (i = 0; i < count; i++)
1869                 ppga[i] = pga + i;
1870         return ppga;
1871 }
1872
1873 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1874 {
1875         LASSERT(ppga != NULL);
1876         OBD_FREE(ppga, sizeof(*ppga) * count);
1877 }
1878
1879 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1880                    obd_count page_count, struct brw_page *pga,
1881                    struct obd_trans_info *oti)
1882 {
1883         struct obdo *saved_oa = NULL;
1884         struct brw_page **ppga, **orig;
1885         struct obd_import *imp = class_exp2cliimp(exp);
1886         struct client_obd *cli;
1887         int rc, page_count_orig;
1888         ENTRY;
1889
1890         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1891         cli = &imp->imp_obd->u.cli;
1892
1893         if (cmd & OBD_BRW_CHECK) {
1894                 /* The caller just wants to know if there's a chance that this
1895                  * I/O can succeed */
1896
1897                 if (imp->imp_invalid)
1898                         RETURN(-EIO);
1899                 RETURN(0);
1900         }
1901
1902         /* test_brw with a failed create can trip this, maybe others. */
1903         LASSERT(cli->cl_max_pages_per_rpc);
1904
1905         rc = 0;
1906
1907         orig = ppga = osc_build_ppga(pga, page_count);
1908         if (ppga == NULL)
1909                 RETURN(-ENOMEM);
1910         page_count_orig = page_count;
1911
1912         sort_brw_pages(ppga, page_count);
1913         while (page_count) {
1914                 obd_count pages_per_brw;
1915
1916                 if (page_count > cli->cl_max_pages_per_rpc)
1917                         pages_per_brw = cli->cl_max_pages_per_rpc;
1918                 else
1919                         pages_per_brw = page_count;
1920
1921                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1922
1923                 if (saved_oa != NULL) {
1924                         /* restore previously saved oa */
1925                         *oinfo->oi_oa = *saved_oa;
1926                 } else if (page_count > pages_per_brw) {
1927                         /* save a copy of oa (brw will clobber it) */
1928                         OBDO_ALLOC(saved_oa);
1929                         if (saved_oa == NULL)
1930                                 GOTO(out, rc = -ENOMEM);
1931                         *saved_oa = *oinfo->oi_oa;
1932                 }
1933
1934                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1935                                       pages_per_brw, ppga, oinfo->oi_capa);
1936
1937                 if (rc != 0)
1938                         break;
1939
1940                 page_count -= pages_per_brw;
1941                 ppga += pages_per_brw;
1942         }
1943
1944 out:
1945         osc_release_ppga(orig, page_count_orig);
1946
1947         if (saved_oa != NULL)
1948                 OBDO_FREE(saved_oa);
1949
1950         RETURN(rc);
1951 }
1952
1953 static int brw_interpret(const struct lu_env *env,
1954                          struct ptlrpc_request *req, void *data, int rc)
1955 {
1956         struct osc_brw_async_args *aa = data;
1957         struct osc_extent *ext;
1958         struct osc_extent *tmp;
1959         struct cl_object  *obj = NULL;
1960         struct client_obd *cli = aa->aa_cli;
1961         ENTRY;
1962
1963         rc = osc_brw_fini_request(req, rc);
1964         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1965         /* When server return -EINPROGRESS, client should always retry
1966          * regardless of the number of times the bulk was resent already. */
1967         if (osc_recoverable_error(rc)) {
1968                 if (req->rq_import_generation !=
1969                     req->rq_import->imp_generation) {
1970                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1971                                ""DOSTID", rc = %d.\n",
1972                                req->rq_import->imp_obd->obd_name,
1973                                POSTID(&aa->aa_oa->o_oi), rc);
1974                 } else if (rc == -EINPROGRESS ||
1975                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1976                         rc = osc_brw_redo_request(req, aa, rc);
1977                 } else {
1978                         CERROR("%s: too many resent retries for object: "
1979                                ""LPU64":"LPU64", rc = %d.\n",
1980                                req->rq_import->imp_obd->obd_name,
1981                                POSTID(&aa->aa_oa->o_oi), rc);
1982                 }
1983
1984                 if (rc == 0)
1985                         RETURN(0);
1986                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1987                         rc = -EIO;
1988         }
1989
1990         if (aa->aa_ocapa) {
1991                 capa_put(aa->aa_ocapa);
1992                 aa->aa_ocapa = NULL;
1993         }
1994
1995         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1996                 if (obj == NULL && rc == 0) {
1997                         obj = osc2cl(ext->oe_obj);
1998                         cl_object_get(obj);
1999                 }
2000
2001                 cfs_list_del_init(&ext->oe_link);
2002                 osc_extent_finish(env, ext, 1, rc);
2003         }
2004         LASSERT(cfs_list_empty(&aa->aa_exts));
2005         LASSERT(cfs_list_empty(&aa->aa_oaps));
2006
2007         if (obj != NULL) {
2008                 struct obdo *oa = aa->aa_oa;
2009                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
2010                 unsigned long valid = 0;
2011
2012                 LASSERT(rc == 0);
2013                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2014                         attr->cat_blocks = oa->o_blocks;
2015                         valid |= CAT_BLOCKS;
2016                 }
2017                 if (oa->o_valid & OBD_MD_FLMTIME) {
2018                         attr->cat_mtime = oa->o_mtime;
2019                         valid |= CAT_MTIME;
2020                 }
2021                 if (oa->o_valid & OBD_MD_FLATIME) {
2022                         attr->cat_atime = oa->o_atime;
2023                         valid |= CAT_ATIME;
2024                 }
2025                 if (oa->o_valid & OBD_MD_FLCTIME) {
2026                         attr->cat_ctime = oa->o_ctime;
2027                         valid |= CAT_CTIME;
2028                 }
2029                 if (valid != 0) {
2030                         cl_object_attr_lock(obj);
2031                         cl_object_attr_set(env, obj, attr, valid);
2032                         cl_object_attr_unlock(obj);
2033                 }
2034                 cl_object_put(env, obj);
2035         }
2036         OBDO_FREE(aa->aa_oa);
2037
2038         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2039                           req->rq_bulk->bd_nob_transferred);
2040         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2041         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2042
2043         client_obd_list_lock(&cli->cl_loi_list_lock);
2044         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2045          * is called so we know whether to go to sync BRWs or wait for more
2046          * RPCs to complete */
2047         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2048                 cli->cl_w_in_flight--;
2049         else
2050                 cli->cl_r_in_flight--;
2051         osc_wake_cache_waiters(cli);
2052         client_obd_list_unlock(&cli->cl_loi_list_lock);
2053
2054         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2055         RETURN(rc);
2056 }
2057
2058 static void brw_commit(struct ptlrpc_request *req)
2059 {
2060         spin_lock(&req->rq_lock);
2061         /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2062          * this called via the rq_commit_cb, I need to ensure
2063          * osc_dec_unstable_pages is still called. Otherwise unstable
2064          * pages may be leaked. */
2065         if (req->rq_unstable) {
2066                 spin_unlock(&req->rq_lock);
2067                 osc_dec_unstable_pages(req);
2068                 spin_lock(&req->rq_lock);
2069         } else {
2070                 req->rq_committed = 1;
2071         }
2072         spin_unlock(&req->rq_lock);
2073 }
2074
2075 /**
2076  * Build an RPC by the list of extent @ext_list. The caller must ensure
2077  * that the total pages in this list are NOT over max pages per RPC.
2078  * Extents in the list must be in OES_RPC state.
2079  */
2080 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2081                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2082 {
2083         struct ptlrpc_request           *req = NULL;
2084         struct osc_extent               *ext;
2085         struct brw_page                 **pga = NULL;
2086         struct osc_brw_async_args       *aa = NULL;
2087         struct obdo                     *oa = NULL;
2088         struct osc_async_page           *oap;
2089         struct osc_async_page           *tmp;
2090         struct cl_req                   *clerq = NULL;
2091         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2092                                                                       CRT_READ;
2093         struct ldlm_lock                *lock = NULL;
2094         struct cl_req_attr              *crattr = NULL;
2095         obd_off                         starting_offset = OBD_OBJECT_EOF;
2096         obd_off                         ending_offset = 0;
2097         int                             mpflag = 0;
2098         int                             mem_tight = 0;
2099         int                             page_count = 0;
2100         int                             i;
2101         int                             rc;
2102         CFS_LIST_HEAD(rpc_list);
2103
2104         ENTRY;
2105         LASSERT(!cfs_list_empty(ext_list));
2106
2107         /* add pages into rpc_list to build BRW rpc */
2108         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2109                 LASSERT(ext->oe_state == OES_RPC);
2110                 mem_tight |= ext->oe_memalloc;
2111                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2112                         ++page_count;
2113                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2114                         if (starting_offset > oap->oap_obj_off)
2115                                 starting_offset = oap->oap_obj_off;
2116                         else
2117                                 LASSERT(oap->oap_page_off == 0);
2118                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2119                                 ending_offset = oap->oap_obj_off +
2120                                                 oap->oap_count;
2121                         else
2122                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2123                                         PAGE_CACHE_SIZE);
2124                 }
2125         }
2126
2127         if (mem_tight)
2128                 mpflag = cfs_memory_pressure_get_and_set();
2129
2130         OBD_ALLOC(crattr, sizeof(*crattr));
2131         if (crattr == NULL)
2132                 GOTO(out, rc = -ENOMEM);
2133
2134         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2135         if (pga == NULL)
2136                 GOTO(out, rc = -ENOMEM);
2137
2138         OBDO_ALLOC(oa);
2139         if (oa == NULL)
2140                 GOTO(out, rc = -ENOMEM);
2141
2142         i = 0;
2143         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2144                 struct cl_page *page = oap2cl_page(oap);
2145                 if (clerq == NULL) {
2146                         clerq = cl_req_alloc(env, page, crt,
2147                                              1 /* only 1-object rpcs for now */);
2148                         if (IS_ERR(clerq))
2149                                 GOTO(out, rc = PTR_ERR(clerq));
2150                         lock = oap->oap_ldlm_lock;
2151                 }
2152                 if (mem_tight)
2153                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2154                 pga[i] = &oap->oap_brw_page;
2155                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2156                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2157                        pga[i]->pg, page_index(oap->oap_page), oap,
2158                        pga[i]->flag);
2159                 i++;
2160                 cl_req_page_add(env, clerq, page);
2161         }
2162
2163         /* always get the data for the obdo for the rpc */
2164         LASSERT(clerq != NULL);
2165         crattr->cra_oa = oa;
2166         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2167         if (lock) {
2168                 oa->o_handle = lock->l_remote_handle;
2169                 oa->o_valid |= OBD_MD_FLHANDLE;
2170         }
2171
2172         rc = cl_req_prep(env, clerq);
2173         if (rc != 0) {
2174                 CERROR("cl_req_prep failed: %d\n", rc);
2175                 GOTO(out, rc);
2176         }
2177
2178         sort_brw_pages(pga, page_count);
2179         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2180                         pga, &req, crattr->cra_capa, 1, 0);
2181         if (rc != 0) {
2182                 CERROR("prep_req failed: %d\n", rc);
2183                 GOTO(out, rc);
2184         }
2185
2186         req->rq_commit_cb = brw_commit;
2187         req->rq_interpret_reply = brw_interpret;
2188
2189         if (mem_tight != 0)
2190                 req->rq_memalloc = 1;
2191
2192         /* Need to update the timestamps after the request is built in case
2193          * we race with setattr (locally or in queue at OST).  If OST gets
2194          * later setattr before earlier BRW (as determined by the request xid),
2195          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2196          * way to do this in a single call.  bug 10150 */
2197         cl_req_attr_set(env, clerq, crattr,
2198                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2199
2200         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2201
2202         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2203         aa = ptlrpc_req_async_args(req);
2204         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2205         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2206         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2207         cfs_list_splice_init(ext_list, &aa->aa_exts);
2208         aa->aa_clerq = clerq;
2209
2210         /* queued sync pages can be torn down while the pages
2211          * were between the pending list and the rpc */
2212         tmp = NULL;
2213         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2214                 /* only one oap gets a request reference */
2215                 if (tmp == NULL)
2216                         tmp = oap;
2217                 if (oap->oap_interrupted && !req->rq_intr) {
2218                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2219                                         oap, req);
2220                         ptlrpc_mark_interrupted(req);
2221                 }
2222         }
2223         if (tmp != NULL)
2224                 tmp->oap_request = ptlrpc_request_addref(req);
2225
2226         client_obd_list_lock(&cli->cl_loi_list_lock);
2227         starting_offset >>= PAGE_CACHE_SHIFT;
2228         if (cmd == OBD_BRW_READ) {
2229                 cli->cl_r_in_flight++;
2230                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2231                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2232                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2233                                       starting_offset + 1);
2234         } else {
2235                 cli->cl_w_in_flight++;
2236                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2237                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2238                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2239                                       starting_offset + 1);
2240         }
2241         client_obd_list_unlock(&cli->cl_loi_list_lock);
2242
2243         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2244                   page_count, aa, cli->cl_r_in_flight,
2245                   cli->cl_w_in_flight);
2246
2247         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2248          * see which CPU/NUMA node the majority of pages were allocated
2249          * on, and try to assign the async RPC to the CPU core
2250          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2251          *
2252          * But on the other hand, we expect that multiple ptlrpcd
2253          * threads and the initial write sponsor can run in parallel,
2254          * especially when data checksum is enabled, which is CPU-bound
2255          * operation and single ptlrpcd thread cannot process in time.
2256          * So more ptlrpcd threads sharing BRW load
2257          * (with PDL_POLICY_ROUND) seems better.
2258          */
2259         ptlrpcd_add_req(req, pol, -1);
2260         rc = 0;
2261         EXIT;
2262
2263 out:
2264         if (mem_tight != 0)
2265                 cfs_memory_pressure_restore(mpflag);
2266
2267         if (crattr != NULL) {
2268                 capa_put(crattr->cra_capa);
2269                 OBD_FREE(crattr, sizeof(*crattr));
2270         }
2271
2272         if (rc != 0) {
2273                 LASSERT(req == NULL);
2274
2275                 if (oa)
2276                         OBDO_FREE(oa);
2277                 if (pga)
2278                         OBD_FREE(pga, sizeof(*pga) * page_count);
2279                 /* this should happen rarely and is pretty bad, it makes the
2280                  * pending list not follow the dirty order */
2281                 while (!cfs_list_empty(ext_list)) {
2282                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2283                                              oe_link);
2284                         cfs_list_del_init(&ext->oe_link);
2285                         osc_extent_finish(env, ext, 0, rc);
2286                 }
2287                 if (clerq && !IS_ERR(clerq))
2288                         cl_req_completion(env, clerq, rc);
2289         }
2290         RETURN(rc);
2291 }
2292
2293 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2294                                         struct ldlm_enqueue_info *einfo)
2295 {
2296         void *data = einfo->ei_cbdata;
2297         int set = 0;
2298
2299         LASSERT(lock != NULL);
2300         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2301         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2302         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2303         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2304
2305         lock_res_and_lock(lock);
2306         spin_lock(&osc_ast_guard);
2307
2308         if (lock->l_ast_data == NULL)
2309                 lock->l_ast_data = data;
2310         if (lock->l_ast_data == data)
2311                 set = 1;
2312
2313         spin_unlock(&osc_ast_guard);
2314         unlock_res_and_lock(lock);
2315
2316         return set;
2317 }
2318
2319 static int osc_set_data_with_check(struct lustre_handle *lockh,
2320                                    struct ldlm_enqueue_info *einfo)
2321 {
2322         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2323         int set = 0;
2324
2325         if (lock != NULL) {
2326                 set = osc_set_lock_data_with_check(lock, einfo);
2327                 LDLM_LOCK_PUT(lock);
2328         } else
2329                 CERROR("lockh %p, data %p - client evicted?\n",
2330                        lockh, einfo->ei_cbdata);
2331         return set;
2332 }
2333
2334 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2335                              ldlm_iterator_t replace, void *data)
2336 {
2337         struct ldlm_res_id res_id;
2338         struct obd_device *obd = class_exp2obd(exp);
2339
2340         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2341         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2342         return 0;
2343 }
2344
2345 /* find any ldlm lock of the inode in osc
2346  * return 0    not find
2347  *        1    find one
2348  *      < 0    error */
2349 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2350                            ldlm_iterator_t replace, void *data)
2351 {
2352         struct ldlm_res_id res_id;
2353         struct obd_device *obd = class_exp2obd(exp);
2354         int rc = 0;
2355
2356         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2357         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2358         if (rc == LDLM_ITER_STOP)
2359                 return(1);
2360         if (rc == LDLM_ITER_CONTINUE)
2361                 return(0);
2362         return(rc);
2363 }
2364
2365 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2366                             obd_enqueue_update_f upcall, void *cookie,
2367                             __u64 *flags, int agl, int rc)
2368 {
2369         int intent = *flags & LDLM_FL_HAS_INTENT;
2370         ENTRY;
2371
2372         if (intent) {
2373                 /* The request was created before ldlm_cli_enqueue call. */
2374                 if (rc == ELDLM_LOCK_ABORTED) {
2375                         struct ldlm_reply *rep;
2376                         rep = req_capsule_server_get(&req->rq_pill,
2377                                                      &RMF_DLM_REP);
2378
2379                         LASSERT(rep != NULL);
2380                         rep->lock_policy_res1 =
2381                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2382                         if (rep->lock_policy_res1)
2383                                 rc = rep->lock_policy_res1;
2384                 }
2385         }
2386
2387         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2388             (rc == 0)) {
2389                 *flags |= LDLM_FL_LVB_READY;
2390                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2391                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2392         }
2393
2394         /* Call the update callback. */
2395         rc = (*upcall)(cookie, rc);
2396         RETURN(rc);
2397 }
2398
2399 static int osc_enqueue_interpret(const struct lu_env *env,
2400                                  struct ptlrpc_request *req,
2401                                  struct osc_enqueue_args *aa, int rc)
2402 {
2403         struct ldlm_lock *lock;
2404         struct lustre_handle handle;
2405         __u32 mode;
2406         struct ost_lvb *lvb;
2407         __u32 lvb_len;
2408         __u64 *flags = aa->oa_flags;
2409
2410         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2411          * might be freed anytime after lock upcall has been called. */
2412         lustre_handle_copy(&handle, aa->oa_lockh);
2413         mode = aa->oa_ei->ei_mode;
2414
2415         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2416          * be valid. */
2417         lock = ldlm_handle2lock(&handle);
2418
2419         /* Take an additional reference so that a blocking AST that
2420          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2421          * to arrive after an upcall has been executed by
2422          * osc_enqueue_fini(). */
2423         ldlm_lock_addref(&handle, mode);
2424
2425         /* Let CP AST to grant the lock first. */
2426         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2427
2428         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2429                 lvb = NULL;
2430                 lvb_len = 0;
2431         } else {
2432                 lvb = aa->oa_lvb;
2433                 lvb_len = sizeof(*aa->oa_lvb);
2434         }
2435
2436         /* Complete obtaining the lock procedure. */
2437         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2438                                    mode, flags, lvb, lvb_len, &handle, rc);
2439         /* Complete osc stuff. */
2440         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2441                               flags, aa->oa_agl, rc);
2442
2443         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2444
2445         /* Release the lock for async request. */
2446         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2447                 /*
2448                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2449                  * not already released by
2450                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2451                  */
2452                 ldlm_lock_decref(&handle, mode);
2453
2454         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2455                  aa->oa_lockh, req, aa);
2456         ldlm_lock_decref(&handle, mode);
2457         LDLM_LOCK_PUT(lock);
2458         return rc;
2459 }
2460
2461 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2462                         struct lov_oinfo *loi, __u64 flags,
2463                         struct ost_lvb *lvb, __u32 mode, int rc)
2464 {
2465         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2466
2467         if (rc == ELDLM_OK) {
2468                 __u64 tmp;
2469
2470                 LASSERT(lock != NULL);
2471                 loi->loi_lvb = *lvb;
2472                 tmp = loi->loi_lvb.lvb_size;
2473                 /* Extend KMS up to the end of this lock and no further
2474                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2475                 if (tmp > lock->l_policy_data.l_extent.end)
2476                         tmp = lock->l_policy_data.l_extent.end + 1;
2477                 if (tmp >= loi->loi_kms) {
2478                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2479                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2480                         loi_kms_set(loi, tmp);
2481                 } else {
2482                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2483                                    LPU64"; leaving kms="LPU64", end="LPU64,
2484                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2485                                    lock->l_policy_data.l_extent.end);
2486                 }
2487                 ldlm_lock_allow_match(lock);
2488         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2489                 LASSERT(lock != NULL);
2490                 loi->loi_lvb = *lvb;
2491                 ldlm_lock_allow_match(lock);
2492                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2493                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2494                 rc = ELDLM_OK;
2495         }
2496
2497         if (lock != NULL) {
2498                 if (rc != ELDLM_OK)
2499                         ldlm_lock_fail_match(lock);
2500
2501                 LDLM_LOCK_PUT(lock);
2502         }
2503 }
2504 EXPORT_SYMBOL(osc_update_enqueue);
2505
2506 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2507
2508 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2509  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2510  * other synchronous requests, however keeping some locks and trying to obtain
2511  * others may take a considerable amount of time in a case of ost failure; and
2512  * when other sync requests do not get released lock from a client, the client
2513  * is excluded from the cluster -- such scenarious make the life difficult, so
2514  * release locks just after they are obtained. */
2515 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2516                      __u64 *flags, ldlm_policy_data_t *policy,
2517                      struct ost_lvb *lvb, int kms_valid,
2518                      obd_enqueue_update_f upcall, void *cookie,
2519                      struct ldlm_enqueue_info *einfo,
2520                      struct lustre_handle *lockh,
2521                      struct ptlrpc_request_set *rqset, int async, int agl)
2522 {
2523         struct obd_device *obd = exp->exp_obd;
2524         struct ptlrpc_request *req = NULL;
2525         int intent = *flags & LDLM_FL_HAS_INTENT;
2526         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2527         ldlm_mode_t mode;
2528         int rc;
2529         ENTRY;
2530
2531         /* Filesystem lock extents are extended to page boundaries so that
2532          * dealing with the page cache is a little smoother.  */
2533         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2534         policy->l_extent.end |= ~CFS_PAGE_MASK;
2535
2536         /*
2537          * kms is not valid when either object is completely fresh (so that no
2538          * locks are cached), or object was evicted. In the latter case cached
2539          * lock cannot be used, because it would prime inode state with
2540          * potentially stale LVB.
2541          */
2542         if (!kms_valid)
2543                 goto no_match;
2544
2545         /* Next, search for already existing extent locks that will cover us */
2546         /* If we're trying to read, we also search for an existing PW lock.  The
2547          * VFS and page cache already protect us locally, so lots of readers/
2548          * writers can share a single PW lock.
2549          *
2550          * There are problems with conversion deadlocks, so instead of
2551          * converting a read lock to a write lock, we'll just enqueue a new
2552          * one.
2553          *
2554          * At some point we should cancel the read lock instead of making them
2555          * send us a blocking callback, but there are problems with canceling
2556          * locks out from other users right now, too. */
2557         mode = einfo->ei_mode;
2558         if (einfo->ei_mode == LCK_PR)
2559                 mode |= LCK_PW;
2560         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2561                                einfo->ei_type, policy, mode, lockh, 0);
2562         if (mode) {
2563                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2564
2565                 if ((agl != 0) && !ldlm_is_lvb_ready(matched)) {
2566                         /* For AGL, if enqueue RPC is sent but the lock is not
2567                          * granted, then skip to process this strpe.
2568                          * Return -ECANCELED to tell the caller. */
2569                         ldlm_lock_decref(lockh, mode);
2570                         LDLM_LOCK_PUT(matched);
2571                         RETURN(-ECANCELED);
2572                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2573                         *flags |= LDLM_FL_LVB_READY;
2574                         /* addref the lock only if not async requests and PW
2575                          * lock is matched whereas we asked for PR. */
2576                         if (!rqset && einfo->ei_mode != mode)
2577                                 ldlm_lock_addref(lockh, LCK_PR);
2578                         if (intent) {
2579                                 /* I would like to be able to ASSERT here that
2580                                  * rss <= kms, but I can't, for reasons which
2581                                  * are explained in lov_enqueue() */
2582                         }
2583
2584                         /* We already have a lock, and it's referenced.
2585                          *
2586                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2587                          * AGL upcall may change it to CLS_HELD directly. */
2588                         (*upcall)(cookie, ELDLM_OK);
2589
2590                         if (einfo->ei_mode != mode)
2591                                 ldlm_lock_decref(lockh, LCK_PW);
2592                         else if (rqset)
2593                                 /* For async requests, decref the lock. */
2594                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2595                         LDLM_LOCK_PUT(matched);
2596                         RETURN(ELDLM_OK);
2597                 } else {
2598                         ldlm_lock_decref(lockh, mode);
2599                         LDLM_LOCK_PUT(matched);
2600                 }
2601         }
2602
2603  no_match:
2604         if (intent) {
2605                 CFS_LIST_HEAD(cancels);
2606                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2607                                            &RQF_LDLM_ENQUEUE_LVB);
2608                 if (req == NULL)
2609                         RETURN(-ENOMEM);
2610
2611                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2612                 if (rc) {
2613                         ptlrpc_request_free(req);
2614                         RETURN(rc);
2615                 }
2616
2617                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2618                                      sizeof *lvb);
2619                 ptlrpc_request_set_replen(req);
2620         }
2621
2622         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2623         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2624
2625         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2626                               sizeof(*lvb), LVB_T_OST, lockh, async);
2627         if (rqset) {
2628                 if (!rc) {
2629                         struct osc_enqueue_args *aa;
2630                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2631                         aa = ptlrpc_req_async_args(req);
2632                         aa->oa_ei = einfo;
2633                         aa->oa_exp = exp;
2634                         aa->oa_flags  = flags;
2635                         aa->oa_upcall = upcall;
2636                         aa->oa_cookie = cookie;
2637                         aa->oa_lvb    = lvb;
2638                         aa->oa_lockh  = lockh;
2639                         aa->oa_agl    = !!agl;
2640
2641                         req->rq_interpret_reply =
2642                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2643                         if (rqset == PTLRPCD_SET)
2644                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2645                         else
2646                                 ptlrpc_set_add_req(rqset, req);
2647                 } else if (intent) {
2648                         ptlrpc_req_finished(req);
2649                 }
2650                 RETURN(rc);
2651         }
2652
2653         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2654         if (intent)
2655                 ptlrpc_req_finished(req);
2656
2657         RETURN(rc);
2658 }
2659
2660 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2661                        struct ldlm_enqueue_info *einfo,
2662                        struct ptlrpc_request_set *rqset)
2663 {
2664         struct ldlm_res_id res_id;
2665         int rc;
2666         ENTRY;
2667
2668         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2669         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2670                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2671                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2672                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2673                               rqset, rqset != NULL, 0);
2674         RETURN(rc);
2675 }
2676
2677 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2678                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2679                    __u64 *flags, void *data, struct lustre_handle *lockh,
2680                    int unref)
2681 {
2682         struct obd_device *obd = exp->exp_obd;
2683         __u64 lflags = *flags;
2684         ldlm_mode_t rc;
2685         ENTRY;
2686
2687         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2688                 RETURN(-EIO);
2689
2690         /* Filesystem lock extents are extended to page boundaries so that
2691          * dealing with the page cache is a little smoother */
2692         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2693         policy->l_extent.end |= ~CFS_PAGE_MASK;
2694
2695         /* Next, search for already existing extent locks that will cover us */
2696         /* If we're trying to read, we also search for an existing PW lock.  The
2697          * VFS and page cache already protect us locally, so lots of readers/
2698          * writers can share a single PW lock. */
2699         rc = mode;
2700         if (mode == LCK_PR)
2701                 rc |= LCK_PW;
2702         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2703                              res_id, type, policy, rc, lockh, unref);
2704         if (rc) {
2705                 if (data != NULL) {
2706                         if (!osc_set_data_with_check(lockh, data)) {
2707                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2708                                         ldlm_lock_decref(lockh, rc);
2709                                 RETURN(0);
2710                         }
2711                 }
2712                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2713                         ldlm_lock_addref(lockh, LCK_PR);
2714                         ldlm_lock_decref(lockh, LCK_PW);
2715                 }
2716                 RETURN(rc);
2717         }
2718         RETURN(rc);
2719 }
2720
2721 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2722 {
2723         ENTRY;
2724
2725         if (unlikely(mode == LCK_GROUP))
2726                 ldlm_lock_decref_and_cancel(lockh, mode);
2727         else
2728                 ldlm_lock_decref(lockh, mode);
2729
2730         RETURN(0);
2731 }
2732
2733 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2734                       __u32 mode, struct lustre_handle *lockh)
2735 {
2736         ENTRY;
2737         RETURN(osc_cancel_base(lockh, mode));
2738 }
2739
2740 static int osc_cancel_unused(struct obd_export *exp,
2741                              struct lov_stripe_md *lsm,
2742                              ldlm_cancel_flags_t flags,
2743                              void *opaque)
2744 {
2745         struct obd_device *obd = class_exp2obd(exp);
2746         struct ldlm_res_id res_id, *resp = NULL;
2747
2748         if (lsm != NULL) {
2749                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2750                 resp = &res_id;
2751         }
2752
2753         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2754 }
2755
2756 static int osc_statfs_interpret(const struct lu_env *env,
2757                                 struct ptlrpc_request *req,
2758                                 struct osc_async_args *aa, int rc)
2759 {
2760         struct obd_statfs *msfs;
2761         ENTRY;
2762
2763         if (rc == -EBADR)
2764                 /* The request has in fact never been sent
2765                  * due to issues at a higher level (LOV).
2766                  * Exit immediately since the caller is
2767                  * aware of the problem and takes care
2768                  * of the clean up */
2769                  RETURN(rc);
2770
2771         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2772             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2773                 GOTO(out, rc = 0);
2774
2775         if (rc != 0)
2776                 GOTO(out, rc);
2777
2778         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2779         if (msfs == NULL) {
2780                 GOTO(out, rc = -EPROTO);
2781         }
2782
2783         *aa->aa_oi->oi_osfs = *msfs;
2784 out:
2785         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2786         RETURN(rc);
2787 }
2788
2789 static int osc_statfs_async(struct obd_export *exp,
2790                             struct obd_info *oinfo, __u64 max_age,
2791                             struct ptlrpc_request_set *rqset)
2792 {
2793         struct obd_device     *obd = class_exp2obd(exp);
2794         struct ptlrpc_request *req;
2795         struct osc_async_args *aa;
2796         int                    rc;
2797         ENTRY;
2798
2799         /* We could possibly pass max_age in the request (as an absolute
2800          * timestamp or a "seconds.usec ago") so the target can avoid doing
2801          * extra calls into the filesystem if that isn't necessary (e.g.
2802          * during mount that would help a bit).  Having relative timestamps
2803          * is not so great if request processing is slow, while absolute
2804          * timestamps are not ideal because they need time synchronization. */
2805         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2806         if (req == NULL)
2807                 RETURN(-ENOMEM);
2808
2809         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2810         if (rc) {
2811                 ptlrpc_request_free(req);
2812                 RETURN(rc);
2813         }
2814         ptlrpc_request_set_replen(req);
2815         req->rq_request_portal = OST_CREATE_PORTAL;
2816         ptlrpc_at_set_req_timeout(req);
2817
2818         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2819                 /* procfs requests not want stat in wait for avoid deadlock */
2820                 req->rq_no_resend = 1;
2821                 req->rq_no_delay = 1;
2822         }
2823
2824         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2825         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2826         aa = ptlrpc_req_async_args(req);
2827         aa->aa_oi = oinfo;
2828
2829         ptlrpc_set_add_req(rqset, req);
2830         RETURN(0);
2831 }
2832
2833 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2834                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2835 {
2836         struct obd_device     *obd = class_exp2obd(exp);
2837         struct obd_statfs     *msfs;
2838         struct ptlrpc_request *req;
2839         struct obd_import     *imp = NULL;
2840         int rc;
2841         ENTRY;
2842
2843         /*Since the request might also come from lprocfs, so we need
2844          *sync this with client_disconnect_export Bug15684*/
2845         down_read(&obd->u.cli.cl_sem);
2846         if (obd->u.cli.cl_import)
2847                 imp = class_import_get(obd->u.cli.cl_import);
2848         up_read(&obd->u.cli.cl_sem);
2849         if (!imp)
2850                 RETURN(-ENODEV);
2851
2852         /* We could possibly pass max_age in the request (as an absolute
2853          * timestamp or a "seconds.usec ago") so the target can avoid doing
2854          * extra calls into the filesystem if that isn't necessary (e.g.
2855          * during mount that would help a bit).  Having relative timestamps
2856          * is not so great if request processing is slow, while absolute
2857          * timestamps are not ideal because they need time synchronization. */
2858         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2859
2860         class_import_put(imp);
2861
2862         if (req == NULL)
2863                 RETURN(-ENOMEM);
2864
2865         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2866         if (rc) {
2867                 ptlrpc_request_free(req);
2868                 RETURN(rc);
2869         }
2870         ptlrpc_request_set_replen(req);
2871         req->rq_request_portal = OST_CREATE_PORTAL;
2872         ptlrpc_at_set_req_timeout(req);
2873
2874         if (flags & OBD_STATFS_NODELAY) {
2875                 /* procfs requests not want stat in wait for avoid deadlock */
2876                 req->rq_no_resend = 1;
2877                 req->rq_no_delay = 1;
2878         }
2879
2880         rc = ptlrpc_queue_wait(req);
2881         if (rc)
2882                 GOTO(out, rc);
2883
2884         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2885         if (msfs == NULL) {
2886                 GOTO(out, rc = -EPROTO);
2887         }
2888
2889         *osfs = *msfs;
2890
2891         EXIT;
2892  out:
2893         ptlrpc_req_finished(req);
2894         return rc;
2895 }
2896
2897 /* Retrieve object striping information.
2898  *
2899  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2900  * the maximum number of OST indices which will fit in the user buffer.
2901  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2902  */
2903 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2904 {
2905         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2906         struct lov_user_md_v3 lum, *lumk;
2907         struct lov_user_ost_data_v1 *lmm_objects;
2908         int rc = 0, lum_size;
2909         ENTRY;
2910
2911         if (!lsm)
2912                 RETURN(-ENODATA);
2913
2914         /* we only need the header part from user space to get lmm_magic and
2915          * lmm_stripe_count, (the header part is common to v1 and v3) */
2916         lum_size = sizeof(struct lov_user_md_v1);
2917         if (copy_from_user(&lum, lump, lum_size))
2918                 RETURN(-EFAULT);
2919
2920         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2921             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2922                 RETURN(-EINVAL);
2923
2924         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2925         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2926         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2927         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2928
2929         /* we can use lov_mds_md_size() to compute lum_size
2930          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2931         if (lum.lmm_stripe_count > 0) {
2932                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2933                 OBD_ALLOC(lumk, lum_size);
2934                 if (!lumk)
2935                         RETURN(-ENOMEM);
2936
2937                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2938                         lmm_objects =
2939                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2940                 else
2941                         lmm_objects = &(lumk->lmm_objects[0]);
2942                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2943         } else {
2944                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2945                 lumk = &lum;
2946         }
2947
2948         lumk->lmm_oi = lsm->lsm_oi;
2949         lumk->lmm_stripe_count = 1;
2950
2951         if (copy_to_user(lump, lumk, lum_size))
2952                 rc = -EFAULT;
2953
2954         if (lumk != &lum)
2955                 OBD_FREE(lumk, lum_size);
2956
2957         RETURN(rc);
2958 }
2959
2960
2961 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2962                          void *karg, void *uarg)
2963 {
2964         struct obd_device *obd = exp->exp_obd;
2965         struct obd_ioctl_data *data = karg;
2966         int err = 0;
2967         ENTRY;
2968
2969         if (!try_module_get(THIS_MODULE)) {
2970                 CERROR("Can't get module. Is it alive?");
2971                 return -EINVAL;
2972         }
2973         switch (cmd) {
2974         case OBD_IOC_LOV_GET_CONFIG: {
2975                 char *buf;
2976                 struct lov_desc *desc;
2977                 struct obd_uuid uuid;
2978
2979                 buf = NULL;
2980                 len = 0;
2981                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2982                         GOTO(out, err = -EINVAL);
2983
2984                 data = (struct obd_ioctl_data *)buf;
2985
2986                 if (sizeof(*desc) > data->ioc_inllen1) {
2987                         obd_ioctl_freedata(buf, len);
2988                         GOTO(out, err = -EINVAL);
2989                 }
2990
2991                 if (data->ioc_inllen2 < sizeof(uuid)) {
2992                         obd_ioctl_freedata(buf, len);
2993                         GOTO(out, err = -EINVAL);
2994                 }
2995
2996                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2997                 desc->ld_tgt_count = 1;
2998                 desc->ld_active_tgt_count = 1;
2999                 desc->ld_default_stripe_count = 1;
3000                 desc->ld_default_stripe_size = 0;
3001                 desc->ld_default_stripe_offset = 0;
3002                 desc->ld_pattern = 0;
3003                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3004
3005                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3006
3007                 err = copy_to_user((void *)uarg, buf, len);
3008                 if (err)
3009                         err = -EFAULT;
3010                 obd_ioctl_freedata(buf, len);
3011                 GOTO(out, err);
3012         }
3013         case LL_IOC_LOV_SETSTRIPE:
3014                 err = obd_alloc_memmd(exp, karg);
3015                 if (err > 0)
3016                         err = 0;
3017                 GOTO(out, err);
3018         case LL_IOC_LOV_GETSTRIPE:
3019                 err = osc_getstripe(karg, uarg);
3020                 GOTO(out, err);
3021         case OBD_IOC_CLIENT_RECOVER:
3022                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3023                                             data->ioc_inlbuf1, 0);
3024                 if (err > 0)
3025                         err = 0;
3026                 GOTO(out, err);
3027         case IOC_OSC_SET_ACTIVE:
3028                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3029                                                data->ioc_offset);
3030                 GOTO(out, err);
3031         case OBD_IOC_POLL_QUOTACHECK:
3032                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3033                 GOTO(out, err);
3034         case OBD_IOC_PING_TARGET:
3035                 err = ptlrpc_obd_ping(obd);
3036                 GOTO(out, err);
3037         default:
3038                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3039                        cmd, current_comm());
3040                 GOTO(out, err = -ENOTTY);
3041         }
3042 out:
3043         module_put(THIS_MODULE);
3044         return err;
3045 }
3046
3047 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3048                         obd_count keylen, void *key, __u32 *vallen, void *val,
3049                         struct lov_stripe_md *lsm)
3050 {
3051         ENTRY;
3052         if (!vallen || !val)
3053                 RETURN(-EFAULT);
3054
3055         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3056                 __u32 *stripe = val;
3057                 *vallen = sizeof(*stripe);
3058                 *stripe = 0;
3059                 RETURN(0);
3060         } else if (KEY_IS(KEY_LAST_ID)) {
3061                 struct ptlrpc_request *req;
3062                 obd_id                *reply;
3063                 char                  *tmp;
3064                 int                    rc;
3065
3066                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3067                                            &RQF_OST_GET_INFO_LAST_ID);
3068                 if (req == NULL)
3069                         RETURN(-ENOMEM);
3070
3071                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3072                                      RCL_CLIENT, keylen);
3073                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3074                 if (rc) {
3075                         ptlrpc_request_free(req);
3076                         RETURN(rc);
3077                 }
3078
3079                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3080                 memcpy(tmp, key, keylen);
3081
3082                 req->rq_no_delay = req->rq_no_resend = 1;
3083                 ptlrpc_request_set_replen(req);
3084                 rc = ptlrpc_queue_wait(req);
3085                 if (rc)
3086                         GOTO(out, rc);
3087
3088                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3089                 if (reply == NULL)
3090                         GOTO(out, rc = -EPROTO);
3091
3092                 *((obd_id *)val) = *reply;
3093         out:
3094                 ptlrpc_req_finished(req);
3095                 RETURN(rc);
3096         } else if (KEY_IS(KEY_FIEMAP)) {
3097                 struct ll_fiemap_info_key *fm_key =
3098                                 (struct ll_fiemap_info_key *)key;
3099                 struct ldlm_res_id       res_id;
3100                 ldlm_policy_data_t       policy;
3101                 struct lustre_handle     lockh;
3102                 ldlm_mode_t              mode = 0;
3103                 struct ptlrpc_request   *req;
3104                 struct ll_user_fiemap   *reply;
3105                 char                    *tmp;
3106                 int                      rc;
3107
3108                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3109                         goto skip_locking;
3110
3111                 policy.l_extent.start = fm_key->fiemap.fm_start &
3112                                                 CFS_PAGE_MASK;
3113
3114                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3115                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3116                         policy.l_extent.end = OBD_OBJECT_EOF;
3117                 else
3118                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3119                                 fm_key->fiemap.fm_length +
3120                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3121
3122                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3123                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3124                                        LDLM_FL_BLOCK_GRANTED |
3125                                        LDLM_FL_LVB_READY,
3126                                        &res_id, LDLM_EXTENT, &policy,
3127                                        LCK_PR | LCK_PW, &lockh, 0);
3128                 if (mode) { /* lock is cached on client */
3129                         if (mode != LCK_PR) {
3130                                 ldlm_lock_addref(&lockh, LCK_PR);
3131                                 ldlm_lock_decref(&lockh, LCK_PW);
3132                         }
3133                 } else { /* no cached lock, needs acquire lock on server side */
3134                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3135                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3136                 }
3137
3138 skip_locking:
3139                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3140                                            &RQF_OST_GET_INFO_FIEMAP);
3141                 if (req == NULL)
3142                         GOTO(drop_lock, rc = -ENOMEM);
3143
3144                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3145                                      RCL_CLIENT, keylen);
3146                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3147                                      RCL_CLIENT, *vallen);
3148                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3149                                      RCL_SERVER, *vallen);
3150
3151                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3152                 if (rc) {
3153                         ptlrpc_request_free(req);
3154                         GOTO(drop_lock, rc);
3155                 }
3156
3157                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3158                 memcpy(tmp, key, keylen);
3159                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3160                 memcpy(tmp, val, *vallen);