Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453 out_req:
454         ptlrpc_req_finished(req);
455 out:
456         if (rc && !*ea)
457                 obd_free_memmd(exp, &lsm);
458         RETURN(rc);
459 }
460
461 static int osc_punch_interpret(struct ptlrpc_request *req,
462                                struct osc_async_args *aa, int rc)
463 {
464         struct ost_body *body;
465         ENTRY;
466
467         if (rc != 0)
468                 GOTO(out, rc);
469
470         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
471                                   lustre_swab_ost_body);
472         if (body == NULL) {
473                 CERROR ("can't unpack ost_body\n");
474                 GOTO(out, rc = -EPROTO);
475         }
476
477         *aa->aa_oi->oi_oa = body->oa;
478 out:
479         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
480         RETURN(rc);
481 }
482
483 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
484                      struct obd_trans_info *oti,
485                      struct ptlrpc_request_set *rqset)
486 {
487         struct ptlrpc_request *req;
488         struct osc_async_args *aa;
489         struct ost_body *body;
490         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
491         ENTRY;
492
493         if (!oinfo->oi_oa) {
494                 CERROR("oa NULL\n");
495                 RETURN(-EINVAL);
496         }
497
498         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
499         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
500                               OST_PUNCH, 3, size, NULL);
501         if (!req)
502                 RETURN(-ENOMEM);
503
504         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
505
506         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
507         /* overload the size and blocks fields in the oa with start/end */
508         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
509         body->oa.o_size = oinfo->oi_policy.l_extent.start;
510         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
511         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
512
513         ptlrpc_req_set_repsize(req, 2, size);
514
515         req->rq_interpret_reply = osc_punch_interpret;
516         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
517         aa = (struct osc_async_args *)&req->rq_async_args;
518         aa->aa_oi = oinfo;
519         ptlrpc_set_add_req(rqset, req);
520
521         RETURN(0);
522 }
523
524 static int osc_sync(struct obd_export *exp, struct obdo *oa,
525                     struct lov_stripe_md *md, obd_size start, obd_size end,
526                     void *capa)
527 {
528         struct ptlrpc_request *req;
529         struct ost_body *body;
530         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
531         ENTRY;
532
533         if (!oa) {
534                 CERROR("oa NULL\n");
535                 RETURN(-EINVAL);
536         }
537
538         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
539
540         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
541                               OST_SYNC, 3, size, NULL);
542         if (!req)
543                 RETURN(-ENOMEM);
544
545         /* overload the size and blocks fields in the oa with start/end */
546         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
547         body->oa = *oa;
548         body->oa.o_size = start;
549         body->oa.o_blocks = end;
550         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
551
552         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
553
554         ptlrpc_req_set_repsize(req, 2, size);
555
556         rc = ptlrpc_queue_wait(req);
557         if (rc)
558                 GOTO(out, rc);
559
560         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
561                                   lustre_swab_ost_body);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO (out, rc = -EPROTO);
565         }
566
567         *oa = body->oa;
568
569         EXIT;
570  out:
571         ptlrpc_req_finished(req);
572         return rc;
573 }
574
575 /* Find and cancel locally locks matched by @mode in the resource found by
576  * @objid. Found locks are added into @cancel list. Returns the amount of
577  * locks added to @cancels list. */
578 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
579                                    struct list_head *cancels, ldlm_mode_t mode,
580                                    int lock_flags)
581 {
582         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
583         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
584         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
585         int count;
586         ENTRY;
587
588         if (res == NULL)
589                 RETURN(0);
590
591         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
592                                            lock_flags, 0, NULL);
593         ldlm_resource_putref(res);
594         RETURN(count);
595 }
596
597 /* Destroy requests can be async always on the client, and we don't even really
598  * care about the return code since the client cannot do anything at all about
599  * a destroy failure.
600  * When the MDS is unlinking a filename, it saves the file objects into a
601  * recovery llog, and these object records are cancelled when the OST reports
602  * they were destroyed and sync'd to disk (i.e. transaction committed).
603  * If the client dies, or the OST is down when the object should be destroyed,
604  * the records are not cancelled, and when the OST reconnects to the MDS next,
605  * it will retrieve the llog unlink logs and then sends the log cancellation
606  * cookies to the MDS after committing destroy transactions. */
607 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
608                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
609                        struct obd_export *md_export)
610 {
611         CFS_LIST_HEAD(cancels);
612         struct ptlrpc_request *req;
613         struct ost_body *body;
614         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
615         int count, bufcount = 2;
616         ENTRY;
617
618         if (!oa) {
619                 CERROR("oa NULL\n");
620                 RETURN(-EINVAL);
621         }
622
623         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
624                                         LDLM_FL_DISCARD_DATA);
625         if (exp_connect_cancelset(exp) && count) {
626                 bufcount = 3;
627                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
628                                                              OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         if (body->oa.o_valid & OBD_MD_FLGRANT)
804                 cli->cl_avail_grant += body->oa.o_grant;
805         /* waiters are woken in brw_interpret_oap */
806         client_obd_list_unlock(&cli->cl_loi_list_lock);
807 }
808
809 /* We assume that the reason this OSC got a short read is because it read
810  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
811  * via the LOV, and it _knows_ it's reading inside the file, it's just that
812  * this stripe never got written at or beyond this stripe offset yet. */
813 static void handle_short_read(int nob_read, obd_count page_count,
814                               struct brw_page **pga)
815 {
816         char *ptr;
817         int i = 0;
818
819         /* skip bytes read OK */
820         while (nob_read > 0) {
821                 LASSERT (page_count > 0);
822
823                 if (pga[i]->count > nob_read) {
824                         /* EOF inside this page */
825                         ptr = cfs_kmap(pga[i]->pg) +
826                                 (pga[i]->off & ~CFS_PAGE_MASK);
827                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
828                         cfs_kunmap(pga[i]->pg);
829                         page_count--;
830                         i++;
831                         break;
832                 }
833
834                 nob_read -= pga[i]->count;
835                 page_count--;
836                 i++;
837         }
838
839         /* zero remaining pages */
840         while (page_count-- > 0) {
841                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
842                 memset(ptr, 0, pga[i]->count);
843                 cfs_kunmap(pga[i]->pg);
844                 i++;
845         }
846 }
847
848 static int check_write_rcs(struct ptlrpc_request *req,
849                            int requested_nob, int niocount,
850                            obd_count page_count, struct brw_page **pga)
851 {
852         int    *remote_rcs, i;
853
854         /* return error if any niobuf was in error */
855         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
856                                         sizeof(*remote_rcs) * niocount, NULL);
857         if (remote_rcs == NULL) {
858                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
859                 return(-EPROTO);
860         }
861         if (lustre_msg_swabbed(req->rq_repmsg))
862                 for (i = 0; i < niocount; i++)
863                         __swab32s(&remote_rcs[i]);
864
865         for (i = 0; i < niocount; i++) {
866                 if (remote_rcs[i] < 0)
867                         return(remote_rcs[i]);
868
869                 if (remote_rcs[i] != 0) {
870                         CERROR("rc[%d] invalid (%d) req %p\n",
871                                 i, remote_rcs[i], req);
872                         return(-EPROTO);
873                 }
874         }
875
876         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
877                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
878                        requested_nob, req->rq_bulk->bd_nob_transferred);
879                 return(-EPROTO);
880         }
881
882         return (0);
883 }
884
885 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
886 {
887         if (p1->flag != p2->flag) {
888                 unsigned mask = ~OBD_BRW_FROM_GRANT;
889
890                 /* warn if we try to combine flags that we don't know to be
891                  * safe to combine */
892                 if ((p1->flag & mask) != (p2->flag & mask))
893                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
894                                "same brw?\n", p1->flag, p2->flag);
895                 return 0;
896         }
897
898         return (p1->off + p1->count == p2->off);
899 }
900
901 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
902                                    struct brw_page **pga, int opc)
903 {
904         __u32 cksum = ~0;
905         int i = 0;
906
907         LASSERT (pg_count > 0);
908         while (nob > 0 && pg_count > 0) {
909                 char *ptr = cfs_kmap(pga[i]->pg);
910                 int off = pga[i]->off & ~CFS_PAGE_MASK;
911                 int count = pga[i]->count > nob ? nob : pga[i]->count;
912
913                 /* corrupt the data before we compute the checksum, to
914                  * simulate an OST->client data error */
915                 if (i == 0 && opc == OST_READ &&
916                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
917                         memcpy(ptr + off, "bad1", min(4, nob));
918                 cksum = crc32_le(cksum, ptr + off, count);
919                 cfs_kunmap(pga[i]->pg);
920                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
921                                off, cksum);
922
923                 nob -= pga[i]->count;
924                 pg_count--;
925                 i++;
926         }
927         /* For sending we only compute the wrong checksum instead
928          * of corrupting the data so it is still correct on a redo */
929         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
930                 cksum++;
931
932         return cksum;
933 }
934
935 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
936                                 struct lov_stripe_md *lsm, obd_count page_count,
937                                 struct brw_page **pga, 
938                                 struct ptlrpc_request **reqp,
939                                 struct obd_capa *ocapa)
940 {
941         struct ptlrpc_request   *req;
942         struct ptlrpc_bulk_desc *desc;
943         struct ost_body         *body;
944         struct obd_ioobj        *ioobj;
945         struct niobuf_remote    *niobuf;
946         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
947         int niocount, i, requested_nob, opc, rc;
948         struct ptlrpc_request_pool *pool;
949         struct lustre_capa      *capa;
950         struct osc_brw_async_args *aa;
951
952         ENTRY;
953         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
954         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
955
956         if ((cmd & OBD_BRW_WRITE) != 0) {
957                 opc = OST_WRITE;
958                 pool = cli->cl_import->imp_rq_pool;
959         } else {
960                 opc = OST_READ;
961                 pool = NULL;
962         }
963
964         for (niocount = i = 1; i < page_count; i++) {
965                 if (!can_merge_pages(pga[i - 1], pga[i]))
966                         niocount++;
967         }
968
969         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
970         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
971         if (ocapa)
972                 size[REQ_REC_OFF + 3] = sizeof(*capa);
973
974         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
975                                    size, NULL, pool, NULL);
976         if (req == NULL)
977                 RETURN (-ENOMEM);
978
979         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
980
981         if (opc == OST_WRITE)
982                 desc = ptlrpc_prep_bulk_imp (req, page_count,
983                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
984         else
985                 desc = ptlrpc_prep_bulk_imp (req, page_count,
986                                              BULK_PUT_SINK, OST_BULK_PORTAL);
987         if (desc == NULL)
988                 GOTO(out, rc = -ENOMEM);
989         /* NB request now owns desc and will free it when it gets freed */
990
991         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
992         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
993         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
994                                 niocount * sizeof(*niobuf));
995
996         body->oa = *oa;
997
998         obdo_to_ioobj(oa, ioobj);
999         ioobj->ioo_bufcnt = niocount;
1000         if (ocapa) {
1001                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1002                                       sizeof(*capa));
1003                 capa_cpy(capa, ocapa);
1004                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1005         }
1006
1007         LASSERT (page_count > 0);
1008         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1009                 struct brw_page *pg = pga[i];
1010                 struct brw_page *pg_prev = pga[i - 1];
1011
1012                 LASSERT(pg->count > 0);
1013                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1014                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1015                          pg->off, pg->count);
1016 #ifdef __LINUX__
1017                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1018                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1019                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1020                          i, page_count,
1021                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1022                          pg_prev->pg, page_private(pg_prev->pg),
1023                          pg_prev->pg->index, pg_prev->off);
1024 #else
1025                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1026                          "i %d p_c %u\n", i, page_count);
1027 #endif
1028                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1029                         (pg->flag & OBD_BRW_SRVLOCK));
1030
1031                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1032                                       pg->count);
1033                 requested_nob += pg->count;
1034
1035                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1036                         niobuf--;
1037                         niobuf->len += pg->count;
1038                 } else {
1039                         niobuf->offset = pg->off;
1040                         niobuf->len    = pg->count;
1041                         niobuf->flags  = pg->flag;
1042                 }
1043         }
1044
1045         LASSERT((void *)(niobuf - niocount) ==
1046                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1047                                niocount * sizeof(*niobuf)));
1048         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1049
1050         /* size[REQ_REC_OFF] still sizeof (*body) */
1051         if (opc == OST_WRITE) {
1052                 if (unlikely(cli->cl_checksum)) {
1053                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1054                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1055                                                              page_count, pga,
1056                                                              OST_WRITE);
1057                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1058                                body->oa.o_cksum);
1059                         /* save this in 'oa', too, for later checking */
1060                         oa->o_valid |= OBD_MD_FLCKSUM;
1061                 } else {
1062                         /* clear out the checksum flag, in case this is a
1063                          * resend but cl_checksum is no longer set. b=11238 */
1064                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1065                 }
1066                 oa->o_cksum = body->oa.o_cksum;
1067                 /* 1 RC per niobuf */
1068                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1069                 ptlrpc_req_set_repsize(req, 3, size);
1070         } else {
1071                 if (unlikely(cli->cl_checksum))
1072                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1073                 /* 1 RC for the whole I/O */
1074                 ptlrpc_req_set_repsize(req, 2, size);
1075         }
1076
1077         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1078         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1079         aa->aa_oa = oa;
1080         aa->aa_requested_nob = requested_nob;
1081         aa->aa_nio_count = niocount;
1082         aa->aa_page_count = page_count;
1083         aa->aa_resends = 0;
1084         aa->aa_ppga = pga;
1085         aa->aa_cli = cli;
1086         INIT_LIST_HEAD(&aa->aa_oaps);
1087
1088         *reqp = req;
1089         RETURN (0);
1090
1091  out:
1092         ptlrpc_req_finished (req);
1093         RETURN (rc);
1094 }
1095
1096 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1097                                 __u32 client_cksum, __u32 server_cksum,
1098                                 int nob, obd_count page_count,
1099                                 struct brw_page **pga)
1100 {
1101         __u32 new_cksum;
1102         char *msg;
1103
1104         if (server_cksum == client_cksum) {
1105                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1106                 return 0;
1107         }
1108
1109         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
1110
1111         if (new_cksum == server_cksum)
1112                 msg = "changed on the client after we checksummed it - "
1113                       "likely false positive due to mmap IO (bug 11742)";
1114         else if (new_cksum == client_cksum)
1115                 msg = "changed in transit before arrival at OST";
1116         else
1117                 msg = "changed in transit AND doesn't match the original - "
1118                       "likely false positive due to mmap IO (bug 11742)";
1119
1120         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1121                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1122                            "["LPU64"-"LPU64"]\n",
1123                            msg, libcfs_nid2str(peer->nid),
1124                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1125                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1126                                                         (__u64)0,
1127                            oa->o_id,
1128                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1129                            pga[0]->off,
1130                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1131         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1132                client_cksum, server_cksum, new_cksum);
1133         return 1;        
1134 }
1135
1136 /* Note rc enters this function as number of bytes transferred */
1137 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1138 {
1139         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1140         const lnet_process_id_t *peer =
1141                         &req->rq_import->imp_connection->c_peer;
1142         struct client_obd *cli = aa->aa_cli;
1143         struct ost_body *body;
1144         __u32 client_cksum = 0;
1145         ENTRY;
1146
1147         if (rc < 0 && rc != -EDQUOT)
1148                 RETURN(rc);
1149
1150         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1151         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1152                                   lustre_swab_ost_body);
1153         if (body == NULL) {
1154                 CERROR ("Can't unpack body\n");
1155                 RETURN(-EPROTO);
1156         }
1157
1158         /* set/clear over quota flag for a uid/gid */
1159         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1160             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1161                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1162                              body->oa.o_gid, body->oa.o_valid,
1163                              body->oa.o_flags);
1164
1165         if (rc < 0)
1166                 RETURN(rc);
1167
1168         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1169                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1170
1171         osc_update_grant(cli, body);
1172
1173         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1174                 if (rc > 0) {
1175                         CERROR ("Unexpected +ve rc %d\n", rc);
1176                         RETURN(-EPROTO);
1177                 }
1178                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1179
1180                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1181                              client_cksum &&
1182                              check_write_checksum(&body->oa, peer, client_cksum,
1183                                                   body->oa.o_cksum,
1184                                                   aa->aa_requested_nob,
1185                                                   aa->aa_page_count,
1186                                                   aa->aa_ppga)))
1187                         RETURN(-EAGAIN);
1188
1189                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1190                         RETURN(-EAGAIN);
1191
1192                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1193                                      aa->aa_page_count, aa->aa_ppga);
1194                 GOTO(out, rc);
1195         }
1196
1197         /* The rest of this function executes only for OST_READs */
1198         if (rc > aa->aa_requested_nob) {
1199                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1200                        aa->aa_requested_nob);
1201                 RETURN(-EPROTO);
1202         }
1203
1204         if (rc != req->rq_bulk->bd_nob_transferred) {
1205                 CERROR ("Unexpected rc %d (%d transferred)\n",
1206                         rc, req->rq_bulk->bd_nob_transferred);
1207                 return (-EPROTO);
1208         }
1209
1210         if (rc < aa->aa_requested_nob)
1211                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1212
1213         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1214                                          aa->aa_ppga))
1215                 GOTO(out, rc = -EAGAIN);
1216
1217         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1218                 static int cksum_counter;
1219                 __u32      server_cksum = body->oa.o_cksum;
1220                 char      *via;
1221                 char      *router;
1222
1223                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1224                                                  aa->aa_ppga, OST_READ);
1225
1226                 if (peer->nid == req->rq_bulk->bd_sender) {
1227                         via = router = "";
1228                 } else {
1229                         via = " via ";
1230                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1231                 }
1232
1233                 if (server_cksum == ~0 && rc > 0) {
1234                         CERROR("Protocol error: server %s set the 'checksum' "
1235                                "bit, but didn't send a checksum.  Not fatal, "
1236                                "but please tell CFS.\n",
1237                                libcfs_nid2str(peer->nid));
1238                 } else if (server_cksum != client_cksum) {
1239                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1240                                            "%s%s%s inum "LPU64"/"LPU64" object "
1241                                            LPU64"/"LPU64" extent "
1242                                            "["LPU64"-"LPU64"]\n",
1243                                            req->rq_import->imp_obd->obd_name,
1244                                            libcfs_nid2str(peer->nid),
1245                                            via, router,
1246                                            body->oa.o_valid & OBD_MD_FLFID ?
1247                                                 body->oa.o_fid : (__u64)0,
1248                                            body->oa.o_valid & OBD_MD_FLFID ?
1249                                                 body->oa.o_generation :(__u64)0,
1250                                            body->oa.o_id,
1251                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1252                                                 body->oa.o_gr : (__u64)0,
1253                                            aa->aa_ppga[0]->off,
1254                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1255                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1256                                                                         1);
1257                         CERROR("client %x, server %x\n",
1258                                client_cksum, server_cksum);
1259                         cksum_counter = 0;
1260                         aa->aa_oa->o_cksum = client_cksum;
1261                         rc = -EAGAIN;
1262                 } else {
1263                         cksum_counter++;
1264                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1265                         rc = 0;
1266                 }
1267         } else if (unlikely(client_cksum)) {
1268                 static int cksum_missed;
1269
1270                 cksum_missed++;
1271                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1272                         CERROR("Checksum %u requested from %s but not sent\n",
1273                                cksum_missed, libcfs_nid2str(peer->nid));
1274         } else {
1275                 rc = 0;
1276         }
1277 out:
1278         if (rc >= 0)
1279                 *aa->aa_oa = body->oa;
1280
1281         RETURN(rc);
1282 }
1283
1284 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1285                             struct lov_stripe_md *lsm,
1286                             obd_count page_count, struct brw_page **pga,
1287                             struct obd_capa *ocapa)
1288 {
1289         struct ptlrpc_request *req;
1290         int                    rc;
1291         cfs_waitq_t            waitq;
1292         int                    resends = 0;
1293         struct l_wait_info     lwi;
1294
1295         ENTRY;
1296
1297         cfs_waitq_init(&waitq);
1298
1299 restart_bulk:
1300         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1301                                   page_count, pga, &req, ocapa);
1302         if (rc != 0)
1303                 return (rc);
1304
1305         rc = ptlrpc_queue_wait(req);
1306
1307         if (rc == -ETIMEDOUT && req->rq_resend) {
1308                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1309                 ptlrpc_req_finished(req);
1310                 goto restart_bulk;
1311         }
1312
1313         rc = osc_brw_fini_request(req, rc);
1314
1315         ptlrpc_req_finished(req);
1316         if (osc_recoverable_error(rc)) {
1317                 resends++;
1318                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1319                         CERROR("too many resend retries, returning error\n");
1320                         RETURN(-EIO);
1321                 }
1322
1323                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1324                 l_wait_event(waitq, 0, &lwi);
1325
1326                 goto restart_bulk;
1327         }
1328         
1329         RETURN (rc);
1330 }
1331
1332 int osc_brw_redo_request(struct ptlrpc_request *request,
1333                          struct osc_brw_async_args *aa)
1334 {
1335         struct ptlrpc_request *new_req;
1336         struct ptlrpc_request_set *set = request->rq_set;
1337         struct osc_brw_async_args *new_aa;
1338         struct osc_async_page *oap;
1339         int rc = 0;
1340         ENTRY;
1341
1342         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1343                 CERROR("too many resend retries, returning error\n");
1344                 RETURN(-EIO);
1345         }
1346         
1347         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1348 /*
1349         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1350         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1351                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1352                                            REQ_REC_OFF + 3);
1353 */
1354         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1355                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1356                                   aa->aa_cli, aa->aa_oa,
1357                                   NULL /* lsm unused by osc currently */,
1358                                   aa->aa_page_count, aa->aa_ppga, 
1359                                   &new_req, NULL /* ocapa */);
1360         if (rc)
1361                 RETURN(rc);
1362
1363         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1364    
1365         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1366                 if (oap->oap_request != NULL) {
1367                         LASSERTF(request == oap->oap_request,
1368                                  "request %p != oap_request %p\n",
1369                                  request, oap->oap_request);
1370                         if (oap->oap_interrupted) {
1371                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1372                                 ptlrpc_req_finished(new_req);                        
1373                                 RETURN(-EINTR);
1374                         }
1375                 }
1376         }
1377         /* New request takes over pga and oaps from old request.
1378          * Note that copying a list_head doesn't work, need to move it... */
1379         aa->aa_resends++;
1380         new_req->rq_interpret_reply = request->rq_interpret_reply;
1381         new_req->rq_async_args = request->rq_async_args;
1382         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1383
1384         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1385
1386         INIT_LIST_HEAD(&new_aa->aa_oaps);
1387         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1388         INIT_LIST_HEAD(&aa->aa_oaps);
1389
1390         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1391                 if (oap->oap_request) {
1392                         ptlrpc_req_finished(oap->oap_request);
1393                         oap->oap_request = ptlrpc_request_addref(new_req);
1394                 }
1395         }
1396         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1397
1398         DEBUG_REQ(D_INFO, new_req, "new request");
1399
1400         ptlrpc_set_add_req(set, new_req);
1401
1402         RETURN(0);
1403 }
1404
1405 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1406 {
1407         struct osc_brw_async_args *aa = data;
1408         int                        i;
1409         int                        nob = rc;
1410         ENTRY;
1411
1412         rc = osc_brw_fini_request(req, rc);
1413         if (osc_recoverable_error(rc)) {
1414                 rc = osc_brw_redo_request(req, aa);
1415                 if (rc == 0)
1416                         RETURN(0);
1417         }
1418         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1419                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1420
1421         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1422         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1423                 aa->aa_cli->cl_w_in_flight--;
1424         else
1425                 aa->aa_cli->cl_r_in_flight--;
1426         for (i = 0; i < aa->aa_page_count; i++)
1427                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1428         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1429
1430         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1431
1432         RETURN(rc);
1433 }
1434
1435 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1436                           struct lov_stripe_md *lsm, obd_count page_count,
1437                           struct brw_page **pga, struct ptlrpc_request_set *set,
1438                           struct obd_capa *ocapa)
1439 {
1440         struct ptlrpc_request     *req;
1441         struct client_obd         *cli = &exp->exp_obd->u.cli;
1442         int                        rc, i;
1443         struct osc_brw_async_args *aa;
1444         ENTRY;
1445
1446         /* Consume write credits even if doing a sync write -
1447          * otherwise we may run out of space on OST due to grant. */
1448         if (cmd == OBD_BRW_WRITE) {
1449                 spin_lock(&cli->cl_loi_list_lock);
1450                 for (i = 0; i < page_count; i++) {
1451                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1452                                 osc_consume_write_grant(cli, pga[i]);
1453                 }
1454                 spin_unlock(&cli->cl_loi_list_lock);
1455         }
1456
1457         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1458                                   &req, ocapa);
1459
1460         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1461         if (cmd == OBD_BRW_READ) {
1462                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1463                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1464                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1465         } else {
1466                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1467                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1468                                  cli->cl_w_in_flight);
1469                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1470         }
1471
1472         if (rc == 0) {
1473                 req->rq_interpret_reply = brw_interpret;
1474                 ptlrpc_set_add_req(set, req);
1475                 client_obd_list_lock(&cli->cl_loi_list_lock);
1476                 if (cmd == OBD_BRW_READ)
1477                         cli->cl_r_in_flight++;
1478                 else
1479                         cli->cl_w_in_flight++;
1480                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1481         } else if (cmd == OBD_BRW_WRITE) {
1482                 client_obd_list_lock(&cli->cl_loi_list_lock);
1483                 for (i = 0; i < page_count; i++)
1484                         osc_release_write_grant(cli, pga[i], 0);
1485                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1486         }
1487         RETURN (rc);
1488 }
1489
1490 /*
1491  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1492  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1493  * fine for our small page arrays and doesn't require allocation.  its an
1494  * insertion sort that swaps elements that are strides apart, shrinking the
1495  * stride down until its '1' and the array is sorted.
1496  */
1497 static void sort_brw_pages(struct brw_page **array, int num)
1498 {
1499         int stride, i, j;
1500         struct brw_page *tmp;
1501
1502         if (num == 1)
1503                 return;
1504         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1505                 ;
1506
1507         do {
1508                 stride /= 3;
1509                 for (i = stride ; i < num ; i++) {
1510                         tmp = array[i];
1511                         j = i;
1512                         while (j >= stride && array[j - stride]->off > tmp->off) {
1513                                 array[j] = array[j - stride];
1514                                 j -= stride;
1515                         }
1516                         array[j] = tmp;
1517                 }
1518         } while (stride > 1);
1519 }
1520
1521 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1522 {
1523         int count = 1;
1524         int offset;
1525         int i = 0;
1526
1527         LASSERT (pages > 0);
1528         offset = pg[i]->off & ~CFS_PAGE_MASK;
1529
1530         for (;;) {
1531                 pages--;
1532                 if (pages == 0)         /* that's all */
1533                         return count;
1534
1535                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1536                         return count;   /* doesn't end on page boundary */
1537
1538                 i++;
1539                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1540                 if (offset != 0)        /* doesn't start on page boundary */
1541                         return count;
1542
1543                 count++;
1544         }
1545 }
1546
1547 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1548 {
1549         struct brw_page **ppga;
1550         int i;
1551
1552         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1553         if (ppga == NULL)
1554                 return NULL;
1555
1556         for (i = 0; i < count; i++)
1557                 ppga[i] = pga + i;
1558         return ppga;
1559 }
1560
1561 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1562 {
1563         LASSERT(ppga != NULL);
1564         OBD_FREE(ppga, sizeof(*ppga) * count);
1565 }
1566
1567 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1568                    obd_count page_count, struct brw_page *pga,
1569                    struct obd_trans_info *oti)
1570 {
1571         struct obdo *saved_oa = NULL;
1572         struct brw_page **ppga, **orig;
1573         struct obd_import *imp = class_exp2cliimp(exp);
1574         struct client_obd *cli = &imp->imp_obd->u.cli;
1575         int rc, page_count_orig;
1576         ENTRY;
1577
1578         if (cmd & OBD_BRW_CHECK) {
1579                 /* The caller just wants to know if there's a chance that this
1580                  * I/O can succeed */
1581
1582                 if (imp == NULL || imp->imp_invalid)
1583                         RETURN(-EIO);
1584                 RETURN(0);
1585         }
1586
1587         /* test_brw with a failed create can trip this, maybe others. */
1588         LASSERT(cli->cl_max_pages_per_rpc);
1589
1590         rc = 0;
1591
1592         orig = ppga = osc_build_ppga(pga, page_count);
1593         if (ppga == NULL)
1594                 RETURN(-ENOMEM);
1595         page_count_orig = page_count;
1596
1597         sort_brw_pages(ppga, page_count);
1598         while (page_count) {
1599                 obd_count pages_per_brw;
1600
1601                 if (page_count > cli->cl_max_pages_per_rpc)
1602                         pages_per_brw = cli->cl_max_pages_per_rpc;
1603                 else
1604                         pages_per_brw = page_count;
1605
1606                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1607
1608                 if (saved_oa != NULL) {
1609                         /* restore previously saved oa */
1610                         *oinfo->oi_oa = *saved_oa;
1611                 } else if (page_count > pages_per_brw) {
1612                         /* save a copy of oa (brw will clobber it) */
1613                         OBDO_ALLOC(saved_oa);
1614                         if (saved_oa == NULL)
1615                                 GOTO(out, rc = -ENOMEM);
1616                         *saved_oa = *oinfo->oi_oa;
1617                 }
1618
1619                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1620                                       pages_per_brw, ppga, oinfo->oi_capa);
1621
1622                 if (rc != 0)
1623                         break;
1624
1625                 page_count -= pages_per_brw;
1626                 ppga += pages_per_brw;
1627         }
1628
1629 out:
1630         osc_release_ppga(orig, page_count_orig);
1631
1632         if (saved_oa != NULL)
1633                 OBDO_FREE(saved_oa);
1634
1635         RETURN(rc);
1636 }
1637
1638 static int osc_brw_async(int cmd, struct obd_export *exp,
1639                          struct obd_info *oinfo, obd_count page_count,
1640                          struct brw_page *pga, struct obd_trans_info *oti,
1641                          struct ptlrpc_request_set *set)
1642 {
1643         struct brw_page **ppga, **orig;
1644         struct client_obd *cli = &exp->exp_obd->u.cli;
1645         int page_count_orig;
1646         int rc = 0;
1647         ENTRY;
1648
1649         if (cmd & OBD_BRW_CHECK) {
1650                 struct obd_import *imp = class_exp2cliimp(exp);
1651                 /* The caller just wants to know if there's a chance that this
1652                  * I/O can succeed */
1653
1654                 if (imp == NULL || imp->imp_invalid)
1655                         RETURN(-EIO);
1656                 RETURN(0);
1657         }
1658
1659         orig = ppga = osc_build_ppga(pga, page_count);
1660         if (ppga == NULL)
1661                 RETURN(-ENOMEM);
1662         page_count_orig = page_count;
1663
1664         sort_brw_pages(ppga, page_count);
1665         while (page_count) {
1666                 struct brw_page **copy;
1667                 obd_count pages_per_brw;
1668
1669                 pages_per_brw = min_t(obd_count, page_count,
1670                                       cli->cl_max_pages_per_rpc);
1671
1672                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1673
1674                 /* use ppga only if single RPC is going to fly */
1675                 if (pages_per_brw != page_count_orig || ppga != orig) {
1676                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1677                         if (copy == NULL)
1678                                 GOTO(out, rc = -ENOMEM);
1679                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1680                 } else
1681                         copy = ppga;
1682
1683                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1684                                     pages_per_brw, copy, set, oinfo->oi_capa);
1685
1686                 if (rc != 0) {
1687                         if (copy != ppga)
1688                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1689                         break;
1690                 }
1691                 if (copy == orig) {
1692                         /* we passed it to async_internal() which is
1693                          * now responsible for releasing memory */
1694                         orig = NULL;
1695                 }
1696
1697                 page_count -= pages_per_brw;
1698                 ppga += pages_per_brw;
1699         }
1700 out:
1701         if (orig)
1702                 osc_release_ppga(orig, page_count_orig);
1703         RETURN(rc);
1704 }
1705
1706 static void osc_check_rpcs(struct client_obd *cli);
1707
1708 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1709  * the dirty accounting.  Writeback completes or truncate happens before
1710  * writing starts.  Must be called with the loi lock held. */
1711 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1712                            int sent)
1713 {
1714         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1715 }
1716
1717
1718 /* This maintains the lists of pending pages to read/write for a given object
1719  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1720  * to quickly find objects that are ready to send an RPC. */
1721 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1722                          int cmd)
1723 {
1724         int optimal;
1725         ENTRY;
1726
1727         if (lop->lop_num_pending == 0)
1728                 RETURN(0);
1729
1730         /* if we have an invalid import we want to drain the queued pages
1731          * by forcing them through rpcs that immediately fail and complete
1732          * the pages.  recovery relies on this to empty the queued pages
1733          * before canceling the locks and evicting down the llite pages */
1734         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1735                 RETURN(1);
1736
1737         /* stream rpcs in queue order as long as as there is an urgent page
1738          * queued.  this is our cheap solution for good batching in the case
1739          * where writepage marks some random page in the middle of the file
1740          * as urgent because of, say, memory pressure */
1741         if (!list_empty(&lop->lop_urgent)) {
1742                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1743                 RETURN(1);
1744         }
1745         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1746         optimal = cli->cl_max_pages_per_rpc;
1747         if (cmd & OBD_BRW_WRITE) {
1748                 /* trigger a write rpc stream as long as there are dirtiers
1749                  * waiting for space.  as they're waiting, they're not going to
1750                  * create more pages to coallesce with what's waiting.. */
1751                 if (!list_empty(&cli->cl_cache_waiters)) {
1752                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1753                         RETURN(1);
1754                 }
1755                 /* +16 to avoid triggering rpcs that would want to include pages
1756                  * that are being queued but which can't be made ready until
1757                  * the queuer finishes with the page. this is a wart for
1758                  * llite::commit_write() */
1759                 optimal += 16;
1760         }
1761         if (lop->lop_num_pending >= optimal)
1762                 RETURN(1);
1763
1764         RETURN(0);
1765 }
1766
1767 static void on_list(struct list_head *item, struct list_head *list,
1768                     int should_be_on)
1769 {
1770         if (list_empty(item) && should_be_on)
1771                 list_add_tail(item, list);
1772         else if (!list_empty(item) && !should_be_on)
1773                 list_del_init(item);
1774 }
1775
1776 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1777  * can find pages to build into rpcs quickly */
1778 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1779 {
1780         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1781                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1782                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1783
1784         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1785                 loi->loi_write_lop.lop_num_pending);
1786
1787         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1788                 loi->loi_read_lop.lop_num_pending);
1789 }
1790
1791 static void lop_update_pending(struct client_obd *cli,
1792                                struct loi_oap_pages *lop, int cmd, int delta)
1793 {
1794         lop->lop_num_pending += delta;
1795         if (cmd & OBD_BRW_WRITE)
1796                 cli->cl_pending_w_pages += delta;
1797         else
1798                 cli->cl_pending_r_pages += delta;
1799 }
1800
1801 /* this is called when a sync waiter receives an interruption.  Its job is to
1802  * get the caller woken as soon as possible.  If its page hasn't been put in an
1803  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1804  * desiring interruption which will forcefully complete the rpc once the rpc
1805  * has timed out */
1806 static void osc_occ_interrupted(struct oig_callback_context *occ)
1807 {
1808         struct osc_async_page *oap;
1809         struct loi_oap_pages *lop;
1810         struct lov_oinfo *loi;
1811         ENTRY;
1812
1813         /* XXX member_of() */
1814         oap = list_entry(occ, struct osc_async_page, oap_occ);
1815
1816         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1817
1818         oap->oap_interrupted = 1;
1819
1820         /* ok, it's been put in an rpc. only one oap gets a request reference */
1821         if (oap->oap_request != NULL) {
1822                 ptlrpc_mark_interrupted(oap->oap_request);
1823                 ptlrpcd_wake(oap->oap_request);
1824                 GOTO(unlock, 0);
1825         }
1826
1827         /* we don't get interruption callbacks until osc_trigger_group_io()
1828          * has been called and put the sync oaps in the pending/urgent lists.*/
1829         if (!list_empty(&oap->oap_pending_item)) {
1830                 list_del_init(&oap->oap_pending_item);
1831                 list_del_init(&oap->oap_urgent_item);
1832
1833                 loi = oap->oap_loi;
1834                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1835                         &loi->loi_write_lop : &loi->loi_read_lop;
1836                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1837                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1838
1839                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1840                 oap->oap_oig = NULL;
1841         }
1842
1843 unlock:
1844         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1845 }
1846
1847 /* this is trying to propogate async writeback errors back up to the
1848  * application.  As an async write fails we record the error code for later if
1849  * the app does an fsync.  As long as errors persist we force future rpcs to be
1850  * sync so that the app can get a sync error and break the cycle of queueing
1851  * pages for which writeback will fail. */
1852 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1853                            int rc)
1854 {
1855         if (rc) {
1856                 if (!ar->ar_rc)
1857                         ar->ar_rc = rc;
1858
1859                 ar->ar_force_sync = 1;
1860                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1861                 return;
1862
1863         }
1864
1865         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1866                 ar->ar_force_sync = 0;
1867 }
1868
1869 static void osc_oap_to_pending(struct osc_async_page *oap)
1870 {
1871         struct loi_oap_pages *lop;
1872
1873         if (oap->oap_cmd & OBD_BRW_WRITE)
1874                 lop = &oap->oap_loi->loi_write_lop;
1875         else
1876                 lop = &oap->oap_loi->loi_read_lop;
1877
1878         if (oap->oap_async_flags & ASYNC_URGENT)
1879                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1880         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1881         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1882 }
1883
1884 /* this must be called holding the loi list lock to give coverage to exit_cache,
1885  * async_flag maintenance, and oap_request */
1886 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1887                               struct osc_async_page *oap, int sent, int rc)
1888 {
1889         __u64 xid = 0;
1890
1891         ENTRY;
1892         if (oap->oap_request != NULL) {
1893                 xid = ptlrpc_req_xid(oap->oap_request);
1894                 ptlrpc_req_finished(oap->oap_request);
1895                 oap->oap_request = NULL;
1896         }
1897
1898         oap->oap_async_flags = 0;
1899         oap->oap_interrupted = 0;
1900
1901         if (oap->oap_cmd & OBD_BRW_WRITE) {
1902                 osc_process_ar(&cli->cl_ar, xid, rc);
1903                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1904         }
1905
1906         if (rc == 0 && oa != NULL) {
1907                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1908                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1909                 if (oa->o_valid & OBD_MD_FLMTIME)
1910                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1911                 if (oa->o_valid & OBD_MD_FLATIME)
1912                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1913                 if (oa->o_valid & OBD_MD_FLCTIME)
1914                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1915         }
1916
1917         if (oap->oap_oig) {
1918                 osc_exit_cache(cli, oap, sent);
1919                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1920                 oap->oap_oig = NULL;
1921                 EXIT;
1922                 return;
1923         }
1924
1925         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1926                                                 oap->oap_cmd, oa, rc);
1927
1928         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1929          * I/O on the page could start, but OSC calls it under lock
1930          * and thus we can add oap back to pending safely */
1931         if (rc)
1932                 /* upper layer wants to leave the page on pending queue */
1933                 osc_oap_to_pending(oap);
1934         else
1935                 osc_exit_cache(cli, oap, sent);
1936         EXIT;
1937 }
1938
1939 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1940 {
1941         struct osc_async_page *oap, *tmp;
1942         struct osc_brw_async_args *aa = data;
1943         struct client_obd *cli;
1944         ENTRY;
1945
1946         rc = osc_brw_fini_request(req, rc);
1947         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1948         if (osc_recoverable_error(rc)) {
1949                 rc = osc_brw_redo_request(req, aa);
1950                 if (rc == 0)
1951                         RETURN(0);
1952         }
1953
1954         cli = aa->aa_cli;
1955
1956         client_obd_list_lock(&cli->cl_loi_list_lock);
1957
1958         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1959          * is called so we know whether to go to sync BRWs or wait for more
1960          * RPCs to complete */
1961         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1962                 cli->cl_w_in_flight--;
1963         else
1964                 cli->cl_r_in_flight--;
1965
1966         /* the caller may re-use the oap after the completion call so
1967          * we need to clean it up a little */
1968         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1969                 list_del_init(&oap->oap_rpc_item);
1970                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1971         }
1972
1973         osc_wake_cache_waiters(cli);
1974         osc_check_rpcs(cli);
1975
1976         client_obd_list_unlock(&cli->cl_loi_list_lock);
1977
1978         OBDO_FREE(aa->aa_oa);
1979         
1980         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1981         RETURN(rc);
1982 }
1983
1984 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1985                                             struct list_head *rpc_list,
1986                                             int page_count, int cmd)
1987 {
1988         struct ptlrpc_request *req;
1989         struct brw_page **pga = NULL;
1990         struct osc_brw_async_args *aa;
1991         struct obdo *oa = NULL;
1992         struct obd_async_page_ops *ops = NULL;
1993         void *caller_data = NULL;
1994         struct obd_capa *ocapa;
1995         struct osc_async_page *oap;
1996         int i, rc;
1997
1998         ENTRY;
1999         LASSERT(!list_empty(rpc_list));
2000
2001         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2002         if (pga == NULL)
2003                 RETURN(ERR_PTR(-ENOMEM));
2004
2005         OBDO_ALLOC(oa);
2006         if (oa == NULL)
2007                 GOTO(out, req = ERR_PTR(-ENOMEM));
2008
2009         i = 0;
2010         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2011                 if (ops == NULL) {
2012                         ops = oap->oap_caller_ops;
2013                         caller_data = oap->oap_caller_data;
2014                 }
2015                 pga[i] = &oap->oap_brw_page;
2016                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2017                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2018                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2019                 i++;
2020         }
2021
2022         /* always get the data for the obdo for the rpc */
2023         LASSERT(ops != NULL);
2024         ops->ap_fill_obdo(caller_data, cmd, oa);
2025         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2026
2027         sort_brw_pages(pga, page_count);
2028         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2029                                   pga, &req, ocapa);
2030         capa_put(ocapa);
2031         if (rc != 0) {
2032                 CERROR("prep_req failed: %d\n", rc);
2033                 GOTO(out, req = ERR_PTR(rc));
2034         }
2035
2036         /* Need to update the timestamps after the request is built in case
2037          * we race with setattr (locally or in queue at OST).  If OST gets
2038          * later setattr before earlier BRW (as determined by the request xid),
2039          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2040          * way to do this in a single call.  bug 10150 */
2041         ops->ap_update_obdo(caller_data, cmd, oa,
2042                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2043
2044         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2045         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2046         INIT_LIST_HEAD(&aa->aa_oaps);
2047         list_splice(rpc_list, &aa->aa_oaps);
2048         INIT_LIST_HEAD(rpc_list);
2049
2050 out:
2051         if (IS_ERR(req)) {
2052                 if (oa)
2053                         OBDO_FREE(oa);
2054                 if (pga)
2055                         OBD_FREE(pga, sizeof(*pga) * page_count);
2056         }
2057         RETURN(req);
2058 }
2059
2060 /* the loi lock is held across this function but it's allowed to release
2061  * and reacquire it during its work */
2062 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2063                             int cmd, struct loi_oap_pages *lop)
2064 {
2065         struct ptlrpc_request *req;
2066         obd_count page_count = 0;
2067         struct osc_async_page *oap = NULL, *tmp;
2068         struct osc_brw_async_args *aa;
2069         struct obd_async_page_ops *ops;
2070         CFS_LIST_HEAD(rpc_list);
2071         unsigned int ending_offset;
2072         unsigned  starting_offset = 0;
2073         ENTRY;
2074
2075         /* first we find the pages we're allowed to work with */
2076         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2077                                  oap_pending_item) {
2078                 ops = oap->oap_caller_ops;
2079
2080                 LASSERT(oap->oap_magic == OAP_MAGIC);
2081
2082                 /* in llite being 'ready' equates to the page being locked
2083                  * until completion unlocks it.  commit_write submits a page
2084                  * as not ready because its unlock will happen unconditionally
2085                  * as the call returns.  if we race with commit_write giving
2086                  * us that page we dont' want to create a hole in the page
2087                  * stream, so we stop and leave the rpc to be fired by
2088                  * another dirtier or kupdated interval (the not ready page
2089                  * will still be on the dirty list).  we could call in
2090                  * at the end of ll_file_write to process the queue again. */
2091                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2092                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2093                         if (rc < 0)
2094                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2095                                                 "instead of ready\n", oap,
2096                                                 oap->oap_page, rc);
2097                         switch (rc) {
2098                         case -EAGAIN:
2099                                 /* llite is telling us that the page is still
2100                                  * in commit_write and that we should try
2101                                  * and put it in an rpc again later.  we
2102                                  * break out of the loop so we don't create
2103                                  * a hole in the sequence of pages in the rpc
2104                                  * stream.*/
2105                                 oap = NULL;
2106                                 break;
2107                         case -EINTR:
2108                                 /* the io isn't needed.. tell the checks
2109                                  * below to complete the rpc with EINTR */
2110                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2111                                 oap->oap_count = -EINTR;
2112                                 break;
2113                         case 0:
2114                                 oap->oap_async_flags |= ASYNC_READY;
2115                                 break;
2116                         default:
2117                                 LASSERTF(0, "oap %p page %p returned %d "
2118                                             "from make_ready\n", oap,
2119                                             oap->oap_page, rc);
2120                                 break;
2121                         }
2122                 }
2123                 if (oap == NULL)
2124                         break;
2125                 /*
2126                  * Page submitted for IO has to be locked. Either by
2127                  * ->ap_make_ready() or by higher layers.
2128                  *
2129                  * XXX nikita: this assertion should be adjusted when lustre
2130                  * starts using PG_writeback for pages being written out.
2131                  */
2132 #if defined(__KERNEL__) && defined(__LINUX__)
2133                 LASSERT(PageLocked(oap->oap_page));
2134 #endif
2135                 /* If there is a gap at the start of this page, it can't merge
2136                  * with any previous page, so we'll hand the network a
2137                  * "fragmented" page array that it can't transfer in 1 RDMA */
2138                 if (page_count != 0 && oap->oap_page_off != 0)
2139                         break;
2140
2141                 /* take the page out of our book-keeping */
2142                 list_del_init(&oap->oap_pending_item);
2143                 lop_update_pending(cli, lop, cmd, -1);
2144                 list_del_init(&oap->oap_urgent_item);
2145
2146                 if (page_count == 0)
2147                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2148                                           (PTLRPC_MAX_BRW_SIZE - 1);
2149
2150                 /* ask the caller for the size of the io as the rpc leaves. */
2151                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2152                         oap->oap_count =
2153                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2154                 if (oap->oap_count <= 0) {
2155                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2156                                oap->oap_count);
2157                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2158                         continue;
2159                 }
2160
2161                 /* now put the page back in our accounting */
2162                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2163                 if (++page_count >= cli->cl_max_pages_per_rpc)
2164                         break;
2165
2166                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2167                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2168                  * have the same alignment as the initial writes that allocated
2169                  * extents on the server. */
2170                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2171                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2172                 if (ending_offset == 0)
2173                         break;
2174
2175                 /* If there is a gap at the end of this page, it can't merge
2176                  * with any subsequent pages, so we'll hand the network a
2177                  * "fragmented" page array that it can't transfer in 1 RDMA */
2178                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2179                         break;
2180         }
2181
2182         osc_wake_cache_waiters(cli);
2183
2184         if (page_count == 0)
2185                 RETURN(0);
2186
2187         loi_list_maint(cli, loi);
2188
2189         client_obd_list_unlock(&cli->cl_loi_list_lock);
2190
2191         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2192         if (IS_ERR(req)) {
2193                 /* this should happen rarely and is pretty bad, it makes the
2194                  * pending list not follow the dirty order */
2195                 client_obd_list_lock(&cli->cl_loi_list_lock);
2196                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2197                         list_del_init(&oap->oap_rpc_item);
2198
2199                         /* queued sync pages can be torn down while the pages
2200                          * were between the pending list and the rpc */
2201                         if (oap->oap_interrupted) {
2202                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2203                                 osc_ap_completion(cli, NULL, oap, 0,
2204                                                   oap->oap_count);
2205                                 continue;
2206                         }
2207                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2208                 }
2209                 loi_list_maint(cli, loi);
2210                 RETURN(PTR_ERR(req));
2211         }
2212
2213         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2214
2215         if (cmd == OBD_BRW_READ) {
2216                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2217                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2218                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2219                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2220                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2221         } else {
2222                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2223                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2224                                  cli->cl_w_in_flight);
2225                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2226                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2227                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2228         }
2229
2230         client_obd_list_lock(&cli->cl_loi_list_lock);
2231
2232         if (cmd == OBD_BRW_READ)
2233                 cli->cl_r_in_flight++;
2234         else
2235                 cli->cl_w_in_flight++;
2236
2237         /* queued sync pages can be torn down while the pages
2238          * were between the pending list and the rpc */
2239         tmp = NULL;
2240         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2241                 /* only one oap gets a request reference */
2242                 if (tmp == NULL)
2243                         tmp = oap;
2244                 if (oap->oap_interrupted && !req->rq_intr) {
2245                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2246                                oap, req);
2247                         ptlrpc_mark_interrupted(req);
2248                 }
2249         }
2250         if (tmp != NULL)
2251                 tmp->oap_request = ptlrpc_request_addref(req);
2252
2253         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2254                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2255
2256         req->rq_interpret_reply = brw_interpret_oap;
2257         ptlrpcd_add_req(req);
2258         RETURN(1);
2259 }
2260
2261 #define LOI_DEBUG(LOI, STR, args...)                                     \
2262         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2263                !list_empty(&(LOI)->loi_cli_item),                        \
2264                (LOI)->loi_write_lop.lop_num_pending,                     \
2265                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2266                (LOI)->loi_read_lop.lop_num_pending,                      \
2267                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2268                args)                                                     \
2269
2270 /* This is called by osc_check_rpcs() to find which objects have pages that
2271  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2272 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2273 {
2274         ENTRY;
2275         /* first return all objects which we already know to have
2276          * pages ready to be stuffed into rpcs */
2277         if (!list_empty(&cli->cl_loi_ready_list))
2278                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2279                                   struct lov_oinfo, loi_cli_item));
2280
2281         /* then if we have cache waiters, return all objects with queued
2282          * writes.  This is especially important when many small files
2283          * have filled up the cache and not been fired into rpcs because
2284          * they don't pass the nr_pending/object threshhold */
2285         if (!list_empty(&cli->cl_cache_waiters) &&
2286             !list_empty(&cli->cl_loi_write_list))
2287                 RETURN(list_entry(cli->cl_loi_write_list.next,
2288                                   struct lov_oinfo, loi_write_item));
2289
2290         /* then return all queued objects when we have an invalid import
2291          * so that they get flushed */
2292         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2293                 if (!list_empty(&cli->cl_loi_write_list))
2294                         RETURN(list_entry(cli->cl_loi_write_list.next,
2295                                           struct lov_oinfo, loi_write_item));
2296                 if (!list_empty(&cli->cl_loi_read_list))
2297                         RETURN(list_entry(cli->cl_loi_read_list.next,
2298                                           struct lov_oinfo, loi_read_item));
2299         }
2300         RETURN(NULL);
2301 }
2302
2303 /* called with the loi list lock held */
2304 static void osc_check_rpcs(struct client_obd *cli)
2305 {
2306         struct lov_oinfo *loi;
2307         int rc = 0, race_counter = 0;
2308         ENTRY;
2309
2310         while ((loi = osc_next_loi(cli)) != NULL) {
2311                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2312
2313                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2314                         break;
2315
2316                 /* attempt some read/write balancing by alternating between
2317                  * reads and writes in an object.  The makes_rpc checks here
2318                  * would be redundant if we were getting read/write work items
2319                  * instead of objects.  we don't want send_oap_rpc to drain a
2320                  * partial read pending queue when we're given this object to
2321                  * do io on writes while there are cache waiters */
2322                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2323                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2324                                               &loi->loi_write_lop);
2325                         if (rc < 0)
2326                                 break;
2327                         if (rc > 0)
2328                                 race_counter = 0;
2329                         else
2330                                 race_counter++;
2331                 }
2332                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2333                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2334                                               &loi->loi_read_lop);
2335                         if (rc < 0)
2336                                 break;
2337                         if (rc > 0)
2338                                 race_counter = 0;
2339                         else
2340                                 race_counter++;
2341                 }
2342
2343                 /* attempt some inter-object balancing by issueing rpcs
2344                  * for each object in turn */
2345                 if (!list_empty(&loi->loi_cli_item))
2346                         list_del_init(&loi->loi_cli_item);
2347                 if (!list_empty(&loi->loi_write_item))
2348                         list_del_init(&loi->loi_write_item);
2349                 if (!list_empty(&loi->loi_read_item))
2350                         list_del_init(&loi->loi_read_item);
2351
2352                 loi_list_maint(cli, loi);
2353
2354                 /* send_oap_rpc fails with 0 when make_ready tells it to
2355                  * back off.  llite's make_ready does this when it tries
2356                  * to lock a page queued for write that is already locked.
2357                  * we want to try sending rpcs from many objects, but we
2358                  * don't want to spin failing with 0.  */
2359                 if (race_counter == 10)
2360                         break;
2361         }
2362         EXIT;
2363 }
2364
2365 /* we're trying to queue a page in the osc so we're subject to the
2366  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2367  * If the osc's queued pages are already at that limit, then we want to sleep
2368  * until there is space in the osc's queue for us.  We also may be waiting for
2369  * write credits from the OST if there are RPCs in flight that may return some
2370  * before we fall back to sync writes.
2371  *
2372  * We need this know our allocation was granted in the presence of signals */
2373 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2374 {
2375         int rc;
2376         ENTRY;
2377         client_obd_list_lock(&cli->cl_loi_list_lock);
2378         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2379         client_obd_list_unlock(&cli->cl_loi_list_lock);
2380         RETURN(rc);
2381 };
2382
2383 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2384  * grant or cache space. */
2385 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2386                            struct osc_async_page *oap)
2387 {
2388         struct osc_cache_waiter ocw;
2389         struct l_wait_info lwi = { 0 };
2390
2391         ENTRY;
2392
2393         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2394                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2395                cli->cl_dirty_max, obd_max_dirty_pages,
2396                cli->cl_lost_grant, cli->cl_avail_grant);
2397
2398         /* force the caller to try sync io.  this can jump the list
2399          * of queued writes and create a discontiguous rpc stream */
2400         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2401             loi->loi_ar.ar_force_sync)
2402                 RETURN(-EDQUOT);
2403
2404         /* Hopefully normal case - cache space and write credits available */
2405         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2406             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2407             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2408                 /* account for ourselves */
2409                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2410                 RETURN(0);
2411         }
2412
2413         /* Make sure that there are write rpcs in flight to wait for.  This
2414          * is a little silly as this object may not have any pending but
2415          * other objects sure might. */
2416         if (cli->cl_w_in_flight) {
2417                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2418                 cfs_waitq_init(&ocw.ocw_waitq);
2419                 ocw.ocw_oap = oap;
2420                 ocw.ocw_rc = 0;
2421
2422                 loi_list_maint(cli, loi);
2423                 osc_check_rpcs(cli);
2424                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2425
2426                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2427                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2428
2429                 client_obd_list_lock(&cli->cl_loi_list_lock);
2430                 if (!list_empty(&ocw.ocw_entry)) {
2431                         list_del(&ocw.ocw_entry);
2432                         RETURN(-EINTR);
2433                 }
2434                 RETURN(ocw.ocw_rc);
2435         }
2436
2437         RETURN(-EDQUOT);
2438 }
2439
2440 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2441                         struct lov_oinfo *loi, cfs_page_t *page,
2442                         obd_off offset, struct obd_async_page_ops *ops,
2443                         void *data, void **res)
2444 {
2445         struct osc_async_page *oap;
2446         ENTRY;
2447
2448         if (!page)
2449                 return size_round(sizeof(*oap));
2450
2451         oap = *res;
2452         oap->oap_magic = OAP_MAGIC;
2453         oap->oap_cli = &exp->exp_obd->u.cli;
2454         oap->oap_loi = loi;
2455
2456         oap->oap_caller_ops = ops;
2457         oap->oap_caller_data = data;
2458
2459         oap->oap_page = page;
2460         oap->oap_obj_off = offset;
2461
2462         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2463         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2464         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2465
2466         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2467
2468         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2469         RETURN(0);
2470 }
2471
2472 struct osc_async_page *oap_from_cookie(void *cookie)
2473 {
2474         struct osc_async_page *oap = cookie;
2475         if (oap->oap_magic != OAP_MAGIC)
2476                 return ERR_PTR(-EINVAL);
2477         return oap;
2478 };
2479
2480 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2481                               struct lov_oinfo *loi, void *cookie,
2482                               int cmd, obd_off off, int count,
2483                               obd_flag brw_flags, enum async_flags async_flags)
2484 {
2485         struct client_obd *cli = &exp->exp_obd->u.cli;
2486         struct osc_async_page *oap;
2487         int rc = 0;
2488         ENTRY;
2489
2490         oap = oap_from_cookie(cookie);
2491         if (IS_ERR(oap))
2492                 RETURN(PTR_ERR(oap));
2493
2494         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2495                 RETURN(-EIO);
2496
2497         if (!list_empty(&oap->oap_pending_item) ||
2498             !list_empty(&oap->oap_urgent_item) ||
2499             !list_empty(&oap->oap_rpc_item))
2500                 RETURN(-EBUSY);
2501
2502         /* check if the file's owner/group is over quota */
2503 #ifdef HAVE_QUOTA_SUPPORT
2504         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2505                 struct obd_async_page_ops *ops;
2506                 struct obdo *oa;
2507
2508                 OBDO_ALLOC(oa);
2509                 if (oa == NULL)
2510                         RETURN(-ENOMEM);
2511
2512                 ops = oap->oap_caller_ops;
2513                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2514                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2515                     NO_QUOTA)
2516                         rc = -EDQUOT;
2517
2518                 OBDO_FREE(oa);
2519                 if (rc)
2520                         RETURN(rc);
2521         }
2522 #endif
2523
2524         if (loi == NULL)
2525                 loi = lsm->lsm_oinfo[0];
2526
2527         client_obd_list_lock(&cli->cl_loi_list_lock);
2528
2529         oap->oap_cmd = cmd;
2530         oap->oap_page_off = off;
2531         oap->oap_count = count;
2532         oap->oap_brw_flags = brw_flags;
2533         oap->oap_async_flags = async_flags;
2534
2535         if (cmd & OBD_BRW_WRITE) {
2536                 rc = osc_enter_cache(cli, loi, oap);
2537                 if (rc) {
2538                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2539                         RETURN(rc);
2540                 }
2541         }
2542
2543         osc_oap_to_pending(oap);
2544         loi_list_maint(cli, loi);
2545
2546         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2547                   cmd);
2548
2549         osc_check_rpcs(cli);
2550         client_obd_list_unlock(&cli->cl_loi_list_lock);
2551
2552         RETURN(0);
2553 }
2554
2555 /* aka (~was & now & flag), but this is more clear :) */
2556 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2557
2558 static int osc_set_async_flags(struct obd_export *exp,
2559                                struct lov_stripe_md *lsm,
2560                                struct lov_oinfo *loi, void *cookie,
2561                                obd_flag async_flags)
2562 {
2563         struct client_obd *cli = &exp->exp_obd->u.cli;
2564         struct loi_oap_pages *lop;
2565         struct osc_async_page *oap;
2566         int rc = 0;
2567         ENTRY;
2568
2569         oap = oap_from_cookie(cookie);
2570         if (IS_ERR(oap))
2571                 RETURN(PTR_ERR(oap));
2572
2573         /*
2574          * bug 7311: OST-side locking is only supported for liblustre for now
2575          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2576          * implementation has to handle case where OST-locked page was picked
2577          * up by, e.g., ->writepage().
2578          */
2579         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2580         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2581                                      * tread here. */
2582
2583         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2584                 RETURN(-EIO);
2585
2586         if (loi == NULL)
2587                 loi = lsm->lsm_oinfo[0];
2588
2589         if (oap->oap_cmd & OBD_BRW_WRITE) {
2590                 lop = &loi->loi_write_lop;
2591         } else {
2592                 lop = &loi->loi_read_lop;
2593         }
2594
2595         client_obd_list_lock(&cli->cl_loi_list_lock);
2596
2597         if (list_empty(&oap->oap_pending_item))
2598                 GOTO(out, rc = -EINVAL);
2599
2600         if ((oap->oap_async_flags & async_flags) == async_flags)
2601                 GOTO(out, rc = 0);
2602
2603         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2604                 oap->oap_async_flags |= ASYNC_READY;
2605
2606         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2607                 if (list_empty(&oap->oap_rpc_item)) {
2608                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2609                         loi_list_maint(cli, loi);
2610                 }
2611         }
2612
2613         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2614                         oap->oap_async_flags);
2615 out:
2616         osc_check_rpcs(cli);
2617         client_obd_list_unlock(&cli->cl_loi_list_lock);
2618         RETURN(rc);
2619 }
2620
2621 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2622                              struct lov_oinfo *loi,
2623                              struct obd_io_group *oig, void *cookie,
2624                              int cmd, obd_off off, int count,
2625                              obd_flag brw_flags,
2626                              obd_flag async_flags)
2627 {
2628         struct client_obd *cli = &exp->exp_obd->u.cli;
2629         struct osc_async_page *oap;
2630         struct loi_oap_pages *lop;
2631         int rc = 0;
2632         ENTRY;
2633
2634         oap = oap_from_cookie(cookie);
2635         if (IS_ERR(oap))
2636                 RETURN(PTR_ERR(oap));
2637
2638         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2639                 RETURN(-EIO);
2640
2641         if (!list_empty(&oap->oap_pending_item) ||
2642             !list_empty(&oap->oap_urgent_item) ||
2643             !list_empty(&oap->oap_rpc_item))
2644                 RETURN(-EBUSY);
2645
2646         if (loi == NULL)
2647                 loi = lsm->lsm_oinfo[0];
2648
2649         client_obd_list_lock(&cli->cl_loi_list_lock);
2650
2651         oap->oap_cmd = cmd;
2652         oap->oap_page_off = off;
2653         oap->oap_count = count;
2654         oap->oap_brw_flags = brw_flags;
2655         oap->oap_async_flags = async_flags;
2656
2657         if (cmd & OBD_BRW_WRITE)
2658                 lop = &loi->loi_write_lop;
2659         else
2660                 lop = &loi->loi_read_lop;
2661
2662         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2663         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2664                 oap->oap_oig = oig;
2665                 rc = oig_add_one(oig, &oap->oap_occ);
2666         }
2667
2668         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2669                   oap, oap->oap_page, rc);
2670
2671         client_obd_list_unlock(&cli->cl_loi_list_lock);
2672
2673         RETURN(rc);
2674 }
2675
2676 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2677                                  struct loi_oap_pages *lop, int cmd)
2678 {
2679         struct list_head *pos, *tmp;
2680         struct osc_async_page *oap;
2681
2682         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2683                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2684                 list_del(&oap->oap_pending_item);
2685                 osc_oap_to_pending(oap);
2686         }
2687         loi_list_maint(cli, loi);
2688 }
2689
2690 static int osc_trigger_group_io(struct obd_export *exp,
2691                                 struct lov_stripe_md *lsm,
2692                                 struct lov_oinfo *loi,
2693                                 struct obd_io_group *oig)
2694 {
2695         struct client_obd *cli = &exp->exp_obd->u.cli;
2696         ENTRY;
2697
2698         if (loi == NULL)
2699                 loi = lsm->lsm_oinfo[0];
2700
2701         client_obd_list_lock(&cli->cl_loi_list_lock);
2702
2703         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2704         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2705
2706         osc_check_rpcs(cli);
2707         client_obd_list_unlock(&cli->cl_loi_list_lock);
2708
2709         RETURN(0);
2710 }
2711
2712 static int osc_teardown_async_page(struct obd_export *exp,
2713                                    struct lov_stripe_md *lsm,
2714                                    struct lov_oinfo *loi, void *cookie)
2715 {
2716         struct client_obd *cli = &exp->exp_obd->u.cli;
2717         struct loi_oap_pages *lop;
2718         struct osc_async_page *oap;
2719         int rc = 0;
2720         ENTRY;
2721
2722         oap = oap_from_cookie(cookie);
2723         if (IS_ERR(oap))
2724                 RETURN(PTR_ERR(oap));
2725
2726         if (loi == NULL)
2727                 loi = lsm->lsm_oinfo[0];
2728
2729         if (oap->oap_cmd & OBD_BRW_WRITE) {
2730                 lop = &loi->loi_write_lop;
2731         } else {
2732                 lop = &loi->loi_read_lop;
2733         }
2734
2735         client_obd_list_lock(&cli->cl_loi_list_lock);
2736
2737         if (!list_empty(&oap->oap_rpc_item))
2738                 GOTO(out, rc = -EBUSY);
2739
2740         osc_exit_cache(cli, oap, 0);
2741         osc_wake_cache_waiters(cli);
2742
2743         if (!list_empty(&oap->oap_urgent_item)) {
2744                 list_del_init(&oap->oap_urgent_item);
2745                 oap->oap_async_flags &= ~ASYNC_URGENT;
2746         }
2747         if (!list_empty(&oap->oap_pending_item)) {
2748                 list_del_init(&oap->oap_pending_item);
2749                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2750         }
2751         loi_list_maint(cli, loi);
2752
2753         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2754 out:
2755         client_obd_list_unlock(&cli->cl_loi_list_lock);
2756         RETURN(rc);
2757 }
2758
2759 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2760                                     int flags)
2761 {
2762         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2763
2764         if (lock == NULL) {
2765                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2766                 return;
2767         }
2768         lock_res_and_lock(lock);
2769 #if defined (__KERNEL__) && defined (__LINUX__)
2770         /* Liang XXX: Darwin and Winnt checking should be added */
2771         if (lock->l_ast_data && lock->l_ast_data != data) {
2772                 struct inode *new_inode = data;
2773                 struct inode *old_inode = lock->l_ast_data;
2774                 if (!(old_inode->i_state & I_FREEING))
2775                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2776                 LASSERTF(old_inode->i_state & I_FREEING,
2777                          "Found existing inode %p/%lu/%u state %lu in lock: "
2778                          "setting data to %p/%lu/%u\n", old_inode,
2779                          old_inode->i_ino, old_inode->i_generation,
2780                          old_inode->i_state,
2781                          new_inode, new_inode->i_ino, new_inode->i_generation);
2782         }
2783 #endif
2784         lock->l_ast_data = data;
2785         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2786         unlock_res_and_lock(lock);
2787         LDLM_LOCK_PUT(lock);
2788 }
2789
2790 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2791                              ldlm_iterator_t replace, void *data)
2792 {
2793         struct ldlm_res_id res_id = { .name = {0} };
2794         struct obd_device *obd = class_exp2obd(exp);
2795
2796         res_id.name[0] = lsm->lsm_object_id;
2797         res_id.name[2] = lsm->lsm_object_gr;
2798
2799         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2800         return 0;
2801 }
2802
2803 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2804                             int intent, int rc)
2805 {
2806         ENTRY;
2807
2808         if (intent) {
2809                 /* The request was created before ldlm_cli_enqueue call. */
2810                 if (rc == ELDLM_LOCK_ABORTED) {
2811                         struct ldlm_reply *rep;
2812
2813                         /* swabbed by ldlm_cli_enqueue() */
2814                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2815                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2816                                              sizeof(*rep));
2817                         LASSERT(rep != NULL);
2818                         if (rep->lock_policy_res1)
2819                                 rc = rep->lock_policy_res1;
2820                 }
2821         }
2822
2823         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2824                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2825                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2826                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2827                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2828         }
2829
2830         /* Call the update callback. */
2831         rc = oinfo->oi_cb_up(oinfo, rc);
2832         RETURN(rc);
2833 }
2834
2835 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2836                                  struct osc_enqueue_args *aa, int rc)
2837 {
2838         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2839         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2840         struct ldlm_lock *lock;
2841
2842         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2843          * be valid. */
2844         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2845
2846         /* Complete obtaining the lock procedure. */
2847         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2848                                    aa->oa_ei->ei_mode,
2849                                    &aa->oa_oi->oi_flags,
2850                                    &lsm->lsm_oinfo[0]->loi_lvb,
2851                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2852                                    lustre_swab_ost_lvb,
2853                                    aa->oa_oi->oi_lockh, rc);
2854
2855         /* Complete osc stuff. */
2856         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2857
2858         /* Release the lock for async request. */
2859         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2860                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2861
2862         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2863                  aa->oa_oi->oi_lockh, req, aa);
2864         LDLM_LOCK_PUT(lock);
2865         return rc;
2866 }
2867
2868 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2869  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2870  * other synchronous requests, however keeping some locks and trying to obtain
2871  * others may take a considerable amount of time in a case of ost failure; and
2872  * when other sync requests do not get released lock from a client, the client
2873  * is excluded from the cluster -- such scenarious make the life difficult, so
2874  * release locks just after they are obtained. */
2875 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2876                        struct ldlm_enqueue_info *einfo,
2877                        struct ptlrpc_request_set *rqset)
2878 {
2879         struct ldlm_res_id res_id = { .name = {0} };
2880         struct obd_device *obd = exp->exp_obd;
2881         struct ldlm_reply *rep;
2882         struct ptlrpc_request *req = NULL;
2883         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2884         ldlm_mode_t mode;
2885         int rc;
2886         ENTRY;
2887
2888         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2889         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2890
2891         /* Filesystem lock extents are extended to page boundaries so that
2892          * dealing with the page cache is a little smoother.  */
2893         oinfo->oi_policy.l_extent.start -=
2894                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2895         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2896
2897         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2898                 goto no_match;
2899
2900         /* Next, search for already existing extent locks that will cover us */
2901         /* If we're trying to read, we also search for an existing PW lock.  The
2902          * VFS and page cache already protect us locally, so lots of readers/
2903          * writers can share a single PW lock.
2904          *
2905          * There are problems with conversion deadlocks, so instead of
2906          * converting a read lock to a write lock, we'll just enqueue a new
2907          * one.
2908          *
2909          * At some point we should cancel the read lock instead of making them
2910          * send us a blocking callback, but there are problems with canceling
2911          * locks out from other users right now, too. */
2912         mode = einfo->ei_mode;
2913         if (einfo->ei_mode == LCK_PR)
2914                 mode |= LCK_PW;
2915         mode = ldlm_lock_match(obd->obd_namespace,
2916                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2917                                einfo->ei_type, &oinfo->oi_policy, mode,
2918                                oinfo->oi_lockh);
2919         if (mode) {
2920                 /* addref the lock only if not async requests and PW lock is
2921                  * matched whereas we asked for PR. */
2922                 if (!rqset && einfo->ei_mode != mode)
2923                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2924                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2925                                         oinfo->oi_flags);
2926                 if (intent) {
2927                         /* I would like to be able to ASSERT here that rss <=
2928                          * kms, but I can't, for reasons which are explained in
2929                          * lov_enqueue() */
2930                 }
2931
2932                 /* We already have a lock, and it's referenced */
2933                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2934
2935                 /* For async requests, decref the lock. */
2936                 if (einfo->ei_mode != mode)
2937                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2938                 else if (rqset)
2939                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2940
2941                 RETURN(ELDLM_OK);
2942         }
2943
2944  no_match:
2945         if (intent) {
2946                 int size[3] = {
2947                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2948                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2949                         [DLM_LOCKREQ_OFF + 1] = 0 };
2950
2951                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2952                 if (req == NULL)
2953                         RETURN(-ENOMEM);
2954
2955                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2956                 size[DLM_REPLY_REC_OFF] =
2957                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2958                 ptlrpc_req_set_repsize(req, 3, size);
2959         }
2960
2961         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2962         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2963
2964         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2965                               &oinfo->oi_policy, &oinfo->oi_flags,
2966                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2967                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2968                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2969                               rqset ? 1 : 0);
2970         if (rqset) {
2971                 if (!rc) {
2972                         struct osc_enqueue_args *aa;
2973                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2974                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2975                         aa->oa_oi = oinfo;
2976                         aa->oa_ei = einfo;
2977                         aa->oa_exp = exp;
2978
2979                         req->rq_interpret_reply = osc_enqueue_interpret;
2980                         ptlrpc_set_add_req(rqset, req);
2981                 } else if (intent) {
2982                         ptlrpc_req_finished(req);
2983                 }
2984                 RETURN(rc);
2985         }
2986
2987         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2988         if (intent)
2989                 ptlrpc_req_finished(req);
2990
2991         RETURN(rc);
2992 }
2993
2994 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2995                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2996                      int *flags, void *data, struct lustre_handle *lockh)
2997 {
2998         struct ldlm_res_id res_id = { .name = {0} };
2999         struct obd_device *obd = exp->exp_obd;
3000         int lflags = *flags;
3001         ldlm_mode_t rc;
3002         ENTRY;
3003
3004         res_id.name[0] = lsm->lsm_object_id;
3005         res_id.name[2] = lsm->lsm_object_gr;
3006
3007         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3008
3009         /* Filesystem lock extents are extended to page boundaries so that
3010          * dealing with the page cache is a little smoother */
3011         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3012         policy->l_extent.end |= ~CFS_PAGE_MASK;
3013
3014         /* Next, search for already existing extent locks that will cover us */
3015         /* If we're trying to read, we also search for an existing PW lock.  The
3016          * VFS and page cache already protect us locally, so lots of readers/
3017          * writers can share a single PW lock. */
3018         rc = mode;
3019         if (mode == LCK_PR)
3020                 rc |= LCK_PW;
3021         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3022                              &res_id, type, policy, rc, lockh);
3023         if (rc) {
3024                 osc_set_data_with_check(lockh, data, lflags);
3025                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3026                         ldlm_lock_addref(lockh, LCK_PR);
3027                         ldlm_lock_decref(lockh, LCK_PW);
3028                 }
3029                 RETURN(rc);
3030         }
3031         RETURN(rc);
3032 }
3033
3034 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3035                       __u32 mode, struct lustre_handle *lockh)
3036 {
3037         ENTRY;
3038
3039         if (unlikely(mode == LCK_GROUP))
3040                 ldlm_lock_decref_and_cancel(lockh, mode);
3041         else
3042                 ldlm_lock_decref(lockh, mode);
3043
3044         RETURN(0);
3045 }
3046
3047 static int osc_cancel_unused(struct obd_export *exp,
3048                              struct lov_stripe_md *lsm, int flags,
3049                              void *opaque)
3050 {
3051         struct obd_device *obd = class_exp2obd(exp);
3052         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3053
3054         if (lsm != NULL) {
3055                 res_id.name[0] = lsm->lsm_object_id;
3056                 res_id.name[2] = lsm->lsm_object_gr;
3057                 resp = &res_id;
3058         }
3059
3060         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3061 }
3062
3063 static int osc_join_lru(struct obd_export *exp,
3064                         struct lov_stripe_md *lsm, int join)
3065 {
3066         struct obd_device *obd = class_exp2obd(exp);
3067         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3068
3069         if (lsm != NULL) {
3070                 res_id.name[0] = lsm->lsm_object_id;
3071                 res_id.name[2] = lsm->lsm_object_gr;
3072                 resp = &res_id;
3073         }
3074
3075         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3076 }
3077
3078 static int osc_statfs_interpret(struct ptlrpc_request *req,
3079                                 struct osc_async_args *aa, int rc)
3080 {
3081         struct obd_statfs *msfs;
3082         ENTRY;
3083
3084         if (rc != 0)
3085                 GOTO(out, rc);
3086
3087         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3088                                   lustre_swab_obd_statfs);
3089         if (msfs == NULL) {
3090                 CERROR("Can't unpack obd_statfs\n");
3091                 GOTO(out, rc = -EPROTO);
3092         }
3093
3094         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3095 out:
3096         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3097         RETURN(rc);
3098 }
3099
3100 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3101                             __u64 max_age, struct ptlrpc_request_set *rqset)
3102 {
3103         struct ptlrpc_request *req;
3104         struct osc_async_args *aa;
3105         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3106         ENTRY;
3107
3108         /* We could possibly pass max_age in the request (as an absolute
3109          * timestamp or a "seconds.usec ago") so the target can avoid doing
3110          * extra calls into the filesystem if that isn't necessary (e.g.
3111          * during mount that would help a bit).  Having relative timestamps
3112          * is not so great if request processing is slow, while absolute
3113          * timestamps are not ideal because they need time synchronization. */
3114         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3115                               OST_STATFS, 1, NULL, NULL);
3116         if (!req)
3117                 RETURN(-ENOMEM);
3118
3119         ptlrpc_req_set_repsize(req, 2, size);
3120         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3121
3122         req->rq_interpret_reply = osc_statfs_interpret;
3123         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3124         aa = (struct osc_async_args *)&req->rq_async_args;
3125         aa->aa_oi = oinfo;
3126
3127         ptlrpc_set_add_req(rqset, req);
3128         RETURN(0);
3129 }
3130
3131 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3132                       __u64 max_age)
3133 {
3134         struct obd_statfs *msfs;
3135         struct ptlrpc_request *req;
3136         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3137         ENTRY;
3138
3139         /* We could possibly pass max_age in the request (as an absolute
3140          * timestamp or a "seconds.usec ago") so the target can avoid doing
3141          * extra calls into the filesystem if that isn't necessary (e.g.
3142          * during mount that would help a bit).  Having relative timestamps
3143          * is not so great if request processing is slow, while absolute
3144          * timestamps are not ideal because they need time synchronization. */
3145         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3146                               OST_STATFS, 1, NULL, NULL);
3147         if (!req)
3148                 RETURN(-ENOMEM);
3149
3150         ptlrpc_req_set_repsize(req, 2, size);
3151         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3152
3153         rc = ptlrpc_queue_wait(req);
3154         if (rc)
3155                 GOTO(out, rc);
3156
3157         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3158                                   lustre_swab_obd_statfs);
3159         if (msfs == NULL) {
3160                 CERROR("Can't unpack obd_statfs\n");
3161                 GOTO(out, rc = -EPROTO);
3162         }
3163
3164         memcpy(osfs, msfs, sizeof(*osfs));
3165
3166         EXIT;
3167  out:
3168         ptlrpc_req_finished(req);
3169         return rc;
3170 }
3171
3172 /* Retrieve object striping information.
3173  *
3174  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3175  * the maximum number of OST indices which will fit in the user buffer.
3176  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3177  */
3178 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3179 {
3180         struct lov_user_md lum, *lumk;
3181         int rc = 0, lum_size;
3182         ENTRY;
3183
3184         if (!lsm)