Whamcloud - gitweb
b=22755 Don't consume grant twice on recoverable resend
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync(struct obd_export *exp, struct obdo *oa,
568                     struct lov_stripe_md *md, obd_size start, obd_size end,
569                     void *capa)
570 {
571         struct ptlrpc_request *req;
572         struct ost_body       *body;
573         int                    rc;
574         ENTRY;
575
576         if (!oa) {
577                 CDEBUG(D_INFO, "oa NULL\n");
578                 RETURN(-EINVAL);
579         }
580
581         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
582         if (req == NULL)
583                 RETURN(-ENOMEM);
584
585         osc_set_capa_size(req, &RMF_CAPA1, capa);
586         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
587         if (rc) {
588                 ptlrpc_request_free(req);
589                 RETURN(rc);
590         }
591
592         /* overload the size and blocks fields in the oa with start/end */
593         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
594         LASSERT(body);
595         lustre_set_wire_obdo(&body->oa, oa);
596         body->oa.o_size = start;
597         body->oa.o_blocks = end;
598         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
599         osc_pack_capa(req, body, capa);
600
601         ptlrpc_request_set_replen(req);
602
603         rc = ptlrpc_queue_wait(req);
604         if (rc)
605                 GOTO(out, rc);
606
607         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
608         if (body == NULL)
609                 GOTO(out, rc = -EPROTO);
610
611         lustre_get_wire_obdo(oa, &body->oa);
612
613         EXIT;
614  out:
615         ptlrpc_req_finished(req);
616         return rc;
617 }
618
619 /* Find and cancel locally locks matched by @mode in the resource found by
620  * @objid. Found locks are added into @cancel list. Returns the amount of
621  * locks added to @cancels list. */
622 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
623                                    cfs_list_t *cancels,
624                                    ldlm_mode_t mode, int lock_flags)
625 {
626         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
627         struct ldlm_res_id res_id;
628         struct ldlm_resource *res;
629         int count;
630         ENTRY;
631
632         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
633         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
634         if (res == NULL)
635                 RETURN(0);
636
637         LDLM_RESOURCE_ADDREF(res);
638         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
639                                            lock_flags, 0, NULL);
640         LDLM_RESOURCE_DELREF(res);
641         ldlm_resource_putref(res);
642         RETURN(count);
643 }
644
645 static int osc_destroy_interpret(const struct lu_env *env,
646                                  struct ptlrpc_request *req, void *data,
647                                  int rc)
648 {
649         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
650
651         cfs_atomic_dec(&cli->cl_destroy_in_flight);
652         cfs_waitq_signal(&cli->cl_destroy_waitq);
653         return 0;
654 }
655
656 static int osc_can_send_destroy(struct client_obd *cli)
657 {
658         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
659             cli->cl_max_rpcs_in_flight) {
660                 /* The destroy request can be sent */
661                 return 1;
662         }
663         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
664             cli->cl_max_rpcs_in_flight) {
665                 /*
666                  * The counter has been modified between the two atomic
667                  * operations.
668                  */
669                 cfs_waitq_signal(&cli->cl_destroy_waitq);
670         }
671         return 0;
672 }
673
674 /* Destroy requests can be async always on the client, and we don't even really
675  * care about the return code since the client cannot do anything at all about
676  * a destroy failure.
677  * When the MDS is unlinking a filename, it saves the file objects into a
678  * recovery llog, and these object records are cancelled when the OST reports
679  * they were destroyed and sync'd to disk (i.e. transaction committed).
680  * If the client dies, or the OST is down when the object should be destroyed,
681  * the records are not cancelled, and when the OST reconnects to the MDS next,
682  * it will retrieve the llog unlink logs and then sends the log cancellation
683  * cookies to the MDS after committing destroy transactions. */
684 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
685                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
686                        struct obd_export *md_export, void *capa)
687 {
688         struct client_obd     *cli = &exp->exp_obd->u.cli;
689         struct ptlrpc_request *req;
690         struct ost_body       *body;
691         CFS_LIST_HEAD(cancels);
692         int rc, count;
693         ENTRY;
694
695         if (!oa) {
696                 CDEBUG(D_INFO, "oa NULL\n");
697                 RETURN(-EINVAL);
698         }
699
700         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
701                                         LDLM_FL_DISCARD_DATA);
702
703         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
704         if (req == NULL) {
705                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
706                 RETURN(-ENOMEM);
707         }
708
709         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
710         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
711                                0, &cancels, count);
712         if (rc) {
713                 ptlrpc_request_free(req);
714                 RETURN(rc);
715         }
716
717         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
718         ptlrpc_at_set_req_timeout(req);
719
720         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
721                 oa->o_lcookie = *oti->oti_logcookies;
722         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
723         LASSERT(body);
724         lustre_set_wire_obdo(&body->oa, oa);
725
726         osc_pack_capa(req, body, (struct obd_capa *)capa);
727         ptlrpc_request_set_replen(req);
728
729         /* don't throttle destroy RPCs for the MDT */
730         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
731                 req->rq_interpret_reply = osc_destroy_interpret;
732                 if (!osc_can_send_destroy(cli)) {
733                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
734                                                           NULL);
735
736                         /*
737                          * Wait until the number of on-going destroy RPCs drops
738                          * under max_rpc_in_flight
739                          */
740                         l_wait_event_exclusive(cli->cl_destroy_waitq,
741                                                osc_can_send_destroy(cli), &lwi);
742                 }
743         }
744
745         /* Do not wait for response */
746         ptlrpcd_add_req(req, PSCOPE_OTHER);
747         RETURN(0);
748 }
749
750 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
751                                 long writing_bytes)
752 {
753         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
754
755         LASSERT(!(oa->o_valid & bits));
756
757         oa->o_valid |= bits;
758         client_obd_list_lock(&cli->cl_loi_list_lock);
759         oa->o_dirty = cli->cl_dirty;
760         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
761                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
762                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
763                 oa->o_undirty = 0;
764         } else if (cfs_atomic_read(&obd_dirty_pages) -
765                    cfs_atomic_read(&obd_dirty_transit_pages) >
766                    obd_max_dirty_pages + 1){
767                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
768                  * not covered by a lock thus they may safely race and trip
769                  * this CERROR() unless we add in a small fudge factor (+1). */
770                 CERROR("dirty %d - %d > system dirty_max %d\n",
771                        cfs_atomic_read(&obd_dirty_pages),
772                        cfs_atomic_read(&obd_dirty_transit_pages),
773                        obd_max_dirty_pages);
774                 oa->o_undirty = 0;
775         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
776                 CERROR("dirty %lu - dirty_max %lu too big???\n",
777                        cli->cl_dirty, cli->cl_dirty_max);
778                 oa->o_undirty = 0;
779         } else {
780                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
781                                 (cli->cl_max_rpcs_in_flight + 1);
782                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
783         }
784         oa->o_grant = cli->cl_avail_grant;
785         oa->o_dropped = cli->cl_lost_grant;
786         cli->cl_lost_grant = 0;
787         client_obd_list_unlock(&cli->cl_loi_list_lock);
788         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
789                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
790
791 }
792
793 static void osc_update_next_shrink(struct client_obd *cli)
794 {
795         cli->cl_next_shrink_grant =
796                 cfs_time_shift(cli->cl_grant_shrink_interval);
797         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
798                cli->cl_next_shrink_grant);
799 }
800
801 /* caller must hold loi_list_lock */
802 static void osc_consume_write_grant(struct client_obd *cli,
803                                     struct brw_page *pga)
804 {
805         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
806         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
807         cfs_atomic_inc(&obd_dirty_pages);
808         cli->cl_dirty += CFS_PAGE_SIZE;
809         cli->cl_avail_grant -= CFS_PAGE_SIZE;
810         pga->flag |= OBD_BRW_FROM_GRANT;
811         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
812                CFS_PAGE_SIZE, pga, pga->pg);
813         LASSERT(cli->cl_avail_grant >= 0);
814         osc_update_next_shrink(cli);
815 }
816
817 /* the companion to osc_consume_write_grant, called when a brw has completed.
818  * must be called with the loi lock held. */
819 static void osc_release_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga, int sent)
821 {
822         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
823         ENTRY;
824
825         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
826         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
827                 EXIT;
828                 return;
829         }
830
831         pga->flag &= ~OBD_BRW_FROM_GRANT;
832         cfs_atomic_dec(&obd_dirty_pages);
833         cli->cl_dirty -= CFS_PAGE_SIZE;
834         if (pga->flag & OBD_BRW_NOCACHE) {
835                 pga->flag &= ~OBD_BRW_NOCACHE;
836                 cfs_atomic_dec(&obd_dirty_transit_pages);
837                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
838         }
839         if (!sent) {
840                 cli->cl_lost_grant += CFS_PAGE_SIZE;
841                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
842                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
843         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
844                 /* For short writes we shouldn't count parts of pages that
845                  * span a whole block on the OST side, or our accounting goes
846                  * wrong.  Should match the code in filter_grant_check. */
847                 int offset = pga->off & ~CFS_PAGE_MASK;
848                 int count = pga->count + (offset & (blocksize - 1));
849                 int end = (offset + pga->count) & (blocksize - 1);
850                 if (end)
851                         count += blocksize - end;
852
853                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
854                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
855                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
856                        cli->cl_avail_grant, cli->cl_dirty);
857         }
858
859         EXIT;
860 }
861
862 static unsigned long rpcs_in_flight(struct client_obd *cli)
863 {
864         return cli->cl_r_in_flight + cli->cl_w_in_flight;
865 }
866
867 int osc_wake_sync_fs(struct client_obd *cli)
868 {
869         ENTRY;
870         if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
871             cli->cl_sf_wait.started) {
872                 cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
873                 cli->cl_sf_wait.started = 0;
874         }
875         RETURN(0);
876 }
877
878 /* caller must hold loi_list_lock */
879 void osc_wake_cache_waiters(struct client_obd *cli)
880 {
881         cfs_list_t *l, *tmp;
882         struct osc_cache_waiter *ocw;
883
884         ENTRY;
885         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
886                 /* if we can't dirty more, we must wait until some is written */
887                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
888                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
889                     obd_max_dirty_pages)) {
890                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
891                                "osc max %ld, sys max %d\n", cli->cl_dirty,
892                                cli->cl_dirty_max, obd_max_dirty_pages);
893                         return;
894                 }
895
896                 /* if still dirty cache but no grant wait for pending RPCs that
897                  * may yet return us some grant before doing sync writes */
898                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
899                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
900                                cli->cl_w_in_flight);
901                         return;
902                 }
903
904                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
905                 cfs_list_del_init(&ocw->ocw_entry);
906                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
907                         /* no more RPCs in flight to return grant, do sync IO */
908                         ocw->ocw_rc = -EDQUOT;
909                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
910                 } else {
911                         osc_consume_write_grant(cli,
912                                                 &ocw->ocw_oap->oap_brw_page);
913                 }
914
915                 cfs_waitq_signal(&ocw->ocw_waitq);
916         }
917
918         EXIT;
919 }
920
921 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
922 {
923         client_obd_list_lock(&cli->cl_loi_list_lock);
924         cli->cl_avail_grant += grant;
925         client_obd_list_unlock(&cli->cl_loi_list_lock);
926 }
927
928 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
929 {
930         if (body->oa.o_valid & OBD_MD_FLGRANT) {
931                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
932                 __osc_update_grant(cli, body->oa.o_grant);
933         }
934 }
935
936 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
937                               void *key, obd_count vallen, void *val,
938                               struct ptlrpc_request_set *set);
939
940 static int osc_shrink_grant_interpret(const struct lu_env *env,
941                                       struct ptlrpc_request *req,
942                                       void *aa, int rc)
943 {
944         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
945         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
946         struct ost_body *body;
947
948         if (rc != 0) {
949                 __osc_update_grant(cli, oa->o_grant);
950                 GOTO(out, rc);
951         }
952
953         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
954         LASSERT(body);
955         osc_update_grant(cli, body);
956 out:
957         OBDO_FREE(oa);
958         return rc;
959 }
960
961 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
962 {
963         client_obd_list_lock(&cli->cl_loi_list_lock);
964         oa->o_grant = cli->cl_avail_grant / 4;
965         cli->cl_avail_grant -= oa->o_grant;
966         client_obd_list_unlock(&cli->cl_loi_list_lock);
967         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
968                 oa->o_valid |= OBD_MD_FLFLAGS;
969                 oa->o_flags = 0;
970         }
971         oa->o_flags |= OBD_FL_SHRINK_GRANT;
972         osc_update_next_shrink(cli);
973 }
974
975 /* Shrink the current grant, either from some large amount to enough for a
976  * full set of in-flight RPCs, or if we have already shrunk to that limit
977  * then to enough for a single RPC.  This avoids keeping more grant than
978  * needed, and avoids shrinking the grant piecemeal. */
979 static int osc_shrink_grant(struct client_obd *cli)
980 {
981         long target = (cli->cl_max_rpcs_in_flight + 1) *
982                       cli->cl_max_pages_per_rpc;
983
984         client_obd_list_lock(&cli->cl_loi_list_lock);
985         if (cli->cl_avail_grant <= target)
986                 target = cli->cl_max_pages_per_rpc;
987         client_obd_list_unlock(&cli->cl_loi_list_lock);
988
989         return osc_shrink_grant_to_target(cli, target);
990 }
991
992 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
993 {
994         int    rc = 0;
995         struct ost_body     *body;
996         ENTRY;
997
998         client_obd_list_lock(&cli->cl_loi_list_lock);
999         /* Don't shrink if we are already above or below the desired limit
1000          * We don't want to shrink below a single RPC, as that will negatively
1001          * impact block allocation and long-term performance. */
1002         if (target < cli->cl_max_pages_per_rpc)
1003                 target = cli->cl_max_pages_per_rpc;
1004
1005         if (target >= cli->cl_avail_grant) {
1006                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1007                 RETURN(0);
1008         }
1009         client_obd_list_unlock(&cli->cl_loi_list_lock);
1010
1011         OBD_ALLOC_PTR(body);
1012         if (!body)
1013                 RETURN(-ENOMEM);
1014
1015         osc_announce_cached(cli, &body->oa, 0);
1016
1017         client_obd_list_lock(&cli->cl_loi_list_lock);
1018         body->oa.o_grant = cli->cl_avail_grant - target;
1019         cli->cl_avail_grant = target;
1020         client_obd_list_unlock(&cli->cl_loi_list_lock);
1021         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1022                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1023                 body->oa.o_flags = 0;
1024         }
1025         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1026         osc_update_next_shrink(cli);
1027
1028         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1029                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1030                                 sizeof(*body), body, NULL);
1031         if (rc != 0)
1032                 __osc_update_grant(cli, body->oa.o_grant);
1033         OBD_FREE_PTR(body);
1034         RETURN(rc);
1035 }
1036
1037 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1038 static int osc_should_shrink_grant(struct client_obd *client)
1039 {
1040         cfs_time_t time = cfs_time_current();
1041         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1042
1043         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1044              OBD_CONNECT_GRANT_SHRINK) == 0)
1045                 return 0;
1046
1047         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1048                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1049                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1050                         return 1;
1051                 else
1052                         osc_update_next_shrink(client);
1053         }
1054         return 0;
1055 }
1056
1057 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1058 {
1059         struct client_obd *client;
1060
1061         cfs_list_for_each_entry(client, &item->ti_obd_list,
1062                                 cl_grant_shrink_list) {
1063                 if (osc_should_shrink_grant(client))
1064                         osc_shrink_grant(client);
1065         }
1066         return 0;
1067 }
1068
1069 static int osc_add_shrink_grant(struct client_obd *client)
1070 {
1071         int rc;
1072
1073         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1074                                        TIMEOUT_GRANT,
1075                                        osc_grant_shrink_grant_cb, NULL,
1076                                        &client->cl_grant_shrink_list);
1077         if (rc) {
1078                 CERROR("add grant client %s error %d\n",
1079                         client->cl_import->imp_obd->obd_name, rc);
1080                 return rc;
1081         }
1082         CDEBUG(D_CACHE, "add grant client %s \n",
1083                client->cl_import->imp_obd->obd_name);
1084         osc_update_next_shrink(client);
1085         return 0;
1086 }
1087
1088 static int osc_del_shrink_grant(struct client_obd *client)
1089 {
1090         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1091                                          TIMEOUT_GRANT);
1092 }
1093
1094 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1095 {
1096         /*
1097          * ocd_grant is the total grant amount we're expect to hold: if we've
1098          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1099          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1100          *
1101          * race is tolerable here: if we're evicted, but imp_state already
1102          * left EVICTED state, then cl_dirty must be 0 already.
1103          */
1104         client_obd_list_lock(&cli->cl_loi_list_lock);
1105         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1106                 cli->cl_avail_grant = ocd->ocd_grant;
1107         else
1108                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1109
1110         if (cli->cl_avail_grant < 0) {
1111                 CWARN("%s: available grant < 0, the OSS is probably not running"
1112                       " with patch from bug20278 (%ld) \n",
1113                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1114                 /* workaround for 1.6 servers which do not have 
1115                  * the patch from bug20278 */
1116                 cli->cl_avail_grant = ocd->ocd_grant;
1117         }
1118
1119         client_obd_list_unlock(&cli->cl_loi_list_lock);
1120
1121         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1122                cli->cl_import->imp_obd->obd_name,
1123                cli->cl_avail_grant, cli->cl_lost_grant);
1124
1125         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1126             cfs_list_empty(&cli->cl_grant_shrink_list))
1127                 osc_add_shrink_grant(cli);
1128 }
1129
1130 /* We assume that the reason this OSC got a short read is because it read
1131  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1132  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1133  * this stripe never got written at or beyond this stripe offset yet. */
1134 static void handle_short_read(int nob_read, obd_count page_count,
1135                               struct brw_page **pga)
1136 {
1137         char *ptr;
1138         int i = 0;
1139
1140         /* skip bytes read OK */
1141         while (nob_read > 0) {
1142                 LASSERT (page_count > 0);
1143
1144                 if (pga[i]->count > nob_read) {
1145                         /* EOF inside this page */
1146                         ptr = cfs_kmap(pga[i]->pg) +
1147                                 (pga[i]->off & ~CFS_PAGE_MASK);
1148                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1149                         cfs_kunmap(pga[i]->pg);
1150                         page_count--;
1151                         i++;
1152                         break;
1153                 }
1154
1155                 nob_read -= pga[i]->count;
1156                 page_count--;
1157                 i++;
1158         }
1159
1160         /* zero remaining pages */
1161         while (page_count-- > 0) {
1162                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1163                 memset(ptr, 0, pga[i]->count);
1164                 cfs_kunmap(pga[i]->pg);
1165                 i++;
1166         }
1167 }
1168
1169 static int check_write_rcs(struct ptlrpc_request *req,
1170                            int requested_nob, int niocount,
1171                            obd_count page_count, struct brw_page **pga)
1172 {
1173         int     i;
1174         __u32   *remote_rcs;
1175
1176         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1177                                                   sizeof(*remote_rcs) *
1178                                                   niocount);
1179         if (remote_rcs == NULL) {
1180                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1181                 return(-EPROTO);
1182         }
1183
1184         /* return error if any niobuf was in error */
1185         for (i = 0; i < niocount; i++) {
1186                 if (remote_rcs[i] < 0)
1187                         return(remote_rcs[i]);
1188
1189                 if (remote_rcs[i] != 0) {
1190                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1191                                 i, remote_rcs[i], req);
1192                         return(-EPROTO);
1193                 }
1194         }
1195
1196         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1197                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1198                        req->rq_bulk->bd_nob_transferred, requested_nob);
1199                 return(-EPROTO);
1200         }
1201
1202         return (0);
1203 }
1204
1205 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1206 {
1207         if (p1->flag != p2->flag) {
1208                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1209                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1210
1211                 /* warn if we try to combine flags that we don't know to be
1212                  * safe to combine */
1213                 if ((p1->flag & mask) != (p2->flag & mask))
1214                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1215                                "same brw?\n", p1->flag, p2->flag);
1216                 return 0;
1217         }
1218
1219         return (p1->off + p1->count == p2->off);
1220 }
1221
1222 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1223                                    struct brw_page **pga, int opc,
1224                                    cksum_type_t cksum_type)
1225 {
1226         __u32 cksum;
1227         int i = 0;
1228
1229         LASSERT (pg_count > 0);
1230         cksum = init_checksum(cksum_type);
1231         while (nob > 0 && pg_count > 0) {
1232                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1233                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1234                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1235
1236                 /* corrupt the data before we compute the checksum, to
1237                  * simulate an OST->client data error */
1238                 if (i == 0 && opc == OST_READ &&
1239                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1240                         memcpy(ptr + off, "bad1", min(4, nob));
1241                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1242                 cfs_kunmap(pga[i]->pg);
1243                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1244                                off, cksum);
1245
1246                 nob -= pga[i]->count;
1247                 pg_count--;
1248                 i++;
1249         }
1250         /* For sending we only compute the wrong checksum instead
1251          * of corrupting the data so it is still correct on a redo */
1252         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1253                 cksum++;
1254
1255         return cksum;
1256 }
1257
1258 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1259                                 struct lov_stripe_md *lsm, obd_count page_count,
1260                                 struct brw_page **pga,
1261                                 struct ptlrpc_request **reqp,
1262                                 struct obd_capa *ocapa, int reserve,
1263                                 int resend)
1264 {
1265         struct ptlrpc_request   *req;
1266         struct ptlrpc_bulk_desc *desc;
1267         struct ost_body         *body;
1268         struct obd_ioobj        *ioobj;
1269         struct niobuf_remote    *niobuf;
1270         int niocount, i, requested_nob, opc, rc;
1271         struct osc_brw_async_args *aa;
1272         struct req_capsule      *pill;
1273         struct brw_page *pg_prev;
1274
1275         ENTRY;
1276         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1277                 RETURN(-ENOMEM); /* Recoverable */
1278         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1279                 RETURN(-EINVAL); /* Fatal */
1280
1281         if ((cmd & OBD_BRW_WRITE) != 0) {
1282                 opc = OST_WRITE;
1283                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1284                                                 cli->cl_import->imp_rq_pool,
1285                                                 &RQF_OST_BRW_WRITE);
1286         } else {
1287                 opc = OST_READ;
1288                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1289         }
1290         if (req == NULL)
1291                 RETURN(-ENOMEM);
1292
1293         for (niocount = i = 1; i < page_count; i++) {
1294                 if (!can_merge_pages(pga[i - 1], pga[i]))
1295                         niocount++;
1296         }
1297
1298         pill = &req->rq_pill;
1299         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1300                              sizeof(*ioobj));
1301         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1302                              niocount * sizeof(*niobuf));
1303         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1304
1305         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1306         if (rc) {
1307                 ptlrpc_request_free(req);
1308                 RETURN(rc);
1309         }
1310         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1311         ptlrpc_at_set_req_timeout(req);
1312
1313         if (opc == OST_WRITE)
1314                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1315                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1316         else
1317                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1318                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1319
1320         if (desc == NULL)
1321                 GOTO(out, rc = -ENOMEM);
1322         /* NB request now owns desc and will free it when it gets freed */
1323
1324         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1325         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1326         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1327         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1328
1329         lustre_set_wire_obdo(&body->oa, oa);
1330
1331         obdo_to_ioobj(oa, ioobj);
1332         ioobj->ioo_bufcnt = niocount;
1333         osc_pack_capa(req, body, ocapa);
1334         LASSERT (page_count > 0);
1335         pg_prev = pga[0];
1336         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1337                 struct brw_page *pg = pga[i];
1338
1339                 LASSERT(pg->count > 0);
1340                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1341                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1342                          pg->off, pg->count);
1343 #ifdef __linux__
1344                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1346                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1347                          i, page_count,
1348                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1349                          pg_prev->pg, page_private(pg_prev->pg),
1350                          pg_prev->pg->index, pg_prev->off);
1351 #else
1352                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1353                          "i %d p_c %u\n", i, page_count);
1354 #endif
1355                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1356                         (pg->flag & OBD_BRW_SRVLOCK));
1357
1358                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1359                                       pg->count);
1360                 requested_nob += pg->count;
1361
1362                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1363                         niobuf--;
1364                         niobuf->len += pg->count;
1365                 } else {
1366                         niobuf->offset = pg->off;
1367                         niobuf->len    = pg->count;
1368                         niobuf->flags  = pg->flag;
1369                 }
1370                 pg_prev = pg;
1371         }
1372
1373         LASSERTF((void *)(niobuf - niocount) ==
1374                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1375                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1376                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1377
1378         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1379         if (resend) {
1380                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1381                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1382                         body->oa.o_flags = 0;
1383                 }
1384                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1385         }
1386
1387         if (osc_should_shrink_grant(cli))
1388                 osc_shrink_grant_local(cli, &body->oa);
1389
1390         /* size[REQ_REC_OFF] still sizeof (*body) */
1391         if (opc == OST_WRITE) {
1392                 if (unlikely(cli->cl_checksum) &&
1393                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394                         /* store cl_cksum_type in a local variable since
1395                          * it can be changed via lprocfs */
1396                         cksum_type_t cksum_type = cli->cl_cksum_type;
1397
1398                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1399                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1400                                 body->oa.o_flags = 0;
1401                         }
1402                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1403                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1404                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1405                                                              page_count, pga,
1406                                                              OST_WRITE,
1407                                                              cksum_type);
1408                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1409                                body->oa.o_cksum);
1410                         /* save this in 'oa', too, for later checking */
1411                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1412                         oa->o_flags |= cksum_type_pack(cksum_type);
1413                 } else {
1414                         /* clear out the checksum flag, in case this is a
1415                          * resend but cl_checksum is no longer set. b=11238 */
1416                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1417                 }
1418                 oa->o_cksum = body->oa.o_cksum;
1419                 /* 1 RC per niobuf */
1420                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1421                                      sizeof(__u32) * niocount);
1422         } else {
1423                 if (unlikely(cli->cl_checksum) &&
1424                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1425                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1426                                 body->oa.o_flags = 0;
1427                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1428                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1429                 }
1430         }
1431         ptlrpc_request_set_replen(req);
1432
1433         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1434         aa = ptlrpc_req_async_args(req);
1435         aa->aa_oa = oa;
1436         aa->aa_requested_nob = requested_nob;
1437         aa->aa_nio_count = niocount;
1438         aa->aa_page_count = page_count;
1439         aa->aa_resends = 0;
1440         aa->aa_ppga = pga;
1441         aa->aa_cli = cli;
1442         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1443         if (ocapa && reserve)
1444                 aa->aa_ocapa = capa_get(ocapa);
1445
1446         *reqp = req;
1447         RETURN(0);
1448
1449  out:
1450         ptlrpc_req_finished(req);
1451         RETURN(rc);
1452 }
1453
1454 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1455                                 __u32 client_cksum, __u32 server_cksum, int nob,
1456                                 obd_count page_count, struct brw_page **pga,
1457                                 cksum_type_t client_cksum_type)
1458 {
1459         __u32 new_cksum;
1460         char *msg;
1461         cksum_type_t cksum_type;
1462
1463         if (server_cksum == client_cksum) {
1464                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1465                 return 0;
1466         }
1467
1468         /* If this is mmaped file - it can be changed at any time */
1469         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1470                 return 1;
1471
1472         if (oa->o_valid & OBD_MD_FLFLAGS)
1473                 cksum_type = cksum_type_unpack(oa->o_flags);
1474         else
1475                 cksum_type = OBD_CKSUM_CRC32;
1476
1477         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1478                                       cksum_type);
1479
1480         if (cksum_type != client_cksum_type)
1481                 msg = "the server did not use the checksum type specified in "
1482                       "the original request - likely a protocol problem";
1483         else if (new_cksum == server_cksum)
1484                 msg = "changed on the client after we checksummed it - "
1485                       "likely false positive due to mmap IO (bug 11742)";
1486         else if (new_cksum == client_cksum)
1487                 msg = "changed in transit before arrival at OST";
1488         else
1489                 msg = "changed in transit AND doesn't match the original - "
1490                       "likely false positive due to mmap IO (bug 11742)";
1491
1492         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1493                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1494                            msg, libcfs_nid2str(peer->nid),
1495                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1496                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1497                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1498                            oa->o_id,
1499                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1500                            pga[0]->off,
1501                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1502         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1503                "client csum now %x\n", client_cksum, client_cksum_type,
1504                server_cksum, cksum_type, new_cksum);
1505         return 1;
1506 }
1507
1508 /* Note rc enters this function as number of bytes transferred */
1509 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1510 {
1511         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1512         const lnet_process_id_t *peer =
1513                         &req->rq_import->imp_connection->c_peer;
1514         struct client_obd *cli = aa->aa_cli;
1515         struct ost_body *body;
1516         __u32 client_cksum = 0;
1517         ENTRY;
1518
1519         if (rc < 0 && rc != -EDQUOT) {
1520                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1521                 RETURN(rc);
1522         }
1523
1524         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1525         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1526         if (body == NULL) {
1527                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1528                 RETURN(-EPROTO);
1529         }
1530
1531 #ifdef HAVE_QUOTA_SUPPORT
1532         /* set/clear over quota flag for a uid/gid */
1533         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1534             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1535                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1536
1537                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1538                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1539                        body->oa.o_flags);
1540                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1541                              body->oa.o_flags);
1542         }
1543 #endif
1544
1545         osc_update_grant(cli, body);
1546
1547         if (rc < 0)
1548                 RETURN(rc);
1549
1550         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1551                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1552
1553         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1554                 if (rc > 0) {
1555                         CERROR("Unexpected +ve rc %d\n", rc);
1556                         RETURN(-EPROTO);
1557                 }
1558                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1559
1560                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1561                         RETURN(-EAGAIN);
1562
1563                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1564                     check_write_checksum(&body->oa, peer, client_cksum,
1565                                          body->oa.o_cksum, aa->aa_requested_nob,
1566                                          aa->aa_page_count, aa->aa_ppga,
1567                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1568                         RETURN(-EAGAIN);
1569
1570                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1571                                      aa->aa_page_count, aa->aa_ppga);
1572                 GOTO(out, rc);
1573         }
1574
1575         /* The rest of this function executes only for OST_READs */
1576
1577         /* if unwrap_bulk failed, return -EAGAIN to retry */
1578         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1579         if (rc < 0)
1580                 GOTO(out, rc = -EAGAIN);
1581
1582         if (rc > aa->aa_requested_nob) {
1583                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1584                        aa->aa_requested_nob);
1585                 RETURN(-EPROTO);
1586         }
1587
1588         if (rc != req->rq_bulk->bd_nob_transferred) {
1589                 CERROR ("Unexpected rc %d (%d transferred)\n",
1590                         rc, req->rq_bulk->bd_nob_transferred);
1591                 return (-EPROTO);
1592         }
1593
1594         if (rc < aa->aa_requested_nob)
1595                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1596
1597         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1598                 static int cksum_counter;
1599                 __u32      server_cksum = body->oa.o_cksum;
1600                 char      *via;
1601                 char      *router;
1602                 cksum_type_t cksum_type;
1603
1604                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1605                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1606                 else
1607                         cksum_type = OBD_CKSUM_CRC32;
1608                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1609                                                  aa->aa_ppga, OST_READ,
1610                                                  cksum_type);
1611
1612                 if (peer->nid == req->rq_bulk->bd_sender) {
1613                         via = router = "";
1614                 } else {
1615                         via = " via ";
1616                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1617                 }
1618
1619                 if (server_cksum == ~0 && rc > 0) {
1620                         CERROR("Protocol error: server %s set the 'checksum' "
1621                                "bit, but didn't send a checksum.  Not fatal, "
1622                                "but please notify on http://bugzilla.lustre.org/\n",
1623                                libcfs_nid2str(peer->nid));
1624                 } else if (server_cksum != client_cksum) {
1625                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1626                                            "%s%s%s inode "DFID" object "
1627                                            LPU64"/"LPU64" extent "
1628                                            "["LPU64"-"LPU64"]\n",
1629                                            req->rq_import->imp_obd->obd_name,
1630                                            libcfs_nid2str(peer->nid),
1631                                            via, router,
1632                                            body->oa.o_valid & OBD_MD_FLFID ?
1633                                                 body->oa.o_parent_seq : (__u64)0,
1634                                            body->oa.o_valid & OBD_MD_FLFID ?
1635                                                 body->oa.o_parent_oid : 0,
1636                                            body->oa.o_valid & OBD_MD_FLFID ?
1637                                                 body->oa.o_parent_ver : 0,
1638                                            body->oa.o_id,
1639                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1640                                                 body->oa.o_seq : (__u64)0,
1641                                            aa->aa_ppga[0]->off,
1642                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1643                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1644                                                                         1);
1645                         CERROR("client %x, server %x, cksum_type %x\n",
1646                                client_cksum, server_cksum, cksum_type);
1647                         cksum_counter = 0;
1648                         aa->aa_oa->o_cksum = client_cksum;
1649                         rc = -EAGAIN;
1650                 } else {
1651                         cksum_counter++;
1652                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1653                         rc = 0;
1654                 }
1655         } else if (unlikely(client_cksum)) {
1656                 static int cksum_missed;
1657
1658                 cksum_missed++;
1659                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1660                         CERROR("Checksum %u requested from %s but not sent\n",
1661                                cksum_missed, libcfs_nid2str(peer->nid));
1662         } else {
1663                 rc = 0;
1664         }
1665 out:
1666         if (rc >= 0)
1667                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1668
1669         RETURN(rc);
1670 }
1671
1672 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1673                             struct lov_stripe_md *lsm,
1674                             obd_count page_count, struct brw_page **pga,
1675                             struct obd_capa *ocapa)
1676 {
1677         struct ptlrpc_request *req;
1678         int                    rc;
1679         cfs_waitq_t            waitq;
1680         int                    resends = 0;
1681         struct l_wait_info     lwi;
1682
1683         ENTRY;
1684
1685         cfs_waitq_init(&waitq);
1686
1687 restart_bulk:
1688         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1689                                   page_count, pga, &req, ocapa, 0, resends);
1690         if (rc != 0)
1691                 return (rc);
1692
1693         rc = ptlrpc_queue_wait(req);
1694
1695         if (rc == -ETIMEDOUT && req->rq_resend) {
1696                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1697                 ptlrpc_req_finished(req);
1698                 goto restart_bulk;
1699         }
1700
1701         rc = osc_brw_fini_request(req, rc);
1702
1703         ptlrpc_req_finished(req);
1704         if (osc_recoverable_error(rc)) {
1705                 resends++;
1706                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1707                         CERROR("too many resend retries, returning error\n");
1708                         RETURN(-EIO);
1709                 }
1710
1711                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1712                 l_wait_event(waitq, 0, &lwi);
1713
1714                 goto restart_bulk;
1715         }
1716
1717         RETURN (rc);
1718 }
1719
1720 int osc_brw_redo_request(struct ptlrpc_request *request,
1721                          struct osc_brw_async_args *aa)
1722 {
1723         struct ptlrpc_request *new_req;
1724         struct ptlrpc_request_set *set = request->rq_set;
1725         struct osc_brw_async_args *new_aa;
1726         struct osc_async_page *oap;
1727         int rc = 0;
1728         ENTRY;
1729
1730         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1731                 CERROR("too many resent retries, returning error\n");
1732                 RETURN(-EIO);
1733         }
1734
1735         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1736
1737         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1738                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1739                                   aa->aa_cli, aa->aa_oa,
1740                                   NULL /* lsm unused by osc currently */,
1741                                   aa->aa_page_count, aa->aa_ppga,
1742                                   &new_req, aa->aa_ocapa, 0, 1);
1743         if (rc)
1744                 RETURN(rc);
1745
1746         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1747
1748         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1749                 if (oap->oap_request != NULL) {
1750                         LASSERTF(request == oap->oap_request,
1751                                  "request %p != oap_request %p\n",
1752                                  request, oap->oap_request);
1753                         if (oap->oap_interrupted) {
1754                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1755                                 ptlrpc_req_finished(new_req);
1756                                 RETURN(-EINTR);
1757                         }
1758                 }
1759         }
1760         /* New request takes over pga and oaps from old request.
1761          * Note that copying a list_head doesn't work, need to move it... */
1762         aa->aa_resends++;
1763         new_req->rq_interpret_reply = request->rq_interpret_reply;
1764         new_req->rq_async_args = request->rq_async_args;
1765         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1766
1767         new_aa = ptlrpc_req_async_args(new_req);
1768
1769         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1770         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1771         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1772
1773         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1774                 if (oap->oap_request) {
1775                         ptlrpc_req_finished(oap->oap_request);
1776                         oap->oap_request = ptlrpc_request_addref(new_req);
1777                 }
1778         }
1779
1780         new_aa->aa_ocapa = aa->aa_ocapa;
1781         aa->aa_ocapa = NULL;
1782
1783         /* use ptlrpc_set_add_req is safe because interpret functions work
1784          * in check_set context. only one way exist with access to request
1785          * from different thread got -EINTR - this way protected with
1786          * cl_loi_list_lock */
1787         ptlrpc_set_add_req(set, new_req);
1788
1789         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1790
1791         DEBUG_REQ(D_INFO, new_req, "new request");
1792         RETURN(0);
1793 }
1794
1795 /*
1796  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1797  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1798  * fine for our small page arrays and doesn't require allocation.  its an
1799  * insertion sort that swaps elements that are strides apart, shrinking the
1800  * stride down until its '1' and the array is sorted.
1801  */
1802 static void sort_brw_pages(struct brw_page **array, int num)
1803 {
1804         int stride, i, j;
1805         struct brw_page *tmp;
1806
1807         if (num == 1)
1808                 return;
1809         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1810                 ;
1811
1812         do {
1813                 stride /= 3;
1814                 for (i = stride ; i < num ; i++) {
1815                         tmp = array[i];
1816                         j = i;
1817                         while (j >= stride && array[j - stride]->off > tmp->off) {
1818                                 array[j] = array[j - stride];
1819                                 j -= stride;
1820                         }
1821                         array[j] = tmp;
1822                 }
1823         } while (stride > 1);
1824 }
1825
1826 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1827 {
1828         int count = 1;
1829         int offset;
1830         int i = 0;
1831
1832         LASSERT (pages > 0);
1833         offset = pg[i]->off & ~CFS_PAGE_MASK;
1834
1835         for (;;) {
1836                 pages--;
1837                 if (pages == 0)         /* that's all */
1838                         return count;
1839
1840                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1841                         return count;   /* doesn't end on page boundary */
1842
1843                 i++;
1844                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1845                 if (offset != 0)        /* doesn't start on page boundary */
1846                         return count;
1847
1848                 count++;
1849         }
1850 }
1851
1852 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1853 {
1854         struct brw_page **ppga;
1855         int i;
1856
1857         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1858         if (ppga == NULL)
1859                 return NULL;
1860
1861         for (i = 0; i < count; i++)
1862                 ppga[i] = pga + i;
1863         return ppga;
1864 }
1865
1866 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1867 {
1868         LASSERT(ppga != NULL);
1869         OBD_FREE(ppga, sizeof(*ppga) * count);
1870 }
1871
1872 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1873                    obd_count page_count, struct brw_page *pga,
1874                    struct obd_trans_info *oti)
1875 {
1876         struct obdo *saved_oa = NULL;
1877         struct brw_page **ppga, **orig;
1878         struct obd_import *imp = class_exp2cliimp(exp);
1879         struct client_obd *cli;
1880         int rc, page_count_orig;
1881         ENTRY;
1882
1883         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1884         cli = &imp->imp_obd->u.cli;
1885
1886         if (cmd & OBD_BRW_CHECK) {
1887                 /* The caller just wants to know if there's a chance that this
1888                  * I/O can succeed */
1889
1890                 if (imp->imp_invalid)
1891                         RETURN(-EIO);
1892                 RETURN(0);
1893         }
1894
1895         /* test_brw with a failed create can trip this, maybe others. */
1896         LASSERT(cli->cl_max_pages_per_rpc);
1897
1898         rc = 0;
1899
1900         orig = ppga = osc_build_ppga(pga, page_count);
1901         if (ppga == NULL)
1902                 RETURN(-ENOMEM);
1903         page_count_orig = page_count;
1904
1905         sort_brw_pages(ppga, page_count);
1906         while (page_count) {
1907                 obd_count pages_per_brw;
1908
1909                 if (page_count > cli->cl_max_pages_per_rpc)
1910                         pages_per_brw = cli->cl_max_pages_per_rpc;
1911                 else
1912                         pages_per_brw = page_count;
1913
1914                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1915
1916                 if (saved_oa != NULL) {
1917                         /* restore previously saved oa */
1918                         *oinfo->oi_oa = *saved_oa;
1919                 } else if (page_count > pages_per_brw) {
1920                         /* save a copy of oa (brw will clobber it) */
1921                         OBDO_ALLOC(saved_oa);
1922                         if (saved_oa == NULL)
1923                                 GOTO(out, rc = -ENOMEM);
1924                         *saved_oa = *oinfo->oi_oa;
1925                 }
1926
1927                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1928                                       pages_per_brw, ppga, oinfo->oi_capa);
1929
1930                 if (rc != 0)
1931                         break;
1932
1933                 page_count -= pages_per_brw;
1934                 ppga += pages_per_brw;
1935         }
1936
1937 out:
1938         osc_release_ppga(orig, page_count_orig);
1939
1940         if (saved_oa != NULL)
1941                 OBDO_FREE(saved_oa);
1942
1943         RETURN(rc);
1944 }
1945
1946 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1947  * the dirty accounting.  Writeback completes or truncate happens before
1948  * writing starts.  Must be called with the loi lock held. */
1949 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1950                            int sent)
1951 {
1952         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1953 }
1954
1955 static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
1956 {
1957         struct osc_async_page *oap;
1958         ENTRY;
1959
1960         if (cfs_list_empty(&lop->lop_urgent))
1961                 RETURN(0);
1962
1963         oap = cfs_list_entry(lop->lop_urgent.next,
1964                              struct osc_async_page, oap_urgent_item);
1965
1966         if (oap->oap_async_flags & ASYNC_SYNCFS) {
1967                 CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
1968                 RETURN(1);
1969         }
1970
1971         RETURN(0);
1972 }
1973
1974 /* This maintains the lists of pending pages to read/write for a given object
1975  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1976  * to quickly find objects that are ready to send an RPC. */
1977 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1978                          int cmd)
1979 {
1980         int optimal;
1981         ENTRY;
1982
1983         if (lop->lop_num_pending == 0)
1984                 RETURN(0);
1985
1986         /* if we have an invalid import we want to drain the queued pages
1987          * by forcing them through rpcs that immediately fail and complete
1988          * the pages.  recovery relies on this to empty the queued pages
1989          * before canceling the locks and evicting down the llite pages */
1990         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1991                 RETURN(1);
1992
1993         /* stream rpcs in queue order as long as as there is an urgent page
1994          * queued.  this is our cheap solution for good batching in the case
1995          * where writepage marks some random page in the middle of the file
1996          * as urgent because of, say, memory pressure */
1997         if (!cfs_list_empty(&lop->lop_urgent)) {
1998                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1999                 RETURN(1);
2000         }
2001         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2002         optimal = cli->cl_max_pages_per_rpc;
2003         if (cmd & OBD_BRW_WRITE) {
2004                 /* trigger a write rpc stream as long as there are dirtiers
2005                  * waiting for space.  as they're waiting, they're not going to
2006                  * create more pages to coalesce with what's waiting.. */
2007                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2008                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2009                         RETURN(1);
2010                 }
2011                 /* +16 to avoid triggering rpcs that would want to include pages
2012                  * that are being queued but which can't be made ready until
2013                  * the queuer finishes with the page. this is a wart for
2014                  * llite::commit_write() */
2015                 optimal += 16;
2016         }
2017         if (lop->lop_num_pending >= optimal)
2018                 RETURN(1);
2019
2020         RETURN(0);
2021 }
2022
2023 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2024 {
2025         struct osc_async_page *oap;
2026         ENTRY;
2027
2028         if (cfs_list_empty(&lop->lop_urgent))
2029                 RETURN(0);
2030
2031         oap = cfs_list_entry(lop->lop_urgent.next,
2032                          struct osc_async_page, oap_urgent_item);
2033
2034         if (oap->oap_async_flags & ASYNC_HP) {
2035                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2036                 RETURN(1);
2037         }
2038
2039         RETURN(0);
2040 }
2041
2042 static void on_list(cfs_list_t *item, cfs_list_t *list,
2043                     int should_be_on)
2044 {
2045         if (cfs_list_empty(item) && should_be_on)
2046                 cfs_list_add_tail(item, list);
2047         else if (!cfs_list_empty(item) && !should_be_on)
2048                 cfs_list_del_init(item);
2049 }
2050
2051 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2052  * can find pages to build into rpcs quickly */
2053 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2054 {
2055         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2056             lop_makes_hprpc(&loi->loi_read_lop)) {
2057                 /* HP rpc */
2058                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2059                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2060         } else {
2061                 if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
2062                         on_list(&loi->loi_sync_fs_item,
2063                                 &cli->cl_loi_sync_fs_list,
2064                                 loi->loi_write_lop.lop_num_pending);
2065                 } else {
2066                         on_list(&loi->loi_hp_ready_item,
2067                                 &cli->cl_loi_hp_ready_list, 0);
2068                         on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2069                                 lop_makes_rpc(cli, &loi->loi_write_lop,
2070                                               OBD_BRW_WRITE)||
2071                                 lop_makes_rpc(cli, &loi->loi_read_lop,
2072                                               OBD_BRW_READ));
2073                 }
2074         }
2075
2076         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2077                 loi->loi_write_lop.lop_num_pending);
2078
2079         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2080                 loi->loi_read_lop.lop_num_pending);
2081 }
2082
2083 static void lop_update_pending(struct client_obd *cli,
2084                                struct loi_oap_pages *lop, int cmd, int delta)
2085 {
2086         lop->lop_num_pending += delta;
2087         if (cmd & OBD_BRW_WRITE)
2088                 cli->cl_pending_w_pages += delta;
2089         else
2090                 cli->cl_pending_r_pages += delta;
2091 }
2092
2093 /**
2094  * this is called when a sync waiter receives an interruption.  Its job is to
2095  * get the caller woken as soon as possible.  If its page hasn't been put in an
2096  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2097  * desiring interruption which will forcefully complete the rpc once the rpc
2098  * has timed out.
2099  */
2100 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2101 {
2102         struct loi_oap_pages *lop;
2103         struct lov_oinfo *loi;
2104         int rc = -EBUSY;
2105         ENTRY;
2106
2107         LASSERT(!oap->oap_interrupted);
2108         oap->oap_interrupted = 1;
2109
2110         /* ok, it's been put in an rpc. only one oap gets a request reference */
2111         if (oap->oap_request != NULL) {
2112                 ptlrpc_mark_interrupted(oap->oap_request);
2113                 ptlrpcd_wake(oap->oap_request);
2114                 ptlrpc_req_finished(oap->oap_request);
2115                 oap->oap_request = NULL;
2116         }
2117
2118         /*
2119          * page completion may be called only if ->cpo_prep() method was
2120          * executed by osc_io_submit(), that also adds page the to pending list
2121          */
2122         if (!cfs_list_empty(&oap->oap_pending_item)) {
2123                 cfs_list_del_init(&oap->oap_pending_item);
2124                 cfs_list_del_init(&oap->oap_urgent_item);
2125
2126                 loi = oap->oap_loi;
2127                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2128                         &loi->loi_write_lop : &loi->loi_read_lop;
2129                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2130                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2131                 rc = oap->oap_caller_ops->ap_completion(env,
2132                                           oap->oap_caller_data,
2133                                           oap->oap_cmd, NULL, -EINTR);
2134         }
2135
2136         RETURN(rc);
2137 }
2138
2139 /* this is trying to propogate async writeback errors back up to the
2140  * application.  As an async write fails we record the error code for later if
2141  * the app does an fsync.  As long as errors persist we force future rpcs to be
2142  * sync so that the app can get a sync error and break the cycle of queueing
2143  * pages for which writeback will fail. */
2144 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2145                            int rc)
2146 {
2147         if (rc) {
2148                 if (!ar->ar_rc)
2149                         ar->ar_rc = rc;
2150
2151                 ar->ar_force_sync = 1;
2152                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2153                 return;
2154
2155         }
2156
2157         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2158                 ar->ar_force_sync = 0;
2159 }
2160
2161 static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
2162                                  struct osc_async_page *oap,
2163                                  obd_flag async_flags)
2164 {
2165
2166         /* If true, then already present in lop urgent */
2167         if (!cfs_list_empty(&oap->oap_urgent_item)) {
2168                 CWARN("Request to add duplicate oap_urgent for flag = %d\n",
2169                        oap->oap_async_flags);
2170                 return 1;
2171         }
2172
2173         /* item from sync_fs, to avoid duplicates check the existing flags */
2174         if (async_flags & ASYNC_SYNCFS) {
2175                 cfs_list_add_tail(&oap->oap_urgent_item,
2176                                   &lop->lop_urgent);
2177                 return 0;
2178         }
2179
2180         if (oap->oap_async_flags & ASYNC_HP)
2181                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2182         else if (oap->oap_async_flags & ASYNC_URGENT ||
2183                  async_flags & ASYNC_URGENT)
2184                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2185
2186         return 0;
2187 }
2188
2189 void osc_oap_to_pending(struct osc_async_page *oap)
2190 {
2191         struct loi_oap_pages *lop;
2192
2193         if (oap->oap_cmd & OBD_BRW_WRITE)
2194                 lop = &oap->oap_loi->loi_write_lop;
2195         else
2196                 lop = &oap->oap_loi->loi_read_lop;
2197
2198         osc_add_to_lop_urgent(lop, oap, 0);
2199         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2200         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2201 }
2202
2203 /* this must be called holding the loi list lock to give coverage to exit_cache,
2204  * async_flag maintenance, and oap_request */
2205 static void osc_ap_completion(const struct lu_env *env,
2206                               struct client_obd *cli, struct obdo *oa,
2207                               struct osc_async_page *oap, int sent, int rc)
2208 {
2209         __u64 xid = 0;
2210
2211         ENTRY;
2212         if (oap->oap_request != NULL) {
2213                 xid = ptlrpc_req_xid(oap->oap_request);
2214                 ptlrpc_req_finished(oap->oap_request);
2215                 oap->oap_request = NULL;
2216         }
2217
2218         cfs_spin_lock(&oap->oap_lock);
2219         oap->oap_async_flags = 0;
2220         cfs_spin_unlock(&oap->oap_lock);
2221         oap->oap_interrupted = 0;
2222
2223         if (oap->oap_cmd & OBD_BRW_WRITE) {
2224                 osc_process_ar(&cli->cl_ar, xid, rc);
2225                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2226         }
2227
2228         if (rc == 0 && oa != NULL) {
2229                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2230                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2231                 if (oa->o_valid & OBD_MD_FLMTIME)
2232                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2233                 if (oa->o_valid & OBD_MD_FLATIME)
2234                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2235                 if (oa->o_valid & OBD_MD_FLCTIME)
2236                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2237         }
2238
2239         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2240                                                 oap->oap_cmd, oa, rc);
2241
2242         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2243          * I/O on the page could start, but OSC calls it under lock
2244          * and thus we can add oap back to pending safely */
2245         if (rc)
2246                 /* upper layer wants to leave the page on pending queue */
2247                 osc_oap_to_pending(oap);
2248         else
2249                 osc_exit_cache(cli, oap, sent);
2250         EXIT;
2251 }
2252
2253 static int brw_interpret(const struct lu_env *env,
2254                          struct ptlrpc_request *req, void *data, int rc)
2255 {
2256         struct osc_brw_async_args *aa = data;
2257         struct client_obd *cli;
2258         int async;
2259         ENTRY;
2260
2261         rc = osc_brw_fini_request(req, rc);
2262         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2263         if (osc_recoverable_error(rc)) {
2264                 /* Only retry once for mmaped files since the mmaped page
2265                  * might be modified at anytime. We have to retry at least
2266                  * once in case there WAS really a corruption of the page
2267                  * on the network, that was not caused by mmap() modifying
2268                  * the page. Bug11742 */
2269                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2270                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2271                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2272                         rc = 0;
2273                 } else {
2274                         rc = osc_brw_redo_request(req, aa);
2275                         if (rc == 0)
2276                                 RETURN(0);
2277                 }
2278         }
2279
2280         if (aa->aa_ocapa) {
2281                 capa_put(aa->aa_ocapa);
2282                 aa->aa_ocapa = NULL;
2283         }
2284
2285         cli = aa->aa_cli;
2286
2287         client_obd_list_lock(&cli->cl_loi_list_lock);
2288
2289         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2290          * is called so we know whether to go to sync BRWs or wait for more
2291          * RPCs to complete */
2292         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2293                 cli->cl_w_in_flight--;
2294         else
2295                 cli->cl_r_in_flight--;
2296
2297         async = cfs_list_empty(&aa->aa_oaps);
2298         if (!async) { /* from osc_send_oap_rpc() */
2299                 struct osc_async_page *oap, *tmp;
2300                 /* the caller may re-use the oap after the completion call so
2301                  * we need to clean it up a little */
2302                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2303                                              oap_rpc_item) {
2304                         cfs_list_del_init(&oap->oap_rpc_item);
2305                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2306                 }
2307                 OBDO_FREE(aa->aa_oa);
2308         } else { /* from async_internal() */
2309                 obd_count i;
2310                 for (i = 0; i < aa->aa_page_count; i++)
2311                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2312         }
2313         osc_wake_cache_waiters(cli);
2314         osc_wake_sync_fs(cli);
2315         osc_check_rpcs(env, cli);
2316         client_obd_list_unlock(&cli->cl_loi_list_lock);
2317         if (!async)
2318                 cl_req_completion(env, aa->aa_clerq, rc);
2319         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2320
2321         RETURN(rc);
2322 }
2323
2324 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2325                                             struct client_obd *cli,
2326                                             cfs_list_t *rpc_list,
2327                                             int page_count, int cmd)
2328 {
2329         struct ptlrpc_request *req;
2330         struct brw_page **pga = NULL;
2331         struct osc_brw_async_args *aa;
2332         struct obdo *oa = NULL;
2333         const struct obd_async_page_ops *ops = NULL;
2334         void *caller_data = NULL;
2335         struct osc_async_page *oap;
2336         struct osc_async_page *tmp;
2337         struct ost_body *body;
2338         struct cl_req *clerq = NULL;
2339         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2340         struct ldlm_lock *lock = NULL;
2341         struct cl_req_attr crattr;
2342         int i, rc, mpflag = 0;
2343
2344         ENTRY;
2345         LASSERT(!cfs_list_empty(rpc_list));
2346
2347         if (cmd & OBD_BRW_MEMALLOC)
2348                 mpflag = cfs_memory_pressure_get_and_set();
2349
2350         memset(&crattr, 0, sizeof crattr);
2351         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2352         if (pga == NULL)
2353                 GOTO(out, req = ERR_PTR(-ENOMEM));
2354
2355         OBDO_ALLOC(oa);
2356         if (oa == NULL)
2357                 GOTO(out, req = ERR_PTR(-ENOMEM));
2358
2359         i = 0;
2360         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2361                 struct cl_page *page = osc_oap2cl_page(oap);
2362                 if (ops == NULL) {
2363                         ops = oap->oap_caller_ops;
2364                         caller_data = oap->oap_caller_data;
2365
2366                         clerq = cl_req_alloc(env, page, crt,
2367                                              1 /* only 1-object rpcs for
2368                                                 * now */);
2369                         if (IS_ERR(clerq))
2370                                 GOTO(out, req = (void *)clerq);
2371                         lock = oap->oap_ldlm_lock;
2372                 }
2373                 pga[i] = &oap->oap_brw_page;
2374                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2375                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2376                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2377                 i++;
2378                 cl_req_page_add(env, clerq, page);
2379         }
2380
2381         /* always get the data for the obdo for the rpc */
2382         LASSERT(ops != NULL);
2383         crattr.cra_oa = oa;
2384         crattr.cra_capa = NULL;
2385         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2386         if (lock) {
2387                 oa->o_handle = lock->l_remote_handle;
2388                 oa->o_valid |= OBD_MD_FLHANDLE;
2389         }
2390
2391         rc = cl_req_prep(env, clerq);
2392         if (rc != 0) {
2393                 CERROR("cl_req_prep failed: %d\n", rc);
2394                 GOTO(out, req = ERR_PTR(rc));
2395         }
2396
2397         sort_brw_pages(pga, page_count);
2398         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2399                                   pga, &req, crattr.cra_capa, 1, 0);
2400         if (rc != 0) {
2401                 CERROR("prep_req failed: %d\n", rc);
2402                 GOTO(out, req = ERR_PTR(rc));
2403         }
2404
2405         if (cmd & OBD_BRW_MEMALLOC)
2406                 req->rq_memalloc = 1;
2407
2408         /* Need to update the timestamps after the request is built in case
2409          * we race with setattr (locally or in queue at OST).  If OST gets
2410          * later setattr before earlier BRW (as determined by the request xid),
2411          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2412          * way to do this in a single call.  bug 10150 */
2413         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2414         cl_req_attr_set(env, clerq, &crattr,
2415                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2416
2417         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2418         aa = ptlrpc_req_async_args(req);
2419         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2420         cfs_list_splice(rpc_list, &aa->aa_oaps);
2421         CFS_INIT_LIST_HEAD(rpc_list);
2422         aa->aa_clerq = clerq;
2423 out:
2424         if (cmd & OBD_BRW_MEMALLOC)
2425                 cfs_memory_pressure_restore(mpflag);
2426
2427         capa_put(crattr.cra_capa);
2428         if (IS_ERR(req)) {
2429                 if (oa)
2430                         OBDO_FREE(oa);
2431                 if (pga)
2432                         OBD_FREE(pga, sizeof(*pga) * page_count);
2433                 /* this should happen rarely and is pretty bad, it makes the
2434                  * pending list not follow the dirty order */
2435                 client_obd_list_lock(&cli->cl_loi_list_lock);
2436                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2437                         cfs_list_del_init(&oap->oap_rpc_item);
2438
2439                         /* queued sync pages can be torn down while the pages
2440                          * were between the pending list and the rpc */
2441                         if (oap->oap_interrupted) {
2442                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2443                                 osc_ap_completion(env, cli, NULL, oap, 0,
2444                                                   oap->oap_count);
2445                                 continue;
2446                         }
2447                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2448                 }
2449                 if (clerq && !IS_ERR(clerq))
2450                         cl_req_completion(env, clerq, PTR_ERR(req));
2451         }
2452         RETURN(req);
2453 }
2454
2455 /**
2456  * prepare pages for ASYNC io and put pages in send queue.
2457  *
2458  * \param cmd OBD_BRW_* macroses
2459  * \param lop pending pages
2460  *
2461  * \return zero if no page added to send queue.
2462  * \return 1 if pages successfully added to send queue.
2463  * \return negative on errors.
2464  */
2465 static int
2466 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2467                  struct lov_oinfo *loi,
2468                  int cmd, struct loi_oap_pages *lop)
2469 {
2470         struct ptlrpc_request *req;
2471         obd_count page_count = 0;
2472         struct osc_async_page *oap = NULL, *tmp;
2473         struct osc_brw_async_args *aa;
2474         const struct obd_async_page_ops *ops;
2475         CFS_LIST_HEAD(rpc_list);
2476         CFS_LIST_HEAD(tmp_list);
2477         unsigned int ending_offset;
2478         unsigned  starting_offset = 0;
2479         int srvlock = 0, mem_tight = 0;
2480         struct cl_object *clob = NULL;
2481         ENTRY;
2482
2483         /* ASYNC_HP pages first. At present, when the lock the pages is
2484          * to be canceled, the pages covered by the lock will be sent out
2485          * with ASYNC_HP. We have to send out them as soon as possible. */
2486         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2487                 if (oap->oap_async_flags & ASYNC_HP) 
2488                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2489                 else
2490                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2491                 if (++page_count >= cli->cl_max_pages_per_rpc)
2492                         break;
2493         }
2494
2495         cfs_list_splice(&tmp_list, &lop->lop_pending);
2496         page_count = 0;
2497
2498         /* first we find the pages we're allowed to work with */
2499         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2500                                      oap_pending_item) {
2501                 ops = oap->oap_caller_ops;
2502
2503                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2504                          "magic 0x%x\n", oap, oap->oap_magic);
2505
2506                 if (clob == NULL) {
2507                         /* pin object in memory, so that completion call-backs
2508                          * can be safely called under client_obd_list lock. */
2509                         clob = osc_oap2cl_page(oap)->cp_obj;
2510                         cl_object_get(clob);
2511                 }
2512
2513                 if (page_count != 0 &&
2514                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2515                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2516                                " oap %p, page %p, srvlock %u\n",
2517                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2518                         break;
2519                 }
2520
2521                 /* If there is a gap at the start of this page, it can't merge
2522                  * with any previous page, so we'll hand the network a
2523                  * "fragmented" page array that it can't transfer in 1 RDMA */
2524                 if (page_count != 0 && oap->oap_page_off != 0)
2525                         break;
2526
2527                 /* in llite being 'ready' equates to the page being locked
2528                  * until completion unlocks it.  commit_write submits a page
2529                  * as not ready because its unlock will happen unconditionally
2530                  * as the call returns.  if we race with commit_write giving
2531                  * us that page we don't want to create a hole in the page
2532                  * stream, so we stop and leave the rpc to be fired by
2533                  * another dirtier or kupdated interval (the not ready page
2534                  * will still be on the dirty list).  we could call in
2535                  * at the end of ll_file_write to process the queue again. */
2536                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2537                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2538                                                     cmd);
2539                         if (rc < 0)
2540                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2541                                                 "instead of ready\n", oap,
2542                                                 oap->oap_page, rc);
2543                         switch (rc) {
2544                         case -EAGAIN:
2545                                 /* llite is telling us that the page is still
2546                                  * in commit_write and that we should try
2547                                  * and put it in an rpc again later.  we
2548                                  * break out of the loop so we don't create
2549                                  * a hole in the sequence of pages in the rpc
2550                                  * stream.*/
2551                                 oap = NULL;
2552                                 break;
2553                         case -EINTR:
2554                                 /* the io isn't needed.. tell the checks
2555                                  * below to complete the rpc with EINTR */
2556                                 cfs_spin_lock(&oap->oap_lock);
2557                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2558                                 cfs_spin_unlock(&oap->oap_lock);
2559                                 oap->oap_count = -EINTR;
2560                                 break;
2561                         case 0:
2562                                 cfs_spin_lock(&oap->oap_lock);
2563                                 oap->oap_async_flags |= ASYNC_READY;
2564                                 cfs_spin_unlock(&oap->oap_lock);
2565                                 break;
2566                         default:
2567                                 LASSERTF(0, "oap %p page %p returned %d "
2568                                             "from make_ready\n", oap,
2569                                             oap->oap_page, rc);
2570                                 break;
2571                         }
2572                 }
2573                 if (oap == NULL)
2574                         break;
2575                 /*
2576                  * Page submitted for IO has to be locked. Either by
2577                  * ->ap_make_ready() or by higher layers.
2578                  */
2579 #if defined(__KERNEL__) && defined(__linux__)
2580                 {
2581                         struct cl_page *page;
2582
2583                         page = osc_oap2cl_page(oap);
2584
2585                         if (page->cp_type == CPT_CACHEABLE &&
2586                             !(PageLocked(oap->oap_page) &&
2587                               (CheckWriteback(oap->oap_page, cmd)))) {
2588                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2589                                        oap->oap_page,
2590                                        (long)oap->oap_page->flags,
2591                                        oap->oap_async_flags);
2592                                 LBUG();
2593                         }
2594                 }
2595 #endif
2596
2597                 /* take the page out of our book-keeping */
2598                 cfs_list_del_init(&oap->oap_pending_item);
2599                 lop_update_pending(cli, lop, cmd, -1);
2600                 cfs_list_del_init(&oap->oap_urgent_item);
2601
2602                 if (page_count == 0)
2603                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2604                                           (PTLRPC_MAX_BRW_SIZE - 1);
2605
2606                 /* ask the caller for the size of the io as the rpc leaves. */
2607                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2608                         oap->oap_count =
2609                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2610                                                       cmd);
2611                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2612                 }
2613                 if (oap->oap_count <= 0) {
2614                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2615                                oap->oap_count);
2616                         osc_ap_completion(env, cli, NULL,
2617                                           oap, 0, oap->oap_count);
2618                         continue;
2619                 }
2620
2621                 /* now put the page back in our accounting */
2622                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2623                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2624                         mem_tight = 1;
2625                 if (page_count == 0)
2626                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2627                 if (++page_count >= cli->cl_max_pages_per_rpc)
2628                         break;
2629
2630                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2631                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2632                  * have the same alignment as the initial writes that allocated
2633                  * extents on the server. */
2634                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2635                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2636                 if (ending_offset == 0)
2637                         break;
2638
2639                 /* If there is a gap at the end of this page, it can't merge
2640                  * with any subsequent pages, so we'll hand the network a
2641                  * "fragmented" page array that it can't transfer in 1 RDMA */
2642                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2643                         break;
2644         }
2645
2646         osc_wake_cache_waiters(cli);
2647         osc_wake_sync_fs(cli);
2648         loi_list_maint(cli, loi);
2649
2650         client_obd_list_unlock(&cli->cl_loi_list_lock);
2651
2652         if (clob != NULL)
2653                 cl_object_put(env, clob);
2654
2655         if (page_count == 0) {
2656                 client_obd_list_lock(&cli->cl_loi_list_lock);
2657                 RETURN(0);
2658         }
2659
2660         req = osc_build_req(env, cli, &rpc_list, page_count,
2661                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2662         if (IS_ERR(req)) {
2663                 LASSERT(cfs_list_empty(&rpc_list));
2664                 loi_list_maint(cli, loi);
2665                 RETURN(PTR_ERR(req));
2666         }
2667
2668         aa = ptlrpc_req_async_args(req);
2669
2670         if (cmd == OBD_BRW_READ) {
2671                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2672                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2673                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2674                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2675         } else {
2676                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2677                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2678                                  cli->cl_w_in_flight);
2679                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2680                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2681         }
2682         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2683
2684         client_obd_list_lock(&cli->cl_loi_list_lock);
2685
2686         if (cmd == OBD_BRW_READ)
2687                 cli->cl_r_in_flight++;
2688         else
2689                 cli->cl_w_in_flight++;
2690
2691         /* queued sync pages can be torn down while the pages
2692          * were between the pending list and the rpc */
2693         tmp = NULL;
2694         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2695                 /* only one oap gets a request reference */
2696                 if (tmp == NULL)
2697                         tmp = oap;
2698                 if (oap->oap_interrupted && !req->rq_intr) {
2699                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2700                                oap, req);
2701                         ptlrpc_mark_interrupted(req);
2702                 }
2703         }
2704         if (tmp != NULL)
2705                 tmp->oap_request = ptlrpc_request_addref(req);
2706
2707         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2708                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2709
2710         req->rq_interpret_reply = brw_interpret;
2711         ptlrpcd_add_req(req, PSCOPE_BRW);
2712         RETURN(1);
2713 }
2714
2715 #define LOI_DEBUG(LOI, STR, args...)                                     \
2716         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2717                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2718                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2719                (LOI)->loi_write_lop.lop_num_pending,                     \
2720                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2721                (LOI)->loi_read_lop.lop_num_pending,                      \
2722                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2723                args)                                                     \
2724
2725 /* This is called by osc_check_rpcs() to find which objects have pages that
2726  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2727 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2728 {
2729         ENTRY;
2730
2731         /* First return objects that have blocked locks so that they
2732          * will be flushed quickly and other clients can get the lock,
2733          * then objects which have pages ready to be stuffed into RPCs */
2734         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2735                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2736                                       struct lov_oinfo, loi_hp_ready_item));
2737         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2738                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2739                                       struct lov_oinfo, loi_ready_item));
2740         if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
2741                 RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
2742                                       struct lov_oinfo, loi_sync_fs_item));
2743
2744         /* then if we have cache waiters, return all objects with queued
2745          * writes.  This is especially important when many small files
2746          * have filled up the cache and not been fired into rpcs because
2747          * they don't pass the nr_pending/object threshhold */
2748         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2749             !cfs_list_empty(&cli->cl_loi_write_list))
2750                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2751                                       struct lov_oinfo, loi_write_item));
2752
2753         /* then return all queued objects when we have an invalid import
2754          * so that they get flushed */
2755         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2756                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2757                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2758                                               struct lov_oinfo,
2759                                               loi_write_item));
2760                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2761                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2762                                               struct lov_oinfo, loi_read_item));
2763         }
2764         RETURN(NULL);
2765 }
2766
2767 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2768 {
2769         struct osc_async_page *oap;
2770         int hprpc = 0;
2771
2772         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2773                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2774                                      struct osc_async_page, oap_urgent_item);
2775                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2776         }
2777
2778         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2779                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2780                                      struct osc_async_page, oap_urgent_item);
2781                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2782         }
2783
2784         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2785 }
2786
2787 /* called with the loi list lock held */
2788 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2789 {
2790         struct lov_oinfo *loi;
2791         int rc = 0, race_counter = 0;
2792         ENTRY;
2793
2794         while ((loi = osc_next_loi(cli)) != NULL) {
2795                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2796
2797                 if (osc_max_rpc_in_flight(cli, loi))
2798                         break;
2799
2800                 /* attempt some read/write balancing by alternating between
2801                  * reads and writes in an object.  The makes_rpc checks here
2802                  * would be redundant if we were getting read/write work items
2803                  * instead of objects.  we don't want send_oap_rpc to drain a
2804                  * partial read pending queue when we're given this object to
2805                  * do io on writes while there are cache waiters */
2806                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2807                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2808                                               &loi->loi_write_lop);
2809                         if (rc < 0) {
2810                                 CERROR("Write request failed with %d\n", rc);
2811
2812                                 /* osc_send_oap_rpc failed, mostly because of
2813                                  * memory pressure.
2814                                  *
2815                                  * It can't break here, because if:
2816                                  *  - a page was submitted by osc_io_submit, so
2817                                  *    page locked;
2818                                  *  - no request in flight
2819                                  *  - no subsequent request
2820                                  * The system will be in live-lock state,
2821                                  * because there is no chance to call
2822                                  * osc_io_unplug() and osc_check_rpcs() any
2823                                  * more. pdflush can't help in this case,
2824                                  * because it might be blocked at grabbing
2825                                  * the page lock as we mentioned.
2826                                  *
2827                                  * Anyway, continue to drain pages. */
2828                                 /* break; */
2829                         }
2830
2831                         if (rc > 0)
2832                                 race_counter = 0;
2833                         else
2834                                 race_counter++;
2835                 }
2836                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2837                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2838                                               &loi->loi_read_lop);
2839                         if (rc < 0)
2840                                 CERROR("Read request failed with %d\n", rc);
2841
2842                         if (rc > 0)
2843                                 race_counter = 0;
2844                         else
2845                                 race_counter++;
2846                 }
2847
2848                 /* attempt some inter-object balancing by issuing rpcs
2849                  * for each object in turn */
2850                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2851                         cfs_list_del_init(&loi->loi_hp_ready_item);
2852                 if (!cfs_list_empty(&loi->loi_ready_item))
2853                         cfs_list_del_init(&loi->loi_ready_item);
2854                 if (!cfs_list_empty(&loi->loi_write_item))
2855                         cfs_list_del_init(&loi->loi_write_item);
2856                 if (!cfs_list_empty(&loi->loi_read_item))
2857                         cfs_list_del_init(&loi->loi_read_item);
2858                 if (!cfs_list_empty(&loi->loi_sync_fs_item))
2859                         cfs_list_del_init(&loi->loi_sync_fs_item);
2860
2861                 loi_list_maint(cli, loi);
2862
2863                 /* send_oap_rpc fails with 0 when make_ready tells it to
2864                  * back off.  llite's make_ready does this when it tries
2865                  * to lock a page queued for write that is already locked.
2866                  * we want to try sending rpcs from many objects, but we
2867                  * don't want to spin failing with 0.  */
2868                 if (race_counter == 10)
2869                         break;
2870         }
2871         EXIT;
2872 }
2873
2874 /* we're trying to queue a page in the osc so we're subject to the
2875  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2876  * If the osc's queued pages are already at that limit, then we want to sleep
2877  * until there is space in the osc's queue for us.  We also may be waiting for
2878  * write credits from the OST if there are RPCs in flight that may return some
2879  * before we fall back to sync writes.
2880  *
2881  * We need this know our allocation was granted in the presence of signals */
2882 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2883 {
2884         int rc;
2885         ENTRY;
2886         client_obd_list_lock(&cli->cl_loi_list_lock);
2887         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2888         client_obd_list_unlock(&cli->cl_loi_list_lock);
2889         RETURN(rc);
2890 };
2891
2892 /**
2893  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2894  * is available.
2895  */
2896 int osc_enter_cache_try(const struct lu_env *env,
2897                         struct client_obd *cli, struct lov_oinfo *loi,
2898                         struct osc_async_page *oap, int transient)
2899 {
2900         int has_grant;
2901
2902         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2903         if (has_grant) {
2904                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2905                 if (transient) {
2906                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2907                         cfs_atomic_inc(&obd_dirty_transit_pages);
2908                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2909                 }
2910         }
2911         return has_grant;
2912 }
2913
2914 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2915  * grant or cache space. */
2916 static int osc_enter_cache(const struct lu_env *env,
2917                            struct client_obd *cli, struct lov_oinfo *loi,
2918                            struct osc_async_page *oap)
2919 {
2920         struct osc_cache_waiter ocw;
2921         struct l_wait_info lwi = { 0 };
2922
2923         ENTRY;
2924
2925         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2926                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2927                cli->cl_dirty_max, obd_max_dirty_pages,
2928                cli->cl_lost_grant, cli->cl_avail_grant);
2929
2930         /* force the caller to try sync io.  this can jump the list
2931          * of queued writes and create a discontiguous rpc stream */
2932         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2933             loi->loi_ar.ar_force_sync)
2934                 RETURN(-EDQUOT);
2935
2936         /* Hopefully normal case - cache space and write credits available */
2937         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2938             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2939             osc_enter_cache_try(env, cli, loi, oap, 0))
2940                 RETURN(0);
2941
2942         /* It is safe to block as a cache waiter as long as there is grant
2943          * space available or the hope of additional grant being returned
2944          * when an in flight write completes.  Using the write back cache
2945          * if possible is preferable to sending the data synchronously
2946          * because write pages can then be merged in to large requests.
2947          * The addition of this cache waiter will causing pending write
2948          * pages to be sent immediately. */
2949         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2950                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2951                 cfs_waitq_init(&ocw.ocw_waitq);
2952                 ocw.ocw_oap = oap;
2953                 ocw.ocw_rc = 0;
2954
2955                 loi_list_maint(cli, loi);
2956                 osc_check_rpcs(env, cli);
2957                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2958
2959                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2960                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2961
2962                 client_obd_list_lock(&cli->cl_loi_list_lock);
2963                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2964                         cfs_list_del(&ocw.ocw_entry);
2965                         RETURN(-EINTR);
2966                 }
2967                 RETURN(ocw.ocw_rc);
2968         }
2969
2970         RETURN(-EDQUOT);
2971 }
2972
2973
2974 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2975                         struct lov_oinfo *loi, cfs_page_t *page,
2976                         obd_off offset, const struct obd_async_page_ops *ops,
2977                         void *data, void **res, int nocache,
2978                         struct lustre_handle *lockh)
2979 {
2980         struct osc_async_page *oap;
2981
2982         ENTRY;
2983
2984         if (!page)
2985                 return cfs_size_round(sizeof(*oap));
2986
2987         oap = *res;
2988         oap->oap_magic = OAP_MAGIC;
2989         oap->oap_cli = &exp->exp_obd->u.cli;
2990         oap->oap_loi = loi;
2991
2992         oap->oap_caller_ops = ops;
2993         oap->oap_caller_data = data;
2994
2995         oap->oap_page = page;
2996         oap->oap_obj_off = offset;
2997         if (!client_is_remote(exp) &&
2998             cfs_capable(CFS_CAP_SYS_RESOURCE))
2999                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
3000
3001         LASSERT(!(offset & ~CFS_PAGE_MASK));
3002
3003         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
3004         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
3005         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
3006         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
3007
3008         cfs_spin_lock_init(&oap->oap_lock);
3009         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
3010         RETURN(0);
3011 }
3012
3013 struct osc_async_page *oap_from_cookie(void *cookie)
3014 {
3015         struct osc_async_page *oap = cookie;
3016         if (oap->oap_magic != OAP_MAGIC)
3017                 return ERR_PTR(-EINVAL);
3018         return oap;
3019 };
3020
3021 int osc_queue_async_io(const struct lu_env *env,
3022                        struct obd_export *exp, struct lov_stripe_md *lsm,
3023                        struct lov_oinfo *loi, void *cookie,
3024                        int cmd, obd_off off, int count,
3025                        obd_flag brw_flags, enum async_flags async_flags)
3026 {
3027         struct client_obd *cli = &exp->exp_obd->u.cli;
3028         struct osc_async_page *oap;
3029         int rc = 0;
3030         ENTRY;
3031
3032         oap = oap_from_cookie(cookie);
3033         if (IS_ERR(oap))
3034                 RETURN(PTR_ERR(oap));
3035
3036         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3037                 RETURN(-EIO);
3038
3039         if (!cfs_list_empty(&oap->oap_pending_item) ||
3040             !cfs_list_empty(&oap->oap_urgent_item) ||
3041             !cfs_list_empty(&oap->oap_rpc_item))
3042                 RETURN(-EBUSY);
3043
3044         /* check if the file's owner/group is over quota */
3045         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3046                 struct cl_object *obj;
3047                 struct cl_attr    attr; /* XXX put attr into thread info */
3048                 unsigned int qid[MAXQUOTAS];
3049
3050                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3051
3052                 cl_object_attr_lock(obj);
3053                 rc = cl_object_attr_get(env, obj, &attr);
3054                 cl_object_attr_unlock(obj);
3055
3056                 qid[USRQUOTA] = attr.cat_uid;
3057                 qid[GRPQUOTA] = attr.cat_gid;
3058                 if (rc == 0 &&
3059                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3060                         rc = -EDQUOT;
3061                 if (rc)
3062                         RETURN(rc);
3063         }
3064
3065         if (loi == NULL)
3066                 loi = lsm->lsm_oinfo[0];
3067
3068         client_obd_list_lock(&cli->cl_loi_list_lock);
3069
3070         LASSERT(off + count <= CFS_PAGE_SIZE);
3071         oap->oap_cmd = cmd;
3072         oap->oap_page_off = off;
3073         oap->oap_count = count;
3074         oap->oap_brw_flags = brw_flags;
3075         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3076         if (cfs_memory_pressure_get())
3077                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3078         cfs_spin_lock(&oap->oap_lock);
3079         oap->oap_async_flags = async_flags;
3080         cfs_spin_unlock(&oap->oap_lock);
3081
3082         if (cmd & OBD_BRW_WRITE) {
3083                 rc = osc_enter_cache(env, cli, loi, oap);
3084                 if (rc) {
3085                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3086                         RETURN(rc);
3087                 }
3088         }
3089
3090         osc_oap_to_pending(oap);
3091         loi_list_maint(cli, loi);
3092
3093         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3094                   cmd);
3095
3096         osc_check_rpcs(env, cli);
3097         client_obd_list_unlock(&cli->cl_loi_list_lock);
3098
3099         RETURN(0);
3100 }
3101
3102 /* aka (~was & now & flag), but this is more clear :) */
3103 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3104
3105 int osc_set_async_flags_base(struct client_obd *cli,
3106                              struct lov_oinfo *loi, struct osc_async_page *oap,
3107                              obd_flag async_flags)
3108 {
3109         struct loi_oap_pages *lop;
3110         int flags = 0;
3111         ENTRY;
3112
3113         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3114
3115         if (oap->oap_cmd & OBD_BRW_WRITE) {
3116                 lop = &loi->loi_write_lop;
3117         } else {
3118                 lop = &loi->loi_read_lop;
3119         }
3120
3121         if ((oap->oap_async_flags & async_flags) == async_flags)
3122                 RETURN(0);
3123
3124         /* XXX: This introduces a tiny insignificant race for the case if this
3125          * loi already had other urgent items.
3126          */
3127         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
3128             cfs_list_empty(&oap->oap_rpc_item) &&
3129             cfs_list_empty(&oap->oap_urgent_item)) {
3130                 osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
3131                 flags |= ASYNC_SYNCFS;
3132                 cfs_spin_lock(&oap->oap_lock);
3133                 oap->oap_async_flags |= flags;
3134                 cfs_spin_unlock(&oap->oap_lock);
3135                 loi_list_maint(cli, loi);
3136                 RETURN(0);
3137         }
3138
3139         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3140                 flags |= ASYNC_READY;
3141
3142         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3143             cfs_list_empty(&oap->oap_rpc_item)) {
3144                 osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
3145                 flags |= ASYNC_URGENT;
3146                 loi_list_maint(cli, loi);
3147         }
3148         cfs_spin_lock(&oap->oap_lock);
3149         oap->oap_async_flags |= flags;
3150         cfs_spin_unlock(&oap->oap_lock);
3151
3152         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3153                         oap->oap_async_flags);
3154         RETURN(0);
3155 }
3156
3157 int osc_teardown_async_page(struct obd_export *exp,
3158                             struct lov_stripe_md *lsm,
3159                             struct lov_oinfo *loi, void *cookie)
3160 {
3161         struct client_obd *cli = &exp->exp_obd->u.cli;
3162 &n