Whamcloud - gitweb
d08e62103a8929d6562b78216aae99346dffc346
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* by default 10s */
67 atomic_t osc_resend_time; 
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
181         body->oa = *oinfo->oi_oa;
182         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
183 }
184
185 static int osc_getattr_interpret(struct ptlrpc_request *req,
186                                  struct osc_async_args *aa, int rc)
187 {
188         struct ost_body *body;
189         ENTRY;
190
191         if (rc != 0)
192                 GOTO(out, rc);
193
194         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
195                                   lustre_swab_ost_body);
196         if (body) {
197                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
198                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
199
200                 /* This should really be sent by the OST */
201                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
202                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
203         } else {
204                 CERROR("can't unpack ost_body\n");
205                 rc = -EPROTO;
206                 aa->aa_oi->oi_oa->o_valid = 0;
207         }
208 out:
209         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
210         RETURN(rc);
211 }
212
213 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
214                              struct ptlrpc_request_set *set)
215 {
216         struct ptlrpc_request *req;
217         struct ost_body *body;
218         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
219         struct osc_async_args *aa;
220         ENTRY;
221
222         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
223         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224                               OST_GETATTR, 3, size,NULL);
225         if (!req)
226                 RETURN(-ENOMEM);
227
228         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
229
230         ptlrpc_req_set_repsize(req, 2, size);
231         req->rq_interpret_reply = osc_getattr_interpret;
232
233         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
234         aa = (struct osc_async_args *)&req->rq_async_args;
235         aa->aa_oi = oinfo;
236
237         ptlrpc_set_add_req(set, req);
238         RETURN (0);
239 }
240
241 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
242 {
243         struct ptlrpc_request *req;
244         struct ost_body *body;
245         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
246         ENTRY;
247
248         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
249         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
250                               OST_GETATTR, 3, size, NULL);
251         if (!req)
252                 RETURN(-ENOMEM);
253
254         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
255
256         ptlrpc_req_set_repsize(req, 2, size);
257
258         rc = ptlrpc_queue_wait(req);
259         if (rc) {
260                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
261                 GOTO(out, rc);
262         }
263
264         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
265                                   lustre_swab_ost_body);
266         if (body == NULL) {
267                 CERROR ("can't unpack ost_body\n");
268                 GOTO (out, rc = -EPROTO);
269         }
270
271         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
272         *oinfo->oi_oa = body->oa;
273
274         /* This should really be sent by the OST */
275         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
276         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
277
278         EXIT;
279  out:
280         ptlrpc_req_finished(req);
281         return rc;
282 }
283
284 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
285                        struct obd_trans_info *oti)
286 {
287         struct ptlrpc_request *req;
288         struct ost_body *body;
289         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
290         ENTRY;
291
292         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
293                                         oinfo->oi_oa->o_gr > 0);
294         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
295         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
296                               OST_SETATTR, 3, size, NULL);
297         if (!req)
298                 RETURN(-ENOMEM);
299
300         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
301
302         ptlrpc_req_set_repsize(req, 2, size);
303
304         rc = ptlrpc_queue_wait(req);
305         if (rc)
306                 GOTO(out, rc);
307
308         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
309                                   lustre_swab_ost_body);
310         if (body == NULL)
311                 GOTO(out, rc = -EPROTO);
312
313         *oinfo->oi_oa = body->oa;
314
315         EXIT;
316 out:
317         ptlrpc_req_finished(req);
318         RETURN(rc);
319 }
320
321 static int osc_setattr_interpret(struct ptlrpc_request *req,
322                                  struct osc_async_args *aa, int rc)
323 {
324         struct ost_body *body;
325         ENTRY;
326
327         if (rc != 0)
328                 GOTO(out, rc);
329
330         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
331                                   lustre_swab_ost_body);
332         if (body == NULL) {
333                 CERROR("can't unpack ost_body\n");
334                 GOTO(out, rc = -EPROTO);
335         }
336
337         *aa->aa_oi->oi_oa = body->oa;
338 out:
339         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
340         RETURN(rc);
341 }
342
343 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
344                              struct obd_trans_info *oti,
345                              struct ptlrpc_request_set *rqset)
346 {
347         struct ptlrpc_request *req;
348         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
349         struct osc_async_args *aa;
350         ENTRY;
351
352         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
353         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
354                               OST_SETATTR, 3, size, NULL);
355         if (!req)
356                 RETURN(-ENOMEM);
357
358         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
359         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
360                 LASSERT(oti);
361                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
362         }
363
364         ptlrpc_req_set_repsize(req, 2, size);
365         /* do mds to ost setattr asynchronouly */
366         if (!rqset) {
367                 /* Do not wait for response. */
368                 ptlrpcd_add_req(req);
369         } else {
370                 req->rq_interpret_reply = osc_setattr_interpret;
371
372                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
373                 aa = (struct osc_async_args *)&req->rq_async_args;
374                 aa->aa_oi = oinfo;
375
376                 ptlrpc_set_add_req(rqset, req);
377         }
378
379         RETURN(0);
380 }
381
382 int osc_real_create(struct obd_export *exp, struct obdo *oa,
383                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
384 {
385         struct ptlrpc_request *req;
386         struct ost_body *body;
387         struct lov_stripe_md *lsm;
388         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
389         ENTRY;
390
391         LASSERT(oa);
392         LASSERT(ea);
393
394         lsm = *ea;
395         if (!lsm) {
396                 rc = obd_alloc_memmd(exp, &lsm);
397                 if (rc < 0)
398                         RETURN(rc);
399         }
400
401         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
402                               OST_CREATE, 2, size, NULL);
403         if (!req)
404                 GOTO(out, rc = -ENOMEM);
405
406         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
407         body->oa = *oa;
408
409         ptlrpc_req_set_repsize(req, 2, size);
410         if (oa->o_valid & OBD_MD_FLINLINE) {
411                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
412                         oa->o_flags == OBD_FL_DELORPHAN);
413                 DEBUG_REQ(D_HA, req,
414                           "delorphan from OST integration");
415                 /* Don't resend the delorphan req */
416                 req->rq_no_resend = req->rq_no_delay = 1;
417         }
418
419         rc = ptlrpc_queue_wait(req);
420         if (rc)
421                 GOTO(out_req, rc);
422
423         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
424                                   lustre_swab_ost_body);
425         if (body == NULL) {
426                 CERROR ("can't unpack ost_body\n");
427                 GOTO (out_req, rc = -EPROTO);
428         }
429
430         *oa = body->oa;
431
432         /* This should really be sent by the OST */
433         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
434         oa->o_valid |= OBD_MD_FLBLKSZ;
435
436         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
437          * have valid lsm_oinfo data structs, so don't go touching that.
438          * This needs to be fixed in a big way.
439          */
440         lsm->lsm_object_id = oa->o_id;
441         lsm->lsm_object_gr = oa->o_gr;
442         *ea = lsm;
443
444         if (oti != NULL) {
445                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
446
447                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
448                         if (!oti->oti_logcookies)
449                                 oti_alloc_cookies(oti, 1);
450                         *oti->oti_logcookies = *obdo_logcookie(oa);
451                 }
452         }
453
454         CDEBUG(D_HA, "transno: "LPD64"\n",
455                lustre_msg_get_transno(req->rq_repmsg));
456         EXIT;
457 out_req:
458         ptlrpc_req_finished(req);
459 out:
460         if (rc && !*ea)
461                 obd_free_memmd(exp, &lsm);
462         return rc;
463 }
464
465 static int osc_punch_interpret(struct ptlrpc_request *req,
466                                struct osc_async_args *aa, int rc)
467 {
468         struct ost_body *body;
469         ENTRY;
470
471         if (rc != 0)
472                 GOTO(out, rc);
473
474         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
475                                   lustre_swab_ost_body);
476         if (body == NULL) {
477                 CERROR ("can't unpack ost_body\n");
478                 GOTO(out, rc = -EPROTO);
479         }
480
481         *aa->aa_oi->oi_oa = body->oa;
482 out:
483         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
484         RETURN(rc);
485 }
486
487 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
488                      struct obd_trans_info *oti,
489                      struct ptlrpc_request_set *rqset)
490 {
491         struct ptlrpc_request *req;
492         struct osc_async_args *aa;
493         struct ost_body *body;
494         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
495         ENTRY;
496
497         if (!oinfo->oi_oa) {
498                 CERROR("oa NULL\n");
499                 RETURN(-EINVAL);
500         }
501
502         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
503         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
504                               OST_PUNCH, 3, size, NULL);
505         if (!req)
506                 RETURN(-ENOMEM);
507
508         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
509
510         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
511         /* overload the size and blocks fields in the oa with start/end */
512         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
513         body->oa.o_size = oinfo->oi_policy.l_extent.start;
514         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
515         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
516
517         ptlrpc_req_set_repsize(req, 2, size);
518
519         req->rq_interpret_reply = osc_punch_interpret;
520         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
521         aa = (struct osc_async_args *)&req->rq_async_args;
522         aa->aa_oi = oinfo;
523         ptlrpc_set_add_req(rqset, req);
524
525         RETURN(0);
526 }
527
528 static int osc_sync(struct obd_export *exp, struct obdo *oa,
529                     struct lov_stripe_md *md, obd_size start, obd_size end,
530                     void *capa)
531 {
532         struct ptlrpc_request *req;
533         struct ost_body *body;
534         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
535         ENTRY;
536
537         if (!oa) {
538                 CERROR("oa NULL\n");
539                 RETURN(-EINVAL);
540         }
541
542         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
543
544         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
545                               OST_SYNC, 3, size, NULL);
546         if (!req)
547                 RETURN(-ENOMEM);
548
549         /* overload the size and blocks fields in the oa with start/end */
550         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
551         body->oa = *oa;
552         body->oa.o_size = start;
553         body->oa.o_blocks = end;
554         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
555
556         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
557
558         ptlrpc_req_set_repsize(req, 2, size);
559
560         rc = ptlrpc_queue_wait(req);
561         if (rc)
562                 GOTO(out, rc);
563
564         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
565                                   lustre_swab_ost_body);
566         if (body == NULL) {
567                 CERROR ("can't unpack ost_body\n");
568                 GOTO (out, rc = -EPROTO);
569         }
570
571         *oa = body->oa;
572
573         EXIT;
574  out:
575         ptlrpc_req_finished(req);
576         return rc;
577 }
578
579 /* Find and cancel locally locks matched by @mode in the resource found by
580  * @objid. Found locks are added into @cancel list. Returns the amount of
581  * locks added to @cancels list. */
582 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
583                                    struct list_head *cancels, ldlm_mode_t mode,
584                                    int lock_flags)
585 {
586         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
587         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
588         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
589         int count;
590         ENTRY;
591
592         if (res == NULL)
593                 RETURN(0);
594
595         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
596                                            lock_flags, 0, NULL);
597         ldlm_resource_putref(res);
598         RETURN(count);
599 }
600
601 /* Destroy requests can be async always on the client, and we don't even really
602  * care about the return code since the client cannot do anything at all about
603  * a destroy failure.
604  * When the MDS is unlinking a filename, it saves the file objects into a
605  * recovery llog, and these object records are cancelled when the OST reports
606  * they were destroyed and sync'd to disk (i.e. transaction committed).
607  * If the client dies, or the OST is down when the object should be destroyed,
608  * the records are not cancelled, and when the OST reconnects to the MDS next,
609  * it will retrieve the llog unlink logs and then sends the log cancellation
610  * cookies to the MDS after committing destroy transactions. */
611 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
612                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
613                        struct obd_export *md_export)
614 {
615         CFS_LIST_HEAD(cancels);
616         struct ptlrpc_request *req;
617         struct ost_body *body;
618         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
619         int count, bufcount = 2;
620         ENTRY;
621
622         if (!oa) {
623                 CERROR("oa NULL\n");
624                 RETURN(-EINVAL);
625         }
626
627         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
628                                         LDLM_FL_DISCARD_DATA);
629         if (exp_connect_cancelset(exp) && count) {
630                 bufcount = 3;
631                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
632         }
633         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
634                               OST_DESTROY, bufcount, size, NULL);
635         if (exp_connect_cancelset(exp) && req)
636                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
637         else
638                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
639
640         if (!req)
641                 RETURN(-ENOMEM);
642
643         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
644
645         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
646         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
647                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
648                        sizeof(*oti->oti_logcookies));
649         body->oa = *oa;
650
651         ptlrpc_req_set_repsize(req, 2, size);
652
653         ptlrpcd_add_req(req);
654         RETURN(0);
655 }
656
657 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
658                                 long writing_bytes)
659 {
660         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
661
662         LASSERT(!(oa->o_valid & bits));
663
664         oa->o_valid |= bits;
665         client_obd_list_lock(&cli->cl_loi_list_lock);
666         oa->o_dirty = cli->cl_dirty;
667         if (cli->cl_dirty > cli->cl_dirty_max) {
668                 CERROR("dirty %lu > dirty_max %lu\n",
669                        cli->cl_dirty, cli->cl_dirty_max);
670                 oa->o_undirty = 0;
671         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
672                 CERROR("dirty %d > system dirty_max %d\n",
673                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
674                 oa->o_undirty = 0;
675         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
676                 CERROR("dirty %lu - dirty_max %lu too big???\n",
677                        cli->cl_dirty, cli->cl_dirty_max);
678                 oa->o_undirty = 0;
679         } else {
680                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
681                                 (cli->cl_max_rpcs_in_flight + 1);
682                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
683         }
684         oa->o_grant = cli->cl_avail_grant;
685         oa->o_dropped = cli->cl_lost_grant;
686         cli->cl_lost_grant = 0;
687         client_obd_list_unlock(&cli->cl_loi_list_lock);
688         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
689                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
690 }
691
692 /* caller must hold loi_list_lock */
693 static void osc_consume_write_grant(struct client_obd *cli,
694                                     struct brw_page *pga)
695 {
696         atomic_inc(&obd_dirty_pages);
697         cli->cl_dirty += CFS_PAGE_SIZE;
698         cli->cl_avail_grant -= CFS_PAGE_SIZE;
699         pga->flag |= OBD_BRW_FROM_GRANT;
700         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
701                CFS_PAGE_SIZE, pga, pga->pg);
702         LASSERT(cli->cl_avail_grant >= 0);
703 }
704
705 /* the companion to osc_consume_write_grant, called when a brw has completed.
706  * must be called with the loi lock held. */
707 static void osc_release_write_grant(struct client_obd *cli,
708                                     struct brw_page *pga, int sent)
709 {
710         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
711         ENTRY;
712
713         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
714                 EXIT;
715                 return;
716         }
717
718         pga->flag &= ~OBD_BRW_FROM_GRANT;
719         atomic_dec(&obd_dirty_pages);
720         cli->cl_dirty -= CFS_PAGE_SIZE;
721         if (!sent) {
722                 cli->cl_lost_grant += CFS_PAGE_SIZE;
723                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
724                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
725         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
726                 /* For short writes we shouldn't count parts of pages that
727                  * span a whole block on the OST side, or our accounting goes
728                  * wrong.  Should match the code in filter_grant_check. */
729                 int offset = pga->off & ~CFS_PAGE_MASK;
730                 int count = pga->count + (offset & (blocksize - 1));
731                 int end = (offset + pga->count) & (blocksize - 1);
732                 if (end)
733                         count += blocksize - end;
734
735                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
736                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
737                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
738                        cli->cl_avail_grant, cli->cl_dirty);
739         }
740
741         EXIT;
742 }
743
744 static unsigned long rpcs_in_flight(struct client_obd *cli)
745 {
746         return cli->cl_r_in_flight + cli->cl_w_in_flight;
747 }
748
749 /* caller must hold loi_list_lock */
750 void osc_wake_cache_waiters(struct client_obd *cli)
751 {
752         struct list_head *l, *tmp;
753         struct osc_cache_waiter *ocw;
754
755         ENTRY;
756         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
757                 /* if we can't dirty more, we must wait until some is written */
758                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
759                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
760                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
761                                "osc max %ld, sys max %d\n", cli->cl_dirty,
762                                cli->cl_dirty_max, obd_max_dirty_pages);
763                         return;
764                 }
765
766                 /* if still dirty cache but no grant wait for pending RPCs that
767                  * may yet return us some grant before doing sync writes */
768                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
769                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
770                                cli->cl_w_in_flight);
771                         return;
772                 }
773
774                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
775                 list_del_init(&ocw->ocw_entry);
776                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
777                         /* no more RPCs in flight to return grant, do sync IO */
778                         ocw->ocw_rc = -EDQUOT;
779                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
780                 } else {
781                         osc_consume_write_grant(cli,
782                                                 &ocw->ocw_oap->oap_brw_page);
783                 }
784
785                 cfs_waitq_signal(&ocw->ocw_waitq);
786         }
787
788         EXIT;
789 }
790
791 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
792 {
793         client_obd_list_lock(&cli->cl_loi_list_lock);
794         cli->cl_avail_grant = ocd->ocd_grant;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796
797         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
798                cli->cl_avail_grant, cli->cl_lost_grant);
799         LASSERT(cli->cl_avail_grant >= 0);
800 }
801
802 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
803 {
804         client_obd_list_lock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
806         if (body->oa.o_valid & OBD_MD_FLGRANT)
807                 cli->cl_avail_grant += body->oa.o_grant;
808         /* waiters are woken in brw_interpret_oap */
809         client_obd_list_unlock(&cli->cl_loi_list_lock);
810 }
811
812 /* We assume that the reason this OSC got a short read is because it read
813  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
814  * via the LOV, and it _knows_ it's reading inside the file, it's just that
815  * this stripe never got written at or beyond this stripe offset yet. */
816 static void handle_short_read(int nob_read, obd_count page_count,
817                               struct brw_page **pga)
818 {
819         char *ptr;
820         int i = 0;
821
822         /* skip bytes read OK */
823         while (nob_read > 0) {
824                 LASSERT (page_count > 0);
825
826                 if (pga[i]->count > nob_read) {
827                         /* EOF inside this page */
828                         ptr = cfs_kmap(pga[i]->pg) +
829                                 (pga[i]->off & ~CFS_PAGE_MASK);
830                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
831                         cfs_kunmap(pga[i]->pg);
832                         page_count--;
833                         i++;
834                         break;
835                 }
836
837                 nob_read -= pga[i]->count;
838                 page_count--;
839                 i++;
840         }
841
842         /* zero remaining pages */
843         while (page_count-- > 0) {
844                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
845                 memset(ptr, 0, pga[i]->count);
846                 cfs_kunmap(pga[i]->pg);
847                 i++;
848         }
849 }
850
851 static int check_write_rcs(struct ptlrpc_request *req,
852                            int requested_nob, int niocount,
853                            obd_count page_count, struct brw_page **pga)
854 {
855         int    *remote_rcs, i;
856
857         /* return error if any niobuf was in error */
858         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
859                                         sizeof(*remote_rcs) * niocount, NULL);
860         if (remote_rcs == NULL) {
861                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
862                 return(-EPROTO);
863         }
864         if (lustre_msg_swabbed(req->rq_repmsg))
865                 for (i = 0; i < niocount; i++)
866                         __swab32s(&remote_rcs[i]);
867
868         for (i = 0; i < niocount; i++) {
869                 if (remote_rcs[i] < 0)
870                         return(remote_rcs[i]);
871
872                 if (remote_rcs[i] != 0) {
873                         CERROR("rc[%d] invalid (%d) req %p\n",
874                                 i, remote_rcs[i], req);
875                         return(-EPROTO);
876                 }
877         }
878
879         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
880                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
881                        requested_nob, req->rq_bulk->bd_nob_transferred);
882                 return(-EPROTO);
883         }
884
885         return (0);
886 }
887
888 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
889 {
890         if (p1->flag != p2->flag) {
891                 unsigned mask = ~OBD_BRW_FROM_GRANT;
892
893                 /* warn if we try to combine flags that we don't know to be
894                  * safe to combine */
895                 if ((p1->flag & mask) != (p2->flag & mask))
896                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
897                                "same brw?\n", p1->flag, p2->flag);
898                 return 0;
899         }
900
901         return (p1->off + p1->count == p2->off);
902 }
903
904 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
905                                    struct brw_page **pga)
906 {
907         __u32 cksum = ~0;
908         int i = 0;
909
910         LASSERT (pg_count > 0);
911         while (nob > 0 && pg_count > 0) {
912                 char *ptr = cfs_kmap(pga[i]->pg);
913                 int off = pga[i]->off & ~CFS_PAGE_MASK;
914                 int count = pga[i]->count > nob ? nob : pga[i]->count;
915
916                 /* corrupt the data before we compute the checksum, to
917                  * simulate an OST->client data error */
918                 if (i == 0 &&
919                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
920                         memcpy(ptr + off, "bad1", min(4, nob));
921                 cksum = crc32_le(cksum, ptr + off, count);
922                 cfs_kunmap(pga[i]->pg);
923                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
924                                off, cksum);
925
926                 nob -= pga[i]->count;
927                 pg_count--;
928                 i++;
929         }
930         /* For sending we only compute the wrong checksum instead
931          * of corrupting the data so it is still correct on a redo */
932         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
933                 cksum++;
934
935         return cksum;
936 }
937
938 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
939                                 struct lov_stripe_md *lsm, obd_count page_count,
940                                 struct brw_page **pga, 
941                                 struct ptlrpc_request **reqp,
942                                 struct obd_capa *ocapa)
943 {
944         struct ptlrpc_request   *req;
945         struct ptlrpc_bulk_desc *desc;
946         struct ost_body         *body;
947         struct obd_ioobj        *ioobj;
948         struct niobuf_remote    *niobuf;
949         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
950         int niocount, i, requested_nob, opc, rc;
951         struct ptlrpc_request_pool *pool;
952         struct lustre_capa      *capa;
953         struct osc_brw_async_args *aa;
954
955         ENTRY;
956         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
957         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
958
959         if ((cmd & OBD_BRW_WRITE) != 0) {
960                 opc = OST_WRITE;
961                 pool = cli->cl_import->imp_rq_pool;
962         } else {
963                 opc = OST_READ;
964                 pool = NULL;
965         }
966
967         for (niocount = i = 1; i < page_count; i++) {
968                 if (!can_merge_pages(pga[i - 1], pga[i]))
969                         niocount++;
970         }
971
972         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
973         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
974         if (ocapa)
975                 size[REQ_REC_OFF + 3] = sizeof(*capa);
976
977         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
978                                    size, NULL, pool, NULL);
979         if (req == NULL)
980                 RETURN (-ENOMEM);
981
982         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
983
984         if (opc == OST_WRITE)
985                 desc = ptlrpc_prep_bulk_imp (req, page_count,
986                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
987         else
988                 desc = ptlrpc_prep_bulk_imp (req, page_count,
989                                              BULK_PUT_SINK, OST_BULK_PORTAL);
990         if (desc == NULL)
991                 GOTO(out, rc = -ENOMEM);
992         /* NB request now owns desc and will free it when it gets freed */
993
994         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
995         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
996         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
997                                 niocount * sizeof(*niobuf));
998
999         body->oa = *oa;
1000
1001         obdo_to_ioobj(oa, ioobj);
1002         ioobj->ioo_bufcnt = niocount;
1003         if (ocapa) {
1004                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
1005                                       sizeof(*capa));
1006                 capa_cpy(capa, ocapa);
1007                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1008         }
1009
1010         LASSERT (page_count > 0);
1011         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1012                 struct brw_page *pg = pga[i];
1013                 struct brw_page *pg_prev = pga[i - 1];
1014
1015                 LASSERT(pg->count > 0);
1016                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1017                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1018                          pg->off, pg->count);
1019 #ifdef __LINUX__
1020                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1021                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1022                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1023                          i, page_count,
1024                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1025                          pg_prev->pg, page_private(pg_prev->pg),
1026                          pg_prev->pg->index, pg_prev->off);
1027 #else
1028                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1029                          "i %d p_c %u\n", i, page_count);
1030 #endif
1031                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1032                         (pg->flag & OBD_BRW_SRVLOCK));
1033
1034                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1035                                       pg->count);
1036                 requested_nob += pg->count;
1037
1038                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1039                         niobuf--;
1040                         niobuf->len += pg->count;
1041                 } else {
1042                         niobuf->offset = pg->off;
1043                         niobuf->len    = pg->count;
1044                         niobuf->flags  = pg->flag;
1045                 }
1046         }
1047
1048         LASSERT((void *)(niobuf - niocount) ==
1049                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1050                                niocount * sizeof(*niobuf)));
1051         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1052
1053         /* size[REQ_REC_OFF] still sizeof (*body) */
1054         if (opc == OST_WRITE) {
1055                 if (unlikely(cli->cl_checksum)) {
1056                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1057                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1058                                                              page_count, pga);
1059                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1060                                body->oa.o_cksum);
1061                         /* save this in 'oa', too, for later checking */
1062                         oa->o_valid |= OBD_MD_FLCKSUM;
1063                 } else {
1064                         /* clear out the checksum flag, in case this is a
1065                          * resend but cl_checksum is no longer set. b=11238 */
1066                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1067                 }
1068                 oa->o_cksum = body->oa.o_cksum;
1069                 /* 1 RC per niobuf */
1070                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1071                 ptlrpc_req_set_repsize(req, 3, size);
1072         } else {
1073                 if (unlikely(cli->cl_checksum))
1074                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1075                 /* 1 RC for the whole I/O */
1076                 ptlrpc_req_set_repsize(req, 2, size);
1077         }
1078
1079         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1080         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1081         aa->aa_oa = oa;
1082         aa->aa_requested_nob = requested_nob;
1083         aa->aa_nio_count = niocount;
1084         aa->aa_page_count = page_count;
1085         aa->aa_resends = 0;
1086         aa->aa_ppga = pga;
1087         aa->aa_cli = cli;
1088         INIT_LIST_HEAD(&aa->aa_oaps);
1089
1090         *reqp = req;
1091         RETURN (0);
1092
1093  out:
1094         ptlrpc_req_finished (req);
1095         RETURN (rc);
1096 }
1097
1098 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1099                                 __u32 client_cksum, __u32 server_cksum,
1100                                 int nob, obd_count page_count,
1101                                 struct brw_page **pga)
1102 {
1103         __u32 new_cksum;
1104         char *msg;
1105
1106         if (server_cksum == client_cksum) {
1107                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1108                 return 0;
1109         }
1110
1111         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1112
1113         if (new_cksum == server_cksum)
1114                 msg = "changed on the client after we checksummed it - "
1115                       "likely false positive due to mmap IO (bug 11742)";
1116         else if (new_cksum == client_cksum)
1117                 msg = "changed in transit before arrival at OST";
1118         else
1119                 msg = "changed in transit AND doesn't match the original - "
1120                       "likely false positive due to mmap IO (bug 11742)";
1121
1122         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1123                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1124                            "["LPU64"-"LPU64"]\n",
1125                            msg, libcfs_nid2str(peer->nid),
1126                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1127                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1128                                                         (__u64)0,
1129                            oa->o_id,
1130                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1131                            pga[0]->off,
1132                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1133         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1134                client_cksum, server_cksum, new_cksum);
1135         return 1;        
1136 }
1137
1138 /* Note rc enters this function as number of bytes transferred */
1139 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1140 {
1141         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1142         const lnet_process_id_t *peer =
1143                         &req->rq_import->imp_connection->c_peer;
1144         struct client_obd *cli = aa->aa_cli;
1145         struct ost_body *body;
1146         __u32 client_cksum = 0;
1147         ENTRY;
1148
1149         if (rc < 0 && rc != -EDQUOT)
1150                 RETURN(rc);
1151
1152         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1153         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1154                                   lustre_swab_ost_body);
1155         if (body == NULL) {
1156                 CERROR ("Can't unpack body\n");
1157                 RETURN(-EPROTO);
1158         }
1159
1160         /* set/clear over quota flag for a uid/gid */
1161         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1162             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1163                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1164                              body->oa.o_gid, body->oa.o_valid,
1165                              body->oa.o_flags);
1166
1167         if (rc < 0)
1168                 RETURN(rc);
1169
1170         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1171                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1172
1173         osc_update_grant(cli, body);
1174
1175         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1176                 if (rc > 0) {
1177                         CERROR ("Unexpected +ve rc %d\n", rc);
1178                         RETURN(-EPROTO);
1179                 }
1180                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1181
1182                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1183                              client_cksum &&
1184                              check_write_checksum(&body->oa, peer, client_cksum,
1185                                                   body->oa.o_cksum,
1186                                                   aa->aa_requested_nob,
1187                                                   aa->aa_page_count,
1188                                                   aa->aa_ppga)))
1189                         RETURN(-EAGAIN);
1190
1191                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1192                         RETURN(-EAGAIN);
1193
1194                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1195                                      aa->aa_page_count, aa->aa_ppga);
1196                 GOTO(out, rc);
1197         }
1198
1199         /* The rest of this function executes only for OST_READs */
1200         if (rc > aa->aa_requested_nob) {
1201                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1202                        aa->aa_requested_nob);
1203                 RETURN(-EPROTO);
1204         }
1205
1206         if (rc != req->rq_bulk->bd_nob_transferred) {
1207                 CERROR ("Unexpected rc %d (%d transferred)\n",
1208                         rc, req->rq_bulk->bd_nob_transferred);
1209                 return (-EPROTO);
1210         }
1211
1212         if (rc < aa->aa_requested_nob)
1213                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1214
1215         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1216                                          aa->aa_ppga))
1217                 GOTO(out, rc = -EAGAIN);
1218
1219         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1220                 static int cksum_counter;
1221                 __u32      server_cksum = body->oa.o_cksum;
1222                 char      *via;
1223                 char      *router;
1224
1225                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1226                                                  aa->aa_ppga);
1227
1228                 if (peer->nid == req->rq_bulk->bd_sender) {
1229                         via = router = "";
1230                 } else {
1231                         via = " via ";
1232                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1233                 }
1234
1235                 if (server_cksum == ~0 && rc > 0) {
1236                         CERROR("Protocol error: server %s set the 'checksum' "
1237                                "bit, but didn't send a checksum.  Not fatal, "
1238                                "but please tell CFS.\n",
1239                                libcfs_nid2str(peer->nid));
1240                 } else if (server_cksum != client_cksum) {
1241                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1242                                            "%s%s%s inum "LPU64"/"LPU64" object "
1243                                            LPU64"/"LPU64" extent "
1244                                            "["LPU64"-"LPU64"]\n",
1245                                            req->rq_import->imp_obd->obd_name,
1246                                            libcfs_nid2str(peer->nid),
1247                                            via, router,
1248                                            body->oa.o_valid & OBD_MD_FLFID ?
1249                                                 body->oa.o_fid : (__u64)0,
1250                                            body->oa.o_valid & OBD_MD_FLFID ?
1251                                                 body->oa.o_generation :(__u64)0,
1252                                            body->oa.o_id,
1253                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1254                                                 body->oa.o_gr : (__u64)0,
1255                                            aa->aa_ppga[0]->off,
1256                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1257                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1258                                                                         1);
1259                         CERROR("client %x, server %x\n",
1260                                client_cksum, server_cksum);
1261                         cksum_counter = 0;
1262                         aa->aa_oa->o_cksum = client_cksum;
1263                         rc = -EAGAIN;
1264                 } else {
1265                         cksum_counter++;
1266                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1267                         rc = 0;
1268                 }
1269         } else if (unlikely(client_cksum)) {
1270                 static int cksum_missed;
1271
1272                 cksum_missed++;
1273                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1274                         CERROR("Checksum %u requested from %s but not sent\n",
1275                                cksum_missed, libcfs_nid2str(peer->nid));
1276         } else {
1277                 rc = 0;
1278         }
1279 out:
1280         if (rc >= 0)
1281                 *aa->aa_oa = body->oa;
1282
1283         RETURN(rc);
1284 }
1285
1286 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1287                             struct lov_stripe_md *lsm,
1288                             obd_count page_count, struct brw_page **pga,
1289                             struct obd_capa *ocapa)
1290 {
1291         struct ptlrpc_request *req;
1292         int                    rc;
1293         cfs_waitq_t            waitq;
1294         int                    resends = 0;
1295         struct l_wait_info     lwi;
1296
1297         ENTRY;
1298
1299         cfs_waitq_init(&waitq);
1300
1301 restart_bulk:
1302         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1303                                   page_count, pga, &req, ocapa);
1304         if (rc != 0)
1305                 return (rc);
1306
1307         rc = ptlrpc_queue_wait(req);
1308
1309         if (rc == -ETIMEDOUT && req->rq_resend) {
1310                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1311                 ptlrpc_req_finished(req);
1312                 goto restart_bulk;
1313         }
1314
1315         rc = osc_brw_fini_request(req, rc);
1316
1317         ptlrpc_req_finished(req);
1318         if (osc_recoverable_error(rc)) {
1319                 resends++;
1320                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1321                         CERROR("too many resend retries, returning error\n");
1322                         RETURN(-EIO);
1323                 }
1324
1325                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1326                 l_wait_event(waitq, 0, &lwi);
1327
1328                 goto restart_bulk;
1329         }
1330         
1331         RETURN (rc);
1332 }
1333
1334 int osc_brw_redo_request(struct ptlrpc_request *request,
1335                          struct osc_brw_async_args *aa)
1336 {
1337         struct ptlrpc_request *new_req;
1338         struct ptlrpc_request_set *set = request->rq_set;
1339         struct osc_brw_async_args *new_aa;
1340         struct osc_async_page *oap;
1341         int rc = 0;
1342         ENTRY;
1343
1344         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1345                 CERROR("too many resend retries, returning error\n");
1346                 RETURN(-EIO);
1347         }
1348         
1349         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1350 /*
1351         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1352         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1353                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1354                                            REQ_REC_OFF + 3);
1355 */
1356         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1357                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1358                                   aa->aa_cli, aa->aa_oa,
1359                                   NULL /* lsm unused by osc currently */,
1360                                   aa->aa_page_count, aa->aa_ppga, 
1361                                   &new_req, NULL /* ocapa */);
1362         if (rc)
1363                 RETURN(rc);
1364
1365         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1366    
1367         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1368                 if (oap->oap_request != NULL) {
1369                         LASSERTF(request == oap->oap_request,
1370                                  "request %p != oap_request %p\n",
1371                                  request, oap->oap_request);
1372                         if (oap->oap_interrupted) {
1373                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1374                                 ptlrpc_req_finished(new_req);                        
1375                                 RETURN(-EINTR);
1376                         }
1377                 }
1378         }
1379         /* New request takes over pga and oaps from old request.
1380          * Note that copying a list_head doesn't work, need to move it... */
1381         aa->aa_resends++;
1382         new_req->rq_interpret_reply = request->rq_interpret_reply;
1383         new_req->rq_async_args = request->rq_async_args;
1384         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1385
1386         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1387
1388         INIT_LIST_HEAD(&new_aa->aa_oaps);
1389         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1390         INIT_LIST_HEAD(&aa->aa_oaps);
1391
1392         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1393                 if (oap->oap_request) {
1394                         ptlrpc_req_finished(oap->oap_request);
1395                         oap->oap_request = ptlrpc_request_addref(new_req);
1396                 }
1397         }
1398         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1399
1400         DEBUG_REQ(D_INFO, new_req, "new request");
1401
1402         ptlrpc_set_add_req(set, new_req);
1403
1404         RETURN(0);
1405 }
1406
1407 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1408 {
1409         struct osc_brw_async_args *aa = data;
1410         int                        i;
1411         int                        nob = rc;
1412         ENTRY;
1413
1414         rc = osc_brw_fini_request(req, rc);
1415         if (osc_recoverable_error(rc)) {
1416                 rc = osc_brw_redo_request(req, aa);
1417                 if (rc == 0)
1418                         RETURN(0);
1419         }
1420         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1421                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1422
1423         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1424         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1425                 aa->aa_cli->cl_w_in_flight--;
1426         else
1427                 aa->aa_cli->cl_r_in_flight--;
1428         for (i = 0; i < aa->aa_page_count; i++)
1429                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1430         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1431
1432         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1433
1434         RETURN(rc);
1435 }
1436
1437 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1438                           struct lov_stripe_md *lsm, obd_count page_count,
1439                           struct brw_page **pga, struct ptlrpc_request_set *set,
1440                           struct obd_capa *ocapa)
1441 {
1442         struct ptlrpc_request     *req;
1443         struct client_obd         *cli = &exp->exp_obd->u.cli;
1444         int                        rc, i;
1445         struct osc_brw_async_args *aa;
1446         ENTRY;
1447
1448         /* Consume write credits even if doing a sync write -
1449          * otherwise we may run out of space on OST due to grant. */
1450         if (cmd == OBD_BRW_WRITE) {
1451                 spin_lock(&cli->cl_loi_list_lock);
1452                 for (i = 0; i < page_count; i++) {
1453                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1454                                 osc_consume_write_grant(cli, pga[i]);
1455                 }
1456                 spin_unlock(&cli->cl_loi_list_lock);
1457         }
1458
1459         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1460                                   &req, ocapa);
1461
1462         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1463         if (cmd == OBD_BRW_READ) {
1464                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1465                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1466                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1467         } else {
1468                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1469                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1470                                  cli->cl_w_in_flight);
1471                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1472         }
1473
1474         if (rc == 0) {
1475                 req->rq_interpret_reply = brw_interpret;
1476                 ptlrpc_set_add_req(set, req);
1477                 client_obd_list_lock(&cli->cl_loi_list_lock);
1478                 if (cmd == OBD_BRW_READ)
1479                         cli->cl_r_in_flight++;
1480                 else
1481                         cli->cl_w_in_flight++;
1482                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1483         } else if (cmd == OBD_BRW_WRITE) {
1484                 client_obd_list_lock(&cli->cl_loi_list_lock);
1485                 for (i = 0; i < page_count; i++)
1486                         osc_release_write_grant(cli, pga[i], 0);
1487                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1488         }
1489         RETURN (rc);
1490 }
1491
1492 /*
1493  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1494  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1495  * fine for our small page arrays and doesn't require allocation.  its an
1496  * insertion sort that swaps elements that are strides apart, shrinking the
1497  * stride down until its '1' and the array is sorted.
1498  */
1499 static void sort_brw_pages(struct brw_page **array, int num)
1500 {
1501         int stride, i, j;
1502         struct brw_page *tmp;
1503
1504         if (num == 1)
1505                 return;
1506         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1507                 ;
1508
1509         do {
1510                 stride /= 3;
1511                 for (i = stride ; i < num ; i++) {
1512                         tmp = array[i];
1513                         j = i;
1514                         while (j >= stride && array[j - stride]->off > tmp->off) {
1515                                 array[j] = array[j - stride];
1516                                 j -= stride;
1517                         }
1518                         array[j] = tmp;
1519                 }
1520         } while (stride > 1);
1521 }
1522
1523 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1524 {
1525         int count = 1;
1526         int offset;
1527         int i = 0;
1528
1529         LASSERT (pages > 0);
1530         offset = pg[i]->off & ~CFS_PAGE_MASK;
1531
1532         for (;;) {
1533                 pages--;
1534                 if (pages == 0)         /* that's all */
1535                         return count;
1536
1537                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1538                         return count;   /* doesn't end on page boundary */
1539
1540                 i++;
1541                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1542                 if (offset != 0)        /* doesn't start on page boundary */
1543                         return count;
1544
1545                 count++;
1546         }
1547 }
1548
1549 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1550 {
1551         struct brw_page **ppga;
1552         int i;
1553
1554         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1555         if (ppga == NULL)
1556                 return NULL;
1557
1558         for (i = 0; i < count; i++)
1559                 ppga[i] = pga + i;
1560         return ppga;
1561 }
1562
1563 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1564 {
1565         LASSERT(ppga != NULL);
1566         OBD_FREE(ppga, sizeof(*ppga) * count);
1567 }
1568
1569 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1570                    obd_count page_count, struct brw_page *pga,
1571                    struct obd_trans_info *oti)
1572 {
1573         struct obdo *saved_oa = NULL;
1574         struct brw_page **ppga, **orig;
1575         struct obd_import *imp = class_exp2cliimp(exp);
1576         struct client_obd *cli = &imp->imp_obd->u.cli;
1577         int rc, page_count_orig;
1578         ENTRY;
1579
1580         if (cmd & OBD_BRW_CHECK) {
1581                 /* The caller just wants to know if there's a chance that this
1582                  * I/O can succeed */
1583
1584                 if (imp == NULL || imp->imp_invalid)
1585                         RETURN(-EIO);
1586                 RETURN(0);
1587         }
1588
1589         /* test_brw with a failed create can trip this, maybe others. */
1590         LASSERT(cli->cl_max_pages_per_rpc);
1591
1592         rc = 0;
1593
1594         orig = ppga = osc_build_ppga(pga, page_count);
1595         if (ppga == NULL)
1596                 RETURN(-ENOMEM);
1597         page_count_orig = page_count;
1598
1599         sort_brw_pages(ppga, page_count);
1600         while (page_count) {
1601                 obd_count pages_per_brw;
1602
1603                 if (page_count > cli->cl_max_pages_per_rpc)
1604                         pages_per_brw = cli->cl_max_pages_per_rpc;
1605                 else
1606                         pages_per_brw = page_count;
1607
1608                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1609
1610                 if (saved_oa != NULL) {
1611                         /* restore previously saved oa */
1612                         *oinfo->oi_oa = *saved_oa;
1613                 } else if (page_count > pages_per_brw) {
1614                         /* save a copy of oa (brw will clobber it) */
1615                         OBDO_ALLOC(saved_oa);
1616                         if (saved_oa == NULL)
1617                                 GOTO(out, rc = -ENOMEM);
1618                         *saved_oa = *oinfo->oi_oa;
1619                 }
1620
1621                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1622                                       pages_per_brw, ppga, oinfo->oi_capa);
1623
1624                 if (rc != 0)
1625                         break;
1626
1627                 page_count -= pages_per_brw;
1628                 ppga += pages_per_brw;
1629         }
1630
1631 out:
1632         osc_release_ppga(orig, page_count_orig);
1633
1634         if (saved_oa != NULL)
1635                 OBDO_FREE(saved_oa);
1636
1637         RETURN(rc);
1638 }
1639
1640 static int osc_brw_async(int cmd, struct obd_export *exp,
1641                          struct obd_info *oinfo, obd_count page_count,
1642                          struct brw_page *pga, struct obd_trans_info *oti,
1643                          struct ptlrpc_request_set *set)
1644 {
1645         struct brw_page **ppga, **orig;
1646         struct client_obd *cli = &exp->exp_obd->u.cli;
1647         int page_count_orig;
1648         int rc = 0;
1649         ENTRY;
1650
1651         if (cmd & OBD_BRW_CHECK) {
1652                 struct obd_import *imp = class_exp2cliimp(exp);
1653                 /* The caller just wants to know if there's a chance that this
1654                  * I/O can succeed */
1655
1656                 if (imp == NULL || imp->imp_invalid)
1657                         RETURN(-EIO);
1658                 RETURN(0);
1659         }
1660
1661         orig = ppga = osc_build_ppga(pga, page_count);
1662         if (ppga == NULL)
1663                 RETURN(-ENOMEM);
1664         page_count_orig = page_count;
1665
1666         sort_brw_pages(ppga, page_count);
1667         while (page_count) {
1668                 struct brw_page **copy;
1669                 obd_count pages_per_brw;
1670
1671                 pages_per_brw = min_t(obd_count, page_count,
1672                                       cli->cl_max_pages_per_rpc);
1673
1674                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1675
1676                 /* use ppga only if single RPC is going to fly */
1677                 if (pages_per_brw != page_count_orig || ppga != orig) {
1678                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1679                         if (copy == NULL)
1680                                 GOTO(out, rc = -ENOMEM);
1681                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1682                 } else
1683                         copy = ppga;
1684
1685                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1686                                     pages_per_brw, copy, set, oinfo->oi_capa);
1687
1688                 if (rc != 0) {
1689                         if (copy != ppga)
1690                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1691                         break;
1692                 }
1693                 if (copy == orig) {
1694                         /* we passed it to async_internal() which is
1695                          * now responsible for releasing memory */
1696                         orig = NULL;
1697                 }
1698
1699                 page_count -= pages_per_brw;
1700                 ppga += pages_per_brw;
1701         }
1702 out:
1703         if (orig)
1704                 osc_release_ppga(orig, page_count_orig);
1705         RETURN(rc);
1706 }
1707
1708 static void osc_check_rpcs(struct client_obd *cli);
1709
1710 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1711  * the dirty accounting.  Writeback completes or truncate happens before
1712  * writing starts.  Must be called with the loi lock held. */
1713 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1714                            int sent)
1715 {
1716         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1717 }
1718
1719
1720 /* This maintains the lists of pending pages to read/write for a given object
1721  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1722  * to quickly find objects that are ready to send an RPC. */
1723 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1724                          int cmd)
1725 {
1726         int optimal;
1727         ENTRY;
1728
1729         if (lop->lop_num_pending == 0)
1730                 RETURN(0);
1731
1732         /* if we have an invalid import we want to drain the queued pages
1733          * by forcing them through rpcs that immediately fail and complete
1734          * the pages.  recovery relies on this to empty the queued pages
1735          * before canceling the locks and evicting down the llite pages */
1736         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1737                 RETURN(1);
1738
1739         /* stream rpcs in queue order as long as as there is an urgent page
1740          * queued.  this is our cheap solution for good batching in the case
1741          * where writepage marks some random page in the middle of the file
1742          * as urgent because of, say, memory pressure */
1743         if (!list_empty(&lop->lop_urgent)) {
1744                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1745                 RETURN(1);
1746         }
1747         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1748         optimal = cli->cl_max_pages_per_rpc;
1749         if (cmd & OBD_BRW_WRITE) {
1750                 /* trigger a write rpc stream as long as there are dirtiers
1751                  * waiting for space.  as they're waiting, they're not going to
1752                  * create more pages to coallesce with what's waiting.. */
1753                 if (!list_empty(&cli->cl_cache_waiters)) {
1754                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1755                         RETURN(1);
1756                 }
1757                 /* +16 to avoid triggering rpcs that would want to include pages
1758                  * that are being queued but which can't be made ready until
1759                  * the queuer finishes with the page. this is a wart for
1760                  * llite::commit_write() */
1761                 optimal += 16;
1762         }
1763         if (lop->lop_num_pending >= optimal)
1764                 RETURN(1);
1765
1766         RETURN(0);
1767 }
1768
1769 static void on_list(struct list_head *item, struct list_head *list,
1770                     int should_be_on)
1771 {
1772         if (list_empty(item) && should_be_on)
1773                 list_add_tail(item, list);
1774         else if (!list_empty(item) && !should_be_on)
1775                 list_del_init(item);
1776 }
1777
1778 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1779  * can find pages to build into rpcs quickly */
1780 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1781 {
1782         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1783                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1784                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1785
1786         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1787                 loi->loi_write_lop.lop_num_pending);
1788
1789         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1790                 loi->loi_read_lop.lop_num_pending);
1791 }
1792
1793 static void lop_update_pending(struct client_obd *cli,
1794                                struct loi_oap_pages *lop, int cmd, int delta)
1795 {
1796         lop->lop_num_pending += delta;
1797         if (cmd & OBD_BRW_WRITE)
1798                 cli->cl_pending_w_pages += delta;
1799         else
1800                 cli->cl_pending_r_pages += delta;
1801 }
1802
1803 /* this is called when a sync waiter receives an interruption.  Its job is to
1804  * get the caller woken as soon as possible.  If its page hasn't been put in an
1805  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1806  * desiring interruption which will forcefully complete the rpc once the rpc
1807  * has timed out */
1808 static void osc_occ_interrupted(struct oig_callback_context *occ)
1809 {
1810         struct osc_async_page *oap;
1811         struct loi_oap_pages *lop;
1812         struct lov_oinfo *loi;
1813         ENTRY;
1814
1815         /* XXX member_of() */
1816         oap = list_entry(occ, struct osc_async_page, oap_occ);
1817
1818         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1819
1820         oap->oap_interrupted = 1;
1821
1822         /* ok, it's been put in an rpc. only one oap gets a request reference */
1823         if (oap->oap_request != NULL) {
1824                 ptlrpc_mark_interrupted(oap->oap_request);
1825                 ptlrpcd_wake(oap->oap_request);
1826                 GOTO(unlock, 0);
1827         }
1828
1829         /* we don't get interruption callbacks until osc_trigger_group_io()
1830          * has been called and put the sync oaps in the pending/urgent lists.*/
1831         if (!list_empty(&oap->oap_pending_item)) {
1832                 list_del_init(&oap->oap_pending_item);
1833                 list_del_init(&oap->oap_urgent_item);
1834
1835                 loi = oap->oap_loi;
1836                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1837                         &loi->loi_write_lop : &loi->loi_read_lop;
1838                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1839                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1840
1841                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1842                 oap->oap_oig = NULL;
1843         }
1844
1845 unlock:
1846         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1847 }
1848
1849 /* this is trying to propogate async writeback errors back up to the
1850  * application.  As an async write fails we record the error code for later if
1851  * the app does an fsync.  As long as errors persist we force future rpcs to be
1852  * sync so that the app can get a sync error and break the cycle of queueing
1853  * pages for which writeback will fail. */
1854 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1855                            int rc)
1856 {
1857         if (rc) {
1858                 if (!ar->ar_rc)
1859                         ar->ar_rc = rc;
1860
1861                 ar->ar_force_sync = 1;
1862                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1863                 return;
1864
1865         }
1866
1867         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1868                 ar->ar_force_sync = 0;
1869 }
1870
1871 static void osc_oap_to_pending(struct osc_async_page *oap)
1872 {
1873         struct loi_oap_pages *lop;
1874
1875         if (oap->oap_cmd & OBD_BRW_WRITE)
1876                 lop = &oap->oap_loi->loi_write_lop;
1877         else
1878                 lop = &oap->oap_loi->loi_read_lop;
1879
1880         if (oap->oap_async_flags & ASYNC_URGENT)
1881                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1882         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1883         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1884 }
1885
1886 /* this must be called holding the loi list lock to give coverage to exit_cache,
1887  * async_flag maintenance, and oap_request */
1888 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1889                               struct osc_async_page *oap, int sent, int rc)
1890 {
1891         __u64 xid = 0;
1892
1893         ENTRY;
1894         if (oap->oap_request != NULL) {
1895                 xid = ptlrpc_req_xid(oap->oap_request);
1896                 ptlrpc_req_finished(oap->oap_request);
1897                 oap->oap_request = NULL;
1898         }
1899
1900         oap->oap_async_flags = 0;
1901         oap->oap_interrupted = 0;
1902
1903         if (oap->oap_cmd & OBD_BRW_WRITE) {
1904                 osc_process_ar(&cli->cl_ar, xid, rc);
1905                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1906         }
1907
1908         if (rc == 0 && oa != NULL) {
1909                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1910                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1911                 if (oa->o_valid & OBD_MD_FLMTIME)
1912                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1913                 if (oa->o_valid & OBD_MD_FLATIME)
1914                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1915                 if (oa->o_valid & OBD_MD_FLCTIME)
1916                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1917         }
1918
1919         if (oap->oap_oig) {
1920                 osc_exit_cache(cli, oap, sent);
1921                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1922                 oap->oap_oig = NULL;
1923                 EXIT;
1924                 return;
1925         }
1926
1927         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1928                                                 oap->oap_cmd, oa, rc);
1929
1930         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1931          * I/O on the page could start, but OSC calls it under lock
1932          * and thus we can add oap back to pending safely */
1933         if (rc)
1934                 /* upper layer wants to leave the page on pending queue */
1935                 osc_oap_to_pending(oap);
1936         else
1937                 osc_exit_cache(cli, oap, sent);
1938         EXIT;
1939 }
1940
1941 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1942 {
1943         struct osc_async_page *oap, *tmp;
1944         struct osc_brw_async_args *aa = data;
1945         struct client_obd *cli;
1946         ENTRY;
1947
1948         rc = osc_brw_fini_request(req, rc);
1949         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1950         if (osc_recoverable_error(rc)) {
1951                 rc = osc_brw_redo_request(req, aa);
1952                 if (rc == 0)
1953                         RETURN(0);
1954         }
1955
1956         cli = aa->aa_cli;
1957
1958         client_obd_list_lock(&cli->cl_loi_list_lock);
1959
1960         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1961          * is called so we know whether to go to sync BRWs or wait for more
1962          * RPCs to complete */
1963         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1964                 cli->cl_w_in_flight--;
1965         else
1966                 cli->cl_r_in_flight--;
1967
1968         /* the caller may re-use the oap after the completion call so
1969          * we need to clean it up a little */
1970         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1971                 list_del_init(&oap->oap_rpc_item);
1972                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1973         }
1974
1975         osc_wake_cache_waiters(cli);
1976         osc_check_rpcs(cli);
1977
1978         client_obd_list_unlock(&cli->cl_loi_list_lock);
1979
1980         OBDO_FREE(aa->aa_oa);
1981         
1982         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1983         RETURN(rc);
1984 }
1985
1986 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1987                                             struct list_head *rpc_list,
1988                                             int page_count, int cmd)
1989 {
1990         struct ptlrpc_request *req;
1991         struct brw_page **pga = NULL;
1992         struct osc_brw_async_args *aa;
1993         struct obdo *oa = NULL;
1994         struct obd_async_page_ops *ops = NULL;
1995         void *caller_data = NULL;
1996         struct obd_capa *ocapa;
1997         struct osc_async_page *oap;
1998         int i, rc;
1999
2000         ENTRY;
2001         LASSERT(!list_empty(rpc_list));
2002
2003         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2004         if (pga == NULL)
2005                 RETURN(ERR_PTR(-ENOMEM));
2006
2007         OBDO_ALLOC(oa);
2008         if (oa == NULL)
2009                 GOTO(out, req = ERR_PTR(-ENOMEM));
2010
2011         i = 0;
2012         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2013                 if (ops == NULL) {
2014                         ops = oap->oap_caller_ops;
2015                         caller_data = oap->oap_caller_data;
2016                 }
2017                 pga[i] = &oap->oap_brw_page;
2018                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2019                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2020                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2021                 i++;
2022         }
2023
2024         /* always get the data for the obdo for the rpc */
2025         LASSERT(ops != NULL);
2026         ops->ap_fill_obdo(caller_data, cmd, oa);
2027         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2028
2029         sort_brw_pages(pga, page_count);
2030         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2031                                   pga, &req, ocapa);
2032         capa_put(ocapa);
2033         if (rc != 0) {
2034                 CERROR("prep_req failed: %d\n", rc);
2035                 GOTO(out, req = ERR_PTR(rc));
2036         }
2037
2038         /* Need to update the timestamps after the request is built in case
2039          * we race with setattr (locally or in queue at OST).  If OST gets
2040          * later setattr before earlier BRW (as determined by the request xid),
2041          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2042          * way to do this in a single call.  bug 10150 */
2043         ops->ap_update_obdo(caller_data, cmd, oa,
2044                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2045
2046         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2047         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2048         INIT_LIST_HEAD(&aa->aa_oaps);
2049         list_splice(rpc_list, &aa->aa_oaps);
2050         INIT_LIST_HEAD(rpc_list);
2051
2052 out:
2053         if (IS_ERR(req)) {
2054                 if (oa)
2055                         OBDO_FREE(oa);
2056                 if (pga)
2057                         OBD_FREE(pga, sizeof(*pga) * page_count);
2058         }
2059         RETURN(req);
2060 }
2061
2062 /* the loi lock is held across this function but it's allowed to release
2063  * and reacquire it during its work */
2064 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2065                             int cmd, struct loi_oap_pages *lop)
2066 {
2067         struct ptlrpc_request *req;
2068         obd_count page_count = 0;
2069         struct osc_async_page *oap = NULL, *tmp;
2070         struct osc_brw_async_args *aa;
2071         struct obd_async_page_ops *ops;
2072         CFS_LIST_HEAD(rpc_list);
2073         unsigned int ending_offset;
2074         unsigned  starting_offset = 0;
2075         ENTRY;
2076
2077         /* first we find the pages we're allowed to work with */
2078         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2079                                  oap_pending_item) {
2080                 ops = oap->oap_caller_ops;
2081
2082                 LASSERT(oap->oap_magic == OAP_MAGIC);
2083
2084                 /* in llite being 'ready' equates to the page being locked
2085                  * until completion unlocks it.  commit_write submits a page
2086                  * as not ready because its unlock will happen unconditionally
2087                  * as the call returns.  if we race with commit_write giving
2088                  * us that page we dont' want to create a hole in the page
2089                  * stream, so we stop and leave the rpc to be fired by
2090                  * another dirtier or kupdated interval (the not ready page
2091                  * will still be on the dirty list).  we could call in
2092                  * at the end of ll_file_write to process the queue again. */
2093                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2094                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2095                         if (rc < 0)
2096                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2097                                                 "instead of ready\n", oap,
2098                                                 oap->oap_page, rc);
2099                         switch (rc) {
2100                         case -EAGAIN:
2101                                 /* llite is telling us that the page is still
2102                                  * in commit_write and that we should try
2103                                  * and put it in an rpc again later.  we
2104                                  * break out of the loop so we don't create
2105                                  * a hole in the sequence of pages in the rpc
2106                                  * stream.*/
2107                                 oap = NULL;
2108                                 break;
2109                         case -EINTR:
2110                                 /* the io isn't needed.. tell the checks
2111                                  * below to complete the rpc with EINTR */
2112                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2113                                 oap->oap_count = -EINTR;
2114                                 break;
2115                         case 0:
2116                                 oap->oap_async_flags |= ASYNC_READY;
2117                                 break;
2118                         default:
2119                                 LASSERTF(0, "oap %p page %p returned %d "
2120                                             "from make_ready\n", oap,
2121                                             oap->oap_page, rc);
2122                                 break;
2123                         }
2124                 }
2125                 if (oap == NULL)
2126                         break;
2127                 /*
2128                  * Page submitted for IO has to be locked. Either by
2129                  * ->ap_make_ready() or by higher layers.
2130                  *
2131                  * XXX nikita: this assertion should be adjusted when lustre
2132                  * starts using PG_writeback for pages being written out.
2133                  */
2134 #if defined(__KERNEL__) && defined(__LINUX__)
2135                 LASSERT(PageLocked(oap->oap_page));
2136 #endif
2137                 /* If there is a gap at the start of this page, it can't merge
2138                  * with any previous page, so we'll hand the network a
2139                  * "fragmented" page array that it can't transfer in 1 RDMA */
2140                 if (page_count != 0 && oap->oap_page_off != 0)
2141                         break;
2142
2143                 /* take the page out of our book-keeping */
2144                 list_del_init(&oap->oap_pending_item);
2145                 lop_update_pending(cli, lop, cmd, -1);
2146                 list_del_init(&oap->oap_urgent_item);
2147
2148                 if (page_count == 0)
2149                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2150                                           (PTLRPC_MAX_BRW_SIZE - 1);
2151
2152                 /* ask the caller for the size of the io as the rpc leaves. */
2153                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2154                         oap->oap_count =
2155                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2156                 if (oap->oap_count <= 0) {
2157                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2158                                oap->oap_count);
2159                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2160                         continue;
2161                 }
2162
2163                 /* now put the page back in our accounting */
2164                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2165                 if (++page_count >= cli->cl_max_pages_per_rpc)
2166                         break;
2167
2168                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2169                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2170                  * have the same alignment as the initial writes that allocated
2171                  * extents on the server. */
2172                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2173                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2174                 if (ending_offset == 0)
2175                         break;
2176
2177                 /* If there is a gap at the end of this page, it can't merge
2178                  * with any subsequent pages, so we'll hand the network a
2179                  * "fragmented" page array that it can't transfer in 1 RDMA */
2180                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2181                         break;
2182         }
2183
2184         osc_wake_cache_waiters(cli);
2185
2186         if (page_count == 0)
2187                 RETURN(0);
2188
2189         loi_list_maint(cli, loi);
2190
2191         client_obd_list_unlock(&cli->cl_loi_list_lock);
2192
2193         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2194         if (IS_ERR(req)) {
2195                 /* this should happen rarely and is pretty bad, it makes the
2196                  * pending list not follow the dirty order */
2197                 client_obd_list_lock(&cli->cl_loi_list_lock);
2198                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2199                         list_del_init(&oap->oap_rpc_item);
2200
2201                         /* queued sync pages can be torn down while the pages
2202                          * were between the pending list and the rpc */
2203                         if (oap->oap_interrupted) {
2204                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2205                                 osc_ap_completion(cli, NULL, oap, 0,
2206                                                   oap->oap_count);
2207                                 continue;
2208                         }
2209                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2210                 }
2211                 loi_list_maint(cli, loi);
2212                 RETURN(PTR_ERR(req));
2213         }
2214
2215         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2216
2217         if (cmd == OBD_BRW_READ) {
2218                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2219                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2220                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2221                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2222                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2223         } else {
2224                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2225                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2226                                  cli->cl_w_in_flight);
2227                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2228                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2229                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2230         }
2231
2232         client_obd_list_lock(&cli->cl_loi_list_lock);
2233
2234         if (cmd == OBD_BRW_READ)
2235                 cli->cl_r_in_flight++;
2236         else
2237                 cli->cl_w_in_flight++;
2238
2239         /* queued sync pages can be torn down while the pages
2240          * were between the pending list and the rpc */
2241         tmp = NULL;
2242         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2243                 /* only one oap gets a request reference */
2244                 if (tmp == NULL)
2245                         tmp = oap;
2246                 if (oap->oap_interrupted && !req->rq_intr) {
2247                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2248                                oap, req);
2249                         ptlrpc_mark_interrupted(req);
2250                 }
2251         }
2252         if (tmp != NULL)
2253                 tmp->oap_request = ptlrpc_request_addref(req);
2254
2255         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2256                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2257
2258         req->rq_interpret_reply = brw_interpret_oap;
2259         ptlrpcd_add_req(req);
2260         RETURN(1);
2261 }
2262
2263 #define LOI_DEBUG(LOI, STR, args...)                                     \
2264         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2265                !list_empty(&(LOI)->loi_cli_item),                        \
2266                (LOI)->loi_write_lop.lop_num_pending,                     \
2267                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2268                (LOI)->loi_read_lop.lop_num_pending,                      \
2269                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2270                args)                                                     \
2271
2272 /* This is called by osc_check_rpcs() to find which objects have pages that
2273  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2274 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2275 {
2276         ENTRY;
2277         /* first return all objects which we already know to have
2278          * pages ready to be stuffed into rpcs */
2279         if (!list_empty(&cli->cl_loi_ready_list))
2280                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2281                                   struct lov_oinfo, loi_cli_item));
2282
2283         /* then if we have cache waiters, return all objects with queued
2284          * writes.  This is especially important when many small files
2285          * have filled up the cache and not been fired into rpcs because
2286          * they don't pass the nr_pending/object threshhold */
2287         if (!list_empty(&cli->cl_cache_waiters) &&
2288             !list_empty(&cli->cl_loi_write_list))
2289                 RETURN(list_entry(cli->cl_loi_write_list.next,
2290                                   struct lov_oinfo, loi_write_item));
2291
2292         /* then return all queued objects when we have an invalid import
2293          * so that they get flushed */
2294         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2295                 if (!list_empty(&cli->cl_loi_write_list))
2296                         RETURN(list_entry(cli->cl_loi_write_list.next,
2297                                           struct lov_oinfo, loi_write_item));
2298                 if (!list_empty(&cli->cl_loi_read_list))
2299                         RETURN(list_entry(cli->cl_loi_read_list.next,
2300                                           struct lov_oinfo, loi_read_item));
2301         }
2302         RETURN(NULL);
2303 }
2304
2305 /* called with the loi list lock held */
2306 static void osc_check_rpcs(struct client_obd *cli)
2307 {
2308         struct lov_oinfo *loi;
2309         int rc = 0, race_counter = 0;
2310         ENTRY;
2311
2312         while ((loi = osc_next_loi(cli)) != NULL) {
2313                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2314
2315                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2316                         break;
2317
2318                 /* attempt some read/write balancing by alternating between
2319                  * reads and writes in an object.  The makes_rpc checks here
2320                  * would be redundant if we were getting read/write work items
2321                  * instead of objects.  we don't want send_oap_rpc to drain a
2322                  * partial read pending queue when we're given this object to
2323                  * do io on writes while there are cache waiters */
2324                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2325                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2326                                               &loi->loi_write_lop);
2327                         if (rc < 0)
2328                                 break;
2329                         if (rc > 0)
2330                                 race_counter = 0;
2331                         else
2332                                 race_counter++;
2333                 }
2334                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2335                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2336                                               &loi->loi_read_lop);
2337                         if (rc < 0)
2338                                 break;
2339                         if (rc > 0)
2340                                 race_counter = 0;
2341                         else
2342                                 race_counter++;
2343                 }
2344
2345                 /* attempt some inter-object balancing by issueing rpcs
2346                  * for each object in turn */
2347                 if (!list_empty(&loi->loi_cli_item))
2348                         list_del_init(&loi->loi_cli_item);
2349                 if (!list_empty(&loi->loi_write_item))
2350                         list_del_init(&loi->loi_write_item);
2351                 if (!list_empty(&loi->loi_read_item))
2352                         list_del_init(&loi->loi_read_item);
2353
2354                 loi_list_maint(cli, loi);
2355
2356                 /* send_oap_rpc fails with 0 when make_ready tells it to
2357                  * back off.  llite's make_ready does this when it tries
2358                  * to lock a page queued for write that is already locked.
2359                  * we want to try sending rpcs from many objects, but we
2360                  * don't want to spin failing with 0.  */
2361                 if (race_counter == 10)
2362                         break;
2363         }
2364         EXIT;
2365 }
2366
2367 /* we're trying to queue a page in the osc so we're subject to the
2368  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2369  * If the osc's queued pages are already at that limit, then we want to sleep
2370  * until there is space in the osc's queue for us.  We also may be waiting for
2371  * write credits from the OST if there are RPCs in flight that may return some
2372  * before we fall back to sync writes.
2373  *
2374  * We need this know our allocation was granted in the presence of signals */
2375 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2376 {
2377         int rc;
2378         ENTRY;
2379         client_obd_list_lock(&cli->cl_loi_list_lock);
2380         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2381         client_obd_list_unlock(&cli->cl_loi_list_lock);
2382         RETURN(rc);
2383 };
2384
2385 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2386  * grant or cache space. */
2387 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2388                            struct osc_async_page *oap)
2389 {
2390         struct osc_cache_waiter ocw;
2391         struct l_wait_info lwi = { 0 };
2392
2393         ENTRY;
2394
2395         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2396                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2397                cli->cl_dirty_max, obd_max_dirty_pages,
2398                cli->cl_lost_grant, cli->cl_avail_grant);
2399
2400         /* force the caller to try sync io.  this can jump the list
2401          * of queued writes and create a discontiguous rpc stream */
2402         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2403             loi->loi_ar.ar_force_sync)
2404                 RETURN(-EDQUOT);
2405
2406         /* Hopefully normal case - cache space and write credits available */
2407         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2408             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2409             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2410                 /* account for ourselves */
2411                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2412                 RETURN(0);
2413         }
2414
2415         /* Make sure that there are write rpcs in flight to wait for.  This
2416          * is a little silly as this object may not have any pending but
2417          * other objects sure might. */
2418         if (cli->cl_w_in_flight) {
2419                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2420                 cfs_waitq_init(&ocw.ocw_waitq);
2421                 ocw.ocw_oap = oap;
2422                 ocw.ocw_rc = 0;
2423
2424                 loi_list_maint(cli, loi);
2425                 osc_check_rpcs(cli);
2426                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2427
2428                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2429                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2430
2431                 client_obd_list_lock(&cli->cl_loi_list_lock);
2432                 if (!list_empty(&ocw.ocw_entry)) {
2433                         list_del(&ocw.ocw_entry);
2434                         RETURN(-EINTR);
2435                 }
2436                 RETURN(ocw.ocw_rc);
2437         }
2438
2439         RETURN(-EDQUOT);
2440 }
2441
2442 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2443                         struct lov_oinfo *loi, cfs_page_t *page,
2444                         obd_off offset, struct obd_async_page_ops *ops,
2445                         void *data, void **res)
2446 {
2447         struct osc_async_page *oap;
2448         ENTRY;
2449
2450         if (!page)
2451                 return size_round(sizeof(*oap));
2452
2453         oap = *res;
2454         oap->oap_magic = OAP_MAGIC;
2455         oap->oap_cli = &exp->exp_obd->u.cli;
2456         oap->oap_loi = loi;
2457
2458         oap->oap_caller_ops = ops;
2459         oap->oap_caller_data = data;
2460
2461         oap->oap_page = page;
2462         oap->oap_obj_off = offset;
2463
2464         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2465         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2466         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2467
2468         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2469
2470         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2471         RETURN(0);
2472 }
2473
2474 struct osc_async_page *oap_from_cookie(void *cookie)
2475 {
2476         struct osc_async_page *oap = cookie;
2477         if (oap->oap_magic != OAP_MAGIC)
2478                 return ERR_PTR(-EINVAL);
2479         return oap;
2480 };
2481
2482 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2483                               struct lov_oinfo *loi, void *cookie,
2484                               int cmd, obd_off off, int count,
2485                               obd_flag brw_flags, enum async_flags async_flags)
2486 {
2487         struct client_obd *cli = &exp->exp_obd->u.cli;
2488         struct osc_async_page *oap;
2489         int rc = 0;
2490         ENTRY;
2491
2492         oap = oap_from_cookie(cookie);
2493         if (IS_ERR(oap))
2494                 RETURN(PTR_ERR(oap));
2495
2496         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2497                 RETURN(-EIO);
2498
2499         if (!list_empty(&oap->oap_pending_item) ||
2500             !list_empty(&oap->oap_urgent_item) ||
2501             !list_empty(&oap->oap_rpc_item))
2502                 RETURN(-EBUSY);
2503
2504         /* check if the file's owner/group is over quota */
2505 #ifdef HAVE_QUOTA_SUPPORT
2506         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2507                 struct obd_async_page_ops *ops;
2508                 struct obdo *oa;
2509
2510                 OBDO_ALLOC(oa);
2511                 if (oa == NULL)
2512                         RETURN(-ENOMEM);
2513
2514                 ops = oap->oap_caller_ops;
2515                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2516                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2517                     NO_QUOTA)
2518                         rc = -EDQUOT;
2519
2520                 OBDO_FREE(oa);
2521                 if (rc)
2522                         RETURN(rc);
2523         }
2524 #endif
2525
2526         if (loi == NULL)
2527                 loi = lsm->lsm_oinfo[0];
2528
2529         client_obd_list_lock(&cli->cl_loi_list_lock);
2530
2531         oap->oap_cmd = cmd;
2532         oap->oap_page_off = off;
2533         oap->oap_count = count;
2534         oap->oap_brw_flags = brw_flags;
2535         oap->oap_async_flags = async_flags;
2536
2537         if (cmd & OBD_BRW_WRITE) {
2538                 rc = osc_enter_cache(cli, loi, oap);
2539                 if (rc) {
2540                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2541                         RETURN(rc);
2542                 }
2543         }
2544
2545         osc_oap_to_pending(oap);
2546         loi_list_maint(cli, loi);
2547
2548         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2549                   cmd);
2550
2551         osc_check_rpcs(cli);
2552         client_obd_list_unlock(&cli->cl_loi_list_lock);
2553
2554         RETURN(0);
2555 }
2556
2557 /* aka (~was & now & flag), but this is more clear :) */
2558 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2559
2560 static int osc_set_async_flags(struct obd_export *exp,
2561                                struct lov_stripe_md *lsm,
2562                                struct lov_oinfo *loi, void *cookie,
2563                                obd_flag async_flags)
2564 {
2565         struct client_obd *cli = &exp->exp_obd->u.cli;
2566         struct loi_oap_pages *lop;
2567         struct osc_async_page *oap;
2568         int rc = 0;
2569         ENTRY;
2570
2571         oap = oap_from_cookie(cookie);
2572         if (IS_ERR(oap))
2573                 RETURN(PTR_ERR(oap));
2574
2575         /*
2576          * bug 7311: OST-side locking is only supported for liblustre for now
2577          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2578          * implementation has to handle case where OST-locked page was picked
2579          * up by, e.g., ->writepage().
2580          */
2581         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2582         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2583                                      * tread here. */
2584
2585         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2586                 RETURN(-EIO);
2587
2588         if (loi == NULL)
2589                 loi = lsm->lsm_oinfo[0];
2590
2591         if (oap->oap_cmd & OBD_BRW_WRITE) {
2592                 lop = &loi->loi_write_lop;
2593         } else {
2594                 lop = &loi->loi_read_lop;
2595         }
2596
2597         client_obd_list_lock(&cli->cl_loi_list_lock);
2598
2599         if (list_empty(&oap->oap_pending_item))
2600                 GOTO(out, rc = -EINVAL);
2601
2602         if ((oap->oap_async_flags & async_flags) == async_flags)
2603                 GOTO(out, rc = 0);
2604
2605         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2606                 oap->oap_async_flags |= ASYNC_READY;
2607
2608         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2609                 if (list_empty(&oap->oap_rpc_item)) {
2610                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2611                         loi_list_maint(cli, loi);
2612                 }
2613         }
2614
2615         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2616                         oap->oap_async_flags);
2617 out:
2618         osc_check_rpcs(cli);
2619         client_obd_list_unlock(&cli->cl_loi_list_lock);
2620         RETURN(rc);
2621 }
2622
2623 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2624                              struct lov_oinfo *loi,
2625                              struct obd_io_group *oig, void *cookie,
2626                              int cmd, obd_off off, int count,
2627                              obd_flag brw_flags,
2628                              obd_flag async_flags)
2629 {
2630         struct client_obd *cli = &exp->exp_obd->u.cli;
2631         struct osc_async_page *oap;
2632         struct loi_oap_pages *lop;
2633         int rc = 0;
2634         ENTRY;
2635
2636         oap = oap_from_cookie(cookie);
2637         if (IS_ERR(oap))
2638                 RETURN(PTR_ERR(oap));
2639
2640         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2641                 RETURN(-EIO);
2642
2643         if (!list_empty(&oap->oap_pending_item) ||
2644             !list_empty(&oap->oap_urgent_item) ||
2645             !list_empty(&oap->oap_rpc_item))
2646                 RETURN(-EBUSY);
2647
2648         if (loi == NULL)
2649                 loi = lsm->lsm_oinfo[0];
2650
2651         client_obd_list_lock(&cli->cl_loi_list_lock);
2652
2653         oap->oap_cmd = cmd;
2654         oap->oap_page_off = off;
2655         oap->oap_count = count;
2656         oap->oap_brw_flags = brw_flags;
2657         oap->oap_async_flags = async_flags;
2658
2659         if (cmd & OBD_BRW_WRITE)
2660                 lop = &loi->loi_write_lop;
2661         else
2662                 lop = &loi->loi_read_lop;
2663
2664         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2665         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2666                 oap->oap_oig = oig;
2667                 rc = oig_add_one(oig, &oap->oap_occ);
2668         }
2669
2670         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2671                   oap, oap->oap_page, rc);
2672
2673         client_obd_list_unlock(&cli->cl_loi_list_lock);
2674
2675         RETURN(rc);
2676 }
2677
2678 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2679                                  struct loi_oap_pages *lop, int cmd)
2680 {
2681         struct list_head *pos, *tmp;
2682         struct osc_async_page *oap;
2683
2684         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2685                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2686                 list_del(&oap->oap_pending_item);
2687                 osc_oap_to_pending(oap);
2688         }
2689         loi_list_maint(cli, loi);
2690 }
2691
2692 static int osc_trigger_group_io(struct obd_export *exp,
2693                                 struct lov_stripe_md *lsm,
2694                                 struct lov_oinfo *loi,
2695                                 struct obd_io_group *oig)
2696 {
2697         struct client_obd *cli = &exp->exp_obd->u.cli;
2698         ENTRY;
2699
2700         if (loi == NULL)
2701                 loi = lsm->lsm_oinfo[0];
2702
2703         client_obd_list_lock(&cli->cl_loi_list_lock);
2704
2705         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2706         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2707
2708         osc_check_rpcs(cli);
2709         client_obd_list_unlock(&cli->cl_loi_list_lock);
2710
2711         RETURN(0);
2712 }
2713
2714 static int osc_teardown_async_page(struct obd_export *exp,
2715                                    struct lov_stripe_md *lsm,
2716                                    struct lov_oinfo *loi, void *cookie)
2717 {
2718         struct client_obd *cli = &exp->exp_obd->u.cli;
2719         struct loi_oap_pages *lop;
2720         struct osc_async_page *oap;
2721         int rc = 0;
2722         ENTRY;
2723
2724         oap = oap_from_cookie(cookie);
2725         if (IS_ERR(oap))
2726                 RETURN(PTR_ERR(oap));
2727
2728         if (loi == NULL)
2729                 loi = lsm->lsm_oinfo[0];
2730
2731         if (oap->oap_cmd & OBD_BRW_WRITE) {
2732                 lop = &loi->loi_write_lop;
2733         } else {
2734                 lop = &loi->loi_read_lop;
2735         }
2736
2737         client_obd_list_lock(&cli->cl_loi_list_lock);
2738
2739         if (!list_empty(&oap->oap_rpc_item))
2740                 GOTO(out, rc = -EBUSY);
2741
2742         osc_exit_cache(cli, oap, 0);
2743         osc_wake_cache_waiters(cli);
2744
2745         if (!list_empty(&oap->oap_urgent_item)) {
2746                 list_del_init(&oap->oap_urgent_item);
2747                 oap->oap_async_flags &= ~ASYNC_URGENT;
2748         }
2749         if (!list_empty(&oap->oap_pending_item)) {
2750                 list_del_init(&oap->oap_pending_item);
2751                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2752         }
2753         loi_list_maint(cli, loi);
2754
2755         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2756 out:
2757         client_obd_list_unlock(&cli->cl_loi_list_lock);
2758         RETURN(rc);
2759 }
2760
2761 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2762                                     int flags)
2763 {
2764         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2765
2766         if (lock == NULL) {
2767                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2768                 return;
2769         }
2770         lock_res_and_lock(lock);
2771 #ifdef __KERNEL__
2772 #ifdef __LINUX__
2773         /* Liang XXX: Darwin and Winnt checking should be added */
2774         if (lock->l_ast_data && lock->l_ast_data != data) {
2775                 struct inode *new_inode = data;
2776                 struct inode *old_inode = lock->l_ast_data;
2777                 if (!(old_inode->i_state & I_FREEING))
2778                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2779                 LASSERTF(old_inode->i_state & I_FREEING,
2780                          "Found existing inode %p/%lu/%u state %lu in lock: "
2781                          "setting data to %p/%lu/%u\n", old_inode,
2782                          old_inode->i_ino, old_inode->i_generation,
2783                          old_inode->i_state,
2784                          new_inode, new_inode->i_ino, new_inode->i_generation);
2785         }
2786 #endif
2787 #endif
2788         lock->l_ast_data = data;
2789         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2790         unlock_res_and_lock(lock);
2791         LDLM_LOCK_PUT(lock);
2792 }
2793
2794 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2795                              ldlm_iterator_t replace, void *data)
2796 {
2797         struct ldlm_res_id res_id = { .name = {0} };
2798         struct obd_device *obd = class_exp2obd(exp);
2799
2800         res_id.name[0] = lsm->lsm_object_id;
2801         res_id.name[2] = lsm->lsm_object_gr;
2802
2803         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2804         return 0;
2805 }
2806
2807 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2808                             int intent, int rc)
2809 {
2810         ENTRY;
2811
2812         if (intent) {
2813                 /* The request was created before ldlm_cli_enqueue call. */
2814                 if (rc == ELDLM_LOCK_ABORTED) {
2815                         struct ldlm_reply *rep;
2816
2817                         /* swabbed by ldlm_cli_enqueue() */
2818                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2819                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2820                                              sizeof(*rep));
2821                         LASSERT(rep != NULL);
2822                         if (rep->lock_policy_res1)
2823                                 rc = rep->lock_policy_res1;
2824                 }
2825         }
2826
2827         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2828                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2829                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2830                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2831                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2832         }
2833
2834         /* Call the update callback. */
2835         rc = oinfo->oi_cb_up(oinfo, rc);
2836         RETURN(rc);
2837 }
2838
2839 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2840                                  struct osc_enqueue_args *aa, int rc)
2841 {
2842         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2843         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2844         struct ldlm_lock *lock;
2845
2846         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2847          * be valid. */
2848         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2849
2850         /* Complete obtaining the lock procedure. */
2851         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2852                                    aa->oa_ei->ei_mode,
2853                                    &aa->oa_oi->oi_flags,
2854                                    &lsm->lsm_oinfo[0]->loi_lvb,
2855                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2856                                    lustre_swab_ost_lvb,
2857                                    aa->oa_oi->oi_lockh, rc);
2858
2859         /* Complete osc stuff. */
2860         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2861
2862         /* Release the lock for async request. */
2863         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2864                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2865
2866         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2867                  aa->oa_oi->oi_lockh, req, aa);
2868         LDLM_LOCK_PUT(lock);
2869         return rc;
2870 }
2871
2872 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2873  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2874  * other synchronous requests, however keeping some locks and trying to obtain
2875  * others may take a considerable amount of time in a case of ost failure; and
2876  * when other sync requests do not get released lock from a client, the client
2877  * is excluded from the cluster -- such scenarious make the life difficult, so
2878  * release locks just after they are obtained. */
2879 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2880                        struct ldlm_enqueue_info *einfo,
2881                        struct ptlrpc_request_set *rqset)
2882 {
2883         struct ldlm_res_id res_id = { .name = {0} };
2884         struct obd_device *obd = exp->exp_obd;
2885         struct ldlm_reply *rep;
2886         struct ptlrpc_request *req = NULL;
2887         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2888         int rc;
2889         ENTRY;
2890
2891         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2892         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2893
2894         /* Filesystem lock extents are extended to page boundaries so that
2895          * dealing with the page cache is a little smoother.  */
2896         oinfo->oi_policy.l_extent.start -=
2897                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2898         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2899
2900         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2901                 goto no_match;
2902
2903         /* Next, search for already existing extent locks that will cover us */
2904         rc = ldlm_lock_match(obd->obd_namespace,
2905                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2906                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2907                              oinfo->oi_lockh);
2908         if (rc == 1) {
2909                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2910                                         oinfo->oi_flags);
2911                 if (intent) {
2912                         /* I would like to be able to ASSERT here that rss <=
2913                          * kms, but I can't, for reasons which are explained in
2914                          * lov_enqueue() */
2915                 }
2916
2917                 /* We already have a lock, and it's referenced */
2918                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2919
2920                 /* For async requests, decref the lock. */
2921                 if (rqset)
2922                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2923
2924                 RETURN(ELDLM_OK);
2925         }
2926
2927         /* If we're trying to read, we also search for an existing PW lock.  The
2928          * VFS and page cache already protect us locally, so lots of readers/
2929          * writers can share a single PW lock.
2930          *
2931          * There are problems with conversion deadlocks, so instead of
2932          * converting a read lock to a write lock, we'll just enqueue a new
2933          * one.
2934          *
2935          * At some point we should cancel the read lock instead of making them
2936          * send us a blocking callback, but there are problems with canceling
2937          * locks out from other users right now, too. */
2938
2939         if (einfo->ei_mode == LCK_PR) {
2940                 rc = ldlm_lock_match(obd->obd_namespace,
2941                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2942                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2943                                      LCK_PW, oinfo->oi_lockh);
2944                 if (rc == 1) {
2945                         /* FIXME: This is not incredibly elegant, but it might
2946                          * be more elegant than adding another parameter to
2947                          * lock_match.  I want a second opinion. */
2948                         /* addref the lock only if not async requests. */
2949                         if (!rqset)
2950                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2951                         osc_set_data_with_check(oinfo->oi_lockh,
2952                                                 einfo->ei_cbdata,
2953                                                 oinfo->oi_flags);
2954                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2955                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2956                         RETURN(ELDLM_OK);
2957                 }
2958         }
2959
2960  no_match:
2961         if (intent) {
2962                 int size[3] = {
2963                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2964                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2965                         [DLM_LOCKREQ_OFF + 1] = 0 };
2966
2967                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2968                 if (req == NULL)
2969                         RETURN(-ENOMEM);
2970
2971                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2972                 size[DLM_REPLY_REC_OFF] =
2973                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2974                 ptlrpc_req_set_repsize(req, 3, size);
2975         }
2976
2977         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2978         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2979
2980         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2981                               &oinfo->oi_policy, &oinfo->oi_flags,
2982                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2983                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2984                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2985                               rqset ? 1 : 0);
2986         if (rqset) {
2987                 if (!rc) {
2988                         struct osc_enqueue_args *aa;
2989                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2990                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2991                         aa->oa_oi = oinfo;
2992                         aa->oa_ei = einfo;
2993                         aa->oa_exp = exp;
2994
2995                         req->rq_interpret_reply = osc_enqueue_interpret;
2996                         ptlrpc_set_add_req(rqset, req);
2997                 } else if (intent) {
2998                         ptlrpc_req_finished(req);
2999                 }
3000                 RETURN(rc);
3001         }
3002
3003         rc = osc_enqueue_fini(req, oinfo, intent, rc);
3004         if (intent)
3005                 ptlrpc_req_finished(req);
3006
3007         RETURN(rc);
3008 }
3009
3010 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3011                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3012                      int *flags, void *data, struct lustre_handle *lockh)
3013 {
3014         struct ldlm_res_id res_id = { .name = {0} };
3015         struct obd_device *obd = exp->exp_obd;
3016         int rc;
3017         int lflags = *flags;
3018         ENTRY;
3019
3020         res_id.name[0] = lsm->lsm_object_id;
3021         res_id.name[2] = lsm->lsm_object_gr;
3022
3023         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3024
3025         /* Filesystem lock extents are extended to page boundaries so that
3026          * dealing with the page cache is a little smoother */
3027         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3028         policy->l_extent.end |= ~CFS_PAGE_MASK;
3029
3030         /* Next, search for already existing extent locks that will cover us */
3031         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3032                              &res_id, type, policy, mode, lockh);
3033         if (rc) {
3034                 //if (!(*flags & LDLM_FL_TEST_LOCK))
3035                         osc_set_data_with_check(lockh, data, lflags);
3036                 RETURN(rc);
3037         }
3038         /* If we're trying to read, we also search for an existing PW lock.  The
3039          * VFS and page cache already protect us locally, so lots of readers/
3040          * writers can share a single PW lock. */
3041         if (mode == LCK_PR) {
3042                 rc = ldlm_lock_match(obd->obd_namespace,
3043                                      lflags | LDLM_FL_LVB_READY, &res_id,
3044                                      type, policy, LCK_PW, lockh);
3045                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
3046                         /* FIXME: This is not incredibly elegant, but it might
3047                          * be more elegant than adding another parameter to
3048                          * lock_match.  I want a second opinion. */
3049                         osc_set_data_with_check(lockh, data, lflags);
3050                         ldlm_lock_addref(lockh, LCK_PR);
3051                         ldlm_lock_decref(lockh, LCK_PW);
3052                 }
3053         }
3054         RETURN(rc);
3055 }
3056
3057 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3058                       __u32 mode, struct lustre_handle *lockh)
3059 {
3060         ENTRY;
3061
3062         if (unlikely(mode == LCK_GROUP))
3063                 ldlm_lock_decref_and_cancel(lockh, mode);
3064         else
3065                 ldlm_lock_decref(lockh, mode);
3066
3067         RETURN(0);
3068 }
3069
3070 static int osc_cancel_unused(struct obd_export *exp,
3071                              struct lov_stripe_md *lsm, int flags,
3072                              void *opaque)
3073 {
3074         struct obd_device *obd = class_exp2obd(exp);
3075         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3076
3077         if (lsm != NULL) {
3078                 res_id.name[0] = lsm->lsm_object_id;
3079                 res_id.name[2] = lsm->lsm_object_gr;
3080                 resp = &res_id;
3081         }
3082
3083         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3084 }
3085
3086 static int osc_join_lru(struct obd_export *exp,
3087                         struct lov_stripe_md *lsm, int join)
3088 {
3089         struct obd_device *obd = class_exp2obd(exp);
3090         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3091
3092         if (lsm != NULL) {
3093                 res_id.name[0] = lsm->lsm_object_id;
3094                 res_id.name[2] = lsm->lsm_object_gr;
3095                 resp = &res_id;
3096         }
3097
3098         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3099 }
3100
3101 static int osc_statfs_interpret(struct ptlrpc_request *req,
3102                                 struct osc_async_args *aa, int rc)
3103 {
3104         struct obd_statfs *msfs;
3105         ENTRY;
3106
3107         if (rc != 0)
3108                 GOTO(out, rc);
3109
3110         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3111                                   lustre_swab_obd_statfs);
3112         if (msfs == NULL) {
3113                 CERROR("Can't unpack obd_statfs\n");
3114                 GOTO(out, rc = -EPROTO);
3115         }
3116
3117         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3118 out:
3119         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3120         RETURN(rc);
3121 }
3122
3123 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3124                             __u64 max_age, struct ptlrpc_request_set *rqset)
3125 {
3126         struct ptlrpc_request *req;
3127         struct osc_async_args *aa;
3128         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3129         ENTRY;
3130
3131         /* We could possibly pass max_age in the request (as an absolute
3132          * timestamp or a "seconds.usec ago") so the target can avoid doing
3133          * extra calls into the filesystem if that isn't necessary (e.g.
3134          * during mount that would help a bit).  Having relative timestamps
3135          * is not so great if request processing is slow, while absolute
3136          * timestamps are not ideal because they need time synchronization. */
3137         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3138                               OST_STATFS, 1, NULL, NULL);
3139         if (!req)
3140                 RETURN(-ENOMEM);
3141
3142         ptlrpc_req_set_repsize(req, 2, size);
3143         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3144
3145         req->rq_interpret_reply = osc_statfs_interpret;
3146         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3147         aa = (struct osc_async_args *)&req->rq_async_args;
3148         aa->aa_oi = oinfo;
3149
3150         ptlrpc_set_add_req(rqset, req);
3151         RETURN(0);
3152 }
3153
3154 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3155                       __u64 max_age)
3156 {
3157         struct obd_statfs *msfs;
3158         struct ptlrpc_request *req;
3159         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3160         ENTRY;
3161
3162         /* We could possibly pass max_age in the request (as an absolute
3163          * timestamp or a "seconds.usec ago") so the target can avoid doing
3164          * extra calls into the filesystem if that isn't necessary (e.g.
3165          * during mount that would help a bit).  Having relative timestamps
3166          * is not so great if request processing is slow, while absolute
3167          * timestamps are not ideal because they need time synchronization. */
3168         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3169                               OST_STATFS, 1, NULL, NULL);
3170         if (!req)
3171                 RETURN(-ENOMEM);
3172
3173         ptlrpc_req_set_repsize(req, 2, size);
3174         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3175
3176         rc = ptlrpc_queue_wait(req);
3177         if (rc)
3178                 GOTO(out, rc);
3179
3180         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3181                                   lustre_swab_obd_statfs);
3182         if (msfs == NULL) {
3183                 CERROR("Can't unpack obd_statfs\n");
3184                 GOTO(out, rc = -EPROTO);
3185         }
3186
3187         memcpy(osfs, msfs, sizeof(*osfs));
3188
3189         EXIT;
3190  out:
3191         ptlrpc_req_finished(req);
3192         return rc;
3193 }
3194
3195 /* Retrieve object striping information.
3196  *
3197  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3198  * the maximum number of OST indices which will fit in the user buffer.
3199  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3200  */
3201 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3202 {
3203         struct lov_user_md lum, *lumk;
3204         int rc = 0, lum_size;
3205         ENTRY;
3206
3207         if (!lsm)
3208                 RETURN(-ENODATA);
3209
3210         if (copy_from_user(&lum, lump, sizeof(lum)))
3211                 RETURN(-EFAULT);
3212
3213         if (lum.lmm_magic != LOV_USER_MAGIC)
3214                 RETURN(-EINVAL);
3215
3216         if (lum.lmm_stripe_count > 0) {
3217                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3218                 OBD_ALLOC(lumk, lum_size);
3219                 if (!lumk)
3220                         RETURN(-ENOMEM);
3221
3222                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3223                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3224         } else {
3225                 lum_size = sizeof(lum);
3226                 lumk = &lum;
3227         }
3228
3229         lumk->lmm_object_id = lsm->lsm_object_id;
3230         lumk->lmm_object_gr = lsm->lsm_object_gr;
3231         lumk->lmm_stripe_count = 1;
3232
3233         if (copy_to_user(lump, lumk, lum_size))
3234                 rc = -EFAULT;
3235
3236         if (lumk != &lum)
3237                 OBD_FREE(lumk, lum_size);
3238
3239         RETURN(rc);
3240 }
3241
3242
3243 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3244                          void *karg, void *uarg)
3245 {
3246         struct obd_device *obd = exp->exp_obd;
3247         struct obd_ioctl_data *data = karg;
3248         int err = 0;
3249         ENTRY;
3250
3251 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3252         MOD_INC_USE_COUNT;
3253 #else
3254         if (!try_module_get(THIS_MODULE)) {
3255                 CERROR("Can't get module. Is it alive?");
3256                 return -EINVAL;
3257         }
3258 #endif
3259         switch (cmd) {
3260         case OBD_IOC_LOV_GET_CONFIG: {
3261                 char *buf;
3262                 struct lov_desc *desc;
3263                 struct obd_uuid uuid;
3264
3265                 buf = NULL;
3266                 len = 0;
3267                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3268                         GOTO(out, err = -EINVAL);
3269
3270                 data = (struct obd_ioctl_data *)buf;
3271
3272                 if (sizeof(*desc) > data->ioc_inllen1) {
3273                         obd_ioctl_freedata(buf, len);
3274                         GOTO(out, err = -EINVAL);
3275                 }
3276
3277                 if (data->ioc_inllen2 < sizeof(uuid)) {
3278                         obd_ioctl_freedata(buf, len);
3279                         GOTO(out, err = -EINVAL);
3280                 }
3281
3282                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3283                 desc->ld_tgt_count = 1;
3284                 desc->ld_active_tgt_count = 1;
3285                 desc->ld_default_stripe_count = 1;
3286                 desc->ld_default_stripe_size = 0;
3287                 desc->ld_default_stripe_offset = 0;
3288                 desc->ld_pattern = 0;
3289                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3290
3291                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3292
3293                 err = copy_to_user((void *)uarg, buf, len);
3294                 if (err)
3295                         err = -EFAULT;
3296                 obd_ioctl_freedata(buf, len);
3297                 GOTO(out, err);
3298         }
3299         case LL_IOC_LOV_SETSTRIPE:
3300                 err = obd_alloc_memmd(exp, karg);
3301                 if (err > 0)
3302                         err = 0;
3303                 GOTO(out, err);
3304         case LL_IOC_LOV_GETSTRIPE:
3305                 err = osc_getstripe(karg, uarg);
3306                 GOTO(out, err);
3307         case OBD_IOC_CLIENT_RECOVER:
3308                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3309                                             data->ioc_inlbuf1);
3310                 if (err > 0)
3311                         err = 0;
3312                 GOTO(out, err);
3313         case IOC_OSC_SET_ACTIVE:
3314                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3315                                                data->ioc_offset);
3316                 GOTO(out, err);
3317         case OBD_IOC_POLL_QUOTACHECK:
3318                 err = lquota_poll_check(quota_interface, exp,
3319                                         (struct if_quotacheck *)karg);
3320                 GOTO(out, err);
3321         default:
3322                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3323                        cmd, cfs_curproc_comm());
3324                 GOTO(out, err = -ENOTTY);
3325         }
3326 out:
3327 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3328         MOD_DEC_USE_COUNT;
3329 #else
3330         module_put(THIS_MODULE);
3331 #endif
3332         return err;
3333 }
3334
3335 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3336                         void *key, __u32 *vallen, void *val)
3337 {
3338         ENTRY;
3339         if (!vallen || !val)
3340                 RETURN(-EFAULT);
3341
3342         if (keylen > strlen("lock_to_stripe") &&
3343             strcmp(key, "lock_to_stripe") == 0) {
3344                 __u32 *stripe = val;
3345                 *vallen = sizeof(*stripe);
3346                 *stripe = 0;
3347                 RETURN(0);
3348         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3349                 struct ptlrpc_request *req;
3350                 obd_id *reply;
3351                 char *bufs[2] = { NULL, key };
3352                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3353
3354                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3355                                       OST_GET_INFO, 2, size, bufs);
3356                 if (req == NULL)
3357                         RETURN(-ENOMEM);
3358
3359                 size[REPLY_REC_OFF] = *vallen;
3360                 ptlrpc_req_set_repsize(req, 2, size);
3361                 rc = ptlrpc_queue_wait(req);
3362                 if (rc)
3363                         GOTO(out, rc);
3364
3365                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3366                                            lustre_swab_ost_last_id);
3367                 if (reply == NULL) {
3368                         CERROR("Can't unpack OST last ID\n");
3369                         GOTO(out, rc = -EPROTO);
3370                 }
3371                 *((obd_id *)val) = *reply;
3372         out:
3373                 ptlrpc_req_finished(req);
3374                 RETURN(rc);
3375         }
3376         RETURN(-EINVAL);
3377 }
3378
3379 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3380                                           void *aa, int rc)
3381 {
3382         struct llog_ctxt *ctxt;
3383         struct obd_import *imp = req->rq_import;
3384         ENTRY;
3385
3386         if (rc != 0)
3387                 RETURN(rc);
3388
3389         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3390         if (ctxt) {
3391                 if (rc == 0)
3392                         rc = llog_initiator_connect(ctxt);
3393                 else
3394                         CERROR("cannot establish connection for "
3395                                "ctxt %p: %d\n", ctxt, rc);
3396         }
3397
3398         spin_lock(&imp->imp_lock);
3399         imp->imp_server_timeout = 1;
3400         imp->imp_pingable = 1;
3401         spin_unlock(&imp->imp_lock);
3402         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3403
3404         RETURN(rc);
3405 }
3406
3407 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3408                               void *key, obd_count vallen, void *val,
3409                               struct ptlrpc_request_set *set)
3410 {
3411         struct ptlrpc_request *req;
3412         struct obd_device  *obd = exp->exp_obd;
3413         struct obd_import *imp = class_exp2cliimp(exp);
3414         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3415         char *bufs[3] = { NULL, key, val };
3416         ENTRY;
3417
3418         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3419
3420         if (KEY_IS(KEY_NEXT_ID)) {
3421                 if (vallen != sizeof(obd_id))
3422                         RETURN(-EINVAL);
3423                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3424                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3425                        exp->exp_obd->obd_name,
3426                        obd->u.cli.cl_oscc.oscc_next_id);
3427
3428                 RETURN(0);
3429         }
3430
3431         if (KEY_IS("unlinked")) {
3432                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3433                 spin_lock(&oscc->oscc_lock);
3434                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3435                 spin_unlock(&oscc->oscc_lock);
3436                 RETURN(0);
3437         }
3438
3439         if (KEY_IS(KEY_INIT_RECOV)) {
3440                 if (vallen != sizeof(int))
3441                         RETURN(-EINVAL);
3442                 spin_lock(&imp->imp_lock);
3443                 imp->imp_initial_recov = *(int *)val;
3444                 spin_unlock(&imp->imp_lock);
3445                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3446                        exp->exp_obd->obd_name,
3447                        imp->imp_initial_recov);
3448                 RETURN(0);
3449         }
3450
3451         if (KEY_IS("checksum")) {
3452                 if (vallen != sizeof(int))
3453                         RETURN(-EINVAL);
3454                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3455                 RETURN(0);
3456         }
3457
3458         if (KEY_IS(KEY_FLUSH_CTX)) {
3459                 sptlrpc_import_flush_my_ctx(imp);
3460                 RETURN(0);
3461         }
3462
3463         if (!set)
3464                 RETURN(-EINVAL);
3465
3466         /* We pass all other commands directly to OST. Since nobody calls osc
3467            methods directly and everybody is supposed to go through LOV, we
3468            assume lov checked invalid values for us.
3469            The only recognised values so far are evict_by_nid and mds_conn.
3470            Even if something bad goes through, we'd get a -EINVAL from OST
3471            anyway. */
3472
3473         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3474                               bufs);
3475         if (req == NULL)
3476                 RETURN(-ENOMEM);
3477
3478         if (KEY_IS("mds_conn")) {
3479                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3480
3481                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3482                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3483                 LASSERT(oscc->oscc_oa.o_gr > 0);
3484                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3485         }
3486
3487         ptlrpc_req_set_repsize(req, 1, NULL);
3488         ptlrpc_set_add_req(set, req);
3489         ptlrpc_check_set(set);
3490
3491         RETURN(0);
3492 }
3493
3494
3495 static struct llog_operations osc_size_repl_logops = {
3496         lop_cancel: llog_obd_repl_cancel
3497 };
3498
3499 static struct llog_operations osc_mds_ost_orig_logops;
3500 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3501                          struct obd_device *tgt, int count,
3502                          struct llog_catid *catid, struct obd_uuid *uuid)
3503 {
3504         int rc;
3505         ENTRY;
3506
3507         spin_lock(&obd->obd_dev_lock);
3508         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3509                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3510                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3511                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3512                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3513                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3514         }
3515         spin_unlock(&obd->obd_dev_lock);
3516
3517         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3518                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3519         if (rc) {
3520                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3521                 GOTO (out, rc);
3522         }
3523
3524         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3525                         &osc_size_repl_logops);
3526         if (rc)
3527                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3528 out:
3529         if (rc) {
3530                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3531                        obd->obd_name, tgt->obd_name, count, catid, rc);
3532                 CERROR("logid "LPX64":0x%x\n",
3533                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3534         }
3535         RETURN(rc);
3536 }
3537
3538 static int osc_llog_finish(struct obd_device *obd, int count)
3539 {
3540         struct llog_ctxt *ctxt;
3541         int rc = 0, rc2 = 0;
3542         ENTRY;
3543
3544         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3545         if (ctxt)
3546                 rc = llog_cleanup(ctxt);
3547
3548         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3549         if (ctxt)
3550                 rc2 = llog_cleanup(ctxt);
3551         if (!rc)
3552                 rc = rc2;
3553
3554         RETURN(rc);
3555 }
3556
3557 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3558                          struct obd_uuid *cluuid,
3559                          struct obd_connect_data *data)
3560 {
3561         struct client_obd *cli = &obd->u.cli;
3562
3563         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3564                 long lost_grant;
3565
3566                 client_obd_list_lock(&cli->cl_loi_list_lock);
3567                 data->ocd_grant = cli->cl_avail_grant ?:
3568                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3569                 lost_grant = cli->cl_lost_grant;
3570                 cli->cl_lost_grant = 0;
3571                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3572
3573                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3574                        "cl_lost_grant: %ld\n", data->ocd_grant,
3575                        cli->cl_avail_grant, lost_grant);
3576                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3577                        " ocd_grant: %d\n", data->ocd_connect_flags,
3578                        data->ocd_version, data->ocd_grant);
3579         }
3580
3581         RETURN(0);
3582 }
3583
3584 static int osc_disconnect(struct obd_export *exp)
3585 {
3586         struct obd_device *obd = class_exp2obd(exp);
3587         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3588         int rc;
3589
3590         if (obd->u.cli.cl_conn_count == 1)
3591                 /* flush any remaining cancel messages out to the target */
3592                 llog_sync(ctxt, exp);
3593
3594         rc = client_disconnect_export(exp);
3595         return rc;
3596 }
3597
3598 static int osc_import_event(struct obd_device *obd,
3599                             struct obd_import *imp,
3600                             enum obd_import_event event)
3601 {
3602         struct client_obd *cli;
3603         int rc = 0;
3604
3605         ENTRY;
3606         LASSERT(imp->imp_obd == obd);
3607
3608         switch (event) {
3609         case IMP_EVENT_DISCON: {
3610                 /* Only do this on the MDS OSC's */
3611                 if (imp->imp_server_timeout) {
3612                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3613
3614                         spin_lock(&oscc->oscc_lock);
3615                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3616                         spin_unlock(&oscc->oscc_lock);
3617                 }
3618                 cli = &obd->u.cli;
3619                 client_obd_list_lock(&cli->cl_loi_list_lock);
3620                 cli->cl_avail_grant = 0;
3621                 cli->cl_lost_grant = 0;
3622                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3623                 break;
3624         }
3625         case IMP_EVENT_INACTIVE: {
3626                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3627                 break;
3628         }
3629         case IMP_EVENT_INVALIDATE: {
3630                 struct ldlm_namespace *ns = obd->obd_namespace;
3631
3632                 /* Reset grants */
3633                 cli = &obd->u.cli;
3634                 client_obd_list_lock(&cli->cl_loi_list_lock);
3635                 /* all pages go to failing rpcs due to the invalid import */
3636                 osc_check_rpcs(cli);
3637                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3638
3639                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3640
3641                 break;
3642         }
3643         case IMP_EVENT_ACTIVE: {
3644                 /* Only do this on the MDS OSC's */
3645                 if (imp->imp_server_timeout) {
3646                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3647
3648                         spin_lock(&oscc->oscc_lock);
3649                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3650                         spin_unlock(&oscc->oscc_lock);
3651                 }
3652                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3653                 break;
3654         }
3655         case IMP_EVENT_OCD: {
3656                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3657
3658                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3659                         osc_init_grant(&obd->u.cli, ocd);
3660
3661                 /* See bug 7198 */
3662                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3663                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3664
3665                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3666                 break;
3667         }
3668         default:
3669                 CERROR("Unknown import event %d\n", event);
3670                 LBUG();
3671         }
3672         RETURN(rc);
3673 }
3674
3675 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3676 {
3677         int rc;
3678         ENTRY;
3679
3680         ENTRY;
3681         rc = ptlrpcd_addref();
3682         if (rc)
3683                 RETURN(rc);
3684
3685         rc = client_obd_setup(obd, lcfg);
3686         if (rc) {
3687                 ptlrpcd_decref();
3688         } else {
3689                 struct lprocfs_static_vars lvars;
3690                 struct