Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
178         LASSERT(body);
179
180         body->oa = *oinfo->oi_oa;
181         osc_pack_capa(req, body, oinfo->oi_capa);
182 }
183
184 static inline void osc_set_capa_size(struct ptlrpc_request *req,
185                                      const struct req_msg_field *field,
186                                      struct obd_capa *oc)
187 {
188         if (oc == NULL)
189                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
190         else
191                 /* it is already calculated as sizeof struct obd_capa */
192                 ;
193 }
194
195 static int osc_getattr_interpret(struct ptlrpc_request *req,
196                                  struct osc_async_args *aa, int rc)
197 {
198         struct ost_body *body;
199         ENTRY;
200
201         if (rc != 0)
202                 GOTO(out, rc);
203
204         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
205                                   lustre_swab_ost_body);
206         if (body) {
207                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
208                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
209
210                 /* This should really be sent by the OST */
211                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
212                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
213         } else {
214                 CDEBUG(D_INFO, "can't unpack ost_body\n");
215                 rc = -EPROTO;
216                 aa->aa_oi->oi_oa->o_valid = 0;
217         }
218 out:
219         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
220         RETURN(rc);
221 }
222
223 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
224                              struct ptlrpc_request_set *set)
225 {
226         struct ptlrpc_request *req;
227         struct osc_async_args *aa;
228         int                    rc;
229         ENTRY;
230
231         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
232         if (req == NULL)
233                 RETURN(-ENOMEM);
234
235         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
236         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
237         if (rc) {
238                 ptlrpc_request_free(req);
239                 RETURN(rc);
240         }
241
242         osc_pack_req_body(req, oinfo);
243
244         ptlrpc_request_set_replen(req);
245         req->rq_interpret_reply = osc_getattr_interpret;
246
247         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
248         aa = (struct osc_async_args *)&req->rq_async_args;
249         aa->aa_oi = oinfo;
250
251         ptlrpc_set_add_req(set, req);
252         RETURN(0);
253 }
254
255 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
256 {
257         struct ptlrpc_request *req;
258         struct ost_body       *body;
259         int                    rc;
260         ENTRY;
261
262         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
263         if (req == NULL)
264                 RETURN(-ENOMEM);
265
266         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
268         if (rc) {
269                 ptlrpc_request_free(req);
270                 RETURN(rc);
271         }
272
273         osc_pack_req_body(req, oinfo);
274
275         ptlrpc_request_set_replen(req);
276  
277         rc = ptlrpc_queue_wait(req);
278         if (rc)
279                 GOTO(out, rc);
280
281         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
282         if (body == NULL)
283                 GOTO(out, rc = -EPROTO);
284
285         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
286         *oinfo->oi_oa = body->oa;
287
288         /* This should really be sent by the OST */
289         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
290         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
291
292         EXIT;
293  out:
294         ptlrpc_req_finished(req);
295         return rc;
296 }
297
298 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
299                        struct obd_trans_info *oti)
300 {
301         struct ptlrpc_request *req;
302         struct ost_body       *body;
303         int                    rc;
304         ENTRY;
305
306         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
307                                         oinfo->oi_oa->o_gr > 0);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
310         if (req == NULL)
311                 RETURN(-ENOMEM);
312
313         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
314         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
315         if (rc) {
316                 ptlrpc_request_free(req);
317                 RETURN(rc);
318         }
319
320         osc_pack_req_body(req, oinfo);
321
322         ptlrpc_request_set_replen(req);
323  
324
325         rc = ptlrpc_queue_wait(req);
326         if (rc)
327                 GOTO(out, rc);
328
329         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
330         if (body == NULL)
331                 GOTO(out, rc = -EPROTO);
332
333         *oinfo->oi_oa = body->oa;
334
335         EXIT;
336 out:
337         ptlrpc_req_finished(req);
338         RETURN(rc);
339 }
340
341 static int osc_setattr_interpret(struct ptlrpc_request *req,
342                                  struct osc_async_args *aa, int rc)
343 {
344         struct ost_body *body;
345         ENTRY;
346
347         if (rc != 0)
348                 GOTO(out, rc);
349
350         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
351         if (body == NULL)
352                 GOTO(out, rc = -EPROTO);
353
354         *aa->aa_oi->oi_oa = body->oa;
355 out:
356         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
357         RETURN(rc);
358 }
359
360 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
361                              struct obd_trans_info *oti,
362                              struct ptlrpc_request_set *rqset)
363 {
364         struct ptlrpc_request *req;
365         struct osc_async_args *aa;
366         int                    rc;
367         ENTRY;
368
369         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
370         if (req == NULL)
371                 RETURN(-ENOMEM);
372
373         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
374         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 RETURN(rc);
378         }
379
380         osc_pack_req_body(req, oinfo);
381
382         ptlrpc_request_set_replen(req);
383  
384         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
385                 LASSERT(oti);
386                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
387         }
388
389         /* do mds to ost setattr asynchronouly */
390         if (!rqset) {
391                 /* Do not wait for response. */
392                 ptlrpcd_add_req(req);
393         } else {
394                 req->rq_interpret_reply = osc_setattr_interpret;
395
396                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
397                 aa = (struct osc_async_args *)&req->rq_async_args;
398                 aa->aa_oi = oinfo;
399
400                 ptlrpc_set_add_req(rqset, req);
401         }
402
403         RETURN(0);
404 }
405
406 int osc_real_create(struct obd_export *exp, struct obdo *oa,
407                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
408 {
409         struct ptlrpc_request *req;
410         struct ost_body       *body;
411         struct lov_stripe_md  *lsm;
412         int                    rc;
413         ENTRY;
414
415         LASSERT(oa);
416         LASSERT(ea);
417
418         lsm = *ea;
419         if (!lsm) {
420                 rc = obd_alloc_memmd(exp, &lsm);
421                 if (rc < 0)
422                         RETURN(rc);
423         }
424
425         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
426         if (req == NULL)
427                 GOTO(out, rc = -ENOMEM);
428
429         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
430         if (rc) {
431                 ptlrpc_request_free(req);
432                 GOTO(out, rc);
433         }
434
435         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
436         LASSERT(body);
437         body->oa = *oa;
438
439         ptlrpc_request_set_replen(req);
440
441         if (oa->o_valid & OBD_MD_FLINLINE) {
442                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
443                         oa->o_flags == OBD_FL_DELORPHAN);
444                 DEBUG_REQ(D_HA, req,
445                           "delorphan from OST integration");
446                 /* Don't resend the delorphan req */
447                 req->rq_no_resend = req->rq_no_delay = 1;
448         }
449
450         rc = ptlrpc_queue_wait(req);
451         if (rc)
452                 GOTO(out_req, rc);
453
454         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
455         if (body == NULL)
456                 GOTO(out_req, rc = -EPROTO);
457
458         *oa = body->oa;
459
460         /* This should really be sent by the OST */
461         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
462         oa->o_valid |= OBD_MD_FLBLKSZ;
463
464         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
465          * have valid lsm_oinfo data structs, so don't go touching that.
466          * This needs to be fixed in a big way.
467          */
468         lsm->lsm_object_id = oa->o_id;
469         lsm->lsm_object_gr = oa->o_gr;
470         *ea = lsm;
471
472         if (oti != NULL) {
473                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
474
475                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
476                         if (!oti->oti_logcookies)
477                                 oti_alloc_cookies(oti, 1);
478                         *oti->oti_logcookies = *obdo_logcookie(oa);
479                 }
480         }
481
482         CDEBUG(D_HA, "transno: "LPD64"\n",
483                lustre_msg_get_transno(req->rq_repmsg));
484 out_req:
485         ptlrpc_req_finished(req);
486 out:
487         if (rc && !*ea)
488                 obd_free_memmd(exp, &lsm);
489         RETURN(rc);
490 }
491
492 static int osc_punch_interpret(struct ptlrpc_request *req,
493                                struct osc_async_args *aa, int rc)
494 {
495         struct ost_body *body;
496         ENTRY;
497
498         if (rc != 0)
499                 GOTO(out, rc);
500
501         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
502         if (body == NULL)
503                 GOTO(out, rc = -EPROTO);
504
505         *aa->aa_oi->oi_oa = body->oa;
506 out:
507         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
508         RETURN(rc);
509 }
510
511 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
512                      struct obd_trans_info *oti,
513                      struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request *req;
516         struct osc_async_args *aa;
517         struct ost_body       *body;
518         int                    rc;
519         ENTRY;
520
521         if (!oinfo->oi_oa) {
522                 CDEBUG(D_INFO, "oa NULL\n");
523                 RETURN(-EINVAL);
524         }
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
527         if (req == NULL)
528                 RETURN(-ENOMEM);
529
530         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
531         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(rc);
535         }
536         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
537         osc_pack_req_body(req, oinfo);
538
539         /* overload the size and blocks fields in the oa with start/end */
540         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
541         LASSERT(body);
542         body->oa.o_size = oinfo->oi_policy.l_extent.start;
543         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
544         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
545         ptlrpc_request_set_replen(req);
546
547
548         req->rq_interpret_reply = osc_punch_interpret;
549         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
550         aa = (struct osc_async_args *)&req->rq_async_args;
551         aa->aa_oi = oinfo;
552         ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_sync(struct obd_export *exp, struct obdo *oa,
558                     struct lov_stripe_md *md, obd_size start, obd_size end,
559                     void *capa)
560 {
561         struct ptlrpc_request *req;
562         struct ost_body       *body;
563         int                    rc;
564         ENTRY;
565
566         if (!oa) {
567                 CDEBUG(D_INFO, "oa NULL\n");
568                 RETURN(-EINVAL);
569         }
570
571         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
572         if (req == NULL)
573                 RETURN(-ENOMEM);
574
575         osc_set_capa_size(req, &RMF_CAPA1, capa);
576         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
577         if (rc) {
578                 ptlrpc_request_free(req);
579                 RETURN(rc);
580         }
581
582         /* overload the size and blocks fields in the oa with start/end */
583         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
584         LASSERT(body);
585         body->oa = *oa;
586         body->oa.o_size = start;
587         body->oa.o_blocks = end;
588         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
589         osc_pack_capa(req, body, capa);
590
591         ptlrpc_request_set_replen(req);
592
593         rc = ptlrpc_queue_wait(req);
594         if (rc)
595                 GOTO(out, rc);
596
597         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
598         if (body == NULL)
599                 GOTO(out, rc = -EPROTO);
600
601         *oa = body->oa;
602
603         EXIT;
604  out:
605         ptlrpc_req_finished(req);
606         return rc;
607 }
608
609 /* Find and cancel locally locks matched by @mode in the resource found by
610  * @objid. Found locks are added into @cancel list. Returns the amount of
611  * locks added to @cancels list. */
612 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
613                                    struct list_head *cancels, ldlm_mode_t mode,
614                                    int lock_flags)
615 {
616         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
617         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
618         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
619         int count;
620         ENTRY;
621
622         if (res == NULL)
623                 RETURN(0);
624
625         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
626                                            lock_flags, 0, NULL);
627         ldlm_resource_putref(res);
628         RETURN(count);
629 }
630
631 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
632                                  int rc)
633 {
634         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
635
636         atomic_dec(&cli->cl_destroy_in_flight);
637         cfs_waitq_signal(&cli->cl_destroy_waitq);
638         return 0;
639 }
640
641 static int osc_can_send_destroy(struct client_obd *cli)
642 {
643         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
644             cli->cl_max_rpcs_in_flight) {
645                 /* The destroy request can be sent */
646                 return 1;
647         }
648         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
649             cli->cl_max_rpcs_in_flight) {
650                 /*
651                  * The counter has been modified between the two atomic
652                  * operations.
653                  */
654                 cfs_waitq_signal(&cli->cl_destroy_waitq);
655         }
656         return 0;
657 }
658
659 /* Destroy requests can be async always on the client, and we don't even really
660  * care about the return code since the client cannot do anything at all about
661  * a destroy failure.
662  * When the MDS is unlinking a filename, it saves the file objects into a
663  * recovery llog, and these object records are cancelled when the OST reports
664  * they were destroyed and sync'd to disk (i.e. transaction committed).
665  * If the client dies, or the OST is down when the object should be destroyed,
666  * the records are not cancelled, and when the OST reconnects to the MDS next,
667  * it will retrieve the llog unlink logs and then sends the log cancellation
668  * cookies to the MDS after committing destroy transactions. */
669 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
670                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
671                        struct obd_export *md_export)
672 {
673         struct client_obd     *cli = &exp->exp_obd->u.cli;
674         struct ptlrpc_request *req;
675         struct ost_body       *body;
676         CFS_LIST_HEAD(cancels);
677         int rc, count;
678         ENTRY;
679
680         if (!oa) {
681                 CDEBUG(D_INFO, "oa NULL\n");
682                 RETURN(-EINVAL);
683         }
684
685         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
686                                         LDLM_FL_DISCARD_DATA);
687
688         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
689         if (req == NULL) {
690                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
691                 RETURN(-ENOMEM);
692         }
693
694         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
695                                0, &cancels, count);
696         if (rc) {
697                 ptlrpc_request_free(req);
698                 RETURN(rc);
699         }
700
701         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
702         req->rq_interpret_reply = osc_destroy_interpret;
703
704         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
705                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
706                        sizeof(*oti->oti_logcookies));
707         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
708         LASSERT(body);
709         body->oa = *oa;
710
711         ptlrpc_request_set_replen(req);
712
713         if (!osc_can_send_destroy(cli)) {
714                 struct l_wait_info lwi = { 0 };
715
716                 /*
717                  * Wait until the number of on-going destroy RPCs drops
718                  * under max_rpc_in_flight
719                  */
720                 l_wait_event_exclusive(cli->cl_destroy_waitq,
721                                        osc_can_send_destroy(cli), &lwi);
722         }
723
724         /* Do not wait for response */
725         ptlrpcd_add_req(req);
726         RETURN(0);
727 }
728
729 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
730                                 long writing_bytes)
731 {
732         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
733
734         LASSERT(!(oa->o_valid & bits));
735
736         oa->o_valid |= bits;
737         client_obd_list_lock(&cli->cl_loi_list_lock);
738         oa->o_dirty = cli->cl_dirty;
739         if (cli->cl_dirty > cli->cl_dirty_max) {
740                 CERROR("dirty %lu > dirty_max %lu\n",
741                        cli->cl_dirty, cli->cl_dirty_max);
742                 oa->o_undirty = 0;
743         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
744                 CERROR("dirty %d > system dirty_max %d\n",
745                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
746                 oa->o_undirty = 0;
747         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
748                 CERROR("dirty %lu - dirty_max %lu too big???\n",
749                        cli->cl_dirty, cli->cl_dirty_max);
750                 oa->o_undirty = 0;
751         } else {
752                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
753                                 (cli->cl_max_rpcs_in_flight + 1);
754                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
755         }
756         oa->o_grant = cli->cl_avail_grant;
757         oa->o_dropped = cli->cl_lost_grant;
758         cli->cl_lost_grant = 0;
759         client_obd_list_unlock(&cli->cl_loi_list_lock);
760         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
761                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
762 }
763
764 /* caller must hold loi_list_lock */
765 static void osc_consume_write_grant(struct client_obd *cli,
766                                     struct brw_page *pga)
767 {
768         atomic_inc(&obd_dirty_pages);
769         cli->cl_dirty += CFS_PAGE_SIZE;
770         cli->cl_avail_grant -= CFS_PAGE_SIZE;
771         pga->flag |= OBD_BRW_FROM_GRANT;
772         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
773                CFS_PAGE_SIZE, pga, pga->pg);
774         LASSERT(cli->cl_avail_grant >= 0);
775 }
776
777 /* the companion to osc_consume_write_grant, called when a brw has completed.
778  * must be called with the loi lock held. */
779 static void osc_release_write_grant(struct client_obd *cli,
780                                     struct brw_page *pga, int sent)
781 {
782         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
783         ENTRY;
784
785         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
786                 EXIT;
787                 return;
788         }
789
790         pga->flag &= ~OBD_BRW_FROM_GRANT;
791         atomic_dec(&obd_dirty_pages);
792         cli->cl_dirty -= CFS_PAGE_SIZE;
793         if (!sent) {
794                 cli->cl_lost_grant += CFS_PAGE_SIZE;
795                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
796                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
797         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
798                 /* For short writes we shouldn't count parts of pages that
799                  * span a whole block on the OST side, or our accounting goes
800                  * wrong.  Should match the code in filter_grant_check. */
801                 int offset = pga->off & ~CFS_PAGE_MASK;
802                 int count = pga->count + (offset & (blocksize - 1));
803                 int end = (offset + pga->count) & (blocksize - 1);
804                 if (end)
805                         count += blocksize - end;
806
807                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
808                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
809                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
810                        cli->cl_avail_grant, cli->cl_dirty);
811         }
812
813         EXIT;
814 }
815
816 static unsigned long rpcs_in_flight(struct client_obd *cli)
817 {
818         return cli->cl_r_in_flight + cli->cl_w_in_flight;
819 }
820
821 /* caller must hold loi_list_lock */
822 void osc_wake_cache_waiters(struct client_obd *cli)
823 {
824         struct list_head *l, *tmp;
825         struct osc_cache_waiter *ocw;
826
827         ENTRY;
828         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
829                 /* if we can't dirty more, we must wait until some is written */
830                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
831                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
832                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
833                                "osc max %ld, sys max %d\n", cli->cl_dirty,
834                                cli->cl_dirty_max, obd_max_dirty_pages);
835                         return;
836                 }
837
838                 /* if still dirty cache but no grant wait for pending RPCs that
839                  * may yet return us some grant before doing sync writes */
840                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
841                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
842                                cli->cl_w_in_flight);
843                         return;
844                 }
845
846                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
847                 list_del_init(&ocw->ocw_entry);
848                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
849                         /* no more RPCs in flight to return grant, do sync IO */
850                         ocw->ocw_rc = -EDQUOT;
851                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
852                 } else {
853                         osc_consume_write_grant(cli,
854                                                 &ocw->ocw_oap->oap_brw_page);
855                 }
856
857                 cfs_waitq_signal(&ocw->ocw_waitq);
858         }
859
860         EXIT;
861 }
862
863 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
864 {
865         client_obd_list_lock(&cli->cl_loi_list_lock);
866         cli->cl_avail_grant = ocd->ocd_grant;
867         client_obd_list_unlock(&cli->cl_loi_list_lock);
868
869         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
870                cli->cl_avail_grant, cli->cl_lost_grant);
871         LASSERT(cli->cl_avail_grant >= 0);
872 }
873
874 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
878         if (body->oa.o_valid & OBD_MD_FLGRANT)
879                 cli->cl_avail_grant += body->oa.o_grant;
880         /* waiters are woken in brw_interpret_oap */
881         client_obd_list_unlock(&cli->cl_loi_list_lock);
882 }
883
884 /* We assume that the reason this OSC got a short read is because it read
885  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
886  * via the LOV, and it _knows_ it's reading inside the file, it's just that
887  * this stripe never got written at or beyond this stripe offset yet. */
888 static void handle_short_read(int nob_read, obd_count page_count,
889                               struct brw_page **pga)
890 {
891         char *ptr;
892         int i = 0;
893
894         /* skip bytes read OK */
895         while (nob_read > 0) {
896                 LASSERT (page_count > 0);
897
898                 if (pga[i]->count > nob_read) {
899                         /* EOF inside this page */
900                         ptr = cfs_kmap(pga[i]->pg) +
901                                 (pga[i]->off & ~CFS_PAGE_MASK);
902                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
903                         cfs_kunmap(pga[i]->pg);
904                         page_count--;
905                         i++;
906                         break;
907                 }
908
909                 nob_read -= pga[i]->count;
910                 page_count--;
911                 i++;
912         }
913
914         /* zero remaining pages */
915         while (page_count-- > 0) {
916                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
917                 memset(ptr, 0, pga[i]->count);
918                 cfs_kunmap(pga[i]->pg);
919                 i++;
920         }
921 }
922
923 static int check_write_rcs(struct ptlrpc_request *req,
924                            int requested_nob, int niocount,
925                            obd_count page_count, struct brw_page **pga)
926 {
927         int    *remote_rcs, i;
928
929         /* return error if any niobuf was in error */
930         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
931                                         sizeof(*remote_rcs) * niocount, NULL);
932         if (remote_rcs == NULL) {
933                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
934                 return(-EPROTO);
935         }
936         if (lustre_msg_swabbed(req->rq_repmsg))
937                 for (i = 0; i < niocount; i++)
938                         __swab32s(&remote_rcs[i]);
939
940         for (i = 0; i < niocount; i++) {
941                 if (remote_rcs[i] < 0)
942                         return(remote_rcs[i]);
943
944                 if (remote_rcs[i] != 0) {
945                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
946                                 i, remote_rcs[i], req);
947                         return(-EPROTO);
948                 }
949         }
950
951         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
952                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
953                        requested_nob, req->rq_bulk->bd_nob_transferred);
954                 return(-EPROTO);
955         }
956
957         return (0);
958 }
959
960 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
961 {
962         if (p1->flag != p2->flag) {
963                 unsigned mask = ~OBD_BRW_FROM_GRANT;
964
965                 /* warn if we try to combine flags that we don't know to be
966                  * safe to combine */
967                 if ((p1->flag & mask) != (p2->flag & mask))
968                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
969                                "same brw?\n", p1->flag, p2->flag);
970                 return 0;
971         }
972
973         return (p1->off + p1->count == p2->off);
974 }
975
976 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
977                                    struct brw_page **pga, int opc,
978                                    cksum_type_t cksum_type)
979 {
980         __u32 cksum;
981         int i = 0;
982
983         LASSERT (pg_count > 0);
984         cksum = init_checksum(cksum_type);
985         while (nob > 0 && pg_count > 0) {
986                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
987                 int off = pga[i]->off & ~CFS_PAGE_MASK;
988                 int count = pga[i]->count > nob ? nob : pga[i]->count;
989
990                 /* corrupt the data before we compute the checksum, to
991                  * simulate an OST->client data error */
992                 if (i == 0 && opc == OST_READ &&
993                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
994                         memcpy(ptr + off, "bad1", min(4, nob));
995                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
996                 cfs_kunmap(pga[i]->pg);
997                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
998                                off, cksum);
999
1000                 nob -= pga[i]->count;
1001                 pg_count--;
1002                 i++;
1003         }
1004         /* For sending we only compute the wrong checksum instead
1005          * of corrupting the data so it is still correct on a redo */
1006         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1007                 cksum++;
1008
1009         return cksum;
1010 }
1011
1012 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1013                                 struct lov_stripe_md *lsm, obd_count page_count,
1014                                 struct brw_page **pga, 
1015                                 struct ptlrpc_request **reqp,
1016                                 struct obd_capa *ocapa)
1017 {
1018         struct ptlrpc_request   *req;
1019         struct ptlrpc_bulk_desc *desc;
1020         struct ost_body         *body;
1021         struct obd_ioobj        *ioobj;
1022         struct niobuf_remote    *niobuf;
1023         int niocount, i, requested_nob, opc, rc;
1024         struct osc_brw_async_args *aa;
1025         struct req_capsule      *pill;
1026
1027         ENTRY;
1028         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1029                 RETURN(-ENOMEM); /* Recoverable */
1030         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1031                 RETURN(-EINVAL); /* Fatal */
1032
1033         if ((cmd & OBD_BRW_WRITE) != 0) {
1034                 opc = OST_WRITE;
1035                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1036                                                 cli->cl_import->imp_rq_pool,
1037                                                 &RQF_OST_BRW);
1038         } else {
1039                 opc = OST_READ;
1040                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1041         }
1042
1043         if (req == NULL)
1044                 RETURN(-ENOMEM);
1045
1046         for (niocount = i = 1; i < page_count; i++) {
1047                 if (!can_merge_pages(pga[i - 1], pga[i]))
1048                         niocount++;
1049         }
1050
1051         pill = &req->rq_pill;
1052         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1053                              niocount * sizeof(*niobuf));
1054         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1055
1056         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1057         if (rc) {
1058                 ptlrpc_request_free(req);
1059                 RETURN(rc);
1060         }
1061         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1062
1063         if (opc == OST_WRITE)
1064                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1065                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1066         else
1067                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1068                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1069
1070         if (desc == NULL)
1071                 GOTO(out, rc = -ENOMEM);
1072         /* NB request now owns desc and will free it when it gets freed */
1073
1074         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1075         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1076         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1077         LASSERT(body && ioobj && niobuf);
1078
1079         body->oa = *oa;
1080
1081         obdo_to_ioobj(oa, ioobj);
1082         ioobj->ioo_bufcnt = niocount;
1083         osc_pack_capa(req, body, ocapa);
1084         LASSERT (page_count > 0);
1085         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1086                 struct brw_page *pg = pga[i];
1087                 struct brw_page *pg_prev = pga[i - 1];
1088
1089                 LASSERT(pg->count > 0);
1090                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1091                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1092                          pg->off, pg->count);
1093 #ifdef __linux__
1094                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1095                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1096                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1097                          i, page_count,
1098                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1099                          pg_prev->pg, page_private(pg_prev->pg),
1100                          pg_prev->pg->index, pg_prev->off);
1101 #else
1102                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1103                          "i %d p_c %u\n", i, page_count);
1104 #endif
1105                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1106                         (pg->flag & OBD_BRW_SRVLOCK));
1107
1108                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1109                                       pg->count);
1110                 requested_nob += pg->count;
1111
1112                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1113                         niobuf--;
1114                         niobuf->len += pg->count;
1115                 } else {
1116                         niobuf->offset = pg->off;
1117                         niobuf->len    = pg->count;
1118                         niobuf->flags  = pg->flag;
1119                 }
1120         }
1121
1122         LASSERT((void *)(niobuf - niocount) ==
1123                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1124                                niocount * sizeof(*niobuf)));
1125         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1126
1127         /* size[REQ_REC_OFF] still sizeof (*body) */
1128         if (opc == OST_WRITE) {
1129                 if (unlikely(cli->cl_checksum) &&
1130                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1131                         /* store cl_cksum_type in a local variable since
1132                          * it can be changed via lprocfs */
1133                         cksum_type_t cksum_type = cli->cl_cksum_type;
1134
1135                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1136                                 oa->o_flags = body->oa.o_flags = 0;
1137                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1138                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1139                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1140                                                              page_count, pga,
1141                                                              OST_WRITE,
1142                                                              cksum_type);
1143                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1144                                body->oa.o_cksum);
1145                         /* save this in 'oa', too, for later checking */
1146                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1147                         oa->o_flags |= cksum_type_pack(cksum_type);
1148                 } else {
1149                         /* clear out the checksum flag, in case this is a
1150                          * resend but cl_checksum is no longer set. b=11238 */
1151                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1152                 }
1153                 oa->o_cksum = body->oa.o_cksum;
1154                 /* 1 RC per niobuf */
1155                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1156                                      sizeof(__u32) * niocount);
1157         } else {
1158                 if (unlikely(cli->cl_checksum) &&
1159                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1160                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1161                                 body->oa.o_flags = 0;
1162                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1163                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1164                 }
1165                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1166                 /* 1 RC for the whole I/O */
1167         }
1168         ptlrpc_request_set_replen(req);
1169
1170         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1171         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1172         aa->aa_oa = oa;
1173         aa->aa_requested_nob = requested_nob;
1174         aa->aa_nio_count = niocount;
1175         aa->aa_page_count = page_count;
1176         aa->aa_resends = 0;
1177         aa->aa_ppga = pga;
1178         aa->aa_cli = cli;
1179         INIT_LIST_HEAD(&aa->aa_oaps);
1180
1181         *reqp = req;
1182         RETURN(0);
1183
1184  out:
1185         ptlrpc_req_finished(req);
1186         RETURN(rc);
1187 }
1188
1189 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1190                                 __u32 client_cksum, __u32 server_cksum, int nob,
1191                                 obd_count page_count, struct brw_page **pga,
1192                                 cksum_type_t client_cksum_type)
1193 {
1194         __u32 new_cksum;
1195         char *msg;
1196         cksum_type_t cksum_type;
1197
1198         if (server_cksum == client_cksum) {
1199                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1200                 return 0;
1201         }
1202
1203         if (oa->o_valid & OBD_MD_FLFLAGS)
1204                 cksum_type = cksum_type_unpack(oa->o_flags);
1205         else
1206                 cksum_type = OBD_CKSUM_CRC32;
1207
1208         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1209                                       cksum_type);
1210
1211         if (cksum_type != client_cksum_type)
1212                 msg = "the server did not use the checksum type specified in "
1213                       "the original request - likely a protocol problem";
1214         else if (new_cksum == server_cksum)
1215                 msg = "changed on the client after we checksummed it - "
1216                       "likely false positive due to mmap IO (bug 11742)";
1217         else if (new_cksum == client_cksum)
1218                 msg = "changed in transit before arrival at OST";
1219         else
1220                 msg = "changed in transit AND doesn't match the original - "
1221                       "likely false positive due to mmap IO (bug 11742)";
1222
1223         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1224                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1225                            "["LPU64"-"LPU64"]\n",
1226                            msg, libcfs_nid2str(peer->nid),
1227                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1228                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1229                                                         (__u64)0,
1230                            oa->o_id,
1231                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1232                            pga[0]->off,
1233                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1234         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1235                "client csum now %x\n", client_cksum, client_cksum_type,
1236                server_cksum, cksum_type, new_cksum);
1237         return 1;        
1238 }
1239
1240 /* Note rc enters this function as number of bytes transferred */
1241 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1242 {
1243         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1244         const lnet_process_id_t *peer =
1245                         &req->rq_import->imp_connection->c_peer;
1246         struct client_obd *cli = aa->aa_cli;
1247         struct ost_body *body;
1248         __u32 client_cksum = 0;
1249         ENTRY;
1250
1251         if (rc < 0 && rc != -EDQUOT)
1252                 RETURN(rc);
1253
1254         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1255         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1256                                   lustre_swab_ost_body);
1257         if (body == NULL) {
1258                 CDEBUG(D_INFO, "Can't unpack body\n");
1259                 RETURN(-EPROTO);
1260         }
1261
1262         /* set/clear over quota flag for a uid/gid */
1263         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1264             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1265                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1266                              body->oa.o_gid, body->oa.o_valid,
1267                              body->oa.o_flags);
1268
1269         if (rc < 0)
1270                 RETURN(rc);
1271
1272         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1273                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1274
1275         osc_update_grant(cli, body);
1276
1277         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1278                 if (rc > 0) {
1279                         CERROR("Unexpected +ve rc %d\n", rc);
1280                         RETURN(-EPROTO);
1281                 }
1282                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1283
1284                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1285                     check_write_checksum(&body->oa, peer, client_cksum,
1286                                          body->oa.o_cksum, aa->aa_requested_nob,
1287                                          aa->aa_page_count, aa->aa_ppga,
1288                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1289                         RETURN(-EAGAIN);
1290
1291                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1292                         RETURN(-EAGAIN);
1293
1294                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1295                                      aa->aa_page_count, aa->aa_ppga);
1296                 GOTO(out, rc);
1297         }
1298
1299         /* The rest of this function executes only for OST_READs */
1300         if (rc > aa->aa_requested_nob) {
1301                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1302                        aa->aa_requested_nob);
1303                 RETURN(-EPROTO);
1304         }
1305
1306         if (rc != req->rq_bulk->bd_nob_transferred) {
1307                 CERROR ("Unexpected rc %d (%d transferred)\n",
1308                         rc, req->rq_bulk->bd_nob_transferred);
1309                 return (-EPROTO);
1310         }
1311
1312         if (rc < aa->aa_requested_nob)
1313                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1314
1315         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1316                                          aa->aa_ppga))
1317                 GOTO(out, rc = -EAGAIN);
1318
1319         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1320                 static int cksum_counter;
1321                 __u32      server_cksum = body->oa.o_cksum;
1322                 char      *via;
1323                 char      *router;
1324                 cksum_type_t cksum_type;
1325
1326                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1327                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1328                 else
1329                         cksum_type = OBD_CKSUM_CRC32;
1330                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1331                                                  aa->aa_ppga, OST_READ,
1332                                                  cksum_type);
1333
1334                 if (peer->nid == req->rq_bulk->bd_sender) {
1335                         via = router = "";
1336                 } else {
1337                         via = " via ";
1338                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1339                 }
1340
1341                 if (server_cksum == ~0 && rc > 0) {
1342                         CERROR("Protocol error: server %s set the 'checksum' "
1343                                "bit, but didn't send a checksum.  Not fatal, "
1344                                "but please tell CFS.\n",
1345                                libcfs_nid2str(peer->nid));
1346                 } else if (server_cksum != client_cksum) {
1347                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1348                                            "%s%s%s inum "LPU64"/"LPU64" object "
1349                                            LPU64"/"LPU64" extent "
1350                                            "["LPU64"-"LPU64"]\n",
1351                                            req->rq_import->imp_obd->obd_name,
1352                                            libcfs_nid2str(peer->nid),
1353                                            via, router,
1354                                            body->oa.o_valid & OBD_MD_FLFID ?
1355                                                 body->oa.o_fid : (__u64)0,
1356                                            body->oa.o_valid & OBD_MD_FLFID ?
1357                                                 body->oa.o_generation :(__u64)0,
1358                                            body->oa.o_id,
1359                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1360                                                 body->oa.o_gr : (__u64)0,
1361                                            aa->aa_ppga[0]->off,
1362                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1363                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1364                                                                         1);
1365                         CERROR("client %x, server %x, cksum_type %x\n",
1366                                client_cksum, server_cksum, cksum_type);
1367                         cksum_counter = 0;
1368                         aa->aa_oa->o_cksum = client_cksum;
1369                         rc = -EAGAIN;
1370                 } else {
1371                         cksum_counter++;
1372                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1373                         rc = 0;
1374                 }
1375         } else if (unlikely(client_cksum)) {
1376                 static int cksum_missed;
1377
1378                 cksum_missed++;
1379                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1380                         CERROR("Checksum %u requested from %s but not sent\n",
1381                                cksum_missed, libcfs_nid2str(peer->nid));
1382         } else {
1383                 rc = 0;
1384         }
1385 out:
1386         if (rc >= 0)
1387                 *aa->aa_oa = body->oa;
1388
1389         RETURN(rc);
1390 }
1391
1392 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1393                             struct lov_stripe_md *lsm,
1394                             obd_count page_count, struct brw_page **pga,
1395                             struct obd_capa *ocapa)
1396 {
1397         struct ptlrpc_request *req;
1398         int                    rc;
1399         cfs_waitq_t            waitq;
1400         int                    resends = 0;
1401         struct l_wait_info     lwi;
1402
1403         ENTRY;
1404
1405         cfs_waitq_init(&waitq);
1406
1407 restart_bulk:
1408         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1409                                   page_count, pga, &req, ocapa);
1410         if (rc != 0)
1411                 return (rc);
1412
1413         rc = ptlrpc_queue_wait(req);
1414
1415         if (rc == -ETIMEDOUT && req->rq_resend) {
1416                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1417                 ptlrpc_req_finished(req);
1418                 goto restart_bulk;
1419         }
1420
1421         rc = osc_brw_fini_request(req, rc);
1422
1423         ptlrpc_req_finished(req);
1424         if (osc_recoverable_error(rc)) {
1425                 resends++;
1426                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1427                         CERROR("too many resend retries, returning error\n");
1428                         RETURN(-EIO);
1429                 }
1430
1431                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1432                 l_wait_event(waitq, 0, &lwi);
1433
1434                 goto restart_bulk;
1435         }
1436         
1437         RETURN (rc);
1438 }
1439
1440 int osc_brw_redo_request(struct ptlrpc_request *request,
1441                          struct osc_brw_async_args *aa)
1442 {
1443         struct ptlrpc_request *new_req;
1444         struct ptlrpc_request_set *set = request->rq_set;
1445         struct osc_brw_async_args *new_aa;
1446         struct osc_async_page *oap;
1447         int rc = 0;
1448         ENTRY;
1449
1450         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1451                 CERROR("too many resend retries, returning error\n");
1452                 RETURN(-EIO);
1453         }
1454
1455         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1456 /*
1457         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1458         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1459                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1460                                            REQ_REC_OFF + 3);
1461 */
1462         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1463                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1464                                   aa->aa_cli, aa->aa_oa,
1465                                   NULL /* lsm unused by osc currently */,
1466                                   aa->aa_page_count, aa->aa_ppga, 
1467                                   &new_req, NULL /* ocapa */);
1468         if (rc)
1469                 RETURN(rc);
1470
1471         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1472
1473         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1474                 if (oap->oap_request != NULL) {
1475                         LASSERTF(request == oap->oap_request,
1476                                  "request %p != oap_request %p\n",
1477                                  request, oap->oap_request);
1478                         if (oap->oap_interrupted) {
1479                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1480                                 ptlrpc_req_finished(new_req);
1481                                 RETURN(-EINTR);
1482                         }
1483                 }
1484         }
1485         /* New request takes over pga and oaps from old request.
1486          * Note that copying a list_head doesn't work, need to move it... */
1487         aa->aa_resends++;
1488         new_req->rq_interpret_reply = request->rq_interpret_reply;
1489         new_req->rq_async_args = request->rq_async_args;
1490         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1491
1492         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1493
1494         INIT_LIST_HEAD(&new_aa->aa_oaps);
1495         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1496         INIT_LIST_HEAD(&aa->aa_oaps);
1497
1498         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1499                 if (oap->oap_request) {
1500                         ptlrpc_req_finished(oap->oap_request);
1501                         oap->oap_request = ptlrpc_request_addref(new_req);
1502                 }
1503         }
1504
1505         /* use ptlrpc_set_add_req is safe because interpret functions work 
1506          * in check_set context. only one way exist with access to request 
1507          * from different thread got -EINTR - this way protected with 
1508          * cl_loi_list_lock */
1509         ptlrpc_set_add_req(set, new_req);
1510
1511         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1512
1513         DEBUG_REQ(D_INFO, new_req, "new request");
1514         RETURN(0);
1515 }
1516
1517 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1518 {
1519         struct osc_brw_async_args *aa = data;
1520         int                        i;
1521         ENTRY;
1522
1523         rc = osc_brw_fini_request(req, rc);
1524         if (osc_recoverable_error(rc)) {
1525                 rc = osc_brw_redo_request(req, aa);
1526                 if (rc == 0)
1527                         RETURN(0);
1528         }
1529
1530         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1531         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1532                 aa->aa_cli->cl_w_in_flight--;
1533         else
1534                 aa->aa_cli->cl_r_in_flight--;
1535         for (i = 0; i < aa->aa_page_count; i++)
1536                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1537         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1538
1539         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1540
1541         RETURN(rc);
1542 }
1543
1544 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1545                           struct lov_stripe_md *lsm, obd_count page_count,
1546                           struct brw_page **pga, struct ptlrpc_request_set *set,
1547                           struct obd_capa *ocapa)
1548 {
1549         struct ptlrpc_request     *req;
1550         struct client_obd         *cli = &exp->exp_obd->u.cli;
1551         int                        rc, i;
1552         struct osc_brw_async_args *aa;
1553         ENTRY;
1554
1555         /* Consume write credits even if doing a sync write -
1556          * otherwise we may run out of space on OST due to grant. */
1557         if (cmd == OBD_BRW_WRITE) {
1558                 spin_lock(&cli->cl_loi_list_lock);
1559                 for (i = 0; i < page_count; i++) {
1560                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1561                                 osc_consume_write_grant(cli, pga[i]);
1562                 }
1563                 spin_unlock(&cli->cl_loi_list_lock);
1564         }
1565
1566         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1567                                   &req, ocapa);
1568
1569         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1570         if (cmd == OBD_BRW_READ) {
1571                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1572                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1573                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1574         } else {
1575                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1576                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1577                                  cli->cl_w_in_flight);
1578                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1579         }
1580
1581         if (rc == 0) {
1582                 req->rq_interpret_reply = brw_interpret;
1583                 ptlrpc_set_add_req(set, req);
1584                 client_obd_list_lock(&cli->cl_loi_list_lock);
1585                 if (cmd == OBD_BRW_READ)
1586                         cli->cl_r_in_flight++;
1587                 else
1588                         cli->cl_w_in_flight++;
1589                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1590         } else if (cmd == OBD_BRW_WRITE) {
1591                 client_obd_list_lock(&cli->cl_loi_list_lock);
1592                 for (i = 0; i < page_count; i++)
1593                         osc_release_write_grant(cli, pga[i], 0);
1594                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1595         }
1596         RETURN (rc);
1597 }
1598
1599 /*
1600  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1601  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1602  * fine for our small page arrays and doesn't require allocation.  its an
1603  * insertion sort that swaps elements that are strides apart, shrinking the
1604  * stride down until its '1' and the array is sorted.
1605  */
1606 static void sort_brw_pages(struct brw_page **array, int num)
1607 {
1608         int stride, i, j;
1609         struct brw_page *tmp;
1610
1611         if (num == 1)
1612                 return;
1613         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1614                 ;
1615
1616         do {
1617                 stride /= 3;
1618                 for (i = stride ; i < num ; i++) {
1619                         tmp = array[i];
1620                         j = i;
1621                         while (j >= stride && array[j - stride]->off > tmp->off) {
1622                                 array[j] = array[j - stride];
1623                                 j -= stride;
1624                         }
1625                         array[j] = tmp;
1626                 }
1627         } while (stride > 1);
1628 }
1629
1630 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1631 {
1632         int count = 1;
1633         int offset;
1634         int i = 0;
1635
1636         LASSERT (pages > 0);
1637         offset = pg[i]->off & ~CFS_PAGE_MASK;
1638
1639         for (;;) {
1640                 pages--;
1641                 if (pages == 0)         /* that's all */
1642                         return count;
1643
1644                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1645                         return count;   /* doesn't end on page boundary */
1646
1647                 i++;
1648                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1649                 if (offset != 0)        /* doesn't start on page boundary */
1650                         return count;
1651
1652                 count++;
1653         }
1654 }
1655
1656 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1657 {
1658         struct brw_page **ppga;
1659         int i;
1660
1661         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1662         if (ppga == NULL)
1663                 return NULL;
1664
1665         for (i = 0; i < count; i++)
1666                 ppga[i] = pga + i;
1667         return ppga;
1668 }
1669
1670 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1671 {
1672         LASSERT(ppga != NULL);
1673         OBD_FREE(ppga, sizeof(*ppga) * count);
1674 }
1675
1676 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1677                    obd_count page_count, struct brw_page *pga,
1678                    struct obd_trans_info *oti)
1679 {
1680         struct obdo *saved_oa = NULL;
1681         struct brw_page **ppga, **orig;
1682         struct obd_import *imp = class_exp2cliimp(exp);
1683         struct client_obd *cli = &imp->imp_obd->u.cli;
1684         int rc, page_count_orig;
1685         ENTRY;
1686
1687         if (cmd & OBD_BRW_CHECK) {
1688                 /* The caller just wants to know if there's a chance that this
1689                  * I/O can succeed */
1690
1691                 if (imp == NULL || imp->imp_invalid)
1692                         RETURN(-EIO);
1693                 RETURN(0);
1694         }
1695
1696         /* test_brw with a failed create can trip this, maybe others. */
1697         LASSERT(cli->cl_max_pages_per_rpc);
1698
1699         rc = 0;
1700
1701         orig = ppga = osc_build_ppga(pga, page_count);
1702         if (ppga == NULL)
1703                 RETURN(-ENOMEM);
1704         page_count_orig = page_count;
1705
1706         sort_brw_pages(ppga, page_count);
1707         while (page_count) {
1708                 obd_count pages_per_brw;
1709
1710                 if (page_count > cli->cl_max_pages_per_rpc)
1711                         pages_per_brw = cli->cl_max_pages_per_rpc;
1712                 else
1713                         pages_per_brw = page_count;
1714
1715                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1716
1717                 if (saved_oa != NULL) {
1718                         /* restore previously saved oa */
1719                         *oinfo->oi_oa = *saved_oa;
1720                 } else if (page_count > pages_per_brw) {
1721                         /* save a copy of oa (brw will clobber it) */
1722                         OBDO_ALLOC(saved_oa);
1723                         if (saved_oa == NULL)
1724                                 GOTO(out, rc = -ENOMEM);
1725                         *saved_oa = *oinfo->oi_oa;
1726                 }
1727
1728                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1729                                       pages_per_brw, ppga, oinfo->oi_capa);
1730
1731                 if (rc != 0)
1732                         break;
1733
1734                 page_count -= pages_per_brw;
1735                 ppga += pages_per_brw;
1736         }
1737
1738 out:
1739         osc_release_ppga(orig, page_count_orig);
1740
1741         if (saved_oa != NULL)
1742                 OBDO_FREE(saved_oa);
1743
1744         RETURN(rc);
1745 }
1746
1747 static int osc_brw_async(int cmd, struct obd_export *exp,
1748                          struct obd_info *oinfo, obd_count page_count,
1749                          struct brw_page *pga, struct obd_trans_info *oti,
1750                          struct ptlrpc_request_set *set)
1751 {
1752         struct brw_page **ppga, **orig;
1753         struct client_obd *cli = &exp->exp_obd->u.cli;
1754         int page_count_orig;
1755         int rc = 0;
1756         ENTRY;
1757
1758         if (cmd & OBD_BRW_CHECK) {
1759                 struct obd_import *imp = class_exp2cliimp(exp);
1760                 /* The caller just wants to know if there's a chance that this
1761                  * I/O can succeed */
1762
1763                 if (imp == NULL || imp->imp_invalid)
1764                         RETURN(-EIO);
1765                 RETURN(0);
1766         }
1767
1768         orig = ppga = osc_build_ppga(pga, page_count);
1769         if (ppga == NULL)
1770                 RETURN(-ENOMEM);
1771         page_count_orig = page_count;
1772
1773         sort_brw_pages(ppga, page_count);
1774         while (page_count) {
1775                 struct brw_page **copy;
1776                 obd_count pages_per_brw;
1777
1778                 pages_per_brw = min_t(obd_count, page_count,
1779                                       cli->cl_max_pages_per_rpc);
1780
1781                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1782
1783                 /* use ppga only if single RPC is going to fly */
1784                 if (pages_per_brw != page_count_orig || ppga != orig) {
1785                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1786                         if (copy == NULL)
1787                                 GOTO(out, rc = -ENOMEM);
1788                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1789                 } else
1790                         copy = ppga;
1791
1792                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1793                                     pages_per_brw, copy, set, oinfo->oi_capa);
1794
1795                 if (rc != 0) {
1796                         if (copy != ppga)
1797                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1798                         break;
1799                 }
1800                 if (copy == orig) {
1801                         /* we passed it to async_internal() which is
1802                          * now responsible for releasing memory */
1803                         orig = NULL;
1804                 }
1805
1806                 page_count -= pages_per_brw;
1807                 ppga += pages_per_brw;
1808         }
1809 out:
1810         if (orig)
1811                 osc_release_ppga(orig, page_count_orig);
1812         RETURN(rc);
1813 }
1814
1815 static void osc_check_rpcs(struct client_obd *cli);
1816
1817 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1818  * the dirty accounting.  Writeback completes or truncate happens before
1819  * writing starts.  Must be called with the loi lock held. */
1820 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1821                            int sent)
1822 {
1823         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1824 }
1825
1826
1827 /* This maintains the lists of pending pages to read/write for a given object
1828  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1829  * to quickly find objects that are ready to send an RPC. */
1830 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1831                          int cmd)
1832 {
1833         int optimal;
1834         ENTRY;
1835
1836         if (lop->lop_num_pending == 0)
1837                 RETURN(0);
1838
1839         /* if we have an invalid import we want to drain the queued pages
1840          * by forcing them through rpcs that immediately fail and complete
1841          * the pages.  recovery relies on this to empty the queued pages
1842          * before canceling the locks and evicting down the llite pages */
1843         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1844                 RETURN(1);
1845
1846         /* stream rpcs in queue order as long as as there is an urgent page
1847          * queued.  this is our cheap solution for good batching in the case
1848          * where writepage marks some random page in the middle of the file
1849          * as urgent because of, say, memory pressure */
1850         if (!list_empty(&lop->lop_urgent)) {
1851                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1852                 RETURN(1);
1853         }
1854         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1855         optimal = cli->cl_max_pages_per_rpc;
1856         if (cmd & OBD_BRW_WRITE) {
1857                 /* trigger a write rpc stream as long as there are dirtiers
1858                  * waiting for space.  as they're waiting, they're not going to
1859                  * create more pages to coallesce with what's waiting.. */
1860                 if (!list_empty(&cli->cl_cache_waiters)) {
1861                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1862                         RETURN(1);
1863                 }
1864                 /* +16 to avoid triggering rpcs that would want to include pages
1865                  * that are being queued but which can't be made ready until
1866                  * the queuer finishes with the page. this is a wart for
1867                  * llite::commit_write() */
1868                 optimal += 16;
1869         }
1870         if (lop->lop_num_pending >= optimal)
1871                 RETURN(1);
1872
1873         RETURN(0);
1874 }
1875
1876 static void on_list(struct list_head *item, struct list_head *list,
1877                     int should_be_on)
1878 {
1879         if (list_empty(item) && should_be_on)
1880                 list_add_tail(item, list);
1881         else if (!list_empty(item) && !should_be_on)
1882                 list_del_init(item);
1883 }
1884
1885 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1886  * can find pages to build into rpcs quickly */
1887 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1888 {
1889         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1890                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1891                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1892
1893         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1894                 loi->loi_write_lop.lop_num_pending);
1895
1896         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1897                 loi->loi_read_lop.lop_num_pending);
1898 }
1899
1900 static void lop_update_pending(struct client_obd *cli,
1901                                struct loi_oap_pages *lop, int cmd, int delta)
1902 {
1903         lop->lop_num_pending += delta;
1904         if (cmd & OBD_BRW_WRITE)
1905                 cli->cl_pending_w_pages += delta;
1906         else
1907                 cli->cl_pending_r_pages += delta;
1908 }
1909
1910 /* this is called when a sync waiter receives an interruption.  Its job is to
1911  * get the caller woken as soon as possible.  If its page hasn't been put in an
1912  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1913  * desiring interruption which will forcefully complete the rpc once the rpc
1914  * has timed out */
1915 static void osc_occ_interrupted(struct oig_callback_context *occ)
1916 {
1917         struct osc_async_page *oap;
1918         struct loi_oap_pages *lop;
1919         struct lov_oinfo *loi;
1920         ENTRY;
1921
1922         /* XXX member_of() */
1923         oap = list_entry(occ, struct osc_async_page, oap_occ);
1924
1925         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1926
1927         oap->oap_interrupted = 1;
1928
1929         /* ok, it's been put in an rpc. only one oap gets a request reference */
1930         if (oap->oap_request != NULL) {
1931                 ptlrpc_mark_interrupted(oap->oap_request);
1932                 ptlrpcd_wake(oap->oap_request);
1933                 GOTO(unlock, 0);
1934         }
1935
1936         /* we don't get interruption callbacks until osc_trigger_group_io()
1937          * has been called and put the sync oaps in the pending/urgent lists.*/
1938         if (!list_empty(&oap->oap_pending_item)) {
1939                 list_del_init(&oap->oap_pending_item);
1940                 list_del_init(&oap->oap_urgent_item);
1941
1942                 loi = oap->oap_loi;
1943                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1944                         &loi->loi_write_lop : &loi->loi_read_lop;
1945                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1946                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1947
1948                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1949                 oap->oap_oig = NULL;
1950         }
1951
1952 unlock:
1953         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1954 }
1955
1956 /* this is trying to propogate async writeback errors back up to the
1957  * application.  As an async write fails we record the error code for later if
1958  * the app does an fsync.  As long as errors persist we force future rpcs to be
1959  * sync so that the app can get a sync error and break the cycle of queueing
1960  * pages for which writeback will fail. */
1961 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1962                            int rc)
1963 {
1964         if (rc) {
1965                 if (!ar->ar_rc)
1966                         ar->ar_rc = rc;
1967
1968                 ar->ar_force_sync = 1;
1969                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1970                 return;
1971
1972         }
1973
1974         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1975                 ar->ar_force_sync = 0;
1976 }
1977
1978 static void osc_oap_to_pending(struct osc_async_page *oap)
1979 {
1980         struct loi_oap_pages *lop;
1981
1982         if (oap->oap_cmd & OBD_BRW_WRITE)
1983                 lop = &oap->oap_loi->loi_write_lop;
1984         else
1985                 lop = &oap->oap_loi->loi_read_lop;
1986
1987         if (oap->oap_async_flags & ASYNC_URGENT)
1988                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1989         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1990         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1991 }
1992
1993 /* this must be called holding the loi list lock to give coverage to exit_cache,
1994  * async_flag maintenance, and oap_request */
1995 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1996                               struct osc_async_page *oap, int sent, int rc)
1997 {
1998         __u64 xid = 0;
1999
2000         ENTRY;
2001         if (oap->oap_request != NULL) {
2002                 xid = ptlrpc_req_xid(oap->oap_request);
2003                 ptlrpc_req_finished(oap->oap_request);
2004                 oap->oap_request = NULL;
2005         }
2006
2007         oap->oap_async_flags = 0;
2008         oap->oap_interrupted = 0;
2009
2010         if (oap->oap_cmd & OBD_BRW_WRITE) {
2011                 osc_process_ar(&cli->cl_ar, xid, rc);
2012                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2013         }
2014
2015         if (rc == 0 && oa != NULL) {
2016                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2017                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2018                 if (oa->o_valid & OBD_MD_FLMTIME)
2019                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2020                 if (oa->o_valid & OBD_MD_FLATIME)
2021                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2022                 if (oa->o_valid & OBD_MD_FLCTIME)
2023                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2024         }
2025
2026         if (oap->oap_oig) {
2027                 osc_exit_cache(cli, oap, sent);
2028                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2029                 oap->oap_oig = NULL;
2030                 EXIT;
2031                 return;
2032         }
2033
2034         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2035                                                 oap->oap_cmd, oa, rc);
2036
2037         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2038          * I/O on the page could start, but OSC calls it under lock
2039          * and thus we can add oap back to pending safely */
2040         if (rc)
2041                 /* upper layer wants to leave the page on pending queue */
2042                 osc_oap_to_pending(oap);
2043         else
2044                 osc_exit_cache(cli, oap, sent);
2045         EXIT;
2046 }
2047
2048 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
2049 {
2050         struct osc_async_page *oap, *tmp;
2051         struct osc_brw_async_args *aa = data;
2052         struct client_obd *cli;
2053         ENTRY;
2054
2055         rc = osc_brw_fini_request(req, rc);
2056         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2057         if (osc_recoverable_error(rc)) {
2058                 rc = osc_brw_redo_request(req, aa);
2059                 if (rc == 0)
2060                         RETURN(0);
2061         }
2062
2063         cli = aa->aa_cli;
2064
2065         client_obd_list_lock(&cli->cl_loi_list_lock);
2066
2067         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2068          * is called so we know whether to go to sync BRWs or wait for more
2069          * RPCs to complete */
2070         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2071                 cli->cl_w_in_flight--;
2072         else
2073                 cli->cl_r_in_flight--;
2074
2075         /* the caller may re-use the oap after the completion call so
2076          * we need to clean it up a little */
2077         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2078                 list_del_init(&oap->oap_rpc_item);
2079                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2080         }
2081
2082         osc_wake_cache_waiters(cli);
2083         osc_check_rpcs(cli);
2084
2085         client_obd_list_unlock(&cli->cl_loi_list_lock);
2086
2087         OBDO_FREE(aa->aa_oa);
2088         
2089         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2090         RETURN(rc);
2091 }
2092
2093 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2094                                             struct list_head *rpc_list,
2095                                             int page_count, int cmd)
2096 {
2097         struct ptlrpc_request *req;
2098         struct brw_page **pga = NULL;
2099         struct osc_brw_async_args *aa;
2100         struct obdo *oa = NULL;
2101         struct obd_async_page_ops *ops = NULL;
2102         void *caller_data = NULL;
2103         struct obd_capa *ocapa;
2104         struct osc_async_page *oap;
2105         int i, rc;
2106
2107         ENTRY;
2108         LASSERT(!list_empty(rpc_list));
2109
2110         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2111         if (pga == NULL)
2112                 RETURN(ERR_PTR(-ENOMEM));
2113
2114         OBDO_ALLOC(oa);
2115         if (oa == NULL)
2116                 GOTO(out, req = ERR_PTR(-ENOMEM));
2117
2118         i = 0;
2119         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2120                 if (ops == NULL) {
2121                         ops = oap->oap_caller_ops;
2122                         caller_data = oap->oap_caller_data;
2123                 }
2124                 pga[i] = &oap->oap_brw_page;
2125                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2126                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2127                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2128                 i++;
2129         }
2130
2131         /* always get the data for the obdo for the rpc */
2132         LASSERT(ops != NULL);
2133         ops->ap_fill_obdo(caller_data, cmd, oa);
2134         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2135
2136         sort_brw_pages(pga, page_count);
2137         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2138                                   pga, &req, ocapa);
2139         capa_put(ocapa);
2140         if (rc != 0) {
2141                 CERROR("prep_req failed: %d\n", rc);
2142                 GOTO(out, req = ERR_PTR(rc));
2143         }
2144
2145         /* Need to update the timestamps after the request is built in case
2146          * we race with setattr (locally or in queue at OST).  If OST gets
2147          * later setattr before earlier BRW (as determined by the request xid),
2148          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2149          * way to do this in a single call.  bug 10150 */
2150         ops->ap_update_obdo(caller_data, cmd, oa,
2151                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2152
2153         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2154         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2155         INIT_LIST_HEAD(&aa->aa_oaps);
2156         list_splice(rpc_list, &aa->aa_oaps);
2157         INIT_LIST_HEAD(rpc_list);
2158
2159 out:
2160         if (IS_ERR(req)) {
2161                 if (oa)
2162                         OBDO_FREE(oa);
2163                 if (pga)
2164                         OBD_FREE(pga, sizeof(*pga) * page_count);
2165         }
2166         RETURN(req);
2167 }
2168
2169 /* the loi lock is held across this function but it's allowed to release
2170  * and reacquire it during its work */
2171 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2172                             int cmd, struct loi_oap_pages *lop)
2173 {
2174         struct ptlrpc_request *req;
2175         obd_count page_count = 0;
2176         struct osc_async_page *oap = NULL, *tmp;
2177         struct osc_brw_async_args *aa;
2178         struct obd_async_page_ops *ops;
2179         CFS_LIST_HEAD(rpc_list);
2180         unsigned int ending_offset;
2181         unsigned  starting_offset = 0;
2182         ENTRY;
2183
2184         /* first we find the pages we're allowed to work with */
2185         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2186                                  oap_pending_item) {
2187                 ops = oap->oap_caller_ops;
2188
2189                 LASSERT(oap->oap_magic == OAP_MAGIC);
2190
2191                 /* in llite being 'ready' equates to the page being locked
2192                  * until completion unlocks it.  commit_write submits a page
2193                  * as not ready because its unlock will happen unconditionally
2194                  * as the call returns.  if we race with commit_write giving
2195                  * us that page we dont' want to create a hole in the page
2196                  * stream, so we stop and leave the rpc to be fired by
2197                  * another dirtier or kupdated interval (the not ready page
2198                  * will still be on the dirty list).  we could call in
2199                  * at the end of ll_file_write to process the queue again. */
2200                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2201                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2202                         if (rc < 0)
2203                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2204                                                 "instead of ready\n", oap,
2205                                                 oap->oap_page, rc);
2206                         switch (rc) {
2207                         case -EAGAIN:
2208                                 /* llite is telling us that the page is still
2209                                  * in commit_write and that we should try
2210                                  * and put it in an rpc again later.  we
2211                                  * break out of the loop so we don't create
2212                                  * a hole in the sequence of pages in the rpc
2213                                  * stream.*/
2214                                 oap = NULL;
2215                                 break;
2216                         case -EINTR:
2217                                 /* the io isn't needed.. tell the checks
2218                                  * below to complete the rpc with EINTR */
2219                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2220                                 oap->oap_count = -EINTR;
2221                                 break;
2222                         case 0:
2223                                 oap->oap_async_flags |= ASYNC_READY;
2224                                 break;
2225                         default:
2226                                 LASSERTF(0, "oap %p page %p returned %d "
2227                                             "from make_ready\n", oap,
2228                                             oap->oap_page, rc);
2229                                 break;
2230                         }
2231                 }
2232                 if (oap == NULL)
2233                         break;
2234                 /*
2235                  * Page submitted for IO has to be locked. Either by
2236                  * ->ap_make_ready() or by higher layers.
2237                  *
2238                  * XXX nikita: this assertion should be adjusted when lustre
2239                  * starts using PG_writeback for pages being written out.
2240                  */
2241 #if defined(__KERNEL__) && defined(__linux__)
2242                 LASSERT(PageLocked(oap->oap_page));
2243 #endif
2244                 /* If there is a gap at the start of this page, it can't merge
2245                  * with any previous page, so we'll hand the network a
2246                  * "fragmented" page array that it can't transfer in 1 RDMA */
2247                 if (page_count != 0 && oap->oap_page_off != 0)
2248                         break;
2249
2250                 /* take the page out of our book-keeping */
2251                 list_del_init(&oap->oap_pending_item);
2252                 lop_update_pending(cli, lop, cmd, -1);
2253                 list_del_init(&oap->oap_urgent_item);
2254
2255                 if (page_count == 0)
2256                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2257                                           (PTLRPC_MAX_BRW_SIZE - 1);
2258
2259                 /* ask the caller for the size of the io as the rpc leaves. */
2260                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2261                         oap->oap_count =
2262                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2263                 if (oap->oap_count <= 0) {
2264                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2265                                oap->oap_count);
2266                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2267                         continue;
2268                 }
2269
2270                 /* now put the page back in our accounting */
2271                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2272                 if (++page_count >= cli->cl_max_pages_per_rpc)
2273                         break;
2274
2275                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2276                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2277                  * have the same alignment as the initial writes that allocated
2278                  * extents on the server. */
2279                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2280                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2281                 if (ending_offset == 0)
2282                         break;
2283
2284                 /* If there is a gap at the end of this page, it can't merge
2285                  * with any subsequent pages, so we'll hand the network a
2286                  * "fragmented" page array that it can't transfer in 1 RDMA */
2287                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2288                         break;
2289         }
2290
2291         osc_wake_cache_waiters(cli);
2292
2293         if (page_count == 0)
2294                 RETURN(0);
2295
2296         loi_list_maint(cli, loi);
2297
2298         client_obd_list_unlock(&cli->cl_loi_list_lock);
2299
2300         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2301         if (IS_ERR(req)) {
2302                 /* this should happen rarely and is pretty bad, it makes the
2303                  * pending list not follow the dirty order */
2304                 client_obd_list_lock(&cli->cl_loi_list_lock);
2305                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2306                         list_del_init(&oap->oap_rpc_item);
2307
2308                         /* queued sync pages can be torn down while the pages
2309                          * were between the pending list and the rpc */
2310                         if (oap->oap_interrupted) {
2311                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2312                                 osc_ap_completion(cli, NULL, oap, 0,
2313                                                   oap->oap_count);
2314                                 continue;
2315                         }
2316                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2317                 }
2318                 loi_list_maint(cli, loi);
2319                 RETURN(PTR_ERR(req));
2320         }
2321
2322         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2323
2324         if (cmd == OBD_BRW_READ) {
2325                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2326                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2327                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2328                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2329                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2330         } else {
2331                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2332                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2333                                  cli->cl_w_in_flight);
2334                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2336                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2337         }
2338
2339         client_obd_list_lock(&cli->cl_loi_list_lock);
2340
2341         if (cmd == OBD_BRW_READ)
2342                 cli->cl_r_in_flight++;
2343         else
2344                 cli->cl_w_in_flight++;
2345
2346         /* queued sync pages can be torn down while the pages
2347          * were between the pending list and the rpc */
2348         tmp = NULL;
2349         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2350                 /* only one oap gets a request reference */
2351                 if (tmp == NULL)
2352                         tmp = oap;
2353                 if (oap->oap_interrupted && !req->rq_intr) {
2354                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2355                                oap, req);
2356                         ptlrpc_mark_interrupted(req);
2357                 }
2358         }
2359         if (tmp != NULL)
2360                 tmp->oap_request = ptlrpc_request_addref(req);
2361
2362         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2363                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2364
2365         req->rq_interpret_reply = brw_interpret_oap;
2366         ptlrpcd_add_req(req);
2367         RETURN(1);
2368 }
2369
2370 #define LOI_DEBUG(LOI, STR, args...)                                     \
2371         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2372                !list_empty(&(LOI)->loi_cli_item),                        \
2373                (LOI)->loi_write_lop.lop_num_pending,                     \
2374                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2375                (LOI)->loi_read_lop.lop_num_pending,                      \
2376                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2377                args)                                                     \
2378
2379 /* This is called by osc_check_rpcs() to find which objects have pages that
2380  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2381 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2382 {
2383         ENTRY;
2384         /* first return all objects which we already know to have
2385          * pages ready to be stuffed into rpcs */
2386         if (!list_empty(&cli->cl_loi_ready_list))
2387                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2388                                   struct lov_oinfo, loi_cli_item));
2389
2390         /* then if we have cache waiters, return all objects with queued
2391          * writes.  This is especially important when many small files
2392          * have filled up the cache and not been fired into rpcs because
2393          * they don't pass the nr_pending/object threshhold */
2394         if (!list_empty(&cli->cl_cache_waiters) &&
2395             !list_empty(&cli->cl_loi_write_list))
2396                 RETURN(list_entry(cli->cl_loi_write_list.next,
2397                                   struct lov_oinfo, loi_write_item));
2398
2399         /* then return all queued objects when we have an invalid import
2400          * so that they get flushed */
2401         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2402                 if (!list_empty(&cli->cl_loi_write_list))
2403                         RETURN(list_entry(cli->cl_loi_write_list.next,
2404                                           struct lov_oinfo, loi_write_item));
2405                 if (!list_empty(&cli->cl_loi_read_list))
2406                         RETURN(list_entry(cli->cl_loi_read_list.next,
2407                                           struct lov_oinfo, loi_read_item));
2408         }
2409         RETURN(NULL);
2410 }
2411
2412 /* called with the loi list lock held */
2413 static void osc_check_rpcs(struct client_obd *cli)
2414 {
2415         struct lov_oinfo *loi;
2416         int rc = 0, race_counter = 0;
2417         ENTRY;
2418
2419         while ((loi = osc_next_loi(cli)) != NULL) {
2420                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2421
2422                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2423                         break;
2424
2425                 /* attempt some read/write balancing by alternating between
2426                  * reads and writes in an object.  The makes_rpc checks here
2427                  * would be redundant if we were getting read/write work items
2428                  * instead of objects.  we don't want send_oap_rpc to drain a
2429                  * partial read pending queue when we're given this object to
2430                  * do io on writes while there are cache waiters */
2431                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2432                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2433                                               &loi->loi_write_lop);
2434                         if (rc < 0)
2435                                 break;
2436                         if (rc > 0)
2437                                 race_counter = 0;
2438                         else
2439                                 race_counter++;
2440                 }
2441                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2442                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2443                                               &loi->loi_read_lop);
2444                         if (rc < 0)
2445                                 break;
2446                         if (rc > 0)
2447                                 race_counter = 0;
2448                         else
2449                                 race_counter++;
2450                 }
2451
2452                 /* attempt some inter-object balancing by issueing rpcs
2453                  * for each object in turn */
2454                 if (!list_empty(&loi->loi_cli_item))
2455                         list_del_init(&loi->loi_cli_item);
2456                 if (!list_empty(&loi->loi_write_item))
2457                         list_del_init(&loi->loi_write_item);
2458                 if (!list_empty(&loi->loi_read_item))
2459                         list_del_init(&loi->loi_read_item);
2460
2461                 loi_list_maint(cli, loi);
2462
2463                 /* send_oap_rpc fails with 0 when make_ready tells it to
2464                  * back off.  llite's make_ready does this when it tries
2465                  * to lock a page queued for write that is already locked.
2466                  * we want to try sending rpcs from many objects, but we
2467                  * don't want to spin failing with 0.  */
2468                 if (race_counter == 10)
2469                         break;
2470         }
2471         EXIT;
2472 }
2473
2474 /* we're trying to queue a page in the osc so we're subject to the
2475  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2476  * If the osc's queued pages are already at that limit, then we want to sleep
2477  * until there is space in the osc's queue for us.  We also may be waiting for
2478  * write credits from the OST if there are RPCs in flight that may return some
2479  * before we fall back to sync writes.
2480  *
2481  * We need this know our allocation was granted in the presence of signals */
2482 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2483 {
2484         int rc;
2485         ENTRY;
2486         client_obd_list_lock(&cli->cl_loi_list_lock);
2487         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2488         client_obd_list_unlock(&cli->cl_loi_list_lock);
2489         RETURN(rc);
2490 };
2491
2492 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2493  * grant or cache space. */
2494 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2495                            struct osc_async_page *oap)
2496 {
2497         struct osc_cache_waiter ocw;
2498         struct l_wait_info lwi = { 0 };
2499
2500         ENTRY;
2501
2502         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2503                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2504                cli->cl_dirty_max, obd_max_dirty_pages,
2505                cli->cl_lost_grant, cli->cl_avail_grant);
2506
2507         /* force the caller to try sync io.  this can jump the list
2508          * of queued writes and create a discontiguous rpc stream */
2509         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2510             loi->loi_ar.ar_force_sync)
2511                 RETURN(-EDQUOT);
2512
2513         /* Hopefully normal case - cache space and write credits available */
2514         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2515             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2516             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2517                 /* account for ourselves */
2518                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2519                 RETURN(0);
2520         }
2521
2522         /* Make sure that there are write rpcs in flight to wait for.  This
2523          * is a little silly as this object may not have any pending but
2524          * other objects sure might. */
2525         if (cli->cl_w_in_flight) {
2526                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2527                 cfs_waitq_init(&ocw.ocw_waitq);
2528                 ocw.ocw_oap = oap;
2529                 ocw.ocw_rc = 0;
2530
2531                 loi_list_maint(cli, loi);
2532                 osc_check_rpcs(cli);
2533                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2534
2535                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2536                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2537
2538                 client_obd_list_lock(&cli->cl_loi_list_lock);
2539                 if (!list_empty(&ocw.ocw_entry)) {
2540                         list_del(&ocw.ocw_entry);
2541                         RETURN(-EINTR);
2542                 }
2543                 RETURN(ocw.ocw_rc);
2544         }
2545
2546         RETURN(-EDQUOT);
2547 }
2548
2549 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2550                         struct lov_oinfo *loi, cfs_page_t *page,
2551                         obd_off offset, struct obd_async_page_ops *ops,
2552                         void *data, void **res)
2553 {
2554         struct osc_async_page *oap;
2555         ENTRY;
2556
2557         if (!page)
2558                 return size_round(sizeof(*oap));
2559
2560         oap = *res;
2561         oap->oap_magic = OAP_MAGIC;
2562         oap->oap_cli = &exp->exp_obd->u.cli;
2563         oap->oap_loi = loi;
2564
2565         oap->oap_caller_ops = ops;
2566         oap->oap_caller_data = data;
2567
2568         oap->oap_page = page;
2569         oap->oap_obj_off = offset;
2570
2571         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2572         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2573         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2574
2575         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2576
2577         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2578         RETURN(0);
2579 }
2580
2581 struct osc_async_page *oap_from_cookie(void *cookie)
2582 {
2583         struct osc_async_page *oap = cookie;
2584         if (oap->oap_magic != OAP_MAGIC)
2585                 return ERR_PTR(-EINVAL);
2586         return oap;
2587 };
2588
2589 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2590                               struct lov_oinfo *loi, void *cookie,
2591                               int cmd, obd_off off, int count,
2592                               obd_flag brw_flags, enum async_flags async_flags)
2593 {
2594         struct client_obd *cli = &exp->exp_obd->u.cli;
2595         struct osc_async_page *oap;
2596         int rc = 0;
2597         ENTRY;
2598
2599         oap = oap_from_cookie(cookie);
2600         if (IS_ERR(oap))
2601                 RETURN(PTR_ERR(oap));
2602
2603         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2604                 RETURN(-EIO);
2605
2606         if (!list_empty(&oap->oap_pending_item) ||
2607             !list_empty(&oap->oap_urgent_item) ||
2608             !list_empty(&oap->oap_rpc_item))
2609                 RETURN(-EBUSY);
2610
2611         /* check if the file's owner/group is over quota */
2612 #ifdef HAVE_QUOTA_SUPPORT
2613         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2614                 struct obd_async_page_ops *ops;
2615                 struct obdo *oa;
2616
2617                 OBDO_ALLOC(oa);
2618                 if (oa == NULL)
2619                         RETURN(-ENOMEM);
2620
2621                 ops = oap->oap_caller_ops;
2622                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2623                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2624                     NO_QUOTA)
2625                         rc = -EDQUOT;
2626
2627                 OBDO_FREE(oa);
2628                 if (rc)
2629                         RETURN(rc);
2630         }
2631 #endif
2632
2633         if (loi == NULL)
2634                 loi = lsm->lsm_oinfo[0];
2635
2636         client_obd_list_lock(&cli->cl_loi_list_lock);
2637
2638         oap->oap_cmd = cmd;
2639         oap->oap_page_off = off;
2640         oap->oap_count = count;
2641         oap->oap_brw_flags = brw_flags;
2642         oap->oap_async_flags = async_flags;
2643
2644         if (cmd & OBD_BRW_WRITE) {
2645                 rc = osc_enter_cache(cli, loi, oap);
2646                 if (rc) {
2647                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2648                         RETURN(rc);
2649                 }
2650         }
2651
2652         osc_oap_to_pending(oap);
2653         loi_list_maint(cli, loi);
2654
2655         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2656                   cmd);
2657
2658         osc_check_rpcs(cli);
2659         client_obd_list_unlock(&cli->cl_loi_list_lock);
2660
2661         RETURN(0);
2662 }
2663
2664 /* aka (~was & now & flag), but this is more clear :) */
2665 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2666
2667 static int osc_set_async_flags(struct obd_export *exp,
2668                                struct lov_stripe_md *lsm,
2669                                struct lov_oinfo *loi, void *cookie,
2670                                obd_flag async_flags)
2671 {
2672         struct client_obd *cli = &exp->exp_obd->u.cli;
2673         struct loi_oap_pages *lop;
2674         struct osc_async_page *oap;
2675         int rc = 0;
2676         ENTRY;
2677
2678         oap = oap_from_cookie(cookie);
2679         if (IS_ERR(oap))
2680                 RETURN(PTR_ERR(oap));
2681
2682         /*
2683          * bug 7311: OST-side locking is only supported for liblustre for now
2684          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2685          * implementation has to handle case where OST-locked page was picked
2686          * up by, e.g., ->writepage().
2687          */
2688         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2689         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2690                                      * tread here. */
2691
2692         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2693                 RETURN(-EIO);
2694
2695         if (loi == NULL)
2696                 loi = lsm->lsm_oinfo[0];
2697
2698         if (oap->oap_cmd & OBD_BRW_WRITE) {
2699                 lop = &loi->loi_write_lop;
2700         } else {
2701                 lop = &loi->loi_read_lop;
2702         }
2703
2704         client_obd_list_lock(&cli->cl_loi_list_lock);
2705
2706         if (list_empty(&oap->oap_pending_item))
2707                 GOTO(out, rc = -EINVAL);
2708
2709         if ((oap->oap_async_flags & async_flags) == async_flags)
2710                 GOTO(out, rc = 0);
2711
2712         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2713                 oap->oap_async_flags |= ASYNC_READY;
2714
2715         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2716                 if (list_empty(&oap->oap_rpc_item)) {
2717                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2718                         loi_list_maint(cli, loi);
2719                 }
2720         }
2721
2722         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2723                         oap->oap_async_flags);
2724 out:
2725         osc_check_rpcs(cli);
2726         client_obd_list_unlock(&cli->cl_loi_list_lock);
2727         RETURN(rc);
2728 }
2729
2730 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2731                              struct lov_oinfo *loi,
2732                              struct obd_io_group *oig, void *cookie,
2733                              int cmd, obd_off off, int count,
2734                              obd_flag brw_flags,
2735                              obd_flag async_flags)
2736 {
2737         struct client_obd *cli = &exp->exp_obd->u.cli;
2738         struct osc_async_page *oap;
2739         struct loi_oap_pages *lop;
2740         int rc = 0;
2741         ENTRY;
2742
2743         oap = oap_from_cookie(cookie);
2744         if (IS_ERR(oap))
2745                 RETURN(PTR_ERR(oap));
2746
2747         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2748                 RETURN(-EIO);
2749
2750         if (!list_empty(&oap->oap_pending_item) ||
2751             !list_empty(&oap->oap_urgent_item) ||
2752             !list_empty(&oap->oap_rpc_item))
2753                 RETURN(-EBUSY);
2754
2755         if (loi == NULL)
2756                 loi = lsm->lsm_oinfo[0];
2757
2758         client_obd_list_lock(&cli->cl_loi_list_lock);
2759
2760         oap->oap_cmd = cmd;
2761         oap->oap_page_off = off;
2762         oap->oap_count = count;
2763         oap->oap_brw_flags = brw_flags;
2764         oap->oap_async_flags = async_flags;
2765
2766         if (cmd & OBD_BRW_WRITE)
2767                 lop = &loi->loi_write_lop;
2768         else
2769                 lop = &loi->loi_read_lop;
2770
2771         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2772         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2773                 oap->oap_oig = oig;
2774                 rc = oig_add_one(oig, &oap->oap_occ);
2775         }
2776
2777         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2778                   oap, oap->oap_page, rc);
2779
2780         client_obd_list_unlock(&cli->cl_loi_list_lock);
2781
2782         RETURN(rc);
2783 }
2784
2785 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2786                                  struct loi_oap_pages *lop, int cmd)
2787 {
2788         struct list_head *pos, *tmp;
2789         struct osc_async_page *oap;
2790
2791         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2792                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2793                 list_del(&oap->oap_pending_item);
2794                 osc_oap_to_pending(oap);
2795         }
2796         loi_list_maint(cli, loi);
2797 }
2798
2799 static int osc_trigger_group_io(struct obd_export *exp,
2800                                 struct lov_stripe_md *lsm,
2801                                 struct lov_oinfo *loi,
2802                                 struct obd_io_group *oig)
2803 {
2804         struct client_obd *cli = &exp->exp_obd->u.cli;
2805         ENTRY;
2806
2807         if (loi == NULL)
2808                 loi = lsm->lsm_oinfo[0];
2809
2810         client_obd_list_lock(&cli->cl_loi_list_lock);
2811
2812         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2813         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2814
2815         osc_check_rpcs(cli);
2816         client_obd_list_unlock(&cli->cl_loi_list_lock);
2817
2818         RETURN(0);
2819 }
2820
2821 static int osc_teardown_async_page(struct obd_export *exp,
2822                                    struct lov_stripe_md *lsm,
2823                                    struct lov_oinfo *loi, void *cookie)
2824 {
2825         struct client_obd *cli = &exp->exp_obd->u.cli;
2826         struct loi_oap_pages *lop;
2827         struct osc_async_page *oap;
2828         int rc = 0;
2829         ENTRY;
2830
2831         oap = oap_from_cookie(cookie);
2832         if (IS_ERR(oap))
2833                 RETURN(PTR_ERR(oap));
2834
2835         if (loi == NULL)
2836                 loi = lsm->lsm_oinfo[0];
2837
2838         if (oap->oap_cmd & OBD_BRW_WRITE) {
2839                 lop = &loi->loi_write_lop;
2840         } else {
2841                 lop = &loi->loi_read_lop;
2842         }
2843
2844         client_obd_list_lock(&cli->cl_loi_list_lock);
2845
2846         if (!list_empty(&oap->oap_rpc_item))
2847                 GOTO(out, rc = -EBUSY);
2848
2849         osc_exit_cache(cli, oap, 0);
2850         osc_wake_cache_waiters(cli);
2851
2852         if (!list_empty(&oap->oap_urgent_item)) {
2853                 list_del_init(&oap->oap_urgent_item);
2854                 oap->oap_async_flags &= ~ASYNC_URGENT;
2855         }
2856         if (!list_empty(&oap->oap_pending_item)) {
2857                 list_del_init(&oap->oap_pending_item);
2858                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2859         }
2860         loi_list_maint(cli, loi);
2861
2862         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2863 out:
2864         client_obd_list_unlock(&cli->cl_loi_list_lock);
2865         RETURN(rc);
2866 }
2867
2868 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2869                                     int flags)
2870 {
2871         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2872
2873         if (lock == NULL) {
2874                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2875                 return;
2876         }
2877         lock_res_and_lock(lock);
2878 #if defined (__KERNEL__) && defined (__linux__)
2879         /* Liang XXX: Darwin and Winnt checking should be added */
2880         if (lock->l_ast_data && lock->l_ast_data != data) {
2881                 struct inode *new_inode = data;
2882                 struct inode *old_inode = lock->l_ast_data;
2883                 if (!(old_inode->i_state & I_FREEING))
2884                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2885                 LASSERTF(old_inode->i_state & I_FREEING,
2886                          "Found existing inode %p/%lu/%u state %lu in lock: "
2887                          "setting data to %p/%lu/%u\n", old_inode,
2888                          old_inode->i_ino, old_inode->i_generation,
2889                          old_inode->i_state,
2890                          new_inode, new_inode->i_ino, new_inode->i_generation);
2891         }
2892 #endif
2893         lock->l_ast_data = data;
2894         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2895         unlock_res_and_lock(lock);
2896         LDLM_LOCK_PUT(lock);
2897 }
2898
2899 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2900                              ldlm_iterator_t replace, void *data)
2901 {
2902         struct ldlm_res_id res_id = { .name = {0} };
2903         struct obd_device *obd = class_exp2obd(exp);
2904
2905         res_id.name[0] = lsm->lsm_object_id;
2906         res_id.name[2] = lsm->lsm_object_gr;
2907
2908         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2909         return 0;
2910 }
2911
2912 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2913                             int intent, int rc)
2914 {
2915         ENTRY;
2916
2917         if (intent) {
2918                 /* The request was created before ldlm_cli_enqueue call. */
2919                 if (rc == ELDLM_LOCK_ABORTED) {
2920                         struct ldlm_reply *rep;
2921                         rep = req_capsule_server_get(&req->rq_pill,
2922                                                      &RMF_DLM_REP);
2923
2924                         LASSERT(rep != NULL);
2925                         if (rep->lock_policy_res1)
2926                                 rc = rep->lock_policy_res1;
2927                 }
2928         }
2929
2930         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2931                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2932                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2933                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2934                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2935         }
2936
2937         /* Call the update callback. */
2938         rc = oinfo->oi_cb_up(oinfo, rc);
2939         RETURN(rc);
2940 }
2941
2942 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2943                                  struct osc_enqueue_args *aa, int rc)
2944 {
2945         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2946         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2947         struct ldlm_lock *lock;
2948
2949         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2950          * be valid. */
2951         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2952
2953         /* Complete obtaining the lock procedure. */
2954         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2955                                    aa->oa_ei->ei_mode,
2956                                    &aa->oa_oi->oi_flags,
2957                                    &lsm->lsm_oinfo[0]->loi_lvb,
2958                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2959                                    lustre_swab_ost_lvb,
2960                                    aa->oa_oi->oi_lockh, rc);
2961
2962         /* Complete osc stuff. */
2963         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2964
2965         /* Release the lock for async request. */
2966         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2967                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2968
2969         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2970                  aa->oa_oi->oi_lockh, req, aa);
2971         LDLM_LOCK_PUT(lock);
2972         return rc;
2973 }
2974
2975 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2976  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2977  * other synchronous requests, however keeping some locks and trying to obtain
2978  * others may take a considerable amount of time in a case of ost failure; and
2979  * when other sync requests do not get released lock from a client, the client
2980  * is excluded from the cluster -- such scenarious make the life difficult, so
2981  * release locks just after they are obtained. */
2982 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2983                        struct ldlm_enqueue_info *einfo,
2984                        struct ptlrpc_request_set *rqset)
2985 {
2986         struct ldlm_res_id res_id = { .name = {0} };
2987         struct obd_device *obd = exp->exp_obd;
2988         struct ptlrpc_request *req = NULL;
2989         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2990         ldlm_mode_t mode;
2991         int rc;
2992         ENTRY;
2993
2994         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2995         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2996
2997         /* Filesystem lock extents are extended to page boundaries so that
2998          * dealing with the page cache is a little smoother.  */
2999         oinfo->oi_policy.l_extent.start -=
3000                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3001         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3002
3003         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3004                 goto no_match;
3005
3006         /* Next, search for already existing extent locks that will cover us */
3007         /* If we're trying to read, we also search for an existing PW lock.  The
3008          * VFS and page cache already protect us locally, so lots of readers/
3009          * writers can share a single PW lock.
3010          *
3011          * There are problems with conversion deadlocks, so instead of
3012          * converting a read lock to a write lock, we'll just enqueue a new
3013          * one.
3014          *
3015          * At some point we should cancel the read lock instead of making them
3016          * send us a blocking callback, but there are problems with canceling
3017          * locks out from other users right now, too. */
3018         mode = einfo->ei_mode;
3019         if (einfo->ei_mode == LCK_PR)
3020                 mode |= LCK_PW;
3021         mode = ldlm_lock_match(obd->obd_namespace,
3022                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3023                                einfo->ei_type, &oinfo->oi_policy, mode,
3024                                oinfo->oi_lockh);
3025         if (mode) {
3026                 /* addref the lock only if not async requests and PW lock is
3027                  * matched whereas we asked for PR. */
3028                 if (!rqset && einfo->ei_mode != mode)
3029                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3030                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3031                                         oinfo->oi_flags);
3032                 if (intent) {
3033                         /* I would like to be able to ASSERT here that rss <=
3034                          * kms, but I can't, for reasons which are explained in
3035                          * lov_enqueue() */
3036                 }
3037
3038                 /* We already have a lock, and it's referenced */
3039                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3040
3041                 /* For async requests, decref the lock. */
3042                 if (einfo->ei_mode != mode)
3043                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3044                 else if (rqset)
3045                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3046
3047                 RETURN(ELDLM_OK);
3048         }
3049
3050  no_match:
3051         if (intent) {
3052                 CFS_LIST_HEAD(cancels);
3053                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3054                                            &RQF_LDLM_ENQUEUE_LVB);
3055                 if (req == NULL)
3056                         RETURN(-ENOMEM);
3057
3058                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3059                 if (rc)
3060                         RETURN(rc);
3061
3062                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3063                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3064                 ptlrpc_request_set_replen(req);
3065         }
3066
3067         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3068         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3069
3070         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3071                               &oinfo->oi_policy, &oinfo->oi_flags,
3072                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3073                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3074                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3075                               rqset ? 1 : 0);
3076         if (rqset) {
3077                 if (!rc) {
3078                         struct osc_enqueue_args *aa;
3079                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3080                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3081                         aa->oa_oi = oinfo;
3082                         aa->oa_ei = einfo;
3083                         aa->oa_exp = exp;
3084
3085                         req->rq_interpret_reply = osc_enqueue_interpret;
3086                         ptlrpc_set_add_req(rqset, req);
3087                 } else if (intent) {
3088                         ptlrpc_req_finished(req);
3089                 }
3090                 RETURN(rc);
3091         }
3092
3093         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3094         if (intent)
3095                 ptlrpc_req_finished(req);
3096
3097         RETURN(rc);
3098 }
3099
3100 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3101                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3102                      int *flags, void *data, struct lustre_handle *lockh)
3103 {
3104         struct ldlm_res_id res_id = { .name = {0} };
3105         struct obd_device *obd = exp->exp_obd;
3106         int lflags = *flags;
3107         ldlm_mode_t rc;
3108         ENTRY;
3109
3110         res_id.name[0] = lsm->lsm_object_id;
3111         res_id.name[2] = lsm->lsm_object_gr;
3112
3113         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3114                 RETURN(-EIO);
3115
3116         /* Filesystem lock extents are extended to page boundaries so that
3117          * dealing with the page cache is a little smoother */
3118         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3119         policy->l_extent.end |= ~CFS_PAGE_MASK;
3120
3121         /* Next, search for already existing extent locks that will cover us */
3122         /* If we're trying to read, we also search for an existing PW lock.  The
3123          * VFS and page cache already protect us locally, so lots of readers/
3124          * writers can share a single PW lock. */
3125         rc = mode;
3126         if (mode == LCK_PR)
3127                 rc |= LCK_PW;
3128         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3129                              &res_id, type, policy, rc, lockh);
3130         if (rc) {
3131                 osc_set_data_with_check(lockh, data, lflags);
3132                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3133                         ldlm_lock_addref(lockh, LCK_PR);
3134                         ldlm_lock_decref(lockh, LCK_PW);
3135                 }
3136                 RETURN(rc);
3137         }
3138         RETURN(rc);
3139 }
3140
3141 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3142                       __u32 mode, struct lustre_handle *lockh)
3143 {
3144         ENTRY;
3145
3146         if (unlikely(mode == LCK_GROUP))
3147                 ldlm_lock_decref_and_cancel(lockh, mode);
3148         else
3149                 ldlm_lock_decref(lockh, mode);
3150
3151         RETURN(0);
3152 }
3153
3154 static int osc_cancel_unused(struct obd_export *exp,
3155                              struct lov_stripe_md *lsm, int flags,
3156                              void *opaque)
3157 {
3158         struct obd_device *obd = class_exp2obd(exp);
3159         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3160
3161         if (lsm != NULL) {
3162                 res_id.name[0] = lsm->lsm_object_id;
3163                 res_id.name[2] = lsm->lsm_object_gr;
3164                 resp = &res_id;
3165         }
3166
3167         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3168 }
3169
3170 static int osc_join_lru(struct obd_export *exp,
3171                         struct lov_stripe_md *lsm, int join)
3172 {
3173         struct obd_device *obd = class_exp2obd(exp);
3174         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3175
3176         if (lsm != NULL) {
3177                 res_id.name[0] = lsm->lsm_object_id;
3178                 res_id.name[2] = lsm->lsm_object_gr;
3179                 resp = &res_id;
3180         }
3181
3182         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3183 }
3184
3185 static int osc_statfs_interpret(struct ptlrpc_request *req,
3186                          &nbs