Whamcloud - gitweb
68452b059153f1557d9f6d5489280e8dd6ff8ddb
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_async_args *aa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
362 out:
363         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364         RETURN(rc);
365 }
366
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368                              struct obd_trans_info *oti,
369                              struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request *req;
372         struct osc_async_args *aa;
373         int                    rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
388                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
389
390         osc_pack_req_body(req, oinfo);
391
392         ptlrpc_request_set_replen(req);
393
394         /* do mds to ost setattr asynchronously */
395         if (!rqset) {
396                 /* Do not wait for response. */
397                 ptlrpcd_add_req(req, PSCOPE_OTHER);
398         } else {
399                 req->rq_interpret_reply =
400                         (ptlrpc_interpterer_t)osc_setattr_interpret;
401
402                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
403                 aa = ptlrpc_req_async_args(req);
404                 aa->aa_oi = oinfo;
405
406                 ptlrpc_set_add_req(rqset, req);
407         }
408
409         RETURN(0);
410 }
411
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct lov_stripe_md  *lsm;
418         int                    rc;
419         ENTRY;
420
421         LASSERT(oa);
422         LASSERT(ea);
423
424         lsm = *ea;
425         if (!lsm) {
426                 rc = obd_alloc_memmd(exp, &lsm);
427                 if (rc < 0)
428                         RETURN(rc);
429         }
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
432         if (req == NULL)
433                 GOTO(out, rc = -ENOMEM);
434
435         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 GOTO(out, rc);
439         }
440
441         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
442         LASSERT(body);
443         lustre_set_wire_obdo(&body->oa, oa);
444
445         ptlrpc_request_set_replen(req);
446
447         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448             oa->o_flags == OBD_FL_DELORPHAN) {
449                 DEBUG_REQ(D_HA, req,
450                           "delorphan from OST integration");
451                 /* Don't resend the delorphan req */
452                 req->rq_no_resend = req->rq_no_delay = 1;
453         }
454
455         rc = ptlrpc_queue_wait(req);
456         if (rc)
457                 GOTO(out_req, rc);
458
459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460         if (body == NULL)
461                 GOTO(out_req, rc = -EPROTO);
462
463         lustre_get_wire_obdo(oa, &body->oa);
464
465         /* This should really be sent by the OST */
466         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
467         oa->o_valid |= OBD_MD_FLBLKSZ;
468
469         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470          * have valid lsm_oinfo data structs, so don't go touching that.
471          * This needs to be fixed in a big way.
472          */
473         lsm->lsm_object_id = oa->o_id;
474         lsm->lsm_object_gr = oa->o_gr;
475         *ea = lsm;
476
477         if (oti != NULL) {
478                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
479
480                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
481                         if (!oti->oti_logcookies)
482                                 oti_alloc_cookies(oti, 1);
483                         *oti->oti_logcookies = oa->o_lcookie;
484                 }
485         }
486
487         CDEBUG(D_HA, "transno: "LPD64"\n",
488                lustre_msg_get_transno(req->rq_repmsg));
489 out_req:
490         ptlrpc_req_finished(req);
491 out:
492         if (rc && !*ea)
493                 obd_free_memmd(exp, &lsm);
494         RETURN(rc);
495 }
496
497 static int osc_punch_interpret(const struct lu_env *env,
498                                struct ptlrpc_request *req,
499                                struct osc_punch_args *aa, int rc)
500 {
501         struct ost_body *body;
502         ENTRY;
503
504         if (rc != 0)
505                 GOTO(out, rc);
506
507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508         if (body == NULL)
509                 GOTO(out, rc = -EPROTO);
510
511         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
512 out:
513         rc = aa->pa_upcall(aa->pa_cookie, rc);
514         RETURN(rc);
515 }
516
517 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
518                    struct obd_capa *capa,
519                    obd_enqueue_update_f upcall, void *cookie,
520                    struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request *req;
523         struct osc_punch_args *aa;
524         struct ost_body       *body;
525         int                    rc;
526         ENTRY;
527
528         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
529         if (req == NULL)
530                 RETURN(-ENOMEM);
531
532         osc_set_capa_size(req, &RMF_CAPA1, capa);
533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(rc);
537         }
538         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
539         ptlrpc_at_set_req_timeout(req);
540
541         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
542         LASSERT(body);
543         lustre_set_wire_obdo(&body->oa, oa);
544         osc_pack_capa(req, body, capa);
545
546         ptlrpc_request_set_replen(req);
547
548
549         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
550         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
551         aa = ptlrpc_req_async_args(req);
552         aa->pa_oa     = oa;
553         aa->pa_upcall = upcall;
554         aa->pa_cookie = cookie;
555         if (rqset == PTLRPCD_SET)
556                 ptlrpcd_add_req(req, PSCOPE_OTHER);
557         else
558                 ptlrpc_set_add_req(rqset, req);
559
560         RETURN(0);
561 }
562
563 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
564                      struct obd_trans_info *oti,
565                      struct ptlrpc_request_set *rqset)
566 {
567         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
568         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
569         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
570         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
571                               oinfo->oi_cb_up, oinfo, rqset);
572 }
573
574 static int osc_sync(struct obd_export *exp, struct obdo *oa,
575                     struct lov_stripe_md *md, obd_size start, obd_size end,
576                     void *capa)
577 {
578         struct ptlrpc_request *req;
579         struct ost_body       *body;
580         int                    rc;
581         ENTRY;
582
583         if (!oa) {
584                 CDEBUG(D_INFO, "oa NULL\n");
585                 RETURN(-EINVAL);
586         }
587
588         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
589         if (req == NULL)
590                 RETURN(-ENOMEM);
591
592         osc_set_capa_size(req, &RMF_CAPA1, capa);
593         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
594         if (rc) {
595                 ptlrpc_request_free(req);
596                 RETURN(rc);
597         }
598
599         /* overload the size and blocks fields in the oa with start/end */
600         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
601         LASSERT(body);
602         lustre_set_wire_obdo(&body->oa, oa);
603         body->oa.o_size = start;
604         body->oa.o_blocks = end;
605         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
606         osc_pack_capa(req, body, capa);
607
608         ptlrpc_request_set_replen(req);
609
610         rc = ptlrpc_queue_wait(req);
611         if (rc)
612                 GOTO(out, rc);
613
614         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
615         if (body == NULL)
616                 GOTO(out, rc = -EPROTO);
617
618         lustre_get_wire_obdo(oa, &body->oa);
619
620         EXIT;
621  out:
622         ptlrpc_req_finished(req);
623         return rc;
624 }
625
626 /* Find and cancel locally locks matched by @mode in the resource found by
627  * @objid. Found locks are added into @cancel list. Returns the amount of
628  * locks added to @cancels list. */
629 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
630                                    cfs_list_t *cancels,
631                                    ldlm_mode_t mode, int lock_flags)
632 {
633         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
634         struct ldlm_res_id res_id;
635         struct ldlm_resource *res;
636         int count;
637         ENTRY;
638
639         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
640         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
641         if (res == NULL)
642                 RETURN(0);
643
644         LDLM_RESOURCE_ADDREF(res);
645         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
646                                            lock_flags, 0, NULL);
647         LDLM_RESOURCE_DELREF(res);
648         ldlm_resource_putref(res);
649         RETURN(count);
650 }
651
652 static int osc_destroy_interpret(const struct lu_env *env,
653                                  struct ptlrpc_request *req, void *data,
654                                  int rc)
655 {
656         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
657
658         cfs_atomic_dec(&cli->cl_destroy_in_flight);
659         cfs_waitq_signal(&cli->cl_destroy_waitq);
660         return 0;
661 }
662
663 static int osc_can_send_destroy(struct client_obd *cli)
664 {
665         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
666             cli->cl_max_rpcs_in_flight) {
667                 /* The destroy request can be sent */
668                 return 1;
669         }
670         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
671             cli->cl_max_rpcs_in_flight) {
672                 /*
673                  * The counter has been modified between the two atomic
674                  * operations.
675                  */
676                 cfs_waitq_signal(&cli->cl_destroy_waitq);
677         }
678         return 0;
679 }
680
681 /* Destroy requests can be async always on the client, and we don't even really
682  * care about the return code since the client cannot do anything at all about
683  * a destroy failure.
684  * When the MDS is unlinking a filename, it saves the file objects into a
685  * recovery llog, and these object records are cancelled when the OST reports
686  * they were destroyed and sync'd to disk (i.e. transaction committed).
687  * If the client dies, or the OST is down when the object should be destroyed,
688  * the records are not cancelled, and when the OST reconnects to the MDS next,
689  * it will retrieve the llog unlink logs and then sends the log cancellation
690  * cookies to the MDS after committing destroy transactions. */
691 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
692                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
693                        struct obd_export *md_export, void *capa)
694 {
695         struct client_obd     *cli = &exp->exp_obd->u.cli;
696         struct ptlrpc_request *req;
697         struct ost_body       *body;
698         CFS_LIST_HEAD(cancels);
699         int rc, count;
700         ENTRY;
701
702         if (!oa) {
703                 CDEBUG(D_INFO, "oa NULL\n");
704                 RETURN(-EINVAL);
705         }
706
707         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
708                                         LDLM_FL_DISCARD_DATA);
709
710         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
711         if (req == NULL) {
712                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
713                 RETURN(-ENOMEM);
714         }
715
716         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
717         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
718                                0, &cancels, count);
719         if (rc) {
720                 ptlrpc_request_free(req);
721                 RETURN(rc);
722         }
723
724         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
725         ptlrpc_at_set_req_timeout(req);
726
727         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
728                 oa->o_lcookie = *oti->oti_logcookies;
729         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
730         LASSERT(body);
731         lustre_set_wire_obdo(&body->oa, oa);
732
733         osc_pack_capa(req, body, (struct obd_capa *)capa);
734         ptlrpc_request_set_replen(req);
735
736         /* don't throttle destroy RPCs for the MDT */
737         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
738                 req->rq_interpret_reply = osc_destroy_interpret;
739                 if (!osc_can_send_destroy(cli)) {
740                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
741                                                           NULL);
742
743                         /*
744                          * Wait until the number of on-going destroy RPCs drops
745                          * under max_rpc_in_flight
746                          */
747                         l_wait_event_exclusive(cli->cl_destroy_waitq,
748                                                osc_can_send_destroy(cli), &lwi);
749                 }
750         }
751
752         /* Do not wait for response */
753         ptlrpcd_add_req(req, PSCOPE_OTHER);
754         RETURN(0);
755 }
756
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
758                                 long writing_bytes)
759 {
760         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
761
762         LASSERT(!(oa->o_valid & bits));
763
764         oa->o_valid |= bits;
765         client_obd_list_lock(&cli->cl_loi_list_lock);
766         oa->o_dirty = cli->cl_dirty;
767         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
770                 oa->o_undirty = 0;
771         } else if (cfs_atomic_read(&obd_dirty_pages) -
772                    cfs_atomic_read(&obd_dirty_transit_pages) >
773                    obd_max_dirty_pages + 1){
774                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
775                  * not covered by a lock thus they may safely race and trip
776                  * this CERROR() unless we add in a small fudge factor (+1). */
777                 CERROR("dirty %d - %d > system dirty_max %d\n",
778                        cfs_atomic_read(&obd_dirty_pages),
779                        cfs_atomic_read(&obd_dirty_transit_pages),
780                        obd_max_dirty_pages);
781                 oa->o_undirty = 0;
782         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
783                 CERROR("dirty %lu - dirty_max %lu too big???\n",
784                        cli->cl_dirty, cli->cl_dirty_max);
785                 oa->o_undirty = 0;
786         } else {
787                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
788                                 (cli->cl_max_rpcs_in_flight + 1);
789                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
790         }
791         oa->o_grant = cli->cl_avail_grant;
792         oa->o_dropped = cli->cl_lost_grant;
793         cli->cl_lost_grant = 0;
794         client_obd_list_unlock(&cli->cl_loi_list_lock);
795         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
796                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
797
798 }
799
800 static void osc_update_next_shrink(struct client_obd *cli)
801 {
802         cli->cl_next_shrink_grant =
803                 cfs_time_shift(cli->cl_grant_shrink_interval);
804         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
805                cli->cl_next_shrink_grant);
806 }
807
808 /* caller must hold loi_list_lock */
809 static void osc_consume_write_grant(struct client_obd *cli,
810                                     struct brw_page *pga)
811 {
812         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         cfs_atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
833         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
834                 EXIT;
835                 return;
836         }
837
838         pga->flag &= ~OBD_BRW_FROM_GRANT;
839         cfs_atomic_dec(&obd_dirty_pages);
840         cli->cl_dirty -= CFS_PAGE_SIZE;
841         if (pga->flag & OBD_BRW_NOCACHE) {
842                 pga->flag &= ~OBD_BRW_NOCACHE;
843                 cfs_atomic_dec(&obd_dirty_transit_pages);
844                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
845         }
846         if (!sent) {
847                 cli->cl_lost_grant += CFS_PAGE_SIZE;
848                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
849                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
850         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
851                 /* For short writes we shouldn't count parts of pages that
852                  * span a whole block on the OST side, or our accounting goes
853                  * wrong.  Should match the code in filter_grant_check. */
854                 int offset = pga->off & ~CFS_PAGE_MASK;
855                 int count = pga->count + (offset & (blocksize - 1));
856                 int end = (offset + pga->count) & (blocksize - 1);
857                 if (end)
858                         count += blocksize - end;
859
860                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
861                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
862                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
863                        cli->cl_avail_grant, cli->cl_dirty);
864         }
865
866         EXIT;
867 }
868
869 static unsigned long rpcs_in_flight(struct client_obd *cli)
870 {
871         return cli->cl_r_in_flight + cli->cl_w_in_flight;
872 }
873
874 /* caller must hold loi_list_lock */
875 void osc_wake_cache_waiters(struct client_obd *cli)
876 {
877         cfs_list_t *l, *tmp;
878         struct osc_cache_waiter *ocw;
879
880         ENTRY;
881         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
882                 /* if we can't dirty more, we must wait until some is written */
883                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
884                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
885                     obd_max_dirty_pages)) {
886                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887                                "osc max %ld, sys max %d\n", cli->cl_dirty,
888                                cli->cl_dirty_max, obd_max_dirty_pages);
889                         return;
890                 }
891
892                 /* if still dirty cache but no grant wait for pending RPCs that
893                  * may yet return us some grant before doing sync writes */
894                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896                                cli->cl_w_in_flight);
897                         return;
898                 }
899
900                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
901                 cfs_list_del_init(&ocw->ocw_entry);
902                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903                         /* no more RPCs in flight to return grant, do sync IO */
904                         ocw->ocw_rc = -EDQUOT;
905                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
906                 } else {
907                         osc_consume_write_grant(cli,
908                                                 &ocw->ocw_oap->oap_brw_page);
909                 }
910
911                 cfs_waitq_signal(&ocw->ocw_waitq);
912         }
913
914         EXIT;
915 }
916
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
918 {
919         client_obd_list_lock(&cli->cl_loi_list_lock);
920         cli->cl_avail_grant += grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922 }
923
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
925 {
926         if (body->oa.o_valid & OBD_MD_FLGRANT) {
927                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928                 __osc_update_grant(cli, body->oa.o_grant);
929         }
930 }
931
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933                               void *key, obd_count vallen, void *val,
934                               struct ptlrpc_request_set *set);
935
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937                                       struct ptlrpc_request *req,
938                                       void *aa, int rc)
939 {
940         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942         struct ost_body *body;
943
944         if (rc != 0) {
945                 __osc_update_grant(cli, oa->o_grant);
946                 GOTO(out, rc);
947         }
948
949         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
950         LASSERT(body);
951         osc_update_grant(cli, body);
952 out:
953         OBD_FREE_PTR(oa);
954         return rc;
955 }
956
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
958 {
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         oa->o_grant = cli->cl_avail_grant / 4;
961         cli->cl_avail_grant -= oa->o_grant;
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         oa->o_flags |= OBD_FL_SHRINK_GRANT;
964         osc_update_next_shrink(cli);
965 }
966
967 /* Shrink the current grant, either from some large amount to enough for a
968  * full set of in-flight RPCs, or if we have already shrunk to that limit
969  * then to enough for a single RPC.  This avoids keeping more grant than
970  * needed, and avoids shrinking the grant piecemeal. */
971 static int osc_shrink_grant(struct client_obd *cli)
972 {
973         long target = (cli->cl_max_rpcs_in_flight + 1) *
974                       cli->cl_max_pages_per_rpc;
975
976         client_obd_list_lock(&cli->cl_loi_list_lock);
977         if (cli->cl_avail_grant <= target)
978                 target = cli->cl_max_pages_per_rpc;
979         client_obd_list_unlock(&cli->cl_loi_list_lock);
980
981         return osc_shrink_grant_to_target(cli, target);
982 }
983
984 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
985 {
986         int    rc = 0;
987         struct ost_body     *body;
988         ENTRY;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         /* Don't shrink if we are already above or below the desired limit
992          * We don't want to shrink below a single RPC, as that will negatively
993          * impact block allocation and long-term performance. */
994         if (target < cli->cl_max_pages_per_rpc)
995                 target = cli->cl_max_pages_per_rpc;
996
997         if (target >= cli->cl_avail_grant) {
998                 client_obd_list_unlock(&cli->cl_loi_list_lock);
999                 RETURN(0);
1000         }
1001         client_obd_list_unlock(&cli->cl_loi_list_lock);
1002
1003         OBD_ALLOC_PTR(body);
1004         if (!body)
1005                 RETURN(-ENOMEM);
1006
1007         osc_announce_cached(cli, &body->oa, 0);
1008
1009         client_obd_list_lock(&cli->cl_loi_list_lock);
1010         body->oa.o_grant = cli->cl_avail_grant - target;
1011         cli->cl_avail_grant = target;
1012         client_obd_list_unlock(&cli->cl_loi_list_lock);
1013         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1014         osc_update_next_shrink(cli);
1015
1016         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1017                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1018                                 sizeof(*body), body, NULL);
1019         if (rc != 0)
1020                 __osc_update_grant(cli, body->oa.o_grant);
1021         OBD_FREE_PTR(body);
1022         RETURN(rc);
1023 }
1024
1025 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1026 static int osc_should_shrink_grant(struct client_obd *client)
1027 {
1028         cfs_time_t time = cfs_time_current();
1029         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1030         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1031                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1032                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1033                         return 1;
1034                 else
1035                         osc_update_next_shrink(client);
1036         }
1037         return 0;
1038 }
1039
1040 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1041 {
1042         struct client_obd *client;
1043
1044         cfs_list_for_each_entry(client, &item->ti_obd_list,
1045                                 cl_grant_shrink_list) {
1046                 if (osc_should_shrink_grant(client))
1047                         osc_shrink_grant(client);
1048         }
1049         return 0;
1050 }
1051
1052 static int osc_add_shrink_grant(struct client_obd *client)
1053 {
1054         int rc;
1055
1056         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057                                        TIMEOUT_GRANT,
1058                                        osc_grant_shrink_grant_cb, NULL,
1059                                        &client->cl_grant_shrink_list);
1060         if (rc) {
1061                 CERROR("add grant client %s error %d\n",
1062                         client->cl_import->imp_obd->obd_name, rc);
1063                 return rc;
1064         }
1065         CDEBUG(D_CACHE, "add grant client %s \n",
1066                client->cl_import->imp_obd->obd_name);
1067         osc_update_next_shrink(client);
1068         return 0;
1069 }
1070
1071 static int osc_del_shrink_grant(struct client_obd *client)
1072 {
1073         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074                                          TIMEOUT_GRANT);
1075 }
1076
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 {
1079         /*
1080          * ocd_grant is the total grant amount we're expect to hold: if we've
1081          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1082          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1083          *
1084          * race is tolerable here: if we're evicted, but imp_state already
1085          * left EVICTED state, then cl_dirty must be 0 already.
1086          */
1087         client_obd_list_lock(&cli->cl_loi_list_lock);
1088         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1089                 cli->cl_avail_grant = ocd->ocd_grant;
1090         else
1091                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1092         client_obd_list_unlock(&cli->cl_loi_list_lock);
1093
1094         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1095                cli->cl_avail_grant, cli->cl_lost_grant);
1096         LASSERT(cli->cl_avail_grant >= 0);
1097
1098         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1099             cfs_list_empty(&cli->cl_grant_shrink_list))
1100                 osc_add_shrink_grant(cli);
1101 }
1102
1103 /* We assume that the reason this OSC got a short read is because it read
1104  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1105  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1106  * this stripe never got written at or beyond this stripe offset yet. */
1107 static void handle_short_read(int nob_read, obd_count page_count,
1108                               struct brw_page **pga)
1109 {
1110         char *ptr;
1111         int i = 0;
1112
1113         /* skip bytes read OK */
1114         while (nob_read > 0) {
1115                 LASSERT (page_count > 0);
1116
1117                 if (pga[i]->count > nob_read) {
1118                         /* EOF inside this page */
1119                         ptr = cfs_kmap(pga[i]->pg) +
1120                                 (pga[i]->off & ~CFS_PAGE_MASK);
1121                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1122                         cfs_kunmap(pga[i]->pg);
1123                         page_count--;
1124                         i++;
1125                         break;
1126                 }
1127
1128                 nob_read -= pga[i]->count;
1129                 page_count--;
1130                 i++;
1131         }
1132
1133         /* zero remaining pages */
1134         while (page_count-- > 0) {
1135                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1136                 memset(ptr, 0, pga[i]->count);
1137                 cfs_kunmap(pga[i]->pg);
1138                 i++;
1139         }
1140 }
1141
1142 static int check_write_rcs(struct ptlrpc_request *req,
1143                            int requested_nob, int niocount,
1144                            obd_count page_count, struct brw_page **pga)
1145 {
1146         int     i;
1147         __u32   *remote_rcs;
1148
1149         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1150                                                   sizeof(*remote_rcs) *
1151                                                   niocount);
1152         if (remote_rcs == NULL) {
1153                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1154                 return(-EPROTO);
1155         }
1156
1157         /* return error if any niobuf was in error */
1158         for (i = 0; i < niocount; i++) {
1159                 if (remote_rcs[i] < 0)
1160                         return(remote_rcs[i]);
1161
1162                 if (remote_rcs[i] != 0) {
1163                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1164                                 i, remote_rcs[i], req);
1165                         return(-EPROTO);
1166                 }
1167         }
1168
1169         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1170                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1171                        req->rq_bulk->bd_nob_transferred, requested_nob);
1172                 return(-EPROTO);
1173         }
1174
1175         return (0);
1176 }
1177
1178 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1179 {
1180         if (p1->flag != p2->flag) {
1181                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1182                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1183
1184                 /* warn if we try to combine flags that we don't know to be
1185                  * safe to combine */
1186                 if ((p1->flag & mask) != (p2->flag & mask))
1187                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1188                                "same brw?\n", p1->flag, p2->flag);
1189                 return 0;
1190         }
1191
1192         return (p1->off + p1->count == p2->off);
1193 }
1194
1195 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1196                                    struct brw_page **pga, int opc,
1197                                    cksum_type_t cksum_type)
1198 {
1199         __u32 cksum;
1200         int i = 0;
1201
1202         LASSERT (pg_count > 0);
1203         cksum = init_checksum(cksum_type);
1204         while (nob > 0 && pg_count > 0) {
1205                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1206                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1207                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1208
1209                 /* corrupt the data before we compute the checksum, to
1210                  * simulate an OST->client data error */
1211                 if (i == 0 && opc == OST_READ &&
1212                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1213                         memcpy(ptr + off, "bad1", min(4, nob));
1214                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1215                 cfs_kunmap(pga[i]->pg);
1216                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1217                                off, cksum);
1218
1219                 nob -= pga[i]->count;
1220                 pg_count--;
1221                 i++;
1222         }
1223         /* For sending we only compute the wrong checksum instead
1224          * of corrupting the data so it is still correct on a redo */
1225         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1226                 cksum++;
1227
1228         return cksum;
1229 }
1230
1231 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1232                                 struct lov_stripe_md *lsm, obd_count page_count,
1233                                 struct brw_page **pga,
1234                                 struct ptlrpc_request **reqp,
1235                                 struct obd_capa *ocapa, int reserve)
1236 {
1237         struct ptlrpc_request   *req;
1238         struct ptlrpc_bulk_desc *desc;
1239         struct ost_body         *body;
1240         struct obd_ioobj        *ioobj;
1241         struct niobuf_remote    *niobuf;
1242         int niocount, i, requested_nob, opc, rc;
1243         struct osc_brw_async_args *aa;
1244         struct req_capsule      *pill;
1245         struct brw_page *pg_prev;
1246
1247         ENTRY;
1248         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1249                 RETURN(-ENOMEM); /* Recoverable */
1250         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1251                 RETURN(-EINVAL); /* Fatal */
1252
1253         if ((cmd & OBD_BRW_WRITE) != 0) {
1254                 opc = OST_WRITE;
1255                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1256                                                 cli->cl_import->imp_rq_pool,
1257                                                 &RQF_OST_BRW_WRITE);
1258         } else {
1259                 opc = OST_READ;
1260                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1261         }
1262         if (req == NULL)
1263                 RETURN(-ENOMEM);
1264
1265         for (niocount = i = 1; i < page_count; i++) {
1266                 if (!can_merge_pages(pga[i - 1], pga[i]))
1267                         niocount++;
1268         }
1269
1270         pill = &req->rq_pill;
1271         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1272                              sizeof(*ioobj));
1273         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1274                              niocount * sizeof(*niobuf));
1275         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1276
1277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1278         if (rc) {
1279                 ptlrpc_request_free(req);
1280                 RETURN(rc);
1281         }
1282         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1283         ptlrpc_at_set_req_timeout(req);
1284
1285         if (opc == OST_WRITE)
1286                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1287                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1288         else
1289                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1290                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1291
1292         if (desc == NULL)
1293                 GOTO(out, rc = -ENOMEM);
1294         /* NB request now owns desc and will free it when it gets freed */
1295
1296         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1297         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1298         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1299         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1300
1301         lustre_set_wire_obdo(&body->oa, oa);
1302
1303         obdo_to_ioobj(oa, ioobj);
1304         ioobj->ioo_bufcnt = niocount;
1305         osc_pack_capa(req, body, ocapa);
1306         LASSERT (page_count > 0);
1307         pg_prev = pga[0];
1308         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1309                 struct brw_page *pg = pga[i];
1310
1311                 LASSERT(pg->count > 0);
1312                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1313                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1314                          pg->off, pg->count);
1315 #ifdef __linux__
1316                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1317                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1318                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1319                          i, page_count,
1320                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1321                          pg_prev->pg, page_private(pg_prev->pg),
1322                          pg_prev->pg->index, pg_prev->off);
1323 #else
1324                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1325                          "i %d p_c %u\n", i, page_count);
1326 #endif
1327                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1328                         (pg->flag & OBD_BRW_SRVLOCK));
1329
1330                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1331                                       pg->count);
1332                 requested_nob += pg->count;
1333
1334                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1335                         niobuf--;
1336                         niobuf->len += pg->count;
1337                 } else {
1338                         niobuf->offset = pg->off;
1339                         niobuf->len    = pg->count;
1340                         niobuf->flags  = pg->flag;
1341                 }
1342                 pg_prev = pg;
1343         }
1344
1345         LASSERTF((void *)(niobuf - niocount) ==
1346                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1347                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1348                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1349
1350         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1351         if (osc_should_shrink_grant(cli))
1352                 osc_shrink_grant_local(cli, &body->oa);
1353
1354         /* size[REQ_REC_OFF] still sizeof (*body) */
1355         if (opc == OST_WRITE) {
1356                 if (unlikely(cli->cl_checksum) &&
1357                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1358                         /* store cl_cksum_type in a local variable since
1359                          * it can be changed via lprocfs */
1360                         cksum_type_t cksum_type = cli->cl_cksum_type;
1361
1362                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1363                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1364                                 body->oa.o_flags = 0;
1365                         }
1366                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1367                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1368                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1369                                                              page_count, pga,
1370                                                              OST_WRITE,
1371                                                              cksum_type);
1372                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1373                                body->oa.o_cksum);
1374                         /* save this in 'oa', too, for later checking */
1375                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1376                         oa->o_flags |= cksum_type_pack(cksum_type);
1377                 } else {
1378                         /* clear out the checksum flag, in case this is a
1379                          * resend but cl_checksum is no longer set. b=11238 */
1380                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1381                 }
1382                 oa->o_cksum = body->oa.o_cksum;
1383                 /* 1 RC per niobuf */
1384                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1385                                      sizeof(__u32) * niocount);
1386         } else {
1387                 if (unlikely(cli->cl_checksum) &&
1388                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1389                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1390                                 body->oa.o_flags = 0;
1391                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1392                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1393                 }
1394         }
1395         ptlrpc_request_set_replen(req);
1396
1397         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1398         aa = ptlrpc_req_async_args(req);
1399         aa->aa_oa = oa;
1400         aa->aa_requested_nob = requested_nob;
1401         aa->aa_nio_count = niocount;
1402         aa->aa_page_count = page_count;
1403         aa->aa_resends = 0;
1404         aa->aa_ppga = pga;
1405         aa->aa_cli = cli;
1406         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1407         if (ocapa && reserve)
1408                 aa->aa_ocapa = capa_get(ocapa);
1409
1410         *reqp = req;
1411         RETURN(0);
1412
1413  out:
1414         ptlrpc_req_finished(req);
1415         RETURN(rc);
1416 }
1417
1418 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1419                                 __u32 client_cksum, __u32 server_cksum, int nob,
1420                                 obd_count page_count, struct brw_page **pga,
1421                                 cksum_type_t client_cksum_type)
1422 {
1423         __u32 new_cksum;
1424         char *msg;
1425         cksum_type_t cksum_type;
1426
1427         if (server_cksum == client_cksum) {
1428                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1429                 return 0;
1430         }
1431
1432         if (oa->o_valid & OBD_MD_FLFLAGS)
1433                 cksum_type = cksum_type_unpack(oa->o_flags);
1434         else
1435                 cksum_type = OBD_CKSUM_CRC32;
1436
1437         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1438                                       cksum_type);
1439
1440         if (cksum_type != client_cksum_type)
1441                 msg = "the server did not use the checksum type specified in "
1442                       "the original request - likely a protocol problem";
1443         else if (new_cksum == server_cksum)
1444                 msg = "changed on the client after we checksummed it - "
1445                       "likely false positive due to mmap IO (bug 11742)";
1446         else if (new_cksum == client_cksum)
1447                 msg = "changed in transit before arrival at OST";
1448         else
1449                 msg = "changed in transit AND doesn't match the original - "
1450                       "likely false positive due to mmap IO (bug 11742)";
1451
1452         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1453                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1454                            "["LPU64"-"LPU64"]\n",
1455                            msg, libcfs_nid2str(peer->nid),
1456                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1457                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1458                                                         (__u64)0,
1459                            oa->o_id,
1460                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1461                            pga[0]->off,
1462                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1463         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1464                "client csum now %x\n", client_cksum, client_cksum_type,
1465                server_cksum, cksum_type, new_cksum);
1466         return 1;
1467 }
1468
1469 /* Note rc enters this function as number of bytes transferred */
1470 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1471 {
1472         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1473         const lnet_process_id_t *peer =
1474                         &req->rq_import->imp_connection->c_peer;
1475         struct client_obd *cli = aa->aa_cli;
1476         struct ost_body *body;
1477         __u32 client_cksum = 0;
1478         ENTRY;
1479
1480         if (rc < 0 && rc != -EDQUOT) {
1481                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1482                 RETURN(rc);
1483         }
1484
1485         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1486         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1487         if (body == NULL) {
1488                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1489                 RETURN(-EPROTO);
1490         }
1491
1492 #ifdef HAVE_QUOTA_SUPPORT
1493         /* set/clear over quota flag for a uid/gid */
1494         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1495             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1496                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1497
1498                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %llx, flags %x\n",
1499                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1500                        body->oa.o_flags);
1501                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1502                              body->oa.o_flags);
1503         }
1504 #endif
1505
1506         if (rc < 0)
1507                 RETURN(rc);
1508
1509         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1510                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1511
1512         osc_update_grant(cli, body);
1513
1514         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1515                 if (rc > 0) {
1516                         CERROR("Unexpected +ve rc %d\n", rc);
1517                         RETURN(-EPROTO);
1518                 }
1519                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1520
1521                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1522                         RETURN(-EAGAIN);
1523
1524                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1525                     check_write_checksum(&body->oa, peer, client_cksum,
1526                                          body->oa.o_cksum, aa->aa_requested_nob,
1527                                          aa->aa_page_count, aa->aa_ppga,
1528                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1529                         RETURN(-EAGAIN);
1530
1531                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1532                                      aa->aa_page_count, aa->aa_ppga);
1533                 GOTO(out, rc);
1534         }
1535
1536         /* The rest of this function executes only for OST_READs */
1537
1538         /* if unwrap_bulk failed, return -EAGAIN to retry */
1539         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1540         if (rc < 0)
1541                 GOTO(out, rc = -EAGAIN);
1542
1543         if (rc > aa->aa_requested_nob) {
1544                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1545                        aa->aa_requested_nob);
1546                 RETURN(-EPROTO);
1547         }
1548
1549         if (rc != req->rq_bulk->bd_nob_transferred) {
1550                 CERROR ("Unexpected rc %d (%d transferred)\n",
1551                         rc, req->rq_bulk->bd_nob_transferred);
1552                 return (-EPROTO);
1553         }
1554
1555         if (rc < aa->aa_requested_nob)
1556                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1557
1558         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1559                 static int cksum_counter;
1560                 __u32      server_cksum = body->oa.o_cksum;
1561                 char      *via;
1562                 char      *router;
1563                 cksum_type_t cksum_type;
1564
1565                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1566                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1567                 else
1568                         cksum_type = OBD_CKSUM_CRC32;
1569                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1570                                                  aa->aa_ppga, OST_READ,
1571                                                  cksum_type);
1572
1573                 if (peer->nid == req->rq_bulk->bd_sender) {
1574                         via = router = "";
1575                 } else {
1576                         via = " via ";
1577                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1578                 }
1579
1580                 if (server_cksum == ~0 && rc > 0) {
1581                         CERROR("Protocol error: server %s set the 'checksum' "
1582                                "bit, but didn't send a checksum.  Not fatal, "
1583                                "but please notify on http://bugzilla.lustre.org/\n",
1584                                libcfs_nid2str(peer->nid));
1585                 } else if (server_cksum != client_cksum) {
1586                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1587                                            "%s%s%s inum "LPU64"/"LPU64" object "
1588                                            LPU64"/"LPU64" extent "
1589                                            "["LPU64"-"LPU64"]\n",
1590                                            req->rq_import->imp_obd->obd_name,
1591                                            libcfs_nid2str(peer->nid),
1592                                            via, router,
1593                                            body->oa.o_valid & OBD_MD_FLFID ?
1594                                                 body->oa.o_fid : (__u64)0,
1595                                            body->oa.o_valid & OBD_MD_FLFID ?
1596                                                 body->oa.o_generation :(__u64)0,
1597                                            body->oa.o_id,
1598                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1599                                                 body->oa.o_gr : (__u64)0,
1600                                            aa->aa_ppga[0]->off,
1601                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1602                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1603                                                                         1);
1604                         CERROR("client %x, server %x, cksum_type %x\n",
1605                                client_cksum, server_cksum, cksum_type);
1606                         cksum_counter = 0;
1607                         aa->aa_oa->o_cksum = client_cksum;
1608                         rc = -EAGAIN;
1609                 } else {
1610                         cksum_counter++;
1611                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1612                         rc = 0;
1613                 }
1614         } else if (unlikely(client_cksum)) {
1615                 static int cksum_missed;
1616
1617                 cksum_missed++;
1618                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1619                         CERROR("Checksum %u requested from %s but not sent\n",
1620                                cksum_missed, libcfs_nid2str(peer->nid));
1621         } else {
1622                 rc = 0;
1623         }
1624 out:
1625         if (rc >= 0)
1626                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1627
1628         RETURN(rc);
1629 }
1630
1631 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1632                             struct lov_stripe_md *lsm,
1633                             obd_count page_count, struct brw_page **pga,
1634                             struct obd_capa *ocapa)
1635 {
1636         struct ptlrpc_request *req;
1637         int                    rc;
1638         cfs_waitq_t            waitq;
1639         int                    resends = 0;
1640         struct l_wait_info     lwi;
1641
1642         ENTRY;
1643
1644         cfs_waitq_init(&waitq);
1645
1646 restart_bulk:
1647         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1648                                   page_count, pga, &req, ocapa, 0);
1649         if (rc != 0)
1650                 return (rc);
1651
1652         rc = ptlrpc_queue_wait(req);
1653
1654         if (rc == -ETIMEDOUT && req->rq_resend) {
1655                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1656                 ptlrpc_req_finished(req);
1657                 goto restart_bulk;
1658         }
1659
1660         rc = osc_brw_fini_request(req, rc);
1661
1662         ptlrpc_req_finished(req);
1663         if (osc_recoverable_error(rc)) {
1664                 resends++;
1665                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1666                         CERROR("too many resend retries, returning error\n");
1667                         RETURN(-EIO);
1668                 }
1669
1670                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1671                 l_wait_event(waitq, 0, &lwi);
1672
1673                 goto restart_bulk;
1674         }
1675
1676         RETURN (rc);
1677 }
1678
1679 int osc_brw_redo_request(struct ptlrpc_request *request,
1680                          struct osc_brw_async_args *aa)
1681 {
1682         struct ptlrpc_request *new_req;
1683         struct ptlrpc_request_set *set = request->rq_set;
1684         struct osc_brw_async_args *new_aa;
1685         struct osc_async_page *oap;
1686         int rc = 0;
1687         ENTRY;
1688
1689         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1690                 CERROR("too many resend retries, returning error\n");
1691                 RETURN(-EIO);
1692         }
1693
1694         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1695
1696         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1697                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1698                                   aa->aa_cli, aa->aa_oa,
1699                                   NULL /* lsm unused by osc currently */,
1700                                   aa->aa_page_count, aa->aa_ppga,
1701                                   &new_req, aa->aa_ocapa, 0);
1702         if (rc)
1703                 RETURN(rc);
1704
1705         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1706
1707         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1708                 if (oap->oap_request != NULL) {
1709                         LASSERTF(request == oap->oap_request,
1710                                  "request %p != oap_request %p\n",
1711                                  request, oap->oap_request);
1712                         if (oap->oap_interrupted) {
1713                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1714                                 ptlrpc_req_finished(new_req);
1715                                 RETURN(-EINTR);
1716                         }
1717                 }
1718         }
1719         /* New request takes over pga and oaps from old request.
1720          * Note that copying a list_head doesn't work, need to move it... */
1721         aa->aa_resends++;
1722         new_req->rq_interpret_reply = request->rq_interpret_reply;
1723         new_req->rq_async_args = request->rq_async_args;
1724         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1725
1726         new_aa = ptlrpc_req_async_args(new_req);
1727
1728         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1729         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1730         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1731
1732         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1733                 if (oap->oap_request) {
1734                         ptlrpc_req_finished(oap->oap_request);
1735                         oap->oap_request = ptlrpc_request_addref(new_req);
1736                 }
1737         }
1738
1739         new_aa->aa_ocapa = aa->aa_ocapa;
1740         aa->aa_ocapa = NULL;
1741
1742         /* use ptlrpc_set_add_req is safe because interpret functions work
1743          * in check_set context. only one way exist with access to request
1744          * from different thread got -EINTR - this way protected with
1745          * cl_loi_list_lock */
1746         ptlrpc_set_add_req(set, new_req);
1747
1748         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1749
1750         DEBUG_REQ(D_INFO, new_req, "new request");
1751         RETURN(0);
1752 }
1753
1754 /*
1755  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1756  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1757  * fine for our small page arrays and doesn't require allocation.  its an
1758  * insertion sort that swaps elements that are strides apart, shrinking the
1759  * stride down until its '1' and the array is sorted.
1760  */
1761 static void sort_brw_pages(struct brw_page **array, int num)
1762 {
1763         int stride, i, j;
1764         struct brw_page *tmp;
1765
1766         if (num == 1)
1767                 return;
1768         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1769                 ;
1770
1771         do {
1772                 stride /= 3;
1773                 for (i = stride ; i < num ; i++) {
1774                         tmp = array[i];
1775                         j = i;
1776                         while (j >= stride && array[j - stride]->off > tmp->off) {
1777                                 array[j] = array[j - stride];
1778                                 j -= stride;
1779                         }
1780                         array[j] = tmp;
1781                 }
1782         } while (stride > 1);
1783 }
1784
1785 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1786 {
1787         int count = 1;
1788         int offset;
1789         int i = 0;
1790
1791         LASSERT (pages > 0);
1792         offset = pg[i]->off & ~CFS_PAGE_MASK;
1793
1794         for (;;) {
1795                 pages--;
1796                 if (pages == 0)         /* that's all */
1797                         return count;
1798
1799                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1800                         return count;   /* doesn't end on page boundary */
1801
1802                 i++;
1803                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1804                 if (offset != 0)        /* doesn't start on page boundary */
1805                         return count;
1806
1807                 count++;
1808         }
1809 }
1810
1811 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1812 {
1813         struct brw_page **ppga;
1814         int i;
1815
1816         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1817         if (ppga == NULL)
1818                 return NULL;
1819
1820         for (i = 0; i < count; i++)
1821                 ppga[i] = pga + i;
1822         return ppga;
1823 }
1824
1825 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1826 {
1827         LASSERT(ppga != NULL);
1828         OBD_FREE(ppga, sizeof(*ppga) * count);
1829 }
1830
1831 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1832                    obd_count page_count, struct brw_page *pga,
1833                    struct obd_trans_info *oti)
1834 {
1835         struct obdo *saved_oa = NULL;
1836         struct brw_page **ppga, **orig;
1837         struct obd_import *imp = class_exp2cliimp(exp);
1838         struct client_obd *cli;
1839         int rc, page_count_orig;
1840         ENTRY;
1841
1842         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1843         cli = &imp->imp_obd->u.cli;
1844
1845         if (cmd & OBD_BRW_CHECK) {
1846                 /* The caller just wants to know if there's a chance that this
1847                  * I/O can succeed */
1848
1849                 if (imp->imp_invalid)
1850                         RETURN(-EIO);
1851                 RETURN(0);
1852         }
1853
1854         /* test_brw with a failed create can trip this, maybe others. */
1855         LASSERT(cli->cl_max_pages_per_rpc);
1856
1857         rc = 0;
1858
1859         orig = ppga = osc_build_ppga(pga, page_count);
1860         if (ppga == NULL)
1861                 RETURN(-ENOMEM);
1862         page_count_orig = page_count;
1863
1864         sort_brw_pages(ppga, page_count);
1865         while (page_count) {
1866                 obd_count pages_per_brw;
1867
1868                 if (page_count > cli->cl_max_pages_per_rpc)
1869                         pages_per_brw = cli->cl_max_pages_per_rpc;
1870                 else
1871                         pages_per_brw = page_count;
1872
1873                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1874
1875                 if (saved_oa != NULL) {
1876                         /* restore previously saved oa */
1877                         *oinfo->oi_oa = *saved_oa;
1878                 } else if (page_count > pages_per_brw) {
1879                         /* save a copy of oa (brw will clobber it) */
1880                         OBDO_ALLOC(saved_oa);
1881                         if (saved_oa == NULL)
1882                                 GOTO(out, rc = -ENOMEM);
1883                         *saved_oa = *oinfo->oi_oa;
1884                 }
1885
1886                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1887                                       pages_per_brw, ppga, oinfo->oi_capa);
1888
1889                 if (rc != 0)
1890                         break;
1891
1892                 page_count -= pages_per_brw;
1893                 ppga += pages_per_brw;
1894         }
1895
1896 out:
1897         osc_release_ppga(orig, page_count_orig);
1898
1899         if (saved_oa != NULL)
1900                 OBDO_FREE(saved_oa);
1901
1902         RETURN(rc);
1903 }
1904
1905 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1906  * the dirty accounting.  Writeback completes or truncate happens before
1907  * writing starts.  Must be called with the loi lock held. */
1908 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1909                            int sent)
1910 {
1911         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1912 }
1913
1914
1915 /* This maintains the lists of pending pages to read/write for a given object
1916  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1917  * to quickly find objects that are ready to send an RPC. */
1918 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1919                          int cmd)
1920 {
1921         int optimal;
1922         ENTRY;
1923
1924         if (lop->lop_num_pending == 0)
1925                 RETURN(0);
1926
1927         /* if we have an invalid import we want to drain the queued pages
1928          * by forcing them through rpcs that immediately fail and complete
1929          * the pages.  recovery relies on this to empty the queued pages
1930          * before canceling the locks and evicting down the llite pages */
1931         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1932                 RETURN(1);
1933
1934         /* stream rpcs in queue order as long as as there is an urgent page
1935          * queued.  this is our cheap solution for good batching in the case
1936          * where writepage marks some random page in the middle of the file
1937          * as urgent because of, say, memory pressure */
1938         if (!cfs_list_empty(&lop->lop_urgent)) {
1939                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1940                 RETURN(1);
1941         }
1942         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1943         optimal = cli->cl_max_pages_per_rpc;
1944         if (cmd & OBD_BRW_WRITE) {
1945                 /* trigger a write rpc stream as long as there are dirtiers
1946                  * waiting for space.  as they're waiting, they're not going to
1947                  * create more pages to coallesce with what's waiting.. */
1948                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1949                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1950                         RETURN(1);
1951                 }
1952                 /* +16 to avoid triggering rpcs that would want to include pages
1953                  * that are being queued but which can't be made ready until
1954                  * the queuer finishes with the page. this is a wart for
1955                  * llite::commit_write() */
1956                 optimal += 16;
1957         }
1958         if (lop->lop_num_pending >= optimal)
1959                 RETURN(1);
1960
1961         RETURN(0);
1962 }
1963
1964 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1965 {
1966         struct osc_async_page *oap;
1967         ENTRY;
1968
1969         if (cfs_list_empty(&lop->lop_urgent))
1970                 RETURN(0);
1971
1972         oap = cfs_list_entry(lop->lop_urgent.next,
1973                          struct osc_async_page, oap_urgent_item);
1974
1975         if (oap->oap_async_flags & ASYNC_HP) {
1976                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1977                 RETURN(1);
1978         }
1979
1980         RETURN(0);
1981 }
1982
1983 static void on_list(cfs_list_t *item, cfs_list_t *list,
1984                     int should_be_on)
1985 {
1986         if (cfs_list_empty(item) && should_be_on)
1987                 cfs_list_add_tail(item, list);
1988         else if (!cfs_list_empty(item) && !should_be_on)
1989                 cfs_list_del_init(item);
1990 }
1991
1992 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1993  * can find pages to build into rpcs quickly */
1994 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1995 {
1996         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1997             lop_makes_hprpc(&loi->loi_read_lop)) {
1998                 /* HP rpc */
1999                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2000                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2001         } else {
2002                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2003                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2004                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2005                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2006         }
2007
2008         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2009                 loi->loi_write_lop.lop_num_pending);
2010
2011         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2012                 loi->loi_read_lop.lop_num_pending);
2013 }
2014
2015 static void lop_update_pending(struct client_obd *cli,
2016                                struct loi_oap_pages *lop, int cmd, int delta)
2017 {
2018         lop->lop_num_pending += delta;
2019         if (cmd & OBD_BRW_WRITE)
2020                 cli->cl_pending_w_pages += delta;
2021         else
2022                 cli->cl_pending_r_pages += delta;
2023 }
2024
2025 /**
2026  * this is called when a sync waiter receives an interruption.  Its job is to
2027  * get the caller woken as soon as possible.  If its page hasn't been put in an
2028  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2029  * desiring interruption which will forcefully complete the rpc once the rpc
2030  * has timed out.
2031  */
2032 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2033 {
2034         struct loi_oap_pages *lop;
2035         struct lov_oinfo *loi;
2036         int rc = -EBUSY;
2037         ENTRY;
2038
2039         LASSERT(!oap->oap_interrupted);
2040         oap->oap_interrupted = 1;
2041
2042         /* ok, it's been put in an rpc. only one oap gets a request reference */
2043         if (oap->oap_request != NULL) {
2044                 ptlrpc_mark_interrupted(oap->oap_request);
2045                 ptlrpcd_wake(oap->oap_request);
2046                 ptlrpc_req_finished(oap->oap_request);
2047                 oap->oap_request = NULL;
2048         }
2049
2050         /*
2051          * page completion may be called only if ->cpo_prep() method was
2052          * executed by osc_io_submit(), that also adds page the to pending list
2053          */
2054         if (!cfs_list_empty(&oap->oap_pending_item)) {
2055                 cfs_list_del_init(&oap->oap_pending_item);
2056                 cfs_list_del_init(&oap->oap_urgent_item);
2057
2058                 loi = oap->oap_loi;
2059                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2060                         &loi->loi_write_lop : &loi->loi_read_lop;
2061                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2062                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2063                 rc = oap->oap_caller_ops->ap_completion(env,
2064                                           oap->oap_caller_data,
2065                                           oap->oap_cmd, NULL, -EINTR);
2066         }
2067
2068         RETURN(rc);
2069 }
2070
2071 /* this is trying to propogate async writeback errors back up to the
2072  * application.  As an async write fails we record the error code for later if
2073  * the app does an fsync.  As long as errors persist we force future rpcs to be
2074  * sync so that the app can get a sync error and break the cycle of queueing
2075  * pages for which writeback will fail. */
2076 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2077                            int rc)
2078 {
2079         if (rc) {
2080                 if (!ar->ar_rc)
2081                         ar->ar_rc = rc;
2082
2083                 ar->ar_force_sync = 1;
2084                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2085                 return;
2086
2087         }
2088
2089         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2090                 ar->ar_force_sync = 0;
2091 }
2092
2093 void osc_oap_to_pending(struct osc_async_page *oap)
2094 {
2095         struct loi_oap_pages *lop;
2096
2097         if (oap->oap_cmd & OBD_BRW_WRITE)
2098                 lop = &oap->oap_loi->loi_write_lop;
2099         else
2100                 lop = &oap->oap_loi->loi_read_lop;
2101
2102         if (oap->oap_async_flags & ASYNC_HP)
2103                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2104         else if (oap->oap_async_flags & ASYNC_URGENT)
2105                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2106         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2107         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2108 }
2109
2110 /* this must be called holding the loi list lock to give coverage to exit_cache,
2111  * async_flag maintenance, and oap_request */
2112 static void osc_ap_completion(const struct lu_env *env,
2113                               struct client_obd *cli, struct obdo *oa,
2114                               struct osc_async_page *oap, int sent, int rc)
2115 {
2116         __u64 xid = 0;
2117
2118         ENTRY;
2119         if (oap->oap_request != NULL) {
2120                 xid = ptlrpc_req_xid(oap->oap_request);
2121                 ptlrpc_req_finished(oap->oap_request);
2122                 oap->oap_request = NULL;
2123         }
2124
2125         cfs_spin_lock(&oap->oap_lock);
2126         oap->oap_async_flags = 0;
2127         cfs_spin_unlock(&oap->oap_lock);
2128         oap->oap_interrupted = 0;
2129
2130         if (oap->oap_cmd & OBD_BRW_WRITE) {
2131                 osc_process_ar(&cli->cl_ar, xid, rc);
2132                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2133         }
2134
2135         if (rc == 0 && oa != NULL) {
2136                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2137                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2138                 if (oa->o_valid & OBD_MD_FLMTIME)
2139                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2140                 if (oa->o_valid & OBD_MD_FLATIME)
2141                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2142                 if (oa->o_valid & OBD_MD_FLCTIME)
2143                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2144         }
2145
2146         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2147                                                 oap->oap_cmd, oa, rc);
2148
2149         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2150          * I/O on the page could start, but OSC calls it under lock
2151          * and thus we can add oap back to pending safely */
2152         if (rc)
2153                 /* upper layer wants to leave the page on pending queue */
2154                 osc_oap_to_pending(oap);
2155         else
2156                 osc_exit_cache(cli, oap, sent);
2157         EXIT;
2158 }
2159
2160 static int brw_interpret(const struct lu_env *env,
2161                          struct ptlrpc_request *req, void *data, int rc)
2162 {
2163         struct osc_brw_async_args *aa = data;
2164         struct client_obd *cli;
2165         int async;
2166         ENTRY;
2167
2168         rc = osc_brw_fini_request(req, rc);
2169         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2170         if (osc_recoverable_error(rc)) {
2171                 rc = osc_brw_redo_request(req, aa);
2172                 if (rc == 0)
2173                         RETURN(0);
2174         }
2175
2176         if (aa->aa_ocapa) {
2177                 capa_put(aa->aa_ocapa);
2178                 aa->aa_ocapa = NULL;
2179         }
2180
2181         cli = aa->aa_cli;
2182
2183         client_obd_list_lock(&cli->cl_loi_list_lock);
2184
2185         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2186          * is called so we know whether to go to sync BRWs or wait for more
2187          * RPCs to complete */
2188         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2189                 cli->cl_w_in_flight--;
2190         else
2191                 cli->cl_r_in_flight--;
2192
2193         async = cfs_list_empty(&aa->aa_oaps);
2194         if (!async) { /* from osc_send_oap_rpc() */
2195                 struct osc_async_page *oap, *tmp;
2196                 /* the caller may re-use the oap after the completion call so
2197                  * we need to clean it up a little */
2198                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2199                                              oap_rpc_item) {
2200                         cfs_list_del_init(&oap->oap_rpc_item);
2201                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2202                 }
2203                 OBDO_FREE(aa->aa_oa);
2204         } else { /* from async_internal() */
2205                 int i;
2206                 for (i = 0; i < aa->aa_page_count; i++)
2207                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2208
2209                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2210                         OBDO_FREE(aa->aa_oa);
2211         }
2212         osc_wake_cache_waiters(cli);
2213         osc_check_rpcs(env, cli);
2214         client_obd_list_unlock(&cli->cl_loi_list_lock);
2215         if (!async)
2216                 cl_req_completion(env, aa->aa_clerq, rc);
2217         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2218         RETURN(rc);
2219 }
2220
2221 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2222                                             struct client_obd *cli,
2223                                             cfs_list_t *rpc_list,
2224                                             int page_count, int cmd)
2225 {
2226         struct ptlrpc_request *req;
2227         struct brw_page **pga = NULL;
2228         struct osc_brw_async_args *aa;
2229         struct obdo *oa = NULL;
2230         const struct obd_async_page_ops *ops = NULL;
2231         void *caller_data = NULL;
2232         struct osc_async_page *oap;
2233         struct osc_async_page *tmp;
2234         struct ost_body *body;
2235         struct cl_req *clerq = NULL;
2236         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2237         struct ldlm_lock *lock = NULL;
2238         struct cl_req_attr crattr;
2239         int i, rc;
2240
2241         ENTRY;
2242         LASSERT(!cfs_list_empty(rpc_list));
2243
2244         memset(&crattr, 0, sizeof crattr);
2245         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2246         if (pga == NULL)
2247                 GOTO(out, req = ERR_PTR(-ENOMEM));
2248
2249         OBDO_ALLOC(oa);
2250         if (oa == NULL)
2251                 GOTO(out, req = ERR_PTR(-ENOMEM));
2252
2253         i = 0;
2254         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2255                 struct cl_page *page = osc_oap2cl_page(oap);
2256                 if (ops == NULL) {
2257                         ops = oap->oap_caller_ops;
2258                         caller_data = oap->oap_caller_data;
2259
2260                         clerq = cl_req_alloc(env, page, crt,
2261                                              1 /* only 1-object rpcs for
2262                                                 * now */);
2263                         if (IS_ERR(clerq))
2264                                 GOTO(out, req = (void *)clerq);
2265                         lock = oap->oap_ldlm_lock;
2266                 }
2267                 pga[i] = &oap->oap_brw_page;
2268                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2269                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2270                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2271                 i++;
2272                 cl_req_page_add(env, clerq, page);
2273         }
2274
2275         /* always get the data for the obdo for the rpc */
2276         LASSERT(ops != NULL);
2277         crattr.cra_oa = oa;
2278         crattr.cra_capa = NULL;
2279         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2280         if (lock) {
2281                 oa->o_handle = lock->l_remote_handle;
2282                 oa->o_valid |= OBD_MD_FLHANDLE;
2283         }
2284
2285         rc = cl_req_prep(env, clerq);
2286         if (rc != 0) {
2287                 CERROR("cl_req_prep failed: %d\n", rc);
2288                 GOTO(out, req = ERR_PTR(rc));
2289         }
2290
2291         sort_brw_pages(pga, page_count);
2292         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2293                                   pga, &req, crattr.cra_capa, 1);
2294         if (rc != 0) {
2295                 CERROR("prep_req failed: %d\n", rc);
2296                 GOTO(out, req = ERR_PTR(rc));
2297         }
2298
2299         /* Need to update the timestamps after the request is built in case
2300          * we race with setattr (locally or in queue at OST).  If OST gets
2301          * later setattr before earlier BRW (as determined by the request xid),
2302          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2303          * way to do this in a single call.  bug 10150 */
2304         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305         cl_req_attr_set(env, clerq, &crattr,
2306                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2307
2308         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2309         aa = ptlrpc_req_async_args(req);
2310         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2311         cfs_list_splice(rpc_list, &aa->aa_oaps);
2312         CFS_INIT_LIST_HEAD(rpc_list);
2313         aa->aa_clerq = clerq;
2314 out:
2315         capa_put(crattr.cra_capa);
2316         if (IS_ERR(req)) {
2317                 if (oa)
2318                         OBDO_FREE(oa);
2319                 if (pga)
2320                         OBD_FREE(pga, sizeof(*pga) * page_count);
2321                 /* this should happen rarely and is pretty bad, it makes the
2322                  * pending list not follow the dirty order */
2323                 client_obd_list_lock(&cli->cl_loi_list_lock);
2324                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2325                         cfs_list_del_init(&oap->oap_rpc_item);
2326
2327                         /* queued sync pages can be torn down while the pages
2328                          * were between the pending list and the rpc */
2329                         if (oap->oap_interrupted) {
2330                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2331                                 osc_ap_completion(env, cli, NULL, oap, 0,
2332                                                   oap->oap_count);
2333                                 continue;
2334                         }
2335                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2336                 }
2337                 if (clerq && !IS_ERR(clerq))
2338                         cl_req_completion(env, clerq, PTR_ERR(req));
2339         }
2340         RETURN(req);
2341 }
2342
2343 /**
2344  * prepare pages for ASYNC io and put pages in send queue.
2345  *
2346  * \param cmd OBD_BRW_* macroses
2347  * \param lop pending pages
2348  *
2349  * \return zero if pages successfully add to send queue.
2350  * \return not zere if error occurring.
2351  */
2352 static int
2353 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2354                  struct lov_oinfo *loi,
2355                  int cmd, struct loi_oap_pages *lop)
2356 {
2357         struct ptlrpc_request *req;
2358         obd_count page_count = 0;
2359         struct osc_async_page *oap = NULL, *tmp;
2360         struct osc_brw_async_args *aa;
2361         const struct obd_async_page_ops *ops;
2362         CFS_LIST_HEAD(rpc_list);
2363         CFS_LIST_HEAD(tmp_list);
2364         unsigned int ending_offset;
2365         unsigned  starting_offset = 0;
2366         int srvlock = 0;
2367         struct cl_object *clob = NULL;
2368         ENTRY;
2369
2370         /* ASYNC_HP pages first. At present, when the lock the pages is
2371          * to be canceled, the pages covered by the lock will be sent out
2372          * with ASYNC_HP. We have to send out them as soon as possible. */
2373         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2374                 if (oap->oap_async_flags & ASYNC_HP) 
2375                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2376                 else
2377                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2378                 if (++page_count >= cli->cl_max_pages_per_rpc)
2379                         break;
2380         }
2381
2382         cfs_list_splice(&tmp_list, &lop->lop_pending);
2383         page_count = 0;
2384
2385         /* first we find the pages we're allowed to work with */
2386         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2387                                      oap_pending_item) {
2388                 ops = oap->oap_caller_ops;
2389
2390                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2391                          "magic 0x%x\n", oap, oap->oap_magic);
2392
2393                 if (clob == NULL) {
2394                         /* pin object in memory, so that completion call-backs
2395                          * can be safely called under client_obd_list lock. */
2396                         clob = osc_oap2cl_page(oap)->cp_obj;
2397                         cl_object_get(clob);
2398                 }
2399
2400                 if (page_count != 0 &&
2401                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2402                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2403                                " oap %p, page %p, srvlock %u\n",
2404                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2405                         break;
2406                 }
2407
2408                 /* If there is a gap at the start of this page, it can't merge
2409                  * with any previous page, so we'll hand the network a
2410                  * "fragmented" page array that it can't transfer in 1 RDMA */
2411                 if (page_count != 0 && oap->oap_page_off != 0)
2412                         break;
2413
2414                 /* in llite being 'ready' equates to the page being locked
2415                  * until completion unlocks it.  commit_write submits a page
2416                  * as not ready because its unlock will happen unconditionally
2417                  * as the call returns.  if we race with commit_write giving
2418                  * us that page we dont' want to create a hole in the page
2419                  * stream, so we stop and leave the rpc to be fired by
2420                  * another dirtier or kupdated interval (the not ready page
2421                  * will still be on the dirty list).  we could call in
2422                  * at the end of ll_file_write to process the queue again. */
2423                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2424                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2425                                                     cmd);
2426                         if (rc < 0)
2427                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2428                                                 "instead of ready\n", oap,
2429                                                 oap->oap_page, rc);
2430                         switch (rc) {
2431                         case -EAGAIN:
2432                                 /* llite is telling us that the page is still
2433                                  * in commit_write and that we should try
2434                                  * and put it in an rpc again later.  we
2435                                  * break out of the loop so we don't create
2436                                  * a hole in the sequence of pages in the rpc
2437                                  * stream.*/
2438                                 oap = NULL;
2439                                 break;
2440                         case -EINTR:
2441                                 /* the io isn't needed.. tell the checks
2442                                  * below to complete the rpc with EINTR */
2443                                 cfs_spin_lock(&oap->oap_lock);
2444                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2445                                 cfs_spin_unlock(&oap->oap_lock);
2446                                 oap->oap_count = -EINTR;
2447                                 break;
2448                         case 0:
2449                                 cfs_spin_lock(&oap->oap_lock);
2450                                 oap->oap_async_flags |= ASYNC_READY;
2451                                 cfs_spin_unlock(&oap->oap_lock);
2452                                 break;
2453                         default:
2454                                 LASSERTF(0, "oap %p page %p returned %d "
2455                                             "from make_ready\n", oap,
2456                                             oap->oap_page, rc);
2457                                 break;
2458                         }
2459                 }
2460                 if (oap == NULL)
2461                         break;
2462                 /*
2463                  * Page submitted for IO has to be locked. Either by
2464                  * ->ap_make_ready() or by higher layers.
2465                  */
2466 #if defined(__KERNEL__) && defined(__linux__)
2467                 {
2468                         struct cl_page *page;
2469
2470                         page = osc_oap2cl_page(oap);
2471
2472                         if (page->cp_type == CPT_CACHEABLE &&
2473                             !(PageLocked(oap->oap_page) &&
2474                               (CheckWriteback(oap->oap_page, cmd)))) {
2475                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2476                                        oap->oap_page,
2477                                        (long)oap->oap_page->flags,
2478                                        oap->oap_async_flags);
2479                                 LBUG();
2480                         }
2481                 }
2482 #endif
2483
2484                 /* take the page out of our book-keeping */
2485                 cfs_list_del_init(&oap->oap_pending_item);
2486                 lop_update_pending(cli, lop, cmd, -1);
2487                 cfs_list_del_init(&oap->oap_urgent_item);
2488
2489                 if (page_count == 0)
2490                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2491                                           (PTLRPC_MAX_BRW_SIZE - 1);
2492
2493                 /* ask the caller for the size of the io as the rpc leaves. */
2494                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2495                         oap->oap_count =
2496                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2497                                                       cmd);
2498                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2499                 }
2500                 if (oap->oap_count <= 0) {
2501                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2502                                oap->oap_count);
2503                         osc_ap_completion(env, cli, NULL,
2504                                           oap, 0, oap->oap_count);
2505                         continue;
2506                 }
2507
2508                 /* now put the page back in our accounting */
2509                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2510                 if (page_count == 0)
2511                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2512                 if (++page_count >= cli->cl_max_pages_per_rpc)
2513                         break;
2514
2515                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2516                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2517                  * have the same alignment as the initial writes that allocated
2518                  * extents on the server. */
2519                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2520                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2521                 if (ending_offset == 0)
2522                         break;
2523
2524                 /* If there is a gap at the end of this page, it can't merge
2525                  * with any subsequent pages, so we'll hand the network a
2526                  * "fragmented" page array that it can't transfer in 1 RDMA */
2527                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2528                         break;
2529         }
2530
2531         osc_wake_cache_waiters(cli);
2532
2533         loi_list_maint(cli, loi);
2534
2535         client_obd_list_unlock(&cli->cl_loi_list_lock);
2536
2537         if (clob != NULL)
2538                 cl_object_put(env, clob);
2539
2540         if (page_count == 0) {
2541                 client_obd_list_lock(&cli->cl_loi_list_lock);
2542                 RETURN(0);
2543         }
2544
2545         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2546         if (IS_ERR(req)) {
2547                 LASSERT(cfs_list_empty(&rpc_list));
2548                 loi_list_maint(cli, loi);
2549                 RETURN(PTR_ERR(req));
2550         }
2551
2552         aa = ptlrpc_req_async_args(req);
2553
2554         if (cmd == OBD_BRW_READ) {
2555                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2556                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2557                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2558                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2559         } else {
2560                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2561                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2562                                  cli->cl_w_in_flight);
2563                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2564                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2565         }
2566         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2567
2568         client_obd_list_lock(&cli->cl_loi_list_lock);
2569
2570         if (cmd == OBD_BRW_READ)
2571                 cli->cl_r_in_flight++;
2572         else
2573                 cli->cl_w_in_flight++;
2574
2575         /* queued sync pages can be torn down while the pages
2576          * were between the pending list and the rpc */
2577         tmp = NULL;
2578         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2579                 /* only one oap gets a request reference */
2580                 if (tmp == NULL)
2581                         tmp = oap;
2582                 if (oap->oap_interrupted && !req->rq_intr) {
2583                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2584                                oap, req);
2585                         ptlrpc_mark_interrupted(req);
2586                 }
2587         }
2588         if (tmp != NULL)
2589                 tmp->oap_request = ptlrpc_request_addref(req);
2590
2591         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2592                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2593
2594         req->rq_interpret_reply = brw_interpret;
2595         ptlrpcd_add_req(req, PSCOPE_BRW);
2596         RETURN(1);
2597 }
2598
2599 #define LOI_DEBUG(LOI, STR, args...)                                     \
2600         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2601                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2602                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2603                (LOI)->loi_write_lop.lop_num_pending,                     \
2604                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2605                (LOI)->loi_read_lop.lop_num_pending,                      \
2606                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2607                args)                                                     \
2608
2609 /* This is called by osc_check_rpcs() to find which objects have pages that
2610  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2611 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2612 {
2613         ENTRY;
2614
2615         /* First return objects that have blocked locks so that they
2616          * will be flushed quickly and other clients can get the lock,
2617          * then objects which have pages ready to be stuffed into RPCs */
2618         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2619                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2620                                       struct lov_oinfo, loi_hp_ready_item));
2621         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2622                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2623                                       struct lov_oinfo, loi_ready_item));
2624
2625         /* then if we have cache waiters, return all objects with queued
2626          * writes.  This is especially important when many small files
2627          * have filled up the cache and not been fired into rpcs because
2628          * they don't pass the nr_pending/object threshhold */
2629         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2630             !cfs_list_empty(&cli->cl_loi_write_list))
2631                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2632                                       struct lov_oinfo, loi_write_item));
2633
2634         /* then return all queued objects when we have an invalid import
2635          * so that they get flushed */
2636         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2637                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2638                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2639                                               struct lov_oinfo,
2640                                               loi_write_item));
2641                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2642                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2643                                               struct lov_oinfo, loi_read_item));
2644         }
2645         RETURN(NULL);
2646 }
2647
2648 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2649 {
2650         struct osc_async_page *oap;
2651         int hprpc = 0;
2652
2653         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2654                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2655                                      struct osc_async_page, oap_urgent_item);
2656                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2657         }
2658
2659         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2660                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2661                                      struct osc_async_page, oap_urgent_item);
2662                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2663         }
2664
2665         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2666 }
2667
2668 /* called with the loi list lock held */
2669 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2670 {
2671         struct lov_oinfo *loi;
2672         int rc = 0, race_counter = 0;
2673         ENTRY;
2674
2675         while ((loi = osc_next_loi(cli)) != NULL) {
2676                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2677
2678                 if (osc_max_rpc_in_flight(cli, loi))
2679                         break;
2680
2681                 /* attempt some read/write balancing by alternating between
2682                  * reads and writes in an object.  The makes_rpc checks here
2683                  * would be redundant if we were getting read/write work items
2684                  * instead of objects.  we don't want send_oap_rpc to drain a
2685                  * partial read pending queue when we're given this object to
2686                  * do io on writes while there are cache waiters */
2687                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2688                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2689                                               &loi->loi_write_lop);
2690                         if (rc < 0) {
2691                                 CERROR("Write request failed with %d\n", rc);
2692
2693                                 /* osc_send_oap_rpc failed, mostly because of
2694                                  * memory pressure.
2695                                  *
2696                                  * It can't break here, because if:
2697                                  *  - a page was submitted by osc_io_submit, so
2698                                  *    page locked;
2699                                  *  - no request in flight
2700                                  *  - no subsequent request
2701                                  * The system will be in live-lock state,
2702                                  * because there is no chance to call
2703                                  * osc_io_unplug() and osc_check_rpcs() any
2704                                  * more. pdflush can't help in this case,
2705                                  * because it might be blocked at grabbing
2706                                  * the page lock as we mentioned.
2707                                  *
2708                                  * Anyway, continue to drain pages. */
2709                                 /* break; */
2710                         }
2711
2712                         if (rc > 0)
2713                                 race_counter = 0;
2714                         else
2715                                 race_counter++;
2716                 }
2717                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2718                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2719                                               &loi->loi_read_lop);
2720                         if (rc < 0)
2721                                 CERROR("Read request failed with %d\n", rc);
2722
2723                         if (rc > 0)
2724                                 race_counter = 0;
2725                         else
2726                                 race_counter++;
2727                 }
2728
2729                 /* attempt some inter-object balancing by issueing rpcs
2730                  * for each object in turn */
2731                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2732                         cfs_list_del_init(&loi->loi_hp_ready_item);
2733                 if (!cfs_list_empty(&loi->loi_ready_item))
2734                         cfs_list_del_init(&loi->loi_ready_item);
2735                 if (!cfs_list_empty(&loi->loi_write_item))
2736                         cfs_list_del_init(&loi->loi_write_item);
2737                 if (!cfs_list_empty(&loi->loi_read_item))
2738                         cfs_list_del_init(&loi->loi_read_item);
2739
2740                 loi_list_maint(cli, loi);
2741
2742                 /* send_oap_rpc fails with 0 when make_ready tells it to
2743                  * back off.  llite's make_ready does this when it tries
2744                  * to lock a page queued for write that is already locked.
2745                  * we want to try sending rpcs from many objects, but we
2746                  * don't want to spin failing with 0.  */
2747                 if (race_counter == 10)
2748                         break;
2749         }
2750         EXIT;
2751 }
2752
2753 /* we're trying to queue a page in the osc so we're subject to the
2754  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2755  * If the osc's queued pages are already at that limit, then we want to sleep
2756  * until there is space in the osc's queue for us.  We also may be waiting for
2757  * write credits from the OST if there are RPCs in flight that may return some
2758  * before we fall back to sync writes.
2759  *
2760  * We need this know our allocation was granted in the presence of signals */
2761 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2762 {
2763         int rc;
2764         ENTRY;
2765         client_obd_list_lock(&cli->cl_loi_list_lock);
2766         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2767         client_obd_list_unlock(&cli->cl_loi_list_lock);
2768         RETURN(rc);
2769 };
2770
2771 /**
2772  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2773  * is available.
2774  */
2775 int osc_enter_cache_try(const struct lu_env *env,
2776                         struct client_obd *cli, struct lov_oinfo *loi,
2777                         struct osc_async_page *oap, int transient)
2778 {
2779         int has_grant;
2780
2781         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2782         if (has_grant) {
2783                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2784                 if (transient) {
2785                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2786                         cfs_atomic_inc(&obd_dirty_transit_pages);
2787                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2788                 }
2789         }
2790         return has_grant;
2791 }
2792
2793 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2794  * grant or cache space. */
2795 static int osc_enter_cache(const struct lu_env *env,
2796                            struct client_obd *cli, struct lov_oinfo *loi,
2797                            struct osc_async_page *oap)
2798 {
2799         struct osc_cache_waiter ocw;
2800         struct l_wait_info lwi = { 0 };
2801
2802         ENTRY;
2803
2804         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2805                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2806                cli->cl_dirty_max, obd_max_dirty_pages,
2807                cli->cl_lost_grant, cli->cl_avail_grant);
2808
2809         /* force the caller to try sync io.  this can jump the list
2810          * of queued writes and create a discontiguous rpc stream */
2811         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2812             loi->loi_ar.ar_force_sync)
2813                 RETURN(-EDQUOT);
2814
2815         /* Hopefully normal case - cache space and write credits available */
2816         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2817             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2818             osc_enter_cache_try(env, cli, loi, oap, 0))
2819                 RETURN(0);
2820
2821         /* Make sure that there are write rpcs in flight to wait for.  This
2822          * is a little silly as this object may not have any pending but
2823          * other objects sure might. */
2824         if (cli->cl_w_in_flight) {
2825                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2826                 cfs_waitq_init(&ocw.ocw_waitq);
2827                 ocw.ocw_oap = oap;
2828                 ocw.ocw_rc = 0;
2829
2830                 loi_list_maint(cli, loi);
2831                 osc_check_rpcs(env, cli);
2832                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2833
2834                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2835                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2836
2837                 client_obd_list_lock(&cli->cl_loi_list_lock);
2838                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2839                         cfs_list_del(&ocw.ocw_entry);
2840                         RETURN(-EINTR);
2841                 }
2842                 RETURN(ocw.ocw_rc);
2843         }
2844
2845         RETURN(-EDQUOT);
2846 }
2847
2848
2849 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2850                         struct lov_oinfo *loi, cfs_page_t *page,
2851                         obd_off offset, const struct obd_async_page_ops *ops,
2852                         void *data, void **res, int nocache,
2853                         struct lustre_handle *lockh)
2854 {
2855         struct osc_async_page *oap;
2856
2857         ENTRY;
2858
2859         if (!page)
2860                 return cfs_size_round(sizeof(*oap));
2861
2862         oap = *res;
2863         oap->oap_magic = OAP_MAGIC;
2864         oap->oap_cli = &exp->exp_obd->u.cli;
2865         oap->oap_loi = loi;
2866
2867         oap->oap_caller_ops = ops;
2868         oap->oap_caller_data = data;
2869
2870         oap->oap_page = page;
2871         oap->oap_obj_off = offset;
2872         if (!client_is_remote(exp) &&
2873             cfs_capable(CFS_CAP_SYS_RESOURCE))
2874                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2875
2876         LASSERT(!(offset & ~CFS_PAGE_MASK));
2877
2878         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2879         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2880         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2881         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2882
2883         cfs_spin_lock_init(&oap->oap_lock);
2884         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2885         RETURN(0);
2886 }
2887
2888 struct osc_async_page *oap_from_cookie(void *cookie)
2889 {
2890         struct osc_async_page *oap = cookie;
2891         if (oap->oap_magic != OAP_MAGIC)
2892                 return ERR_PTR(-EINVAL);
2893         return oap;
2894 };
2895
2896 int osc_queue_async_io(const struct lu_env *env,
2897                        struct obd_export *exp, struct lov_stripe_md *lsm,
2898                        struct lov_oinfo *loi, void *cookie,
2899                        int cmd, obd_off off, int count,
2900                        obd_flag brw_flags, enum async_flags async_flags)
2901 {
2902         struct client_obd *cli = &exp->exp_obd->u.cli;
2903         struct osc_async_page *oap;
2904         int rc = 0;
2905         ENTRY;
2906
2907         oap = oap_from_cookie(cookie);
2908         if (IS_ERR(oap))
2909                 RETURN(PTR_ERR(oap));
2910
2911         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2912                 RETURN(-EIO);
2913
2914         if (!cfs_list_empty(&oap->oap_pending_item) ||
2915             !cfs_list_empty(&oap->oap_urgent_item) ||
2916             !cfs_list_empty(&oap->oap_rpc_item))
2917                 RETURN(-EBUSY);
2918
2919         /* check if the file's owner/group is over quota */
2920         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2921                 struct cl_object *obj;
2922                 struct cl_attr    attr; /* XXX put attr into thread info */
2923                 unsigned int qid[MAXQUOTAS];
2924
2925                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2926
2927                 cl_object_attr_lock(obj);
2928                 rc = cl_object_attr_get(env, obj, &attr);
2929                 cl_object_attr_unlock(obj);
2930
2931                 qid[USRQUOTA] = attr.cat_uid;
2932                 qid[GRPQUOTA] = attr.cat_gid;
2933                 if (rc == 0 &&
2934                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2935                         rc = -EDQUOT;
2936                 if (rc)
2937                         RETURN(rc);
2938         }
2939
2940         if (loi == NULL)
2941                 loi = lsm->lsm_oinfo[0];
2942
2943         client_obd_list_lock(&cli->cl_loi_list_lock);
2944
2945         LASSERT(off + count <= CFS_PAGE_SIZE);
2946         oap->oap_cmd = cmd;
2947         oap->oap_page_off = off;
2948         oap->oap_count = count;
2949         oap->oap_brw_flags = brw_flags;
2950         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2951         if (libcfs_memory_pressure_get())
2952                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2953         cfs_spin_lock(&oap->oap_lock);
2954         oap->oap_async_flags = async_flags;
2955         cfs_spin_unlock(&oap->oap_lock);
2956
2957         if (cmd & OBD_BRW_WRITE) {
2958                 rc = osc_enter_cache(env, cli, loi, oap);
2959                 if (rc) {
2960                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2961                         RETURN(rc);
2962                 }
2963         }
2964
2965         osc_oap_to_pending(oap);
2966         loi_list_maint(cli, loi);
2967
2968         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2969                   cmd);
2970
2971         osc_check_rpcs(env, cli);
2972         client_obd_list_unlock(&cli->cl_loi_list_lock);
2973
2974         RETURN(0);
2975 }
2976
2977 /* aka (~was & now & flag), but this is more clear :) */
2978 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2979
2980 int osc_set_async_flags_base(struct client_obd *cli,
2981                              struct lov_oinfo *loi, struct osc_async_page *oap,
2982                              obd_flag async_flags)
2983 {
2984         struct loi_oap_pages *lop;
2985         int flags = 0;
2986         ENTRY;
2987
2988         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
2989
2990         if (oap->oap_cmd & OBD_BRW_WRITE) {
2991                 lop = &loi->loi_write_lop;
2992         } else {
2993                 lop = &loi->loi_read_lop;
2994         }
2995
2996         if ((oap->oap_async_flags & async_flags) == async_flags)
2997                 RETURN(0);
2998
2999         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3000                 flags |= ASYNC_READY;
3001
3002         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3003             cfs_list_empty(&oap->oap_rpc_item)) {
3004                 if (oap->oap_async_flags & ASYNC_HP)
3005                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3006                 else
3007                         cfs_list_add_tail(&oap->oap_urgent_item,
3008                                           &lop->lop_urgent);
3009                 flags |= ASYNC_URGENT;
3010                 loi_list_maint(cli, loi);
3011         }
3012         cfs_spin_lock(&oap->oap_lock);
3013         oap->oap_async_flags |= flags;
3014         cfs_spin_unlock(&oap->oap_lock);
3015
3016         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3017                         oap->oap_async_flags);
3018         RETURN(0);
3019 }
3020
3021 int osc_teardown_async_page(struct obd_export *exp,
3022                             struct lov_stripe_md *lsm,
3023                             struct lov_oinfo *loi, void *cookie)
3024 {
3025         struct client_obd *cli = &exp->exp_obd->u.cli;
3026         struct loi_oap_pages *lop;
3027         struct osc_async_page *oap;
3028         int rc = 0;
3029         ENTRY;
3030
3031         oap = oap_from_cookie(cookie);
3032         if (IS_ERR(oap))
3033                 RETURN(PTR_ERR(oap));
3034
3035         if (loi == NULL)
3036                 loi = lsm->lsm_oinfo[0];
3037
3038         if (oap->oap_cmd & OBD_BRW_WRITE) {
3039                 lop = &loi->loi_write_lop;
3040         } else {
3041                 lop = &loi->loi_read_lop;
3042         }
3043
3044         client_obd_list_lock(&cli->cl_loi_list_lock);
3045
3046         if (!cfs_list_empty(&oap->oap_rpc_item))
3047                 GOTO(out, rc = -EBUSY);
3048
3049         osc_exit_cache(cli, oap, 0);
3050         osc_wake_cache_waiters(cli);
3051
3052         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3053                 cfs_list_del_init(&oap->oap_urgent_item);
3054                 cfs_spin_lock(&oap->oap_lock);
3055                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3056                 cfs_spin_unlock(&oap->oap_lock);
3057         }
3058         if (!cfs_list_empty(&oap->oap_pending_item)) {
3059                 cfs_list_del_init(&oap->oap_pending_item);
3060                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3061         }
3062         loi_list_maint(cli, loi);
3063         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3064 out:
3065         client_obd_list_unlock(&cli->cl_loi_list_lock);
3066         RETURN(rc);
3067 }
3068
3069 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3070                                          struct ldlm_enqueue_info *einfo,
3071                                          int flags)
3072 {
3073         void *data = einfo->ei_cbdata;
3074
3075         LASSERT(lock != NULL);
3076         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3077         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3078         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3079         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3080
3081         lock_res_and_lock(lock);
3082         cfs_spin_lock(&osc_ast_guard);
3083         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3084         lock->l_ast_data = data;
3085         cfs_spin_unlock(&osc_ast_guard);
3086         unlock_res_and_lock(lock);
3087 }
3088
3089 static void osc_set_data_with_check(struct lustre_handle *lockh,
3090                                     struct ldlm_enqueue_info *einfo,
3091                                     int flags)
3092 {
3093         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3094
3095         if (lock != NULL) {
3096                 osc_set_lock_data_with_check(lock, einfo, flags);
3097                 LDLM_LOCK_PUT(lock);
3098         } else
3099                 CERROR("lockh %p, data %p - client evicted?\n",
3100                        lockh, einfo->ei_cbdata);
3101 }
3102
3103 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3104                              ldlm_iterator_t replace, void *data)
3105 {
3106         struct ldlm_res_id res_id;
3107         struct obd_device *obd = class_exp2obd(exp);
3108
3109         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3110         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3111         return 0;
3112 }
3113
3114 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3115                             obd_enqueue_update_f upcall, void *cookie,
3116                             int *flags, int rc)
3117 {
3118         int intent = *flags & LDLM_FL_HAS_INTENT;
3119         ENTRY;
3120
3121         if (intent) {
3122                 /* The request was created before ldlm_cli_enqueue call. */
3123                 if (rc == ELDLM_LOCK_ABORTED) {
3124                         struct ldlm_reply *rep;
3125                         rep = req_capsule_server_get(&req->rq_pill,
3126                                                      &RMF_DLM_REP);
3127
3128                         LASSERT(rep != NULL);
3129                         if (rep->lock_policy_res1)
3130                                 rc = rep->lock_policy_res1;
3131                 }
3132         }
3133
3134         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3135                 *flags |= LDLM_FL_LVB_READY;
3136                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3137                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3138         }
3139
3140         /* Call the update callback. */
3141         rc = (*upcall)(cookie, rc);
3142         RETURN(rc);
3143 }
3144
3145 static int osc_enqueue_interpret(const struct lu_env *env,
3146                                  struct ptlrpc_request *req,
3147                                  struct osc_enqueue_args *aa, int rc)
3148 {
3149         struct ldlm_lock *lock;
3150         struct lustre_handle handle;
3151         __u32 mode;
3152
3153         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3154          * might be freed anytime after lock upcall has been called. */
3155         lustre_handle_copy(&handle, aa->oa_lockh);
3156         mode = aa->oa_ei->ei_mode;
3157
3158         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3159          * be valid. */
3160         lock = ldlm_handle2lock(&handle);
3161
3162         /* Take an additional reference so that a blocking AST that
3163          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3164          * to arrive after an upcall has been executed by
3165          * osc_enqueue_fini(). */
3166         ldlm_lock_addref(&handle, mode);
3167
3168         /* Complete obtaining the lock procedure. */
3169         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3170                                    mode, aa->oa_flags, aa->oa_lvb,
3171                                    sizeof(*aa->oa_lvb), &handle, rc);
3172         /* Complete osc stuff. */
3173         rc = osc_enqueue_fini(req, aa->oa_lvb,
3174                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3175
3176         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3177
3178         /* Release the lock for async request. */
3179         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3180                 /*
3181                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3182                  * not already released by
3183                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3184                  */
3185                 ldlm_lock_decref(&handle, mode);
3186
3187         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3188                  aa->oa_lockh, req, aa);
3189         ldlm_lock_decref(&handle, mode);
3190         LDLM_LOCK_PUT(lock);
3191         return rc;
3192 }
3193
3194 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3195                         struct lov_oinfo *loi, int flags,
3196                         struct ost_lvb *lvb, __u32 mode, int rc)
3197 {
3198         if (rc == ELDLM_OK) {
3199                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3200                 __u64 tmp;
3201
3202                 LASSERT(lock != NULL);
3203                 loi->loi_lvb = *lvb;
3204                 tmp = loi->loi_lvb.lvb_size;
3205                 /* Extend KMS up to the end of this lock and no further
3206                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3207                 if (tmp > lock->l_policy_data.l_extent.end)
3208                         tmp = lock->l_policy_data.l_extent.end + 1;
3209                 if (tmp >= loi->loi_kms) {
3210                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3211                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3212                         loi_kms_set(loi, tmp);
3213                 } else {
3214                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3215                                    LPU64"; leaving kms="LPU64", end="LPU64,
3216                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3217                                    lock->l_policy_data.l_extent.end);
3218                 }
3219                 ldlm_lock_allow_match(lock);
3220                 LDLM_LOCK_PUT(lock);
3221         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3222                 loi->loi_lvb = *lvb;
3223                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3224                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3225                 rc = ELDLM_OK;
3226         }
3227 }
3228 EXPORT_SYMBOL(osc_update_enqueue);
3229
3230 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3231
3232 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3233  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3234  * other synchronous requests, however keeping some locks and trying to obtain
3235  * others may take a considerable amount of time in a case of ost failure; and
3236  * when other sync requests do not get released lock from a client, the client
3237  * is excluded from the cluster -- such scenarious make the life difficult, so
3238  * release locks just after they are obtained. */
3239 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3240                      int *flags, ldlm_policy_data_t *policy,
3241                      struct ost_lvb *lvb, int kms_valid,
3242                      obd_enqueue_update_f upcall, void *cookie,
3243                      struct ldlm_enqueue_info *einfo,
3244                      struct lustre_handle *lockh,
3245                      struct ptlrpc_request_set *rqset, int async)
3246 {
3247         struct obd_device *obd = exp->exp_obd;
3248         struct ptlrpc_request *req = NULL;
3249         int intent = *flags & LDLM_FL_HAS_INTENT;
3250         ldlm_mode_t mode;
3251         int rc;
3252         ENTRY;
3253
3254         /* Filesystem lock extents are extended to page boundaries so that
3255          * dealing with the page cache is a little smoother.  */
3256         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3257         policy->l_extent.end |= ~CFS_PAGE_MASK;
3258
3259         /*
3260          * kms is not valid when either object is completely fresh (so that no
3261          * locks are cached), or object was evicted. In the latter case cached
3262          * lock cannot be used, because it would prime inode state with
3263          * potentially stale LVB.
3264          */
3265         if (!kms_valid)
3266                 goto no_match;
3267
3268         /* Next, search for already existing extent locks that will cover us */
3269         /* If we're trying to read, we also search for an existing PW lock.  The
3270          * VFS and page cache already protect us locally, so lots of readers/
3271          * writers can share a single PW lock.
3272          *
3273          * There are problems with conversion deadlocks, so instead of
3274          * converting a read lock to a write lock, we'll just enqueue a new
3275          * one.
3276          *
3277          * At some point we should cancel the read lock instead of making them
3278          * send us a blocking callback, but there are problems with canceling
3279          * locks out from other users right now, too. */
3280         mode = einfo->ei_mode;
3281         if (einfo->ei_mode == LCK_PR)
3282                 mode |= LCK_PW;
3283         mode = ldlm_lock_match(obd->obd_namespace,
3284                                *flags | LDLM_FL_LVB_READY, res_id,
3285                                einfo->ei_type, policy, mode, lockh, 0);
3286         if (mode) {
3287                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3288
3289                 if (matched->l_ast_data == NULL ||
3290                     matched->l_ast_data == einfo->ei_cbdata) {
3291                         /* addref the lock only if not async requests and PW
3292                          * lock is matched whereas we asked for PR. */
3293                         if (!rqset && einfo->ei_mode != mode)
3294                                 ldlm_lock_addref(lockh, LCK_PR);
3295                         osc_set_lock_data_with_check(matched, einfo, *flags);
3296                         if (intent) {
3297                                 /* I would like to be able to ASSERT here that
3298                                  * rss <= kms, but I can't, for reasons which
3299                                  * are explained in lov_enqueue() */
3300                         }
3301
3302                         /* We already have a lock, and it's referenced */
3303                         (*upcall)(cookie, ELDLM_OK);
3304
3305                         /* For async requests, decref the lock. */
3306                         if (einfo->ei_mode != mode)
3307                                 ldlm_lock_decref(lockh, LCK_PW);
3308                         else if (rqset)
3309                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3310                         LDLM_LOCK_PUT(matched);
3311                         RETURN(ELDLM_OK);
3312                 } else
3313                         ldlm_lock_decref(lockh, mode);
3314                 LDLM_LOCK_PUT(matched);
3315         }
3316
3317  no_match:
3318         if (intent) {
3319                 CFS_LIST_HEAD(cancels);
3320                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3321                                            &RQF_LDLM_ENQUEUE_LVB);
3322                 if (req == NULL)
3323                         RETURN(-ENOMEM);
3324
3325                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3326                 if (rc)
3327                         RETURN(rc);
3328
3329                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3330                                      sizeof *lvb);
3331                 ptlrpc_request_set_replen(req);
3332         }
3333
3334         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3335         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3336
3337         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3338                               sizeof(*lvb), lockh, async);
3339         if (rqset) {
3340                 if (!rc) {
3341                         struct osc_enqueue_args *aa;
3342                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3343                         aa = ptlrpc_req_async_args(req);
3344                         aa->oa_ei = einfo;
3345                         aa->oa_exp = exp;
3346                         aa->oa_flags  = flags;
3347                         aa->oa_upcall = upcall;
3348                         aa->oa_cookie = cookie;
3349                         aa->oa_lvb    = lvb;
3350                         aa->oa_lockh  = lockh;
3351
3352                         req->rq_interpret_reply =
3353                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3354                         if (rqset == PTLRPCD_SET)
3355                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3356                         else
3357                                 ptlrpc_set_add_req(rqset, req);
3358                 } else if (intent) {
3359                         ptlrpc_req_finished(req);
3360                 }
3361                 RETURN(rc);
3362         }
3363
3364         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3365         if (intent)
3366                 ptlrpc_req_finished(req);
3367
3368         RETURN(rc);
3369 }
3370
3371 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3372                        struct ldlm_enqueue_info *einfo,
3373                        struct ptlrpc_request_set *rqset)
3374 {
3375         struct ldlm_res_id res_id;
3376         int rc;
3377         ENTRY;
3378
3379         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3380                            oinfo->oi_md->lsm_object_gr, &res_id);
3381
3382         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3383                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3384                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3385                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3386                               rqset, rqset != NULL);
3387         RETURN(rc);
3388 }
3389
3390 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3391                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3392                    int *flags, void *data, struct lustre_handle *lockh,
3393                    int unref)
3394 {
3395         struct obd_device *obd = exp->exp_obd;
3396         int lflags = *flags;
3397         ldlm_mode_t rc;
3398         ENTRY;
3399
3400         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3401                 RETURN(-EIO);
3402
3403         /* Filesystem lock extents are extended to page boundaries so that
3404          * dealing with the page cache is a little smoother */
3405         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3406         policy->l_extent.end |= ~CFS_PAGE_MASK;
3407
3408         /* Next, search for already existing extent locks that will cover us */
3409         /* If we're trying to read, we also search for an existing PW lock.  The
3410          * VFS and page cache already protect us locally, so lots of readers/
3411          * writers can share a single PW lock. */
3412         rc = mode;
3413         if (mode == LCK_PR)
3414                 rc |= LCK_PW;
3415         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3416                              res_id, type, policy, rc, lockh, unref);
3417         if (rc) {
3418                 if (data != NULL)
3419                         osc_set_data_with_check(lockh, data, lflags);
3420                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3421                         ldlm_lock_addref(lockh, LCK_PR);
3422                         ldlm_lock_decref(lockh, LCK_PW);
3423                 }
3424                 RETURN(rc);
3425         }
3426         RETURN(rc);
3427 }
3428
3429 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3430 {
3431         ENTRY;
3432
3433         if (unlikely(mode == LCK_GROUP))
3434                 ldlm_lock_decref_and_cancel(lockh, mode);
3435         else
3436                 ldlm_lock_decref(lockh, mode);
3437
3438         RETURN(0);
3439 }
3440
3441 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3442                       __u32 mode, struct lustre_handle *lockh)
3443 {
3444         ENTRY;
3445         RETURN(osc_cancel_base(lockh, mode));
3446 }
3447
3448 static int osc_cancel_unused(struct obd_export *exp,
3449                              struct lov_stripe_md *lsm, int flags,
3450                              void *opaque)
3451 {
3452         struct obd_device *obd = class_exp2obd(exp);
3453         struct ldlm_res_id res_id, *resp = NULL;
3454
3455         if (lsm != NULL) {
3456                 resp = osc_build_res_name(lsm->lsm_object_id,
3457                                           lsm->lsm_object_gr, &res_id);
3458         }
3459
3460         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3461 }
3462
3463 static int osc_statfs_interpret(const struct lu_env *env,
3464                                 struct ptlrpc_request *req,
3465                                 struct osc_async_args *aa, int rc)
3466 {
3467         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3468         struct obd_statfs *msfs;
3469         __u64 used;
3470         ENTRY;
3471
3472         if (rc == -EBADR)
3473                 /* The request has in fact never been sent
3474                  * due to issues at a higher level (LOV).
3475                  * Exit immediately since the caller is
3476                  * aware of the problem and takes care
3477                  * of the clean up */
3478                  RETURN(rc);
3479
3480         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3481             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3482                 GOTO(out, rc = 0);
3483
3484         if (rc != 0)
3485                 GOTO(out, rc);
3486
3487         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3488         if (msfs == NULL) {
3489                 GOTO(out, rc = -EPROTO);
3490         }
3491
3492         /* Reinitialize the RDONLY and DEGRADED flags at the client
3493          * on each statfs, so they don't stay set permanently. */
3494         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3495
3496         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3497                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3498         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3499                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3500
3501         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3502                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3503         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3504                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3505
3506         /* Add a bit of hysteresis so this flag isn't continually flapping,
3507          * and ensure that new files don't get extremely fragmented due to
3508          * only a small amount of available space in the filesystem.
3509          * We want to set the NOSPC flag when there is less than ~0.1% free
3510          * and clear it when there is at least ~0.2% free space, so:
3511          *                   avail < ~0.1% max          max = avail + used
3512          *            1025 * avail < avail + used       used = blocks - free
3513          *            1024 * avail < used
3514          *            1024 * avail < blocks - free                      
3515          *                   avail < ((blocks - free) >> 10)    
3516          *
3517          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3518          * lose that amount of space so in those cases we report no space left
3519          * if their is less than 1 GB left.                             */
3520         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3521         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3522                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3523                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3524         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3525                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3526                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3527
3528         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3529
3530         *aa->aa_oi->oi_osfs = *msfs;
3531 out:
3532         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3533         RETURN(rc);
3534 }
3535
3536 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3537                             __u64 max_age, struct ptlrpc_request_set *rqset)
3538 {
3539         struct ptlrpc_request *req;
3540         struct osc_async_args *aa;
3541         int                    rc;
3542         ENTRY;
3543
3544         /* We could possibly pass max_age in the request (as an absolute
3545          * timestamp or a "seconds.usec ago") so the target can avoid doing
3546          * extra calls into the filesystem if that isn't necessary (e.g.
3547          * during mount that would help a bit).  Having relative timestamps
3548          * is not so great if request processing is slow, while absolute
3549          * timestamps are not ideal because they need time synchronization. */
3550         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3551         if (req == NULL)
3552                 RETURN(-ENOMEM);
3553
3554         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3555         if (rc) {
3556                 ptlrpc_request_free(req);
3557                 RETURN(rc);
3558         }
3559         ptlrpc_request_set_replen(req);
3560         req->rq_request_portal = OST_CREATE_PORTAL;
3561         ptlrpc_at_set_req_timeout(req);
3562
3563         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3564                 /* procfs requests not want stat in wait for avoid deadlock */
3565                 req->rq_no_resend = 1;
3566                 req->rq_no_delay = 1;
3567         }
3568
3569         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3570         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3571         aa = ptlrpc_req_async_args(req);
3572         aa->aa_oi = oinfo;
3573
3574         ptlrpc_set_add_req(rqset, req);
3575         RETURN(0);
3576 }
3577
3578 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3579                       __u64 max_age, __u32 flags)
3580 {
3581         struct obd_statfs     *msfs;
3582         struct ptlrpc_request *req;
3583         struct obd_import     *imp = NULL;
3584         int rc;
3585         ENTRY;
3586
3587         /*Since the request might also come from lprocfs, so we need
3588          *sync this with client_disconnect_export Bug15684*/
3589         cfs_down_read(&obd->u.cli.cl_sem);
3590         if (obd->u.cli.cl_import)
3591                 imp = class_import_get(obd->u.cli.cl_import);
3592         cfs_up_read(&obd->u.cli.cl_sem);
3593         if (!imp)
3594                 RETURN(-ENODEV);
3595
3596         /* We could possibly pass max_age in the request (as an absolute
3597          * timestamp or a "seconds.usec ago") so the target can avoid doing
3598          * extra calls into the filesystem if that isn't necessary (e.g.
3599          * during mount that would help a bit).  Having relative timestamps
3600          * is not so great if request processing is slow, while absolute
3601          * timestamps are not ideal because they need time synchronization. */
3602         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3603
3604         class_import_put(imp);
3605
3606         if (req == NULL)
3607                 RETURN(-ENOMEM);
3608
3609         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3610         if (rc) {
3611                 ptlrpc_request_free(req);
3612                 RETURN(rc);
3613         }
3614         ptlrpc_request_set_replen(req);
3615         req->rq_request_portal = OST_CREATE_PORTAL;
3616         ptlrpc_at_set_req_timeout(req);
3617
3618         if (flags & OBD_STATFS_NODELAY) {
3619                 /* procfs requests not want stat in wait for avoid deadlock */
3620                 req->rq_no_resend = 1;
3621                 req->rq_no_delay = 1;
3622         }
3623
3624         rc = ptlrpc_queue_wait(req);
3625         if (rc)
3626                 GOTO(out, rc);
3627
3628         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3629         if (msfs == NULL) {
3630                 GOTO(out, rc = -EPROTO);
3631         }
3632
3633         *osfs = *msfs;
3634
3635         EXIT;
3636  out:
3637         ptlrpc_req_finished(req);
3638         return rc;
3639 }
3640
3641 /* Retrieve object striping information.
3642  *
3643  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3644  * the maximum number of OST indices which will fit in the user buffer.
3645  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3646  */
3647 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3648 {
3649         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3650         struct lov_user_md_v3 lum, *lumk;
3651         struct lov_user_ost_data_v1 *lmm_objects;
3652         int rc = 0, lum_size;
3653         ENTRY;
3654
3655         if (!lsm)
3656                 RETURN(-ENODATA);
3657
3658         /* we&nb