Whamcloud - gitweb
b=17037
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                                         oinfo->oi_oa->o_gr > 0);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         *oinfo->oi_oa = body->oa;
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_async_args *aa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         *aa->aa_oi->oi_oa = body->oa;
364 out:
365         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
366         RETURN(rc);
367 }
368
369 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
370                              struct obd_trans_info *oti,
371                              struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request *req;
374         struct osc_async_args *aa;
375         int                    rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         osc_pack_req_body(req, oinfo);
390
391         ptlrpc_request_set_replen(req);
392
393         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
394                 LASSERT(oti);
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396         }
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PSCOPE_OTHER);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
407                 aa = ptlrpc_req_async_args(req);
408                 aa->aa_oi = oinfo;
409
410                 ptlrpc_set_add_req(rqset, req);
411         }
412
413         RETURN(0);
414 }
415
416 int osc_real_create(struct obd_export *exp, struct obdo *oa,
417                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
418 {
419         struct ptlrpc_request *req;
420         struct ost_body       *body;
421         struct lov_stripe_md  *lsm;
422         int                    rc;
423         ENTRY;
424
425         LASSERT(oa);
426         LASSERT(ea);
427
428         lsm = *ea;
429         if (!lsm) {
430                 rc = obd_alloc_memmd(exp, &lsm);
431                 if (rc < 0)
432                         RETURN(rc);
433         }
434
435         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436         if (req == NULL)
437                 GOTO(out, rc = -ENOMEM);
438
439         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440         if (rc) {
441                 ptlrpc_request_free(req);
442                 GOTO(out, rc);
443         }
444
445         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446         LASSERT(body);
447         body->oa = *oa;
448
449         ptlrpc_request_set_replen(req);
450
451         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
452             oa->o_flags == OBD_FL_DELORPHAN) {
453                 DEBUG_REQ(D_HA, req,
454                           "delorphan from OST integration");
455                 /* Don't resend the delorphan req */
456                 req->rq_no_resend = req->rq_no_delay = 1;
457         }
458
459         rc = ptlrpc_queue_wait(req);
460         if (rc)
461                 GOTO(out_req, rc);
462
463         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
464         if (body == NULL)
465                 GOTO(out_req, rc = -EPROTO);
466
467         *oa = body->oa;
468
469         /* This should really be sent by the OST */
470         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
471         oa->o_valid |= OBD_MD_FLBLKSZ;
472
473         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
474          * have valid lsm_oinfo data structs, so don't go touching that.
475          * This needs to be fixed in a big way.
476          */
477         lsm->lsm_object_id = oa->o_id;
478         lsm->lsm_object_gr = oa->o_gr;
479         *ea = lsm;
480
481         if (oti != NULL) {
482                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483
484                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485                         if (!oti->oti_logcookies)
486                                 oti_alloc_cookies(oti, 1);
487                         *oti->oti_logcookies = oa->o_lcookie;
488                 }
489         }
490
491         CDEBUG(D_HA, "transno: "LPD64"\n",
492                lustre_msg_get_transno(req->rq_repmsg));
493 out_req:
494         ptlrpc_req_finished(req);
495 out:
496         if (rc && !*ea)
497                 obd_free_memmd(exp, &lsm);
498         RETURN(rc);
499 }
500
501 static int osc_punch_interpret(const struct lu_env *env,
502                                struct ptlrpc_request *req,
503                                struct osc_punch_args *aa, int rc)
504 {
505         struct ost_body *body;
506         ENTRY;
507
508         if (rc != 0)
509                 GOTO(out, rc);
510
511         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
512         if (body == NULL)
513                 GOTO(out, rc = -EPROTO);
514
515         *aa->pa_oa = body->oa;
516 out:
517         rc = aa->pa_upcall(aa->pa_cookie, rc);
518         RETURN(rc);
519 }
520
521 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
522                    struct obd_capa *capa,
523                    obd_enqueue_update_f upcall, void *cookie,
524                    struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_punch_args *aa;
528         struct ost_body       *body;
529         int                    rc;
530         ENTRY;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         osc_set_capa_size(req, &RMF_CAPA1, capa);
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544
545         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546         LASSERT(body);
547         body->oa = *oa;
548         osc_pack_capa(req, body, capa);
549
550         ptlrpc_request_set_replen(req);
551
552
553         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
554         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
555         aa = ptlrpc_req_async_args(req);
556         aa->pa_oa     = oa;
557         aa->pa_upcall = upcall;
558         aa->pa_cookie = cookie;
559         if (rqset == PTLRPCD_SET)
560                 ptlrpcd_add_req(req, PSCOPE_OTHER);
561         else
562                 ptlrpc_set_add_req(rqset, req);
563
564         RETURN(0);
565 }
566
567 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
568                      struct obd_trans_info *oti,
569                      struct ptlrpc_request_set *rqset)
570 {
571         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
572         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
573         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
574         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
575                               oinfo->oi_cb_up, oinfo, rqset);
576 }
577
578 static int osc_sync(struct obd_export *exp, struct obdo *oa,
579                     struct lov_stripe_md *md, obd_size start, obd_size end,
580                     void *capa)
581 {
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         int                    rc;
585         ENTRY;
586
587         if (!oa) {
588                 CDEBUG(D_INFO, "oa NULL\n");
589                 RETURN(-EINVAL);
590         }
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
593         if (req == NULL)
594                 RETURN(-ENOMEM);
595
596         osc_set_capa_size(req, &RMF_CAPA1, capa);
597         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
598         if (rc) {
599                 ptlrpc_request_free(req);
600                 RETURN(rc);
601         }
602
603         /* overload the size and blocks fields in the oa with start/end */
604         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
605         LASSERT(body);
606         body->oa = *oa;
607         body->oa.o_size = start;
608         body->oa.o_blocks = end;
609         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
610         osc_pack_capa(req, body, capa);
611
612         ptlrpc_request_set_replen(req);
613
614         rc = ptlrpc_queue_wait(req);
615         if (rc)
616                 GOTO(out, rc);
617
618         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
619         if (body == NULL)
620                 GOTO(out, rc = -EPROTO);
621
622         *oa = body->oa;
623
624         EXIT;
625  out:
626         ptlrpc_req_finished(req);
627         return rc;
628 }
629
630 /* Find and cancel locally locks matched by @mode in the resource found by
631  * @objid. Found locks are added into @cancel list. Returns the amount of
632  * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634                                    struct list_head *cancels, ldlm_mode_t mode,
635                                    int lock_flags)
636 {
637         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638         struct ldlm_res_id res_id;
639         struct ldlm_resource *res;
640         int count;
641         ENTRY;
642
643         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
644         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
645         if (res == NULL)
646                 RETURN(0);
647
648         LDLM_RESOURCE_ADDREF(res);
649         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
650                                            lock_flags, 0, NULL);
651         LDLM_RESOURCE_DELREF(res);
652         ldlm_resource_putref(res);
653         RETURN(count);
654 }
655
656 static int osc_destroy_interpret(const struct lu_env *env,
657                                  struct ptlrpc_request *req, void *data,
658                                  int rc)
659 {
660         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661
662         atomic_dec(&cli->cl_destroy_in_flight);
663         cfs_waitq_signal(&cli->cl_destroy_waitq);
664         return 0;
665 }
666
667 static int osc_can_send_destroy(struct client_obd *cli)
668 {
669         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
670             cli->cl_max_rpcs_in_flight) {
671                 /* The destroy request can be sent */
672                 return 1;
673         }
674         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
675             cli->cl_max_rpcs_in_flight) {
676                 /*
677                  * The counter has been modified between the two atomic
678                  * operations.
679                  */
680                 cfs_waitq_signal(&cli->cl_destroy_waitq);
681         }
682         return 0;
683 }
684
685 /* Destroy requests can be async always on the client, and we don't even really
686  * care about the return code since the client cannot do anything at all about
687  * a destroy failure.
688  * When the MDS is unlinking a filename, it saves the file objects into a
689  * recovery llog, and these object records are cancelled when the OST reports
690  * they were destroyed and sync'd to disk (i.e. transaction committed).
691  * If the client dies, or the OST is down when the object should be destroyed,
692  * the records are not cancelled, and when the OST reconnects to the MDS next,
693  * it will retrieve the llog unlink logs and then sends the log cancellation
694  * cookies to the MDS after committing destroy transactions. */
695 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
696                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
697                        struct obd_export *md_export)
698 {
699         struct client_obd     *cli = &exp->exp_obd->u.cli;
700         struct ptlrpc_request *req;
701         struct ost_body       *body;
702         CFS_LIST_HEAD(cancels);
703         int rc, count;
704         ENTRY;
705
706         if (!oa) {
707                 CDEBUG(D_INFO, "oa NULL\n");
708                 RETURN(-EINVAL);
709         }
710
711         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
712                                         LDLM_FL_DISCARD_DATA);
713
714         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
715         if (req == NULL) {
716                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
717                 RETURN(-ENOMEM);
718         }
719
720         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
721                                0, &cancels, count);
722         if (rc) {
723                 ptlrpc_request_free(req);
724                 RETURN(rc);
725         }
726
727         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
728         req->rq_interpret_reply = osc_destroy_interpret;
729         ptlrpc_at_set_req_timeout(req);
730
731         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
732                 oa->o_lcookie = *oti->oti_logcookies;
733         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
734         LASSERT(body);
735         body->oa = *oa;
736
737         ptlrpc_request_set_replen(req);
738
739         if (!osc_can_send_destroy(cli)) {
740                 struct l_wait_info lwi = { 0 };
741
742                 /*
743                  * Wait until the number of on-going destroy RPCs drops
744                  * under max_rpc_in_flight
745                  */
746                 l_wait_event_exclusive(cli->cl_destroy_waitq,
747                                        osc_can_send_destroy(cli), &lwi);
748         }
749
750         /* Do not wait for response */
751         ptlrpcd_add_req(req, PSCOPE_OTHER);
752         RETURN(0);
753 }
754
755 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
756                                 long writing_bytes)
757 {
758         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
759
760         LASSERT(!(oa->o_valid & bits));
761
762         oa->o_valid |= bits;
763         client_obd_list_lock(&cli->cl_loi_list_lock);
764         oa->o_dirty = cli->cl_dirty;
765         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
766                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
767                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
768                 oa->o_undirty = 0;
769         } else if (atomic_read(&obd_dirty_pages) -
770                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
771                 CERROR("dirty %d - %d > system dirty_max %d\n",
772                        atomic_read(&obd_dirty_pages),
773                        atomic_read(&obd_dirty_transit_pages),
774                        obd_max_dirty_pages);
775                 oa->o_undirty = 0;
776         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
777                 CERROR("dirty %lu - dirty_max %lu too big???\n",
778                        cli->cl_dirty, cli->cl_dirty_max);
779                 oa->o_undirty = 0;
780         } else {
781                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
782                                 (cli->cl_max_rpcs_in_flight + 1);
783                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784         }
785         oa->o_grant = cli->cl_avail_grant;
786         oa->o_dropped = cli->cl_lost_grant;
787         cli->cl_lost_grant = 0;
788         client_obd_list_unlock(&cli->cl_loi_list_lock);
789         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
790                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
791 }
792
793 /* caller must hold loi_list_lock */
794 static void osc_consume_write_grant(struct client_obd *cli,
795                                     struct brw_page *pga)
796 {
797         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
798         atomic_inc(&obd_dirty_pages);
799         cli->cl_dirty += CFS_PAGE_SIZE;
800         cli->cl_avail_grant -= CFS_PAGE_SIZE;
801         pga->flag |= OBD_BRW_FROM_GRANT;
802         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
803                CFS_PAGE_SIZE, pga, pga->pg);
804         LASSERT(cli->cl_avail_grant >= 0);
805 }
806
807 /* the companion to osc_consume_write_grant, called when a brw has completed.
808  * must be called with the loi lock held. */
809 static void osc_release_write_grant(struct client_obd *cli,
810                                     struct brw_page *pga, int sent)
811 {
812         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
813         ENTRY;
814
815         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
816                 EXIT;
817                 return;
818         }
819
820         pga->flag &= ~OBD_BRW_FROM_GRANT;
821         atomic_dec(&obd_dirty_pages);
822         cli->cl_dirty -= CFS_PAGE_SIZE;
823         if (pga->flag & OBD_BRW_NOCACHE) {
824                 pga->flag &= ~OBD_BRW_NOCACHE;
825                 atomic_dec(&obd_dirty_transit_pages);
826                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
827         }
828         if (!sent) {
829                 cli->cl_lost_grant += CFS_PAGE_SIZE;
830                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
831                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
832         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
833                 /* For short writes we shouldn't count parts of pages that
834                  * span a whole block on the OST side, or our accounting goes
835                  * wrong.  Should match the code in filter_grant_check. */
836                 int offset = pga->off & ~CFS_PAGE_MASK;
837                 int count = pga->count + (offset & (blocksize - 1));
838                 int end = (offset + pga->count) & (blocksize - 1);
839                 if (end)
840                         count += blocksize - end;
841
842                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
843                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
844                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
845                        cli->cl_avail_grant, cli->cl_dirty);
846         }
847
848         EXIT;
849 }
850
851 static unsigned long rpcs_in_flight(struct client_obd *cli)
852 {
853         return cli->cl_r_in_flight + cli->cl_w_in_flight;
854 }
855
856 /* caller must hold loi_list_lock */
857 void osc_wake_cache_waiters(struct client_obd *cli)
858 {
859         struct list_head *l, *tmp;
860         struct osc_cache_waiter *ocw;
861
862         ENTRY;
863         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
864                 /* if we can't dirty more, we must wait until some is written */
865                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
866                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
867                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
868                                "osc max %ld, sys max %d\n", cli->cl_dirty,
869                                cli->cl_dirty_max, obd_max_dirty_pages);
870                         return;
871                 }
872
873                 /* if still dirty cache but no grant wait for pending RPCs that
874                  * may yet return us some grant before doing sync writes */
875                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
876                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
877                                cli->cl_w_in_flight);
878                         return;
879                 }
880
881                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
882                 list_del_init(&ocw->ocw_entry);
883                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
884                         /* no more RPCs in flight to return grant, do sync IO */
885                         ocw->ocw_rc = -EDQUOT;
886                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
887                 } else {
888                         osc_consume_write_grant(cli,
889                                                 &ocw->ocw_oap->oap_brw_page);
890                 }
891
892                 cfs_waitq_signal(&ocw->ocw_waitq);
893         }
894
895         EXIT;
896 }
897
898 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
899 {
900         client_obd_list_lock(&cli->cl_loi_list_lock);
901         cli->cl_avail_grant = ocd->ocd_grant;
902         client_obd_list_unlock(&cli->cl_loi_list_lock);
903
904         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
905                cli->cl_avail_grant, cli->cl_lost_grant);
906         LASSERT(cli->cl_avail_grant >= 0);
907 }
908
909 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
910 {
911         client_obd_list_lock(&cli->cl_loi_list_lock);
912         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
913         if (body->oa.o_valid & OBD_MD_FLGRANT)
914                 cli->cl_avail_grant += body->oa.o_grant;
915         /* waiters are woken in brw_interpret */
916         client_obd_list_unlock(&cli->cl_loi_list_lock);
917 }
918
919 /* We assume that the reason this OSC got a short read is because it read
920  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
921  * via the LOV, and it _knows_ it's reading inside the file, it's just that
922  * this stripe never got written at or beyond this stripe offset yet. */
923 static void handle_short_read(int nob_read, obd_count page_count,
924                               struct brw_page **pga)
925 {
926         char *ptr;
927         int i = 0;
928
929         /* skip bytes read OK */
930         while (nob_read > 0) {
931                 LASSERT (page_count > 0);
932
933                 if (pga[i]->count > nob_read) {
934                         /* EOF inside this page */
935                         ptr = cfs_kmap(pga[i]->pg) +
936                                 (pga[i]->off & ~CFS_PAGE_MASK);
937                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
938                         cfs_kunmap(pga[i]->pg);
939                         page_count--;
940                         i++;
941                         break;
942                 }
943
944                 nob_read -= pga[i]->count;
945                 page_count--;
946                 i++;
947         }
948
949         /* zero remaining pages */
950         while (page_count-- > 0) {
951                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
952                 memset(ptr, 0, pga[i]->count);
953                 cfs_kunmap(pga[i]->pg);
954                 i++;
955         }
956 }
957
958 static int check_write_rcs(struct ptlrpc_request *req,
959                            int requested_nob, int niocount,
960                            obd_count page_count, struct brw_page **pga)
961 {
962         int    *remote_rcs, i;
963
964         /* return error if any niobuf was in error */
965         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
966                                         sizeof(*remote_rcs) * niocount, NULL);
967         if (remote_rcs == NULL) {
968                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
969                 return(-EPROTO);
970         }
971         if (lustre_msg_swabbed(req->rq_repmsg))
972                 for (i = 0; i < niocount; i++)
973                         __swab32s(&remote_rcs[i]);
974
975         for (i = 0; i < niocount; i++) {
976                 if (remote_rcs[i] < 0)
977                         return(remote_rcs[i]);
978
979                 if (remote_rcs[i] != 0) {
980                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
981                                 i, remote_rcs[i], req);
982                         return(-EPROTO);
983                 }
984         }
985
986         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
987                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
988                        req->rq_bulk->bd_nob_transferred, requested_nob);
989                 return(-EPROTO);
990         }
991
992         return (0);
993 }
994
995 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
996 {
997         if (p1->flag != p2->flag) {
998                 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
999
1000                 /* warn if we try to combine flags that we don't know to be
1001                  * safe to combine */
1002                 if ((p1->flag & mask) != (p2->flag & mask))
1003                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1004                                "same brw?\n", p1->flag, p2->flag);
1005                 return 0;
1006         }
1007
1008         return (p1->off + p1->count == p2->off);
1009 }
1010
1011 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1012                                    struct brw_page **pga, int opc,
1013                                    cksum_type_t cksum_type)
1014 {
1015         __u32 cksum;
1016         int i = 0;
1017
1018         LASSERT (pg_count > 0);
1019         cksum = init_checksum(cksum_type);
1020         while (nob > 0 && pg_count > 0) {
1021                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1022                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1023                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1024
1025                 /* corrupt the data before we compute the checksum, to
1026                  * simulate an OST->client data error */
1027                 if (i == 0 && opc == OST_READ &&
1028                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1029                         memcpy(ptr + off, "bad1", min(4, nob));
1030                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1031                 cfs_kunmap(pga[i]->pg);
1032                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1033                                off, cksum);
1034
1035                 nob -= pga[i]->count;
1036                 pg_count--;
1037                 i++;
1038         }
1039         /* For sending we only compute the wrong checksum instead
1040          * of corrupting the data so it is still correct on a redo */
1041         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1042                 cksum++;
1043
1044         return cksum;
1045 }
1046
1047 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1048                                 struct lov_stripe_md *lsm, obd_count page_count,
1049                                 struct brw_page **pga,
1050                                 struct ptlrpc_request **reqp,
1051                                 struct obd_capa *ocapa)
1052 {
1053         struct ptlrpc_request   *req;
1054         struct ptlrpc_bulk_desc *desc;
1055         struct ost_body         *body;
1056         struct obd_ioobj        *ioobj;
1057         struct niobuf_remote    *niobuf;
1058         int niocount, i, requested_nob, opc, rc;
1059         struct osc_brw_async_args *aa;
1060         struct req_capsule      *pill;
1061         struct brw_page *pg_prev;
1062
1063         ENTRY;
1064         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1065                 RETURN(-ENOMEM); /* Recoverable */
1066         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1067                 RETURN(-EINVAL); /* Fatal */
1068
1069         if ((cmd & OBD_BRW_WRITE) != 0) {
1070                 opc = OST_WRITE;
1071                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1072                                                 cli->cl_import->imp_rq_pool,
1073                                                 &RQF_OST_BRW);
1074         } else {
1075                 opc = OST_READ;
1076                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1077         }
1078
1079         if (req == NULL)
1080                 RETURN(-ENOMEM);
1081
1082         for (niocount = i = 1; i < page_count; i++) {
1083                 if (!can_merge_pages(pga[i - 1], pga[i]))
1084                         niocount++;
1085         }
1086
1087         pill = &req->rq_pill;
1088         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1089                              niocount * sizeof(*niobuf));
1090         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1091
1092         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1093         if (rc) {
1094                 ptlrpc_request_free(req);
1095                 RETURN(rc);
1096         }
1097         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1098         ptlrpc_at_set_req_timeout(req);
1099
1100         if (opc == OST_WRITE)
1101                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1102                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1103         else
1104                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1105                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1106
1107         if (desc == NULL)
1108                 GOTO(out, rc = -ENOMEM);
1109         /* NB request now owns desc and will free it when it gets freed */
1110
1111         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1112         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1113         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1114         LASSERT(body && ioobj && niobuf);
1115
1116         body->oa = *oa;
1117
1118         obdo_to_ioobj(oa, ioobj);
1119         ioobj->ioo_bufcnt = niocount;
1120         osc_pack_capa(req, body, ocapa);
1121         LASSERT (page_count > 0);
1122         pg_prev = pga[0];
1123         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1124                 struct brw_page *pg = pga[i];
1125
1126                 LASSERT(pg->count > 0);
1127                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1128                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1129                          pg->off, pg->count);
1130 #ifdef __linux__
1131                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1132                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1133                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1134                          i, page_count,
1135                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1136                          pg_prev->pg, page_private(pg_prev->pg),
1137                          pg_prev->pg->index, pg_prev->off);
1138 #else
1139                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1140                          "i %d p_c %u\n", i, page_count);
1141 #endif
1142                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1143                         (pg->flag & OBD_BRW_SRVLOCK));
1144
1145                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1146                                       pg->count);
1147                 requested_nob += pg->count;
1148
1149                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1150                         niobuf--;
1151                         niobuf->len += pg->count;
1152                 } else {
1153                         niobuf->offset = pg->off;
1154                         niobuf->len    = pg->count;
1155                         niobuf->flags  = pg->flag;
1156                 }
1157                 pg_prev = pg;
1158         }
1159
1160         LASSERTF((void *)(niobuf - niocount) ==
1161                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1162                                niocount * sizeof(*niobuf)),
1163                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1164                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1165                 (void *)(niobuf - niocount));
1166
1167         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1168
1169         /* size[REQ_REC_OFF] still sizeof (*body) */
1170         if (opc == OST_WRITE) {
1171                 if (unlikely(cli->cl_checksum) &&
1172                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1173                         /* store cl_cksum_type in a local variable since
1174                          * it can be changed via lprocfs */
1175                         cksum_type_t cksum_type = cli->cl_cksum_type;
1176
1177                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1178                                 oa->o_flags = body->oa.o_flags = 0;
1179                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1180                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1181                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1182                                                              page_count, pga,
1183                                                              OST_WRITE,
1184                                                              cksum_type);
1185                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1186                                body->oa.o_cksum);
1187                         /* save this in 'oa', too, for later checking */
1188                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1189                         oa->o_flags |= cksum_type_pack(cksum_type);
1190                 } else {
1191                         /* clear out the checksum flag, in case this is a
1192                          * resend but cl_checksum is no longer set. b=11238 */
1193                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1194                 }
1195                 oa->o_cksum = body->oa.o_cksum;
1196                 /* 1 RC per niobuf */
1197                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1198                                      sizeof(__u32) * niocount);
1199         } else {
1200                 if (unlikely(cli->cl_checksum) &&
1201                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1202                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1203                                 body->oa.o_flags = 0;
1204                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1205                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1206                 }
1207                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1208                 /* 1 RC for the whole I/O */
1209         }
1210         ptlrpc_request_set_replen(req);
1211
1212         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1213         aa = ptlrpc_req_async_args(req);
1214         aa->aa_oa = oa;
1215         aa->aa_requested_nob = requested_nob;
1216         aa->aa_nio_count = niocount;
1217         aa->aa_page_count = page_count;
1218         aa->aa_resends = 0;
1219         aa->aa_ppga = pga;
1220         aa->aa_cli = cli;
1221         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1222
1223         *reqp = req;
1224         RETURN(0);
1225
1226  out:
1227         ptlrpc_req_finished(req);
1228         RETURN(rc);
1229 }
1230
1231 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1232                                 __u32 client_cksum, __u32 server_cksum, int nob,
1233                                 obd_count page_count, struct brw_page **pga,
1234                                 cksum_type_t client_cksum_type)
1235 {
1236         __u32 new_cksum;
1237         char *msg;
1238         cksum_type_t cksum_type;
1239
1240         if (server_cksum == client_cksum) {
1241                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1242                 return 0;
1243         }
1244
1245         if (oa->o_valid & OBD_MD_FLFLAGS)
1246                 cksum_type = cksum_type_unpack(oa->o_flags);
1247         else
1248                 cksum_type = OBD_CKSUM_CRC32;
1249
1250         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1251                                       cksum_type);
1252
1253         if (cksum_type != client_cksum_type)
1254                 msg = "the server did not use the checksum type specified in "
1255                       "the original request - likely a protocol problem";
1256         else if (new_cksum == server_cksum)
1257                 msg = "changed on the client after we checksummed it - "
1258                       "likely false positive due to mmap IO (bug 11742)";
1259         else if (new_cksum == client_cksum)
1260                 msg = "changed in transit before arrival at OST";
1261         else
1262                 msg = "changed in transit AND doesn't match the original - "
1263                       "likely false positive due to mmap IO (bug 11742)";
1264
1265         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1266                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1267                            "["LPU64"-"LPU64"]\n",
1268                            msg, libcfs_nid2str(peer->nid),
1269                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1270                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1271                                                         (__u64)0,
1272                            oa->o_id,
1273                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1274                            pga[0]->off,
1275                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1276         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1277                "client csum now %x\n", client_cksum, client_cksum_type,
1278                server_cksum, cksum_type, new_cksum);
1279         return 1;
1280 }
1281
1282 /* Note rc enters this function as number of bytes transferred */
1283 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1284 {
1285         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1286         const lnet_process_id_t *peer =
1287                         &req->rq_import->imp_connection->c_peer;
1288         struct client_obd *cli = aa->aa_cli;
1289         struct ost_body *body;
1290         __u32 client_cksum = 0;
1291         ENTRY;
1292
1293         if (rc < 0 && rc != -EDQUOT)
1294                 RETURN(rc);
1295
1296         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1297         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1298                                   lustre_swab_ost_body);
1299         if (body == NULL) {
1300                 CDEBUG(D_INFO, "Can't unpack body\n");
1301                 RETURN(-EPROTO);
1302         }
1303
1304         /* set/clear over quota flag for a uid/gid */
1305         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1306             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1307                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1308                              body->oa.o_gid, body->oa.o_valid,
1309                              body->oa.o_flags);
1310
1311         if (rc < 0)
1312                 RETURN(rc);
1313
1314         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1315                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1316
1317         osc_update_grant(cli, body);
1318
1319         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1320                 if (rc > 0) {
1321                         CERROR("Unexpected +ve rc %d\n", rc);
1322                         RETURN(-EPROTO);
1323                 }
1324                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1325
1326                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1327                     check_write_checksum(&body->oa, peer, client_cksum,
1328                                          body->oa.o_cksum, aa->aa_requested_nob,
1329                                          aa->aa_page_count, aa->aa_ppga,
1330                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1331                         RETURN(-EAGAIN);
1332
1333                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1334                         RETURN(-EAGAIN);
1335
1336                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1337                                      aa->aa_page_count, aa->aa_ppga);
1338                 GOTO(out, rc);
1339         }
1340
1341         /* The rest of this function executes only for OST_READs */
1342         if (rc > aa->aa_requested_nob) {
1343                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1344                        aa->aa_requested_nob);
1345                 RETURN(-EPROTO);
1346         }
1347
1348         if (rc != req->rq_bulk->bd_nob_transferred) {
1349                 CERROR ("Unexpected rc %d (%d transferred)\n",
1350                         rc, req->rq_bulk->bd_nob_transferred);
1351                 return (-EPROTO);
1352         }
1353
1354         if (rc < aa->aa_requested_nob)
1355                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1356
1357         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1358                                          aa->aa_ppga))
1359                 GOTO(out, rc = -EAGAIN);
1360
1361         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1362                 static int cksum_counter;
1363                 __u32      server_cksum = body->oa.o_cksum;
1364                 char      *via;
1365                 char      *router;
1366                 cksum_type_t cksum_type;
1367
1368                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1369                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1370                 else
1371                         cksum_type = OBD_CKSUM_CRC32;
1372                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1373                                                  aa->aa_ppga, OST_READ,
1374                                                  cksum_type);
1375
1376                 if (peer->nid == req->rq_bulk->bd_sender) {
1377                         via = router = "";
1378                 } else {
1379                         via = " via ";
1380                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1381                 }
1382
1383                 if (server_cksum == ~0 && rc > 0) {
1384                         CERROR("Protocol error: server %s set the 'checksum' "
1385                                "bit, but didn't send a checksum.  Not fatal, "
1386                                "but please notify on http://bugzilla.lustre.org/\n",
1387                                libcfs_nid2str(peer->nid));
1388                 } else if (server_cksum != client_cksum) {
1389                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1390                                            "%s%s%s inum "LPU64"/"LPU64" object "
1391                                            LPU64"/"LPU64" extent "
1392                                            "["LPU64"-"LPU64"]\n",
1393                                            req->rq_import->imp_obd->obd_name,
1394                                            libcfs_nid2str(peer->nid),
1395                                            via, router,
1396                                            body->oa.o_valid & OBD_MD_FLFID ?
1397                                                 body->oa.o_fid : (__u64)0,
1398                                            body->oa.o_valid & OBD_MD_FLFID ?
1399                                                 body->oa.o_generation :(__u64)0,
1400                                            body->oa.o_id,
1401                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1402                                                 body->oa.o_gr : (__u64)0,
1403                                            aa->aa_ppga[0]->off,
1404                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1405                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1406                                                                         1);
1407                         CERROR("client %x, server %x, cksum_type %x\n",
1408                                client_cksum, server_cksum, cksum_type);
1409                         cksum_counter = 0;
1410                         aa->aa_oa->o_cksum = client_cksum;
1411                         rc = -EAGAIN;
1412                 } else {
1413                         cksum_counter++;
1414                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1415                         rc = 0;
1416                 }
1417         } else if (unlikely(client_cksum)) {
1418                 static int cksum_missed;
1419
1420                 cksum_missed++;
1421                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1422                         CERROR("Checksum %u requested from %s but not sent\n",
1423                                cksum_missed, libcfs_nid2str(peer->nid));
1424         } else {
1425                 rc = 0;
1426         }
1427 out:
1428         if (rc >= 0)
1429                 *aa->aa_oa = body->oa;
1430
1431         RETURN(rc);
1432 }
1433
1434 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1435                             struct lov_stripe_md *lsm,
1436                             obd_count page_count, struct brw_page **pga,
1437                             struct obd_capa *ocapa)
1438 {
1439         struct ptlrpc_request *req;
1440         int                    rc;
1441         cfs_waitq_t            waitq;
1442         int                    resends = 0;
1443         struct l_wait_info     lwi;
1444
1445         ENTRY;
1446
1447         cfs_waitq_init(&waitq);
1448
1449 restart_bulk:
1450         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1451                                   page_count, pga, &req, ocapa);
1452         if (rc != 0)
1453                 return (rc);
1454
1455         rc = ptlrpc_queue_wait(req);
1456
1457         if (rc == -ETIMEDOUT && req->rq_resend) {
1458                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1459                 ptlrpc_req_finished(req);
1460                 goto restart_bulk;
1461         }
1462
1463         rc = osc_brw_fini_request(req, rc);
1464
1465         ptlrpc_req_finished(req);
1466         if (osc_recoverable_error(rc)) {
1467                 resends++;
1468                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1469                         CERROR("too many resend retries, returning error\n");
1470                         RETURN(-EIO);
1471                 }
1472
1473                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1474                 l_wait_event(waitq, 0, &lwi);
1475
1476                 goto restart_bulk;
1477         }
1478
1479         RETURN (rc);
1480 }
1481
1482 int osc_brw_redo_request(struct ptlrpc_request *request,
1483                          struct osc_brw_async_args *aa)
1484 {
1485         struct ptlrpc_request *new_req;
1486         struct ptlrpc_request_set *set = request->rq_set;
1487         struct osc_brw_async_args *new_aa;
1488         struct osc_async_page *oap;
1489         int rc = 0;
1490         ENTRY;
1491
1492         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1493                 CERROR("too many resend retries, returning error\n");
1494                 RETURN(-EIO);
1495         }
1496
1497         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1498 /*
1499         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1500         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1501                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1502                                            REQ_REC_OFF + 3);
1503 */
1504         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1505                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1506                                   aa->aa_cli, aa->aa_oa,
1507                                   NULL /* lsm unused by osc currently */,
1508                                   aa->aa_page_count, aa->aa_ppga,
1509                                   &new_req, NULL /* ocapa */);
1510         if (rc)
1511                 RETURN(rc);
1512
1513         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1514
1515         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1516                 if (oap->oap_request != NULL) {
1517                         LASSERTF(request == oap->oap_request,
1518                                  "request %p != oap_request %p\n",
1519                                  request, oap->oap_request);
1520                         if (oap->oap_interrupted) {
1521                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1522                                 ptlrpc_req_finished(new_req);
1523                                 RETURN(-EINTR);
1524                         }
1525                 }
1526         }
1527         /* New request takes over pga and oaps from old request.
1528          * Note that copying a list_head doesn't work, need to move it... */
1529         aa->aa_resends++;
1530         new_req->rq_interpret_reply = request->rq_interpret_reply;
1531         new_req->rq_async_args = request->rq_async_args;
1532         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1533
1534         new_aa = ptlrpc_req_async_args(new_req);
1535
1536         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1537         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1538         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1539
1540         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1541                 if (oap->oap_request) {
1542                         ptlrpc_req_finished(oap->oap_request);
1543                         oap->oap_request = ptlrpc_request_addref(new_req);
1544                 }
1545         }
1546
1547         /* use ptlrpc_set_add_req is safe because interpret functions work
1548          * in check_set context. only one way exist with access to request
1549          * from different thread got -EINTR - this way protected with
1550          * cl_loi_list_lock */
1551         ptlrpc_set_add_req(set, new_req);
1552
1553         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1554
1555         DEBUG_REQ(D_INFO, new_req, "new request");
1556         RETURN(0);
1557 }
1558
1559 /*
1560  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1561  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1562  * fine for our small page arrays and doesn't require allocation.  its an
1563  * insertion sort that swaps elements that are strides apart, shrinking the
1564  * stride down until its '1' and the array is sorted.
1565  */
1566 static void sort_brw_pages(struct brw_page **array, int num)
1567 {
1568         int stride, i, j;
1569         struct brw_page *tmp;
1570
1571         if (num == 1)
1572                 return;
1573         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1574                 ;
1575
1576         do {
1577                 stride /= 3;
1578                 for (i = stride ; i < num ; i++) {
1579                         tmp = array[i];
1580                         j = i;
1581                         while (j >= stride && array[j - stride]->off > tmp->off) {
1582                                 array[j] = array[j - stride];
1583                                 j -= stride;
1584                         }
1585                         array[j] = tmp;
1586                 }
1587         } while (stride > 1);
1588 }
1589
1590 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1591 {
1592         int count = 1;
1593         int offset;
1594         int i = 0;
1595
1596         LASSERT (pages > 0);
1597         offset = pg[i]->off & ~CFS_PAGE_MASK;
1598
1599         for (;;) {
1600                 pages--;
1601                 if (pages == 0)         /* that's all */
1602                         return count;
1603
1604                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1605                         return count;   /* doesn't end on page boundary */
1606
1607                 i++;
1608                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1609                 if (offset != 0)        /* doesn't start on page boundary */
1610                         return count;
1611
1612                 count++;
1613         }
1614 }
1615
1616 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1617 {
1618         struct brw_page **ppga;
1619         int i;
1620
1621         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1622         if (ppga == NULL)
1623                 return NULL;
1624
1625         for (i = 0; i < count; i++)
1626                 ppga[i] = pga + i;
1627         return ppga;
1628 }
1629
1630 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1631 {
1632         LASSERT(ppga != NULL);
1633         OBD_FREE(ppga, sizeof(*ppga) * count);
1634 }
1635
1636 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1637                    obd_count page_count, struct brw_page *pga,
1638                    struct obd_trans_info *oti)
1639 {
1640         struct obdo *saved_oa = NULL;
1641         struct brw_page **ppga, **orig;
1642         struct obd_import *imp = class_exp2cliimp(exp);
1643         struct client_obd *cli = &imp->imp_obd->u.cli;
1644         int rc, page_count_orig;
1645         ENTRY;
1646
1647         if (cmd & OBD_BRW_CHECK) {
1648                 /* The caller just wants to know if there's a chance that this
1649                  * I/O can succeed */
1650
1651                 if (imp == NULL || imp->imp_invalid)
1652                         RETURN(-EIO);
1653                 RETURN(0);
1654         }
1655
1656         /* test_brw with a failed create can trip this, maybe others. */
1657         LASSERT(cli->cl_max_pages_per_rpc);
1658
1659         rc = 0;
1660
1661         orig = ppga = osc_build_ppga(pga, page_count);
1662         if (ppga == NULL)
1663                 RETURN(-ENOMEM);
1664         page_count_orig = page_count;
1665
1666         sort_brw_pages(ppga, page_count);
1667         while (page_count) {
1668                 obd_count pages_per_brw;
1669
1670                 if (page_count > cli->cl_max_pages_per_rpc)
1671                         pages_per_brw = cli->cl_max_pages_per_rpc;
1672                 else
1673                         pages_per_brw = page_count;
1674
1675                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1676
1677                 if (saved_oa != NULL) {
1678                         /* restore previously saved oa */
1679                         *oinfo->oi_oa = *saved_oa;
1680                 } else if (page_count > pages_per_brw) {
1681                         /* save a copy of oa (brw will clobber it) */
1682                         OBDO_ALLOC(saved_oa);
1683                         if (saved_oa == NULL)
1684                                 GOTO(out, rc = -ENOMEM);
1685                         *saved_oa = *oinfo->oi_oa;
1686                 }
1687
1688                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1689                                       pages_per_brw, ppga, oinfo->oi_capa);
1690
1691                 if (rc != 0)
1692                         break;
1693
1694                 page_count -= pages_per_brw;
1695                 ppga += pages_per_brw;
1696         }
1697
1698 out:
1699         osc_release_ppga(orig, page_count_orig);
1700
1701         if (saved_oa != NULL)
1702                 OBDO_FREE(saved_oa);
1703
1704         RETURN(rc);
1705 }
1706
1707 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1708  * the dirty accounting.  Writeback completes or truncate happens before
1709  * writing starts.  Must be called with the loi lock held. */
1710 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1711                            int sent)
1712 {
1713         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1714 }
1715
1716
1717 /* This maintains the lists of pending pages to read/write for a given object
1718  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1719  * to quickly find objects that are ready to send an RPC. */
1720 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1721                          int cmd)
1722 {
1723         int optimal;
1724         ENTRY;
1725
1726         if (lop->lop_num_pending == 0)
1727                 RETURN(0);
1728
1729         /* if we have an invalid import we want to drain the queued pages
1730          * by forcing them through rpcs that immediately fail and complete
1731          * the pages.  recovery relies on this to empty the queued pages
1732          * before canceling the locks and evicting down the llite pages */
1733         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1734                 RETURN(1);
1735
1736         /* stream rpcs in queue order as long as as there is an urgent page
1737          * queued.  this is our cheap solution for good batching in the case
1738          * where writepage marks some random page in the middle of the file
1739          * as urgent because of, say, memory pressure */
1740         if (!list_empty(&lop->lop_urgent)) {
1741                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1742                 RETURN(1);
1743         }
1744         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1745         optimal = cli->cl_max_pages_per_rpc;
1746         if (cmd & OBD_BRW_WRITE) {
1747                 /* trigger a write rpc stream as long as there are dirtiers
1748                  * waiting for space.  as they're waiting, they're not going to
1749                  * create more pages to coallesce with what's waiting.. */
1750                 if (!list_empty(&cli->cl_cache_waiters)) {
1751                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1752                         RETURN(1);
1753                 }
1754                 /* +16 to avoid triggering rpcs that would want to include pages
1755                  * that are being queued but which can't be made ready until
1756                  * the queuer finishes with the page. this is a wart for
1757                  * llite::commit_write() */
1758                 optimal += 16;
1759         }
1760         if (lop->lop_num_pending >= optimal)
1761                 RETURN(1);
1762
1763         RETURN(0);
1764 }
1765
1766 static void on_list(struct list_head *item, struct list_head *list,
1767                     int should_be_on)
1768 {
1769         if (list_empty(item) && should_be_on)
1770                 list_add_tail(item, list);
1771         else if (!list_empty(item) && !should_be_on)
1772                 list_del_init(item);
1773 }
1774
1775 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1776  * can find pages to build into rpcs quickly */
1777 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1778 {
1779         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1780                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1781                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1782
1783         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1784                 loi->loi_write_lop.lop_num_pending);
1785
1786         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1787                 loi->loi_read_lop.lop_num_pending);
1788 }
1789
1790 static void lop_update_pending(struct client_obd *cli,
1791                                struct loi_oap_pages *lop, int cmd, int delta)
1792 {
1793         lop->lop_num_pending += delta;
1794         if (cmd & OBD_BRW_WRITE)
1795                 cli->cl_pending_w_pages += delta;
1796         else
1797                 cli->cl_pending_r_pages += delta;
1798 }
1799
1800 /**
1801  * this is called when a sync waiter receives an interruption.  Its job is to
1802  * get the caller woken as soon as possible.  If its page hasn't been put in an
1803  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1804  * desiring interruption which will forcefully complete the rpc once the rpc
1805  * has timed out.
1806  */
1807 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1808 {
1809         struct loi_oap_pages *lop;
1810         struct lov_oinfo *loi;
1811         int rc = -EBUSY;
1812         ENTRY;
1813
1814         LASSERT(!oap->oap_interrupted);
1815         oap->oap_interrupted = 1;
1816
1817         /* ok, it's been put in an rpc. only one oap gets a request reference */
1818         if (oap->oap_request != NULL) {
1819                 ptlrpc_mark_interrupted(oap->oap_request);
1820                 ptlrpcd_wake(oap->oap_request);
1821                 ptlrpc_req_finished(oap->oap_request);
1822                 oap->oap_request = NULL;
1823         }
1824
1825         /*
1826          * page completion may be called only if ->cpo_prep() method was
1827          * executed by osc_io_submit(), that also adds page the to pending list
1828          */
1829         if (!list_empty(&oap->oap_pending_item)) {
1830                 list_del_init(&oap->oap_pending_item);
1831                 list_del_init(&oap->oap_urgent_item);
1832
1833                 loi = oap->oap_loi;
1834                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1835                         &loi->loi_write_lop : &loi->loi_read_lop;
1836                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1837                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1838                 rc = oap->oap_caller_ops->ap_completion(env,
1839                                           oap->oap_caller_data,
1840                                           oap->oap_cmd, NULL, -EINTR);
1841         }
1842
1843         RETURN(rc);
1844 }
1845
1846 /* this is trying to propogate async writeback errors back up to the
1847  * application.  As an async write fails we record the error code for later if
1848  * the app does an fsync.  As long as errors persist we force future rpcs to be
1849  * sync so that the app can get a sync error and break the cycle of queueing
1850  * pages for which writeback will fail. */
1851 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1852                            int rc)
1853 {
1854         if (rc) {
1855                 if (!ar->ar_rc)
1856                         ar->ar_rc = rc;
1857
1858                 ar->ar_force_sync = 1;
1859                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1860                 return;
1861
1862         }
1863
1864         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1865                 ar->ar_force_sync = 0;
1866 }
1867
1868 void osc_oap_to_pending(struct osc_async_page *oap)
1869 {
1870         struct loi_oap_pages *lop;
1871
1872         if (oap->oap_cmd & OBD_BRW_WRITE)
1873                 lop = &oap->oap_loi->loi_write_lop;
1874         else
1875                 lop = &oap->oap_loi->loi_read_lop;
1876
1877         if (oap->oap_async_flags & ASYNC_URGENT)
1878                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1879         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1880         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1881 }
1882
1883 /* this must be called holding the loi list lock to give coverage to exit_cache,
1884  * async_flag maintenance, and oap_request */
1885 static void osc_ap_completion(const struct lu_env *env,
1886                               struct client_obd *cli, struct obdo *oa,
1887                               struct osc_async_page *oap, int sent, int rc)
1888 {
1889         __u64 xid = 0;
1890
1891         ENTRY;
1892         if (oap->oap_request != NULL) {
1893                 xid = ptlrpc_req_xid(oap->oap_request);
1894                 ptlrpc_req_finished(oap->oap_request);
1895                 oap->oap_request = NULL;
1896         }
1897
1898         oap->oap_async_flags = 0;
1899         oap->oap_interrupted = 0;
1900
1901         if (oap->oap_cmd & OBD_BRW_WRITE) {
1902                 osc_process_ar(&cli->cl_ar, xid, rc);
1903                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1904         }
1905
1906         if (rc == 0 && oa != NULL) {
1907                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1908                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1909                 if (oa->o_valid & OBD_MD_FLMTIME)
1910                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1911                 if (oa->o_valid & OBD_MD_FLATIME)
1912                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1913                 if (oa->o_valid & OBD_MD_FLCTIME)
1914                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1915         }
1916
1917         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1918                                                 oap->oap_cmd, oa, rc);
1919
1920         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1921          * I/O on the page could start, but OSC calls it under lock
1922          * and thus we can add oap back to pending safely */
1923         if (rc)
1924                 /* upper layer wants to leave the page on pending queue */
1925                 osc_oap_to_pending(oap);
1926         else
1927                 osc_exit_cache(cli, oap, sent);
1928         EXIT;
1929 }
1930
1931 static int brw_interpret(const struct lu_env *env,
1932                          struct ptlrpc_request *req, void *data, int rc)
1933 {
1934         struct osc_brw_async_args *aa = data;
1935         struct client_obd *cli;
1936         int async;
1937         ENTRY;
1938
1939         rc = osc_brw_fini_request(req, rc);
1940         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1941         if (osc_recoverable_error(rc)) {
1942                 rc = osc_brw_redo_request(req, aa);
1943                 if (rc == 0)
1944                         RETURN(0);
1945         }
1946
1947         cli = aa->aa_cli;
1948
1949         client_obd_list_lock(&cli->cl_loi_list_lock);
1950
1951         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1952          * is called so we know whether to go to sync BRWs or wait for more
1953          * RPCs to complete */
1954         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1955                 cli->cl_w_in_flight--;
1956         else
1957                 cli->cl_r_in_flight--;
1958
1959         async = list_empty(&aa->aa_oaps);
1960         if (!async) { /* from osc_send_oap_rpc() */
1961                 struct osc_async_page *oap, *tmp;
1962                 /* the caller may re-use the oap after the completion call so
1963                  * we need to clean it up a little */
1964                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1965                         list_del_init(&oap->oap_rpc_item);
1966                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1967                 }
1968                 OBDO_FREE(aa->aa_oa);
1969         } else { /* from async_internal() */
1970                 int i;
1971                 for (i = 0; i < aa->aa_page_count; i++)
1972                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1973         }
1974         osc_wake_cache_waiters(cli);
1975         osc_check_rpcs(env, cli);
1976         client_obd_list_unlock(&cli->cl_loi_list_lock);
1977         if (!async)
1978                 cl_req_completion(env, aa->aa_clerq, rc);
1979         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1980         RETURN(rc);
1981 }
1982
1983 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1984                                             struct client_obd *cli,
1985                                             struct list_head *rpc_list,
1986                                             int page_count, int cmd)
1987 {
1988         struct ptlrpc_request *req;
1989         struct brw_page **pga = NULL;
1990         struct osc_brw_async_args *aa;
1991         struct obdo *oa = NULL;
1992         const struct obd_async_page_ops *ops = NULL;
1993         void *caller_data = NULL;
1994         struct osc_async_page *oap;
1995         struct osc_async_page *tmp;
1996         struct ost_body *body;
1997         struct cl_req *clerq = NULL;
1998         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1999         struct ldlm_lock *lock = NULL;
2000         struct cl_req_attr crattr;
2001         int i, rc;
2002
2003         ENTRY;
2004         LASSERT(!list_empty(rpc_list));
2005
2006         memset(&crattr, 0, sizeof crattr);
2007         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2008         if (pga == NULL)
2009                 GOTO(out, req = ERR_PTR(-ENOMEM));
2010
2011         OBDO_ALLOC(oa);
2012         if (oa == NULL)
2013                 GOTO(out, req = ERR_PTR(-ENOMEM));
2014
2015         i = 0;
2016         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2017                 struct cl_page *page = osc_oap2cl_page(oap);
2018                 if (ops == NULL) {
2019                         ops = oap->oap_caller_ops;
2020                         caller_data = oap->oap_caller_data;
2021
2022                         clerq = cl_req_alloc(env, page, crt,
2023                                              1 /* only 1-object rpcs for
2024                                                 * now */);
2025                         if (IS_ERR(clerq))
2026                                 GOTO(out, req = (void *)clerq);
2027                         lock = oap->oap_ldlm_lock;
2028                 }
2029                 pga[i] = &oap->oap_brw_page;
2030                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2031                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2032                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2033                 i++;
2034                 cl_req_page_add(env, clerq, page);
2035         }
2036
2037         /* always get the data for the obdo for the rpc */
2038         LASSERT(ops != NULL);
2039         crattr.cra_oa = oa;
2040         crattr.cra_capa = NULL;
2041         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2042         if (lock) {
2043                 oa->o_handle = lock->l_remote_handle;
2044                 oa->o_valid |= OBD_MD_FLHANDLE;
2045         }
2046
2047         rc = cl_req_prep(env, clerq);
2048         if (rc != 0) {
2049                 CERROR("cl_req_prep failed: %d\n", rc);
2050                 GOTO(out, req = ERR_PTR(rc));
2051         }
2052
2053         sort_brw_pages(pga, page_count);
2054         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2055                                   pga, &req, crattr.cra_capa);
2056         if (rc != 0) {
2057                 CERROR("prep_req failed: %d\n", rc);
2058                 GOTO(out, req = ERR_PTR(rc));
2059         }
2060
2061         /* Need to update the timestamps after the request is built in case
2062          * we race with setattr (locally or in queue at OST).  If OST gets
2063          * later setattr before earlier BRW (as determined by the request xid),
2064          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2065          * way to do this in a single call.  bug 10150 */
2066         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2067         cl_req_attr_set(env, clerq, &crattr,
2068                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2069
2070         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2071         aa = ptlrpc_req_async_args(req);
2072         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2073         list_splice(rpc_list, &aa->aa_oaps);
2074         CFS_INIT_LIST_HEAD(rpc_list);
2075         aa->aa_clerq = clerq;
2076 out:
2077         capa_put(crattr.cra_capa);
2078         if (IS_ERR(req)) {
2079                 if (oa)
2080                         OBDO_FREE(oa);
2081                 if (pga)
2082                         OBD_FREE(pga, sizeof(*pga) * page_count);
2083                 /* this should happen rarely and is pretty bad, it makes the
2084                  * pending list not follow the dirty order */
2085                 client_obd_list_lock(&cli->cl_loi_list_lock);
2086                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2087                         list_del_init(&oap->oap_rpc_item);
2088
2089                         /* queued sync pages can be torn down while the pages
2090                          * were between the pending list and the rpc */
2091                         if (oap->oap_interrupted) {
2092                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2093                                 osc_ap_completion(env, cli, NULL, oap, 0,
2094                                                   oap->oap_count);
2095                                 continue;
2096                         }
2097                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2098                 }
2099                 if (clerq && !IS_ERR(clerq))
2100                         cl_req_completion(env, clerq, PTR_ERR(req));
2101         }
2102         RETURN(req);
2103 }
2104
2105 /**
2106  * prepare pages for ASYNC io and put pages in send queue.
2107  *
2108  * \param cli -
2109  * \param loi -
2110  * \param cmd - OBD_BRW_* macroses
2111  * \param lop - pending pages
2112  *
2113  * \return zero if pages successfully add to send queue.
2114  * \return not zere if error occurring.
2115  */
2116 static int
2117 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2118                  struct lov_oinfo *loi,
2119                  int cmd, struct loi_oap_pages *lop)
2120 {
2121         struct ptlrpc_request *req;
2122         obd_count page_count = 0;
2123         struct osc_async_page *oap = NULL, *tmp;
2124         struct osc_brw_async_args *aa;
2125         const struct obd_async_page_ops *ops;
2126         CFS_LIST_HEAD(rpc_list);
2127         unsigned int ending_offset;
2128         unsigned  starting_offset = 0;
2129         int srvlock = 0;
2130         struct cl_object *clob = NULL;
2131         ENTRY;
2132
2133         /* first we find the pages we're allowed to work with */
2134         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2135                                  oap_pending_item) {
2136                 ops = oap->oap_caller_ops;
2137
2138                 LASSERT(oap->oap_magic == OAP_MAGIC);
2139
2140                 if (clob == NULL) {
2141                         /* pin object in memory, so that completion call-backs
2142                          * can be safely called under client_obd_list lock. */
2143                         clob = osc_oap2cl_page(oap)->cp_obj;
2144                         cl_object_get(clob);
2145                 }
2146
2147                 if (page_count != 0 &&
2148                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2149                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2150                                " oap %p, page %p, srvlock %u\n",
2151                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2152                         break;
2153                 }
2154                 /* in llite being 'ready' equates to the page being locked
2155                  * until completion unlocks it.  commit_write submits a page
2156                  * as not ready because its unlock will happen unconditionally
2157                  * as the call returns.  if we race with commit_write giving
2158                  * us that page we dont' want to create a hole in the page
2159                  * stream, so we stop and leave the rpc to be fired by
2160                  * another dirtier or kupdated interval (the not ready page
2161                  * will still be on the dirty list).  we could call in
2162                  * at the end of ll_file_write to process the queue again. */
2163                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2164                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2165                                                     cmd);
2166                         if (rc < 0)
2167                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2168                                                 "instead of ready\n", oap,
2169                                                 oap->oap_page, rc);
2170                         switch (rc) {
2171                         case -EAGAIN:
2172                                 /* llite is telling us that the page is still
2173                                  * in commit_write and that we should try
2174                                  * and put it in an rpc again later.  we
2175                                  * break out of the loop so we don't create
2176                                  * a hole in the sequence of pages in the rpc
2177                                  * stream.*/
2178                                 oap = NULL;
2179                                 break;
2180                         case -EINTR:
2181                                 /* the io isn't needed.. tell the checks
2182                                  * below to complete the rpc with EINTR */
2183                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2184                                 oap->oap_count = -EINTR;
2185                                 break;
2186                         case 0:
2187                                 oap->oap_async_flags |= ASYNC_READY;
2188                                 break;
2189                         default:
2190                                 LASSERTF(0, "oap %p page %p returned %d "
2191                                             "from make_ready\n", oap,
2192                                             oap->oap_page, rc);
2193                                 break;
2194                         }
2195                 }
2196                 if (oap == NULL)
2197                         break;
2198                 /*
2199                  * Page submitted for IO has to be locked. Either by
2200                  * ->ap_make_ready() or by higher layers.
2201                  */
2202 #if defined(__KERNEL__) && defined(__linux__)
2203                 {
2204                         struct cl_page *page;
2205
2206                         page = osc_oap2cl_page(oap);
2207
2208                         if (page->cp_type == CPT_CACHEABLE &&
2209                             !(PageLocked(oap->oap_page) &&
2210                               (CheckWriteback(oap->oap_page, cmd)))) {
2211                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2212                                        oap->oap_page,
2213                                        (long)oap->oap_page->flags,
2214                                        oap->oap_async_flags);
2215                                 LBUG();
2216                         }
2217                 }
2218 #endif
2219                 /* If there is a gap at the start of this page, it can't merge
2220                  * with any previous page, so we'll hand the network a
2221                  * "fragmented" page array that it can't transfer in 1 RDMA */
2222                 if (page_count != 0 && oap->oap_page_off != 0)
2223                         break;
2224
2225                 /* take the page out of our book-keeping */
2226                 list_del_init(&oap->oap_pending_item);
2227                 lop_update_pending(cli, lop, cmd, -1);
2228                 list_del_init(&oap->oap_urgent_item);
2229
2230                 if (page_count == 0)
2231                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2232                                           (PTLRPC_MAX_BRW_SIZE - 1);
2233
2234                 /* ask the caller for the size of the io as the rpc leaves. */
2235                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2236                         oap->oap_count =
2237                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2238                                                       cmd);
2239                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2240                 }
2241                 if (oap->oap_count <= 0) {
2242                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2243                                oap->oap_count);
2244                         osc_ap_completion(env, cli, NULL,
2245                                           oap, 0, oap->oap_count);
2246                         continue;
2247                 }
2248
2249                 /* now put the page back in our accounting */
2250                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2251                 if (page_count == 0)
2252                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2253                 if (++page_count >= cli->cl_max_pages_per_rpc)
2254                         break;
2255
2256                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2257                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2258                  * have the same alignment as the initial writes that allocated
2259                  * extents on the server. */
2260                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2261                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2262                 if (ending_offset == 0)
2263                         break;
2264
2265                 /* If there is a gap at the end of this page, it can't merge
2266                  * with any subsequent pages, so we'll hand the network a
2267                  * "fragmented" page array that it can't transfer in 1 RDMA */
2268                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2269                         break;
2270         }
2271
2272         osc_wake_cache_waiters(cli);
2273
2274         loi_list_maint(cli, loi);
2275
2276         client_obd_list_unlock(&cli->cl_loi_list_lock);
2277
2278         if (clob != NULL)
2279                 cl_object_put(env, clob);
2280
2281         if (page_count == 0) {
2282                 client_obd_list_lock(&cli->cl_loi_list_lock);
2283                 RETURN(0);
2284         }
2285
2286         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2287         if (IS_ERR(req)) {
2288                 LASSERT(list_empty(&rpc_list));
2289                 loi_list_maint(cli, loi);
2290                 RETURN(PTR_ERR(req));
2291         }
2292
2293         aa = ptlrpc_req_async_args(req);
2294
2295         if (cmd == OBD_BRW_READ) {
2296                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2297                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2298                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2299                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2300         } else {
2301                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2302                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2303                                  cli->cl_w_in_flight);
2304                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2305                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2306         }
2307         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2308
2309         client_obd_list_lock(&cli->cl_loi_list_lock);
2310
2311         if (cmd == OBD_BRW_READ)
2312                 cli->cl_r_in_flight++;
2313         else
2314                 cli->cl_w_in_flight++;
2315
2316         /* queued sync pages can be torn down while the pages
2317          * were between the pending list and the rpc */
2318         tmp = NULL;
2319         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2320                 /* only one oap gets a request reference */
2321                 if (tmp == NULL)
2322                         tmp = oap;
2323                 if (oap->oap_interrupted && !req->rq_intr) {
2324                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2325                                oap, req);
2326                         ptlrpc_mark_interrupted(req);
2327                 }
2328         }
2329         if (tmp != NULL)
2330                 tmp->oap_request = ptlrpc_request_addref(req);
2331
2332         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2333                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2334
2335         req->rq_interpret_reply = brw_interpret;
2336         ptlrpcd_add_req(req, PSCOPE_BRW);
2337         RETURN(1);
2338 }
2339
2340 #define LOI_DEBUG(LOI, STR, args...)                                     \
2341         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2342                !list_empty(&(LOI)->loi_cli_item),                        \
2343                (LOI)->loi_write_lop.lop_num_pending,                     \
2344                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2345                (LOI)->loi_read_lop.lop_num_pending,                      \
2346                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2347                args)                                                     \
2348
2349 /* This is called by osc_check_rpcs() to find which objects have pages that
2350  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2351 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2352 {
2353         ENTRY;
2354         /* first return all objects which we already know to have
2355          * pages ready to be stuffed into rpcs */
2356         if (!list_empty(&cli->cl_loi_ready_list))
2357                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2358                                   struct lov_oinfo, loi_cli_item));
2359
2360         /* then if we have cache waiters, return all objects with queued
2361          * writes.  This is especially important when many small files
2362          * have filled up the cache and not been fired into rpcs because
2363          * they don't pass the nr_pending/object threshhold */
2364         if (!list_empty(&cli->cl_cache_waiters) &&
2365             !list_empty(&cli->cl_loi_write_list))
2366                 RETURN(list_entry(cli->cl_loi_write_list.next,
2367                                   struct lov_oinfo, loi_write_item));
2368
2369         /* then return all queued objects when we have an invalid import
2370          * so that they get flushed */
2371         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2372                 if (!list_empty(&cli->cl_loi_write_list))
2373                         RETURN(list_entry(cli->cl_loi_write_list.next,
2374                                           struct lov_oinfo, loi_write_item));
2375                 if (!list_empty(&cli->cl_loi_read_list))
2376                         RETURN(list_entry(cli->cl_loi_read_list.next,
2377                                           struct lov_oinfo, loi_read_item));
2378         }
2379         RETURN(NULL);
2380 }
2381
2382 /* called with the loi list lock held */
2383 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2384 {
2385         struct lov_oinfo *loi;
2386         int rc = 0, race_counter = 0;
2387         ENTRY;
2388
2389         while ((loi = osc_next_loi(cli)) != NULL) {
2390                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2391
2392                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2393                         break;
2394
2395                 /* attempt some read/write balancing by alternating between
2396                  * reads and writes in an object.  The makes_rpc checks here
2397                  * would be redundant if we were getting read/write work items
2398                  * instead of objects.  we don't want send_oap_rpc to drain a
2399                  * partial read pending queue when we're given this object to
2400                  * do io on writes while there are cache waiters */
2401                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2402                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2403                                               &loi->loi_write_lop);
2404                         if (rc < 0)
2405                                 break;
2406                         if (rc > 0)
2407                                 race_counter = 0;
2408                         else
2409                                 race_counter++;
2410                 }
2411                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2412                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2413                                               &loi->loi_read_lop);
2414                         if (rc < 0)
2415                                 break;
2416                         if (rc > 0)
2417                                 race_counter = 0;
2418                         else
2419                                 race_counter++;
2420                 }
2421
2422                 /* attempt some inter-object balancing by issueing rpcs
2423                  * for each object in turn */
2424                 if (!list_empty(&loi->loi_cli_item))
2425                         list_del_init(&loi->loi_cli_item);
2426                 if (!list_empty(&loi->loi_write_item))
2427                         list_del_init(&loi->loi_write_item);
2428                 if (!list_empty(&loi->loi_read_item))
2429                         list_del_init(&loi->loi_read_item);
2430
2431                 loi_list_maint(cli, loi);
2432
2433                 /* send_oap_rpc fails with 0 when make_ready tells it to
2434                  * back off.  llite's make_ready does this when it tries
2435                  * to lock a page queued for write that is already locked.
2436                  * we want to try sending rpcs from many objects, but we
2437                  * don't want to spin failing with 0.  */
2438                 if (race_counter == 10)
2439                         break;
2440         }
2441         EXIT;
2442 }
2443
2444 /* we're trying to queue a page in the osc so we're subject to the
2445  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2446  * If the osc's queued pages are already at that limit, then we want to sleep
2447  * until there is space in the osc's queue for us.  We also may be waiting for
2448  * write credits from the OST if there are RPCs in flight that may return some
2449  * before we fall back to sync writes.
2450  *
2451  * We need this know our allocation was granted in the presence of signals */
2452 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2453 {
2454         int rc;
2455         ENTRY;
2456         client_obd_list_lock(&cli->cl_loi_list_lock);
2457         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2458         client_obd_list_unlock(&cli->cl_loi_list_lock);
2459         RETURN(rc);
2460 };
2461
2462 /**
2463  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2464  * is available.
2465  */
2466 int osc_enter_cache_try(const struct lu_env *env,
2467                         struct client_obd *cli, struct lov_oinfo *loi,
2468                         struct osc_async_page *oap, int transient)
2469 {
2470         int has_grant;
2471
2472         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2473         if (has_grant) {
2474                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2475                 if (transient) {
2476                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2477                         atomic_inc(&obd_dirty_transit_pages);
2478                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2479                 }
2480         }
2481         return has_grant;
2482 }
2483
2484 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2485  * grant or cache space. */
2486 static int osc_enter_cache(const struct lu_env *env,
2487                            struct client_obd *cli, struct lov_oinfo *loi,
2488                            struct osc_async_page *oap)
2489 {
2490         struct osc_cache_waiter ocw;
2491         struct l_wait_info lwi = { 0 };
2492
2493         ENTRY;
2494
2495         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2496                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2497                cli->cl_dirty_max, obd_max_dirty_pages,
2498                cli->cl_lost_grant, cli->cl_avail_grant);
2499
2500         /* force the caller to try sync io.  this can jump the list
2501          * of queued writes and create a discontiguous rpc stream */
2502         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2503             loi->loi_ar.ar_force_sync)
2504                 RETURN(-EDQUOT);
2505
2506         /* Hopefully normal case - cache space and write credits available */
2507         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2508             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2509             osc_enter_cache_try(env, cli, loi, oap, 0))
2510                 RETURN(0);
2511
2512         /* Make sure that there are write rpcs in flight to wait for.  This
2513          * is a little silly as this object may not have any pending but
2514          * other objects sure might. */
2515         if (cli->cl_w_in_flight) {
2516                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2517                 cfs_waitq_init(&ocw.ocw_waitq);
2518                 ocw.ocw_oap = oap;
2519                 ocw.ocw_rc = 0;
2520
2521                 loi_list_maint(cli, loi);
2522                 osc_check_rpcs(env, cli);
2523                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2524
2525                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2526                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2527
2528                 client_obd_list_lock(&cli->cl_loi_list_lock);
2529                 if (!list_empty(&ocw.ocw_entry)) {
2530                         list_del(&ocw.ocw_entry);
2531                         RETURN(-EINTR);
2532                 }
2533                 RETURN(ocw.ocw_rc);
2534         }
2535
2536         RETURN(-EDQUOT);
2537 }
2538
2539
2540 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2541                         struct lov_oinfo *loi, cfs_page_t *page,
2542                         obd_off offset, const struct obd_async_page_ops *ops,
2543                         void *data, void **res, int nocache,
2544                         struct lustre_handle *lockh)
2545 {
2546         struct osc_async_page *oap;
2547
2548         ENTRY;
2549
2550         if (!page)
2551                 return size_round(sizeof(*oap));
2552
2553         oap = *res;
2554         oap->oap_magic = OAP_MAGIC;
2555         oap->oap_cli = &exp->exp_obd->u.cli;
2556         oap->oap_loi = loi;
2557
2558         oap->oap_caller_ops = ops;
2559         oap->oap_caller_data = data;
2560
2561         oap->oap_page = page;
2562         oap->oap_obj_off = offset;
2563
2564         LASSERT(!(offset & ~CFS_PAGE_MASK));
2565
2566         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2567         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2568         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2569         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2570
2571         spin_lock_init(&oap->oap_lock);
2572         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2573         RETURN(0);
2574 }
2575
2576 struct osc_async_page *oap_from_cookie(void *cookie)
2577 {
2578         struct osc_async_page *oap = cookie;
2579         if (oap->oap_magic != OAP_MAGIC)
2580                 return ERR_PTR(-EINVAL);
2581         return oap;
2582 };
2583
2584 int osc_queue_async_io(const struct lu_env *env,
2585                        struct obd_export *exp, struct lov_stripe_md *lsm,
2586                        struct lov_oinfo *loi, void *cookie,
2587                        int cmd, obd_off off, int count,
2588                        obd_flag brw_flags, enum async_flags async_flags)
2589 {
2590         struct client_obd *cli = &exp->exp_obd->u.cli;
2591         struct osc_async_page *oap;
2592         int rc = 0;
2593         ENTRY;
2594
2595         oap = oap_from_cookie(cookie);
2596         if (IS_ERR(oap))
2597                 RETURN(PTR_ERR(oap));
2598
2599         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2600                 RETURN(-EIO);
2601
2602         if (!list_empty(&oap->oap_pending_item) ||
2603             !list_empty(&oap->oap_urgent_item) ||
2604             !list_empty(&oap->oap_rpc_item))
2605                 RETURN(-EBUSY);
2606
2607         /* check if the file's owner/group is over quota */
2608 #ifdef HAVE_QUOTA_SUPPORT
2609         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2610                 struct cl_object *obj;
2611                 struct cl_attr    attr; /* XXX put attr into thread info */
2612
2613                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2614
2615                 cl_object_attr_lock(obj);
2616                 rc = cl_object_attr_get(env, obj, &attr);
2617                 cl_object_attr_unlock(obj);
2618
2619                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2620                                             attr.cat_gid) == NO_QUOTA)
2621                         rc = -EDQUOT;
2622                 if (rc)
2623                         RETURN(rc);
2624         }
2625 #endif
2626
2627         if (loi == NULL)
2628                 loi = lsm->lsm_oinfo[0];
2629
2630         client_obd_list_lock(&cli->cl_loi_list_lock);
2631
2632         LASSERT(off + count <= CFS_PAGE_SIZE);
2633         oap->oap_cmd = cmd;
2634         oap->oap_page_off = off;
2635         oap->oap_count = count;
2636         oap->oap_brw_flags = brw_flags;
2637         oap->oap_async_flags = async_flags;
2638
2639         if (cmd & OBD_BRW_WRITE) {
2640                 rc = osc_enter_cache(env, cli, loi, oap);
2641                 if (rc) {
2642                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2643                         RETURN(rc);
2644                 }
2645         }
2646
2647         osc_oap_to_pending(oap);
2648         loi_list_maint(cli, loi);
2649
2650         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2651                   cmd);
2652
2653         osc_check_rpcs(env, cli);
2654         client_obd_list_unlock(&cli->cl_loi_list_lock);
2655
2656         RETURN(0);
2657 }
2658
2659 /* aka (~was & now & flag), but this is more clear :) */
2660 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2661
2662 int osc_set_async_flags_base(struct client_obd *cli,
2663                              struct lov_oinfo *loi, struct osc_async_page *oap,
2664                              obd_flag async_flags)
2665 {
2666         struct loi_oap_pages *lop;
2667         ENTRY;
2668
2669         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2670                 RETURN(-EIO);
2671
2672         if (oap->oap_cmd & OBD_BRW_WRITE) {
2673                 lop = &loi->loi_write_lop;
2674         } else {
2675                 lop = &loi->loi_read_lop;
2676         }
2677
2678         if (list_empty(&oap->oap_pending_item))
2679                 RETURN(-EINVAL);
2680
2681         if ((oap->oap_async_flags & async_flags) == async_flags)
2682                 RETURN(0);
2683
2684         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2685                 oap->oap_async_flags |= ASYNC_READY;
2686
2687         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2688                 if (list_empty(&oap->oap_rpc_item)) {
2689                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2690                         loi_list_maint(cli, loi);
2691                 }
2692         }
2693
2694         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2695                         oap->oap_async_flags);
2696         RETURN(0);
2697 }
2698
2699 int osc_teardown_async_page(struct obd_export *exp,
2700                             struct lov_stripe_md *lsm,
2701                             struct lov_oinfo *loi, void *cookie)
2702 {
2703         struct client_obd *cli = &exp->exp_obd->u.cli;
2704         struct loi_oap_pages *lop;
2705         struct osc_async_page *oap;
2706         int rc = 0;
2707         ENTRY;
2708
2709         oap = oap_from_cookie(cookie);
2710         if (IS_ERR(oap))
2711                 RETURN(PTR_ERR(oap));
2712
2713         if (loi == NULL)
2714                 loi = lsm->lsm_oinfo[0];
2715
2716         if (oap->oap_cmd & OBD_BRW_WRITE) {
2717                 lop = &loi->loi_write_lop;
2718         } else {
2719                 lop = &loi->loi_read_lop;
2720         }
2721
2722         client_obd_list_lock(&cli->cl_loi_list_lock);
2723
2724         if (!list_empty(&oap->oap_rpc_item))
2725                 GOTO(out, rc = -EBUSY);
2726
2727         osc_exit_cache(cli, oap, 0);
2728         osc_wake_cache_waiters(cli);
2729
2730         if (!list_empty(&oap->oap_urgent_item)) {
2731                 list_del_init(&oap->oap_urgent_item);
2732                 oap->oap_async_flags &= ~ASYNC_URGENT;
2733         }
2734         if (!list_empty(&oap->oap_pending_item)) {
2735                 list_del_init(&oap->oap_pending_item);
2736                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2737         }
2738         loi_list_maint(cli, loi);
2739         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2740 out:
2741         client_obd_list_unlock(&cli->cl_loi_list_lock);
2742         RETURN(rc);
2743 }
2744
2745 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2746                                          struct ldlm_enqueue_info *einfo,
2747                                          int flags)
2748 {
2749         void *data = einfo->ei_cbdata;
2750
2751         LASSERT(lock != NULL);
2752         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2753         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2754         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2755         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2756
2757         lock_res_and_lock(lock);
2758         spin_lock(&osc_ast_guard);
2759         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2760         lock->l_ast_data = data;
2761         spin_unlock(&osc_ast_guard);
2762         unlock_res_and_lock(lock);
2763 }
2764
2765 static void osc_set_data_with_check(struct lustre_handle *lockh,
2766                                     struct ldlm_enqueue_info *einfo,
2767                                     int flags)
2768 {
2769         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2770
2771         if (lock != NULL) {
2772                 osc_set_lock_data_with_check(lock, einfo, flags);
2773                 LDLM_LOCK_PUT(lock);
2774         } else
2775                 CERROR("lockh %p, data %p - client evicted?\n",
2776                        lockh, einfo->ei_cbdata);
2777 }
2778
2779 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2780                              ldlm_iterator_t replace, void *data)
2781 {
2782         struct ldlm_res_id res_id;
2783         struct obd_device *obd = class_exp2obd(exp);
2784
2785         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2786         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2787         return 0;
2788 }
2789
2790 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2791                             obd_enqueue_update_f upcall, void *cookie,
2792                             int *flags, int rc)
2793 {
2794         int intent = *flags & LDLM_FL_HAS_INTENT;
2795         ENTRY;
2796
2797         if (intent) {
2798                 /* The request was created before ldlm_cli_enqueue call. */
2799                 if (rc == ELDLM_LOCK_ABORTED) {
2800                         struct ldlm_reply *rep;
2801                         rep = req_capsule_server_get(&req->rq_pill,
2802                                                      &RMF_DLM_REP);
2803
2804                         LASSERT(rep != NULL);
2805                         if (rep->lock_policy_res1)
2806                                 rc = rep->lock_policy_res1;
2807                 }
2808         }
2809
2810         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2811                 *flags |= LDLM_FL_LVB_READY;
2812                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2813                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2814         }
2815
2816         /* Call the update callback. */
2817         rc = (*upcall)(cookie, rc);
2818         RETURN(rc);
2819 }
2820
2821 static int osc_enqueue_interpret(const struct lu_env *env,
2822                                  struct ptlrpc_request *req,
2823                                  struct osc_enqueue_args *aa, int rc)
2824 {
2825         struct ldlm_lock *lock;
2826         struct lustre_handle handle;
2827         __u32 mode;
2828
2829         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2830          * might be freed anytime after lock upcall has been called. */
2831         lustre_handle_copy(&handle, aa->oa_lockh);
2832         mode = aa->oa_ei->ei_mode;
2833
2834         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2835          * be valid. */
2836         lock = ldlm_handle2lock(&handle);
2837
2838         /* Take an additional reference so that a blocking AST that
2839          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2840          * to arrive after an upcall has been executed by
2841          * osc_enqueue_fini(). */
2842         ldlm_lock_addref(&handle, mode);
2843
2844         /* Complete obtaining the lock procedure. */
2845         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2846                                    mode, aa->oa_flags, aa->oa_lvb,
2847                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2848                                    &handle, rc);
2849         /* Complete osc stuff. */
2850         rc = osc_enqueue_fini(req, aa->oa_lvb,
2851                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2852         /* Release the lock for async request. */
2853         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2854                 /*
2855                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2856                  * not already released by
2857                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2858                  */
2859                 ldlm_lock_decref(&handle, mode);
2860
2861         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2862                  aa->oa_lockh, req, aa);
2863         ldlm_lock_decref(&handle, mode);
2864         LDLM_LOCK_PUT(lock);
2865         return rc;
2866 }
2867
2868 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2869                         struct lov_oinfo *loi, int flags,
2870                         struct ost_lvb *lvb, __u32 mode, int rc)
2871 {
2872         if (rc == ELDLM_OK) {
2873                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2874                 __u64 tmp;
2875
2876                 LASSERT(lock != NULL);
2877                 loi->loi_lvb = *lvb;
2878                 tmp = loi->loi_lvb.lvb_size;
2879                 /* Extend KMS up to the end of this lock and no further
2880                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2881                 if (tmp > lock->l_policy_data.l_extent.end)
2882                         tmp = lock->l_policy_data.l_extent.end + 1;
2883                 if (tmp >= loi->loi_kms) {
2884                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2885                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2886                         loi_kms_set(loi, tmp);
2887                 } else {
2888                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2889                                    LPU64"; leaving kms="LPU64", end="LPU64,
2890                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2891                                    lock->l_policy_data.l_extent.end);
2892                 }
2893                 ldlm_lock_allow_match(lock);
2894                 LDLM_LOCK_PUT(lock);
2895         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2896                 loi->loi_lvb = *lvb;
2897                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2898                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2899                 rc = ELDLM_OK;
2900         }
2901 }
2902 EXPORT_SYMBOL(osc_update_enqueue);
2903
2904 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2905
2906 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2907  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2908  * other synchronous requests, however keeping some locks and trying to obtain
2909  * others may take a considerable amount of time in a case of ost failure; and
2910  * when other sync requests do not get released lock from a client, the client
2911  * is excluded from the cluster -- such scenarious make the life difficult, so
2912  * release locks just after they are obtained. */
2913 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2914                      int *flags, ldlm_policy_data_t *policy,
2915                      struct ost_lvb *lvb, int kms_valid,
2916                      obd_enqueue_update_f upcall, void *cookie,
2917                      struct ldlm_enqueue_info *einfo,
2918                      struct lustre_handle *lockh,
2919                      struct ptlrpc_request_set *rqset, int async)
2920 {
2921         struct obd_device *obd = exp->exp_obd;
2922         struct ptlrpc_request *req = NULL;
2923         int intent = *flags & LDLM_FL_HAS_INTENT;
2924         ldlm_mode_t mode;
2925         int rc;
2926         ENTRY;
2927
2928         /* Filesystem lock extents are extended to page boundaries so that
2929          * dealing with the page cache is a little smoother.  */
2930         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2931         policy->l_extent.end |= ~CFS_PAGE_MASK;
2932
2933         /*
2934          * kms is not valid when either object is completely fresh (so that no
2935          * locks are cached), or object was evicted. In the latter case cached
2936          * lock cannot be used, because it would prime inode state with
2937          * potentially stale LVB.
2938          */
2939         if (!kms_valid)
2940                 goto no_match;
2941
2942         /* Next, search for already existing extent locks that will cover us */
2943         /* If we're trying to read, we also search for an existing PW lock.  The
2944          * VFS and page cache already protect us locally, so lots of readers/
2945          * writers can share a single PW lock.
2946          *
2947          * There are problems with conversion deadlocks, so instead of
2948          * converting a read lock to a write lock, we'll just enqueue a new
2949          * one.
2950          *
2951          * At some point we should cancel the read lock instead of making them
2952          * send us a blocking callback, but there are problems with canceling
2953          * locks out from other users right now, too. */
2954         mode = einfo->ei_mode;
2955         if (einfo->ei_mode == LCK_PR)
2956                 mode |= LCK_PW;
2957         mode = ldlm_lock_match(obd->obd_namespace,
2958                                *flags | LDLM_FL_LVB_READY, res_id,
2959                                einfo->ei_type, policy, mode, lockh, 0);
2960         if (mode) {
2961                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2962
2963                 if (matched->l_ast_data == NULL ||
2964                     matched->l_ast_data == einfo->ei_cbdata) {
2965                         /* addref the lock only if not async requests and PW
2966                          * lock is matched whereas we asked for PR. */
2967                         if (!rqset && einfo->ei_mode != mode)
2968                                 ldlm_lock_addref(lockh, LCK_PR);
2969                         osc_set_lock_data_with_check(matched, einfo, *flags);
2970                         if (intent) {
2971                                 /* I would like to be able to ASSERT here that
2972                                  * rss <= kms, but I can't, for reasons which
2973                                  * are explained in lov_enqueue() */
2974                         }
2975
2976                         /* We already have a lock, and it's referenced */
2977                         (*upcall)(cookie, ELDLM_OK);
2978
2979                         /* For async requests, decref the lock. */
2980                         if (einfo->ei_mode != mode)
2981                                 ldlm_lock_decref(lockh, LCK_PW);
2982                         else if (rqset)
2983                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2984                         LDLM_LOCK_PUT(matched);
2985                         RETURN(ELDLM_OK);
2986                 } else
2987                         ldlm_lock_decref(lockh, mode);
2988                 LDLM_LOCK_PUT(matched);
2989         }
2990
2991  no_match:
2992         if (intent) {
2993                 CFS_LIST_HEAD(cancels);
2994                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2995                                            &RQF_LDLM_ENQUEUE_LVB);
2996                 if (req == NULL)
2997                         RETURN(-ENOMEM);
2998
2999                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3000                 if (rc)
3001                         RETURN(rc);
3002
3003                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3004                                      sizeof *lvb);
3005                 ptlrpc_request_set_replen(req);
3006         }
3007
3008         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3009         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3010
3011         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3012                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3013         if (rqset) {
3014                 if (!rc) {
3015                         struct osc_enqueue_args *aa;
3016                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3017                         aa = ptlrpc_req_async_args(req);
3018                         aa->oa_ei = einfo;
3019                         aa->oa_exp = exp;
3020                         aa->oa_flags  = flags;
3021                         aa->oa_upcall = upcall;
3022                         aa->oa_cookie = cookie;
3023                         aa->oa_lvb    = lvb;
3024                         aa->oa_lockh  = lockh;
3025
3026                         req->rq_interpret_reply =
3027                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3028                         if (rqset == PTLRPCD_SET)
3029                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3030                         else
3031                                 ptlrpc_set_add_req(rqset, req);
3032                 } else if (intent) {
3033                         ptlrpc_req_finished(req);
3034                 }
3035                 RETURN(rc);
3036         }
3037
3038         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3039         if (intent)
3040                 ptlrpc_req_finished(req);
3041
3042         RETURN(rc);
3043 }
3044
3045 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3046                        struct ldlm_enqueue_info *einfo,
3047                        struct ptlrpc_request_set *rqset)
3048 {
3049         struct ldlm_res_id res_id;
3050         int rc;
3051         ENTRY;
3052
3053         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3054                            oinfo->oi_md->lsm_object_gr, &res_id);
3055
3056         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3057                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3058                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3059                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3060                               rqset, rqset != NULL);
3061         RETURN(rc);
3062 }
3063
3064 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3065                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3066                    int *flags, void *data, struct lustre_handle *lockh,
3067                    int unref)
3068 {
3069         struct obd_device *obd = exp->exp_obd;
3070         int lflags = *flags;
3071         ldlm_mode_t rc;
3072         ENTRY;
3073
3074         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3075                 RETURN(-EIO);
3076
3077         /* Filesystem lock extents are extended to page boundaries so that
3078          * dealing with the page cache is a little smoother */
3079         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3080         policy->l_extent.end |= ~CFS_PAGE_MASK;
3081
3082         /* Next, search for already existing extent locks that will cover us */
3083         /* If we're trying to read, we also search for an existing PW lock.  The
3084          * VFS and page cache already protect us locally, so lots of readers/
3085          * writers can share a single PW lock. */
3086         rc = mode;
3087         if (mode == LCK_PR)
3088                 rc |= LCK_PW;
3089         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3090                              res_id, type, policy, rc, lockh, unref);
3091         if (rc) {
3092                 if (data != NULL)
3093                         osc_set_data_with_check(lockh, data, lflags);
3094                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3095                         ldlm_lock_addref(lockh, LCK_PR);
3096                         ldlm_lock_decref(lockh, LCK_PW);
3097                 }
3098                 RETURN(rc);
3099         }
3100         RETURN(rc);
3101 }
3102
3103 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3104 {
3105         ENTRY;
3106
3107         if (unlikely(mode == LCK_GROUP))
3108                 ldlm_lock_decref_and_cancel(lockh, mode);
3109         else
3110                 ldlm_lock_decref(lockh, mode);
3111
3112         RETURN(0);
3113 }
3114
3115 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3116                       __u32 mode, struct lustre_handle *lockh)
3117 {
3118         ENTRY;
3119         RETURN(osc_cancel_base(lockh, mode));
3120 }
3121
3122 static int osc_cancel_unused(struct obd_export *exp,
3123                              struct lov_stripe_md *lsm, int flags,
3124                              void *opaque)
3125 {
3126         struct obd_device *obd = class_exp2obd(exp);
3127         struct ldlm_res_id res_id, *resp = NULL;
3128
3129         if (lsm != NULL) {
3130                 resp = osc_build_res_name(lsm->lsm_object_id,
3131                                           lsm->lsm_object_gr, &res_id);
3132         }
3133
3134         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3135 }
3136
3137 static int osc_statfs_interpret(const struct lu_env *env,
3138                                 struct ptlrpc_request *req,
3139                                 struct osc_async_args *aa, int rc)
3140 {
3141         struct obd_statfs *msfs;
3142         ENTRY;
3143
3144         if (rc != 0)
3145                 GOTO(out, rc);
3146
3147         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3148         if (msfs == NULL) {
3149                 GOTO(out, rc = -EPROTO);
3150         }
3151
3152         *aa->aa_oi->oi_osfs = *msfs;
3153 out:
3154         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3155         RETURN(rc);
3156 }
3157
3158 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3159                             __u64 max_age, struct ptlrpc_request_set *rqset)
3160 {
3161         struct ptlrpc_request *req;
3162         struct osc_async_args *aa;
3163         int                    rc;
3164         ENTRY;
3165
3166         /* We could possibly pass max_age in the request (as an absolute
3167          * timestamp or a&