Whamcloud - gitweb
11f0a2b848f645c8f750d68adfb708781ac95642
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 /* Pack OSC object metadata for disk storage (LE byte order). */
67 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
68                       struct lov_stripe_md *lsm)
69 {
70         int lmm_size;
71         ENTRY;
72
73         lmm_size = sizeof(**lmmp);
74         if (!lmmp)
75                 RETURN(lmm_size);
76
77         if (*lmmp && !lsm) {
78                 OBD_FREE(*lmmp, lmm_size);
79                 *lmmp = NULL;
80                 RETURN(0);
81         }
82
83         if (!*lmmp) {
84                 OBD_ALLOC(*lmmp, lmm_size);
85                 if (!*lmmp)
86                         RETURN(-ENOMEM);
87         }
88
89         if (lsm) {
90                 LASSERT(lsm->lsm_object_id);
91                 LASSERT(lsm->lsm_object_gr);
92                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
93                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
94         }
95
96         RETURN(lmm_size);
97 }
98
99 /* Unpack OSC object metadata from disk storage (LE byte order). */
100 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
101                         struct lov_mds_md *lmm, int lmm_bytes)
102 {
103         int lsm_size;
104         ENTRY;
105
106         if (lmm != NULL) {
107                 if (lmm_bytes < sizeof (*lmm)) {
108                         CERROR("lov_mds_md too small: %d, need %d\n",
109                                lmm_bytes, (int)sizeof(*lmm));
110                         RETURN(-EINVAL);
111                 }
112                 /* XXX LOV_MAGIC etc check? */
113
114                 if (lmm->lmm_object_id == 0) {
115                         CERROR("lov_mds_md: zero lmm_object_id\n");
116                         RETURN(-EINVAL);
117                 }
118         }
119
120         lsm_size = lov_stripe_md_size(1);
121         if (lsmp == NULL)
122                 RETURN(lsm_size);
123
124         if (*lsmp != NULL && lmm == NULL) {
125                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
136                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
137                         OBD_FREE(*lsmp, lsm_size);
138                         RETURN(-ENOMEM);
139                 }
140                 loi_init((*lsmp)->lsm_oinfo[0]);
141         }
142
143         if (lmm != NULL) {
144                 /* XXX zero *lsmp? */
145                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
146                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
147                 LASSERT((*lsmp)->lsm_object_id);
148                 LASSERT((*lsmp)->lsm_object_gr);
149         }
150
151         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
152
153         RETURN(lsm_size);
154 }
155
156 static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
157                                  struct ost_body *body, void *capa)
158 {
159         struct obd_capa *oc = (struct obd_capa *)capa;
160         struct lustre_capa *c;
161
162         if (!capa)
163                 return;
164
165         c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
166         LASSERT(c);
167         capa_cpy(c, oc);
168         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
169         DEBUG_CAPA(D_SEC, c, "pack");
170 }
171
172 static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
173                                      struct obd_info *oinfo)
174 {
175         struct ost_body *body;
176
177         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
178         body->oa = *oinfo->oi_oa;
179         osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
180 }
181
182 static int osc_getattr_interpret(struct ptlrpc_request *req,
183                                  struct osc_async_args *aa, int rc)
184 {
185         struct ost_body *body;
186         ENTRY;
187
188         if (rc != 0)
189                 GOTO(out, rc);
190
191         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
192                                   lustre_swab_ost_body);
193         if (body) {
194                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
195                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
196
197                 /* This should really be sent by the OST */
198                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
199                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
200         } else {
201                 CERROR("can't unpack ost_body\n");
202                 rc = -EPROTO;
203                 aa->aa_oi->oi_oa->o_valid = 0;
204         }
205 out:
206         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
207         RETURN(rc);
208 }
209
210 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
211                              struct ptlrpc_request_set *set)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         struct osc_async_args *aa;
217         ENTRY;
218
219         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
220         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
221                               OST_GETATTR, 3, size,NULL);
222         if (!req)
223                 RETURN(-ENOMEM);
224
225         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
226
227         ptlrpc_req_set_repsize(req, 2, size);
228         req->rq_interpret_reply = osc_getattr_interpret;
229
230         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
231         aa = (struct osc_async_args *)&req->rq_async_args;
232         aa->aa_oi = oinfo;
233
234         ptlrpc_set_add_req(set, req);
235         RETURN (0);
236 }
237
238 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
239 {
240         struct ptlrpc_request *req;
241         struct ost_body *body;
242         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
243         ENTRY;
244
245         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
246         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
247                               OST_GETATTR, 3, size, NULL);
248         if (!req)
249                 RETURN(-ENOMEM);
250
251         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
252
253         ptlrpc_req_set_repsize(req, 2, size);
254
255         rc = ptlrpc_queue_wait(req);
256         if (rc) {
257                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
258                 GOTO(out, rc);
259         }
260
261         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
262                                   lustre_swab_ost_body);
263         if (body == NULL) {
264                 CERROR ("can't unpack ost_body\n");
265                 GOTO (out, rc = -EPROTO);
266         }
267
268         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
269         *oinfo->oi_oa = body->oa;
270
271         /* This should really be sent by the OST */
272         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
273         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
274
275         EXIT;
276  out:
277         ptlrpc_req_finished(req);
278         return rc;
279 }
280
281 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
282                        struct obd_trans_info *oti)
283 {
284         struct ptlrpc_request *req;
285         struct ost_body *body;
286         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
287         ENTRY;
288
289         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
290                                         oinfo->oi_oa->o_gr > 0);
291         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
292         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
293                               OST_SETATTR, 3, size, NULL);
294         if (!req)
295                 RETURN(-ENOMEM);
296
297         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
298
299         ptlrpc_req_set_repsize(req, 2, size);
300
301         rc = ptlrpc_queue_wait(req);
302         if (rc)
303                 GOTO(out, rc);
304
305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
306                                   lustre_swab_ost_body);
307         if (body == NULL)
308                 GOTO(out, rc = -EPROTO);
309
310         *oinfo->oi_oa = body->oa;
311
312         EXIT;
313 out:
314         ptlrpc_req_finished(req);
315         RETURN(rc);
316 }
317
318 static int osc_setattr_interpret(struct ptlrpc_request *req,
319                                  struct osc_async_args *aa, int rc)
320 {
321         struct ost_body *body;
322         ENTRY;
323
324         if (rc != 0)
325                 GOTO(out, rc);
326
327         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
328                                   lustre_swab_ost_body);
329         if (body == NULL) {
330                 CERROR("can't unpack ost_body\n");
331                 GOTO(out, rc = -EPROTO);
332         }
333
334         *aa->aa_oi->oi_oa = body->oa;
335 out:
336         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
337         RETURN(rc);
338 }
339
340 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
341                              struct obd_trans_info *oti,
342                              struct ptlrpc_request_set *rqset)
343 {
344         struct ptlrpc_request *req;
345         int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
346         struct osc_async_args *aa;
347         ENTRY;
348
349         size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
350         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
351                               OST_SETATTR, 3, size, NULL);
352         if (!req)
353                 RETURN(-ENOMEM);
354
355         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
356         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
357                 LASSERT(oti);
358                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
359         }
360
361         ptlrpc_req_set_repsize(req, 2, size);
362         /* do mds to ost setattr asynchronouly */
363         if (!rqset) {
364                 /* Do not wait for response. */
365                 ptlrpcd_add_req(req);
366         } else {
367                 req->rq_interpret_reply = osc_setattr_interpret;
368
369                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
370                 aa = (struct osc_async_args *)&req->rq_async_args;
371                 aa->aa_oi = oinfo;
372
373                 ptlrpc_set_add_req(rqset, req);
374         }
375
376         RETURN(0);
377 }
378
379 int osc_real_create(struct obd_export *exp, struct obdo *oa,
380                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
381 {
382         struct ptlrpc_request *req;
383         struct ost_body *body;
384         struct lov_stripe_md *lsm;
385         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
386         ENTRY;
387
388         LASSERT(oa);
389         LASSERT(ea);
390
391         lsm = *ea;
392         if (!lsm) {
393                 rc = obd_alloc_memmd(exp, &lsm);
394                 if (rc < 0)
395                         RETURN(rc);
396         }
397
398         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
399                               OST_CREATE, 2, size, NULL);
400         if (!req)
401                 GOTO(out, rc = -ENOMEM);
402
403         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
404         body->oa = *oa;
405
406         ptlrpc_req_set_repsize(req, 2, size);
407         if (oa->o_valid & OBD_MD_FLINLINE) {
408                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
409                         oa->o_flags == OBD_FL_DELORPHAN);
410                 DEBUG_REQ(D_HA, req,
411                           "delorphan from OST integration");
412                 /* Don't resend the delorphan req */
413                 req->rq_no_resend = req->rq_no_delay = 1;
414         }
415
416         rc = ptlrpc_queue_wait(req);
417         if (rc)
418                 GOTO(out_req, rc);
419
420         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
421                                   lustre_swab_ost_body);
422         if (body == NULL) {
423                 CERROR ("can't unpack ost_body\n");
424                 GOTO (out_req, rc = -EPROTO);
425         }
426
427         *oa = body->oa;
428
429         /* This should really be sent by the OST */
430         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
431         oa->o_valid |= OBD_MD_FLBLKSZ;
432
433         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
434          * have valid lsm_oinfo data structs, so don't go touching that.
435          * This needs to be fixed in a big way.
436          */
437         lsm->lsm_object_id = oa->o_id;
438         lsm->lsm_object_gr = oa->o_gr;
439         *ea = lsm;
440
441         if (oti != NULL) {
442                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
443
444                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
445                         if (!oti->oti_logcookies)
446                                 oti_alloc_cookies(oti, 1);
447                         *oti->oti_logcookies = *obdo_logcookie(oa);
448                 }
449         }
450
451         CDEBUG(D_HA, "transno: "LPD64"\n",
452                lustre_msg_get_transno(req->rq_repmsg));
453         EXIT;
454 out_req:
455         ptlrpc_req_finished(req);
456 out:
457         if (rc && !*ea)
458                 obd_free_memmd(exp, &lsm);
459         return rc;
460 }
461
462 static int osc_punch_interpret(struct ptlrpc_request *req,
463                                struct osc_async_args *aa, int rc)
464 {
465         struct ost_body *body;
466         ENTRY;
467
468         if (rc != 0)
469                 GOTO(out, rc);
470
471         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
472                                   lustre_swab_ost_body);
473         if (body == NULL) {
474                 CERROR ("can't unpack ost_body\n");
475                 GOTO(out, rc = -EPROTO);
476         }
477
478         *aa->aa_oi->oi_oa = body->oa;
479 out:
480         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
481         RETURN(rc);
482 }
483
484 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
485                      struct obd_trans_info *oti,
486                      struct ptlrpc_request_set *rqset)
487 {
488         struct ptlrpc_request *req;
489         struct osc_async_args *aa;
490         struct ost_body *body;
491         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
492         ENTRY;
493
494         if (!oinfo->oi_oa) {
495                 CERROR("oa NULL\n");
496                 RETURN(-EINVAL);
497         }
498
499         size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
500         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
501                               OST_PUNCH, 3, size, NULL);
502         if (!req)
503                 RETURN(-ENOMEM);
504
505         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
506
507         osc_pack_req_body(req, REQ_REC_OFF, oinfo);
508         /* overload the size and blocks fields in the oa with start/end */
509         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
510         body->oa.o_size = oinfo->oi_policy.l_extent.start;
511         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
512         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
513
514         ptlrpc_req_set_repsize(req, 2, size);
515
516         req->rq_interpret_reply = osc_punch_interpret;
517         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
518         aa = (struct osc_async_args *)&req->rq_async_args;
519         aa->aa_oi = oinfo;
520         ptlrpc_set_add_req(rqset, req);
521
522         RETURN(0);
523 }
524
525 static int osc_sync(struct obd_export *exp, struct obdo *oa,
526                     struct lov_stripe_md *md, obd_size start, obd_size end,
527                     void *capa)
528 {
529         struct ptlrpc_request *req;
530         struct ost_body *body;
531         int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
532         ENTRY;
533
534         if (!oa) {
535                 CERROR("oa NULL\n");
536                 RETURN(-EINVAL);
537         }
538
539         size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
540
541         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
542                               OST_SYNC, 3, size, NULL);
543         if (!req)
544                 RETURN(-ENOMEM);
545
546         /* overload the size and blocks fields in the oa with start/end */
547         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
548         body->oa = *oa;
549         body->oa.o_size = start;
550         body->oa.o_blocks = end;
551         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552
553         osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
554
555         ptlrpc_req_set_repsize(req, 2, size);
556
557         rc = ptlrpc_queue_wait(req);
558         if (rc)
559                 GOTO(out, rc);
560
561         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
562                                   lustre_swab_ost_body);
563         if (body == NULL) {
564                 CERROR ("can't unpack ost_body\n");
565                 GOTO (out, rc = -EPROTO);
566         }
567
568         *oa = body->oa;
569
570         EXIT;
571  out:
572         ptlrpc_req_finished(req);
573         return rc;
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
585         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
586         int count;
587         ENTRY;
588
589         if (res == NULL)
590                 RETURN(0);
591
592         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
593                                            lock_flags, 0, NULL);
594         ldlm_resource_putref(res);
595         RETURN(count);
596 }
597
598 /* Destroy requests can be async always on the client, and we don't even really
599  * care about the return code since the client cannot do anything at all about
600  * a destroy failure.
601  * When the MDS is unlinking a filename, it saves the file objects into a
602  * recovery llog, and these object records are cancelled when the OST reports
603  * they were destroyed and sync'd to disk (i.e. transaction committed).
604  * If the client dies, or the OST is down when the object should be destroyed,
605  * the records are not cancelled, and when the OST reconnects to the MDS next,
606  * it will retrieve the llog unlink logs and then sends the log cancellation
607  * cookies to the MDS after committing destroy transactions. */
608 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
609                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
610                        struct obd_export *md_export)
611 {
612         CFS_LIST_HEAD(cancels);
613         struct ptlrpc_request *req;
614         struct ost_body *body;
615         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
616         int count, bufcount = 2;
617         ENTRY;
618
619         if (!oa) {
620                 CERROR("oa NULL\n");
621                 RETURN(-EINVAL);
622         }
623
624         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
625                                         LDLM_FL_DISCARD_DATA);
626         if (exp_connect_cancelset(exp) && count) {
627                 bufcount = 3;
628                 size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
629         }
630         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
631                               OST_DESTROY, bufcount, size, NULL);
632         if (exp_connect_cancelset(exp) && req)
633                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
634         else
635                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
636
637         if (!req)
638                 RETURN(-ENOMEM);
639
640         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
641
642         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
644                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
645                        sizeof(*oti->oti_logcookies));
646         body->oa = *oa;
647
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         ptlrpcd_add_req(req);
651         RETURN(0);
652 }
653
654 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
655                                 long writing_bytes)
656 {
657         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
658
659         LASSERT(!(oa->o_valid & bits));
660
661         oa->o_valid |= bits;
662         client_obd_list_lock(&cli->cl_loi_list_lock);
663         oa->o_dirty = cli->cl_dirty;
664         if (cli->cl_dirty > cli->cl_dirty_max) {
665                 CERROR("dirty %lu > dirty_max %lu\n",
666                        cli->cl_dirty, cli->cl_dirty_max);
667                 oa->o_undirty = 0;
668         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
669                 CERROR("dirty %d > system dirty_max %d\n",
670                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
671                 oa->o_undirty = 0;
672         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
673                 CERROR("dirty %lu - dirty_max %lu too big???\n",
674                        cli->cl_dirty, cli->cl_dirty_max);
675                 oa->o_undirty = 0;
676         } else {
677                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
678                                 (cli->cl_max_rpcs_in_flight + 1);
679                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
680         }
681         oa->o_grant = cli->cl_avail_grant;
682         oa->o_dropped = cli->cl_lost_grant;
683         cli->cl_lost_grant = 0;
684         client_obd_list_unlock(&cli->cl_loi_list_lock);
685         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
686                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
687 }
688
689 /* caller must hold loi_list_lock */
690 static void osc_consume_write_grant(struct client_obd *cli,
691                                     struct brw_page *pga)
692 {
693         atomic_inc(&obd_dirty_pages);
694         cli->cl_dirty += CFS_PAGE_SIZE;
695         cli->cl_avail_grant -= CFS_PAGE_SIZE;
696         pga->flag |= OBD_BRW_FROM_GRANT;
697         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
698                CFS_PAGE_SIZE, pga, pga->pg);
699         LASSERT(cli->cl_avail_grant >= 0);
700 }
701
702 /* the companion to osc_consume_write_grant, called when a brw has completed.
703  * must be called with the loi lock held. */
704 static void osc_release_write_grant(struct client_obd *cli,
705                                     struct brw_page *pga, int sent)
706 {
707         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
708         ENTRY;
709
710         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
711                 EXIT;
712                 return;
713         }
714
715         pga->flag &= ~OBD_BRW_FROM_GRANT;
716         atomic_dec(&obd_dirty_pages);
717         cli->cl_dirty -= CFS_PAGE_SIZE;
718         if (!sent) {
719                 cli->cl_lost_grant += CFS_PAGE_SIZE;
720                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
721                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
722         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
723                 /* For short writes we shouldn't count parts of pages that
724                  * span a whole block on the OST side, or our accounting goes
725                  * wrong.  Should match the code in filter_grant_check. */
726                 int offset = pga->off & ~CFS_PAGE_MASK;
727                 int count = pga->count + (offset & (blocksize - 1));
728                 int end = (offset + pga->count) & (blocksize - 1);
729                 if (end)
730                         count += blocksize - end;
731
732                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
733                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
734                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
735                        cli->cl_avail_grant, cli->cl_dirty);
736         }
737
738         EXIT;
739 }
740
741 static unsigned long rpcs_in_flight(struct client_obd *cli)
742 {
743         return cli->cl_r_in_flight + cli->cl_w_in_flight;
744 }
745
746 /* caller must hold loi_list_lock */
747 void osc_wake_cache_waiters(struct client_obd *cli)
748 {
749         struct list_head *l, *tmp;
750         struct osc_cache_waiter *ocw;
751
752         ENTRY;
753         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
754                 /* if we can't dirty more, we must wait until some is written */
755                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
756                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
757                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
758                                "osc max %ld, sys max %d\n", cli->cl_dirty,
759                                cli->cl_dirty_max, obd_max_dirty_pages);
760                         return;
761                 }
762
763                 /* if still dirty cache but no grant wait for pending RPCs that
764                  * may yet return us some grant before doing sync writes */
765                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
766                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
767                                cli->cl_w_in_flight);
768                         return;
769                 }
770
771                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
772                 list_del_init(&ocw->ocw_entry);
773                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
774                         /* no more RPCs in flight to return grant, do sync IO */
775                         ocw->ocw_rc = -EDQUOT;
776                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
777                 } else {
778                         osc_consume_write_grant(cli,
779                                                 &ocw->ocw_oap->oap_brw_page);
780                 }
781
782                 cfs_waitq_signal(&ocw->ocw_waitq);
783         }
784
785         EXIT;
786 }
787
788 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
789 {
790         client_obd_list_lock(&cli->cl_loi_list_lock);
791         cli->cl_avail_grant = ocd->ocd_grant;
792         client_obd_list_unlock(&cli->cl_loi_list_lock);
793
794         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
795                cli->cl_avail_grant, cli->cl_lost_grant);
796         LASSERT(cli->cl_avail_grant >= 0);
797 }
798
799 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
800 {
801         client_obd_list_lock(&cli->cl_loi_list_lock);
802         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
803         cli->cl_avail_grant += body->oa.o_grant;
804         /* waiters are woken in brw_interpret_oap */
805         client_obd_list_unlock(&cli->cl_loi_list_lock);
806 }
807
808 /* We assume that the reason this OSC got a short read is because it read
809  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
810  * via the LOV, and it _knows_ it's reading inside the file, it's just that
811  * this stripe never got written at or beyond this stripe offset yet. */
812 static void handle_short_read(int nob_read, obd_count page_count,
813                               struct brw_page **pga)
814 {
815         char *ptr;
816         int i = 0;
817
818         /* skip bytes read OK */
819         while (nob_read > 0) {
820                 LASSERT (page_count > 0);
821
822                 if (pga[i]->count > nob_read) {
823                         /* EOF inside this page */
824                         ptr = cfs_kmap(pga[i]->pg) +
825                                 (pga[i]->off & ~CFS_PAGE_MASK);
826                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
827                         cfs_kunmap(pga[i]->pg);
828                         page_count--;
829                         i++;
830                         break;
831                 }
832
833                 nob_read -= pga[i]->count;
834                 page_count--;
835                 i++;
836         }
837
838         /* zero remaining pages */
839         while (page_count-- > 0) {
840                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
841                 memset(ptr, 0, pga[i]->count);
842                 cfs_kunmap(pga[i]->pg);
843                 i++;
844         }
845 }
846
847 static int check_write_rcs(struct ptlrpc_request *req,
848                            int requested_nob, int niocount,
849                            obd_count page_count, struct brw_page **pga)
850 {
851         int    *remote_rcs, i;
852
853         /* return error if any niobuf was in error */
854         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
855                                         sizeof(*remote_rcs) * niocount, NULL);
856         if (remote_rcs == NULL) {
857                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
858                 return(-EPROTO);
859         }
860         if (lustre_msg_swabbed(req->rq_repmsg))
861                 for (i = 0; i < niocount; i++)
862                         __swab32s(&remote_rcs[i]);
863
864         for (i = 0; i < niocount; i++) {
865                 if (remote_rcs[i] < 0)
866                         return(remote_rcs[i]);
867
868                 if (remote_rcs[i] != 0) {
869                         CERROR("rc[%d] invalid (%d) req %p\n",
870                                 i, remote_rcs[i], req);
871                         return(-EPROTO);
872                 }
873         }
874
875         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
876                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
877                        requested_nob, req->rq_bulk->bd_nob_transferred);
878                 return(-EPROTO);
879         }
880
881         return (0);
882 }
883
884 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
885 {
886         if (p1->flag != p2->flag) {
887                 unsigned mask = ~OBD_BRW_FROM_GRANT;
888
889                 /* warn if we try to combine flags that we don't know to be
890                  * safe to combine */
891                 if ((p1->flag & mask) != (p2->flag & mask))
892                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
893                                "same brw?\n", p1->flag, p2->flag);
894                 return 0;
895         }
896
897         return (p1->off + p1->count == p2->off);
898 }
899
900 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
901                                    struct brw_page **pga)
902 {
903         __u32 cksum = ~0;
904         int i = 0;
905
906         LASSERT (pg_count > 0);
907         while (nob > 0 && pg_count > 0) {
908                 char *ptr = cfs_kmap(pga[i]->pg);
909                 int off = pga[i]->off & ~CFS_PAGE_MASK;
910                 int count = pga[i]->count > nob ? nob : pga[i]->count;
911
912                 /* corrupt the data before we compute the checksum, to
913                  * simulate an OST->client data error */
914                 if (i == 0 &&
915                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
916                         memcpy(ptr + off, "bad1", min(4, nob));
917                 cksum = crc32_le(cksum, ptr + off, count);
918                 cfs_kunmap(pga[i]->pg);
919                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
920                                off, cksum);
921
922                 nob -= pga[i]->count;
923                 pg_count--;
924                 i++;
925         }
926         /* For sending we only compute the wrong checksum instead
927          * of corrupting the data so it is still correct on a redo */
928         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
929                 cksum++;
930
931         return cksum;
932 }
933
934 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
935                                 struct lov_stripe_md *lsm, obd_count page_count,
936                                 struct brw_page **pga, 
937                                 struct ptlrpc_request **reqp,
938                                 struct obd_capa *ocapa)
939 {
940         struct ptlrpc_request   *req;
941         struct ptlrpc_bulk_desc *desc;
942         struct ost_body         *body;
943         struct obd_ioobj        *ioobj;
944         struct niobuf_remote    *niobuf;
945         int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
946         int niocount, i, requested_nob, opc, rc;
947         struct ptlrpc_request_pool *pool;
948         struct lustre_capa      *capa;
949         struct osc_brw_async_args *aa;
950
951         ENTRY;
952         if ((cmd & OBD_BRW_WRITE) != 0) {
953                 opc = OST_WRITE;
954                 pool = cli->cl_import->imp_rq_pool;
955         } else {
956                 opc = OST_READ;
957                 pool = NULL;
958         }
959
960         for (niocount = i = 1; i < page_count; i++) {
961                 if (!can_merge_pages(pga[i - 1], pga[i]))
962                         niocount++;
963         }
964
965         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
966         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
967         if (ocapa)
968                 size[REQ_REC_OFF + 3] = sizeof(*capa);
969
970         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
971         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
972                                    size, NULL, pool, NULL);
973         if (req == NULL)
974                 RETURN (-ENOMEM);
975
976         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
977
978         if (opc == OST_WRITE)
979                 desc = ptlrpc_prep_bulk_imp (req, page_count,
980                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
981         else
982                 desc = ptlrpc_prep_bulk_imp (req, page_count,
983                                              BULK_PUT_SINK, OST_BULK_PORTAL);
984         if (desc == NULL)
985                 GOTO(out, rc = -ENOMEM);
986         /* NB request now owns desc and will free it when it gets freed */
987
988         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
989         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
990         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
991                                 niocount * sizeof(*niobuf));
992
993         body->oa = *oa;
994
995         obdo_to_ioobj(oa, ioobj);
996         ioobj->ioo_bufcnt = niocount;
997         if (ocapa) {
998                 capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
999                                       sizeof(*capa));
1000                 capa_cpy(capa, ocapa);
1001                 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
1002         }
1003
1004         LASSERT (page_count > 0);
1005         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1006                 struct brw_page *pg = pga[i];
1007                 struct brw_page *pg_prev = pga[i - 1];
1008
1009                 LASSERT(pg->count > 0);
1010                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1011                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1012                          pg->off, pg->count);
1013 #ifdef __LINUX__
1014                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1015                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1016                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1017                          i, page_count,
1018                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1019                          pg_prev->pg, page_private(pg_prev->pg),
1020                          pg_prev->pg->index, pg_prev->off);
1021 #else
1022                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1023                          "i %d p_c %u\n", i, page_count);
1024 #endif
1025                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1026                         (pg->flag & OBD_BRW_SRVLOCK));
1027
1028                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1029                                       pg->count);
1030                 requested_nob += pg->count;
1031
1032                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1033                         niobuf--;
1034                         niobuf->len += pg->count;
1035                 } else {
1036                         niobuf->offset = pg->off;
1037                         niobuf->len    = pg->count;
1038                         niobuf->flags  = pg->flag;
1039                 }
1040         }
1041
1042         LASSERT((void *)(niobuf - niocount) ==
1043                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1044                                niocount * sizeof(*niobuf)));
1045         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1046
1047         /* size[REQ_REC_OFF] still sizeof (*body) */
1048         if (opc == OST_WRITE) {
1049                 if (unlikely(cli->cl_checksum)) {
1050                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1051                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1052                                                              page_count, pga);
1053                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1054                                body->oa.o_cksum);
1055                         /* save this in 'oa', too, for later checking */
1056                         oa->o_valid |= OBD_MD_FLCKSUM;
1057                 } else {
1058                         /* clear out the checksum flag, in case this is a
1059                          * resend but cl_checksum is no longer set. b=11238 */
1060                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1061                 }
1062                 oa->o_cksum = body->oa.o_cksum;
1063                 /* 1 RC per niobuf */
1064                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1065                 ptlrpc_req_set_repsize(req, 3, size);
1066         } else {
1067                 if (unlikely(cli->cl_checksum))
1068                         body->oa.o_valid |= OBD_MD_FLCKSUM;
1069                 /* 1 RC for the whole I/O */
1070                 ptlrpc_req_set_repsize(req, 2, size);
1071         }
1072
1073         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1074         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1075         aa->aa_oa = oa;
1076         aa->aa_requested_nob = requested_nob;
1077         aa->aa_nio_count = niocount;
1078         aa->aa_page_count = page_count;
1079         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1080         aa->aa_ppga = pga;
1081         aa->aa_cli = cli;
1082         INIT_LIST_HEAD(&aa->aa_oaps);
1083
1084         *reqp = req;
1085         RETURN (0);
1086
1087  out:
1088         ptlrpc_req_finished (req);
1089         RETURN (rc);
1090 }
1091
1092 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1093                                 __u32 client_cksum, __u32 server_cksum,
1094                                 int nob, obd_count page_count,
1095                                 struct brw_page **pga)
1096 {
1097         __u32 new_cksum;
1098         char *msg;
1099
1100         if (server_cksum == client_cksum) {
1101                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1102                 return 0;
1103         }
1104
1105         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1106
1107         if (new_cksum == server_cksum)
1108                 msg = "changed on the client after we checksummed it - "
1109                       "likely false positive due to mmap IO (bug 11742)";
1110         else if (new_cksum == client_cksum)
1111                 msg = "changed in transit before arrival at OST";
1112         else
1113                 msg = "changed in transit AND doesn't match the original - "
1114                       "likely false positive due to mmap IO (bug 11742)";
1115
1116         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1117                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1118                            "["LPU64"-"LPU64"]\n",
1119                            msg, libcfs_nid2str(peer->nid),
1120                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1121                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1122                                                         (__u64)0,
1123                            oa->o_id,
1124                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1125                            pga[0]->off,
1126                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1127         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1128                client_cksum, server_cksum, new_cksum);
1129         return 1;        
1130 }
1131
1132 /* Note rc enters this function as number of bytes transferred */
1133 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1134 {
1135         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1136         const lnet_process_id_t *peer =
1137                         &req->rq_import->imp_connection->c_peer;
1138         struct client_obd *cli = aa->aa_cli;
1139         struct ost_body *body;
1140         __u32 client_cksum = 0;
1141         ENTRY;
1142
1143         if (rc < 0 && rc != -EDQUOT)
1144                 RETURN(rc);
1145
1146         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1147         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1148                                   lustre_swab_ost_body);
1149         if (body == NULL) {
1150                 CERROR ("Can't unpack body\n");
1151                 RETURN(-EPROTO);
1152         }
1153
1154         /* set/clear over quota flag for a uid/gid */
1155         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1156             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1157                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1158                              body->oa.o_gid, body->oa.o_valid,
1159                              body->oa.o_flags);
1160
1161         if (rc < 0)
1162                 RETURN(rc);
1163
1164         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1165                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1166
1167         osc_update_grant(cli, body);
1168
1169         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1170                 if (rc > 0) {
1171                         CERROR ("Unexpected +ve rc %d\n", rc);
1172                         RETURN(-EPROTO);
1173                 }
1174                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1175
1176                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1177                              client_cksum &&
1178                              check_write_checksum(&body->oa, peer, client_cksum,
1179                                                   body->oa.o_cksum,
1180                                                   aa->aa_requested_nob,
1181                                                   aa->aa_page_count,
1182                                                   aa->aa_ppga)))
1183                         RETURN(-EAGAIN);
1184
1185                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1186                         RETURN(-EAGAIN);
1187
1188                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1189                                      aa->aa_page_count, aa->aa_ppga);
1190                 GOTO(out, rc);
1191         }
1192
1193         /* The rest of this function executes only for OST_READs */
1194         if (rc > aa->aa_requested_nob) {
1195                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1196                        aa->aa_requested_nob);
1197                 RETURN(-EPROTO);
1198         }
1199
1200         if (rc != req->rq_bulk->bd_nob_transferred) {
1201                 CERROR ("Unexpected rc %d (%d transferred)\n",
1202                         rc, req->rq_bulk->bd_nob_transferred);
1203                 return (-EPROTO);
1204         }
1205
1206         if (rc < aa->aa_requested_nob)
1207                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1208
1209         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1210                                          aa->aa_ppga))
1211                 GOTO(out, rc = -EAGAIN);
1212
1213         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1214                 static int cksum_counter;
1215                 __u32      server_cksum = body->oa.o_cksum;
1216                 char      *via;
1217                 char      *router;
1218
1219                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1220                                                  aa->aa_ppga);
1221
1222                 if (peer->nid == req->rq_bulk->bd_sender) {
1223                         via = router = "";
1224                 } else {
1225                         via = " via ";
1226                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1227                 }
1228
1229                 if (server_cksum == ~0 && rc > 0) {
1230                         CERROR("Protocol error: server %s set the 'checksum' "
1231                                "bit, but didn't send a checksum.  Not fatal, "
1232                                "but please tell CFS.\n",
1233                                libcfs_nid2str(peer->nid));
1234                 } else if (server_cksum != client_cksum) {
1235                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1236                                            "%s%s%s inum "LPU64"/"LPU64" object "
1237                                            LPU64"/"LPU64" extent "
1238                                            "["LPU64"-"LPU64"]\n",
1239                                            req->rq_import->imp_obd->obd_name,
1240                                            libcfs_nid2str(peer->nid),
1241                                            via, router,
1242                                            body->oa.o_valid & OBD_MD_FLFID ?
1243                                                 body->oa.o_fid : (__u64)0,
1244                                            body->oa.o_valid & OBD_MD_FLFID ?
1245                                                 body->oa.o_generation :(__u64)0,
1246                                            body->oa.o_id,
1247                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1248                                                 body->oa.o_gr : (__u64)0,
1249                                            aa->aa_ppga[0]->off,
1250                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1251                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1252                                                                         1);
1253                         CERROR("client %x, server %x\n",
1254                                client_cksum, server_cksum);
1255                         cksum_counter = 0;
1256                         aa->aa_oa->o_cksum = client_cksum;
1257                         rc = -EAGAIN;
1258                 } else {
1259                         cksum_counter++;
1260                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1261                         rc = 0;
1262                 }
1263         } else if (unlikely(client_cksum)) {
1264                 static int cksum_missed;
1265
1266                 cksum_missed++;
1267                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1268                         CERROR("Checksum %u requested from %s but not sent\n",
1269                                cksum_missed, libcfs_nid2str(peer->nid));
1270         } else {
1271                 rc = 0;
1272         }
1273 out:
1274         if (rc >= 0)
1275                 *aa->aa_oa = body->oa;
1276
1277         RETURN(rc);
1278 }
1279
1280 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1281                             struct lov_stripe_md *lsm,
1282                             obd_count page_count, struct brw_page **pga,
1283                             struct obd_capa *ocapa)
1284 {
1285         struct ptlrpc_request *req;
1286         int                    rc, retries = 5; /* lprocfs? */
1287         ENTRY;
1288
1289 restart_bulk:
1290         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1291                                   page_count, pga, &req, ocapa);
1292         if (rc != 0)
1293                 return (rc);
1294
1295         rc = ptlrpc_queue_wait(req);
1296
1297         if (rc == -ETIMEDOUT && req->rq_resend) {
1298                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1299                 ptlrpc_req_finished(req);
1300                 goto restart_bulk;
1301         }
1302
1303         rc = osc_brw_fini_request(req, rc);
1304
1305         ptlrpc_req_finished(req);
1306         if (rc == -EAGAIN) {
1307                 if (retries-- > 0)
1308                         goto restart_bulk;
1309                 rc = -EIO;
1310         }
1311         RETURN (rc);
1312 }
1313
1314 int osc_brw_redo_request(struct ptlrpc_request *req,
1315                          struct osc_brw_async_args *aa)
1316 {
1317         struct ptlrpc_request *new_req;
1318         struct ptlrpc_request_set *set = req->rq_set;
1319         struct osc_brw_async_args *new_aa;
1320         struct osc_async_page *oap;
1321         int rc = 0;
1322         ENTRY;
1323
1324         if (aa->aa_retries-- <= 0) {
1325                 CERROR("too many checksum retries, returning error\n");
1326                 RETURN(-EIO);
1327         }
1328
1329         DEBUG_REQ(D_ERROR, req, "redo for checksum error");
1330         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1331                 if (oap->oap_request != NULL) {
1332                         LASSERTF(req == oap->oap_request,
1333                                  "request %p != oap_request %p\n",
1334                                  req, oap->oap_request);
1335                         if (oap->oap_interrupted) {
1336                                 ptlrpc_mark_interrupted(oap->oap_request);
1337                                 rc = -EINTR;
1338                                 break;
1339                         }
1340                 }
1341         }
1342         if (rc)
1343                 RETURN(rc);
1344         /* TODO-MERGE: and where to get ocapa?? */
1345         rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
1346                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1347                                   aa->aa_cli, aa->aa_oa,
1348                                   NULL /* lsm unused by osc currently */,
1349                                   aa->aa_page_count, aa->aa_ppga, &new_req,
1350                                   NULL /* ocapa */);
1351         if (rc)
1352                 RETURN(rc);
1353
1354         /* New request takes over pga and oaps from old request.
1355          * Note that copying a list_head doesn't work, need to move it... */
1356         new_req->rq_interpret_reply = req->rq_interpret_reply;
1357         new_req->rq_async_args = req->rq_async_args;
1358         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1359         INIT_LIST_HEAD(&new_aa->aa_oaps);
1360         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1361         INIT_LIST_HEAD(&aa->aa_oaps);
1362
1363         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1364                 if (oap->oap_request) {
1365                         ptlrpc_req_finished(oap->oap_request);
1366                         oap->oap_request = ptlrpc_request_addref(new_req);
1367                 }
1368         }
1369
1370         ptlrpc_set_add_req(set, new_req);
1371
1372         RETURN(0);
1373 }
1374
1375 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
1376 {
1377         struct osc_brw_async_args *aa = data;
1378         int                        i;
1379         int                        nob = rc;
1380         ENTRY;
1381
1382         rc = osc_brw_fini_request(req, rc);
1383         if (rc == -EAGAIN) {
1384                 rc = osc_brw_redo_request(req, aa);
1385                 if (rc == 0)
1386                         RETURN(0);
1387         }
1388         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
1389                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
1390
1391         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1392         for (i = 0; i < aa->aa_page_count; i++)
1393                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1394         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1395
1396         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1397
1398         RETURN(rc);
1399 }
1400
1401 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1402                           struct lov_stripe_md *lsm, obd_count page_count,
1403                           struct brw_page **pga, struct ptlrpc_request_set *set,
1404                           struct obd_capa *ocapa)
1405 {
1406         struct ptlrpc_request     *req;
1407         struct client_obd         *cli = &exp->exp_obd->u.cli;
1408         int                        rc, i;
1409         ENTRY;
1410
1411         /* Consume write credits even if doing a sync write -
1412          * otherwise we may run out of space on OST due to grant. */
1413         if (cmd == OBD_BRW_WRITE) {
1414                 spin_lock(&cli->cl_loi_list_lock);
1415                 for (i = 0; i < page_count; i++) {
1416                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1417                                 osc_consume_write_grant(cli, pga[i]);
1418                 }
1419                 spin_unlock(&cli->cl_loi_list_lock);
1420         }
1421
1422         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1423                                   &req, ocapa);
1424         if (rc == 0) {
1425                 req->rq_interpret_reply = brw_interpret;
1426                 ptlrpc_set_add_req(set, req);
1427         } else if (cmd == OBD_BRW_WRITE) {
1428                 spin_lock(&cli->cl_loi_list_lock);
1429                 for (i = 0; i < page_count; i++)
1430                         osc_release_write_grant(cli, pga[i], 0);
1431                 spin_unlock(&cli->cl_loi_list_lock);
1432         }
1433         RETURN (rc);
1434 }
1435
1436 /*
1437  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1438  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1439  * fine for our small page arrays and doesn't require allocation.  its an
1440  * insertion sort that swaps elements that are strides apart, shrinking the
1441  * stride down until its '1' and the array is sorted.
1442  */
1443 static void sort_brw_pages(struct brw_page **array, int num)
1444 {
1445         int stride, i, j;
1446         struct brw_page *tmp;
1447
1448         if (num == 1)
1449                 return;
1450         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1451                 ;
1452
1453         do {
1454                 stride /= 3;
1455                 for (i = stride ; i < num ; i++) {
1456                         tmp = array[i];
1457                         j = i;
1458                         while (j >= stride && array[j - stride]->off > tmp->off) {
1459                                 array[j] = array[j - stride];
1460                                 j -= stride;
1461                         }
1462                         array[j] = tmp;
1463                 }
1464         } while (stride > 1);
1465 }
1466
1467 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1468 {
1469         int count = 1;
1470         int offset;
1471         int i = 0;
1472
1473         LASSERT (pages > 0);
1474         offset = pg[i]->off & ~CFS_PAGE_MASK;
1475
1476         for (;;) {
1477                 pages--;
1478                 if (pages == 0)         /* that's all */
1479                         return count;
1480
1481                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1482                         return count;   /* doesn't end on page boundary */
1483
1484                 i++;
1485                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1486                 if (offset != 0)        /* doesn't start on page boundary */
1487                         return count;
1488
1489                 count++;
1490         }
1491 }
1492
1493 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1494 {
1495         struct brw_page **ppga;
1496         int i;
1497
1498         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1499         if (ppga == NULL)
1500                 return NULL;
1501
1502         for (i = 0; i < count; i++)
1503                 ppga[i] = pga + i;
1504         return ppga;
1505 }
1506
1507 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1508 {
1509         LASSERT(ppga != NULL);
1510         OBD_FREE(ppga, sizeof(*ppga) * count);
1511 }
1512
1513 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1514                    obd_count page_count, struct brw_page *pga,
1515                    struct obd_trans_info *oti)
1516 {
1517         struct obdo *saved_oa = NULL;
1518         struct brw_page **ppga, **orig;
1519         struct obd_import *imp = class_exp2cliimp(exp);
1520         struct client_obd *cli = &imp->imp_obd->u.cli;
1521         int rc, page_count_orig;
1522         ENTRY;
1523
1524         if (cmd & OBD_BRW_CHECK) {
1525                 /* The caller just wants to know if there's a chance that this
1526                  * I/O can succeed */
1527
1528                 if (imp == NULL || imp->imp_invalid)
1529                         RETURN(-EIO);
1530                 RETURN(0);
1531         }
1532
1533         /* test_brw with a failed create can trip this, maybe others. */
1534         LASSERT(cli->cl_max_pages_per_rpc);
1535
1536         rc = 0;
1537
1538         orig = ppga = osc_build_ppga(pga, page_count);
1539         if (ppga == NULL)
1540                 RETURN(-ENOMEM);
1541         page_count_orig = page_count;
1542
1543         sort_brw_pages(ppga, page_count);
1544         while (page_count) {
1545                 obd_count pages_per_brw;
1546
1547                 if (page_count > cli->cl_max_pages_per_rpc)
1548                         pages_per_brw = cli->cl_max_pages_per_rpc;
1549                 else
1550                         pages_per_brw = page_count;
1551
1552                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1553
1554                 if (saved_oa != NULL) {
1555                         /* restore previously saved oa */
1556                         *oinfo->oi_oa = *saved_oa;
1557                 } else if (page_count > pages_per_brw) {
1558                         /* save a copy of oa (brw will clobber it) */
1559                         OBDO_ALLOC(saved_oa);
1560                         if (saved_oa == NULL)
1561                                 GOTO(out, rc = -ENOMEM);
1562                         *saved_oa = *oinfo->oi_oa;
1563                 }
1564
1565                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1566                                       pages_per_brw, ppga, oinfo->oi_capa);
1567
1568                 if (rc != 0)
1569                         break;
1570
1571                 page_count -= pages_per_brw;
1572                 ppga += pages_per_brw;
1573         }
1574
1575 out:
1576         osc_release_ppga(orig, page_count_orig);
1577
1578         if (saved_oa != NULL)
1579                 OBDO_FREE(saved_oa);
1580
1581         RETURN(rc);
1582 }
1583
1584 static int osc_brw_async(int cmd, struct obd_export *exp,
1585                          struct obd_info *oinfo, obd_count page_count,
1586                          struct brw_page *pga, struct obd_trans_info *oti,
1587                          struct ptlrpc_request_set *set)
1588 {
1589         struct brw_page **ppga, **orig;
1590         struct client_obd *cli = &exp->exp_obd->u.cli;
1591         int page_count_orig;
1592         int rc = 0;
1593         ENTRY;
1594
1595         if (cmd & OBD_BRW_CHECK) {
1596                 struct obd_import *imp = class_exp2cliimp(exp);
1597                 /* The caller just wants to know if there's a chance that this
1598                  * I/O can succeed */
1599
1600                 if (imp == NULL || imp->imp_invalid)
1601                         RETURN(-EIO);
1602                 RETURN(0);
1603         }
1604
1605         orig = ppga = osc_build_ppga(pga, page_count);
1606         if (ppga == NULL)
1607                 RETURN(-ENOMEM);
1608         page_count_orig = page_count;
1609
1610         sort_brw_pages(ppga, page_count);
1611         while (page_count) {
1612                 struct brw_page **copy;
1613                 obd_count pages_per_brw;
1614
1615                 pages_per_brw = min_t(obd_count, page_count,
1616                                       cli->cl_max_pages_per_rpc);
1617
1618                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1619
1620                 /* use ppga only if single RPC is going to fly */
1621                 if (pages_per_brw != page_count_orig || ppga != orig) {
1622                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1623                         if (copy == NULL)
1624                                 GOTO(out, rc = -ENOMEM);
1625                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1626                 } else
1627                         copy = ppga;
1628
1629                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1630                                     pages_per_brw, copy, set, oinfo->oi_capa);
1631
1632                 if (rc != 0) {
1633                         if (copy != ppga)
1634                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1635                         break;
1636                 }
1637                 if (copy == orig) {
1638                         /* we passed it to async_internal() which is
1639                          * now responsible for releasing memory */
1640                         orig = NULL;
1641                 }
1642
1643                 page_count -= pages_per_brw;
1644                 ppga += pages_per_brw;
1645         }
1646 out:
1647         if (orig)
1648                 osc_release_ppga(orig, page_count_orig);
1649         RETURN(rc);
1650 }
1651
1652 static void osc_check_rpcs(struct client_obd *cli);
1653
1654 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1655  * the dirty accounting.  Writeback completes or truncate happens before
1656  * writing starts.  Must be called with the loi lock held. */
1657 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1658                            int sent)
1659 {
1660         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1661 }
1662
1663
1664 /* This maintains the lists of pending pages to read/write for a given object
1665  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1666  * to quickly find objects that are ready to send an RPC. */
1667 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1668                          int cmd)
1669 {
1670         int optimal;
1671         ENTRY;
1672
1673         if (lop->lop_num_pending == 0)
1674                 RETURN(0);
1675
1676         /* if we have an invalid import we want to drain the queued pages
1677          * by forcing them through rpcs that immediately fail and complete
1678          * the pages.  recovery relies on this to empty the queued pages
1679          * before canceling the locks and evicting down the llite pages */
1680         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1681                 RETURN(1);
1682
1683         /* stream rpcs in queue order as long as as there is an urgent page
1684          * queued.  this is our cheap solution for good batching in the case
1685          * where writepage marks some random page in the middle of the file
1686          * as urgent because of, say, memory pressure */
1687         if (!list_empty(&lop->lop_urgent)) {
1688                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1689                 RETURN(1);
1690         }
1691         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1692         optimal = cli->cl_max_pages_per_rpc;
1693         if (cmd & OBD_BRW_WRITE) {
1694                 /* trigger a write rpc stream as long as there are dirtiers
1695                  * waiting for space.  as they're waiting, they're not going to
1696                  * create more pages to coallesce with what's waiting.. */
1697                 if (!list_empty(&cli->cl_cache_waiters)) {
1698                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1699                         RETURN(1);
1700                 }
1701                 /* +16 to avoid triggering rpcs that would want to include pages
1702                  * that are being queued but which can't be made ready until
1703                  * the queuer finishes with the page. this is a wart for
1704                  * llite::commit_write() */
1705                 optimal += 16;
1706         }
1707         if (lop->lop_num_pending >= optimal)
1708                 RETURN(1);
1709
1710         RETURN(0);
1711 }
1712
1713 static void on_list(struct list_head *item, struct list_head *list,
1714                     int should_be_on)
1715 {
1716         if (list_empty(item) && should_be_on)
1717                 list_add_tail(item, list);
1718         else if (!list_empty(item) && !should_be_on)
1719                 list_del_init(item);
1720 }
1721
1722 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1723  * can find pages to build into rpcs quickly */
1724 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1725 {
1726         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1727                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1728                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1729
1730         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1731                 loi->loi_write_lop.lop_num_pending);
1732
1733         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1734                 loi->loi_read_lop.lop_num_pending);
1735 }
1736
1737 static void lop_update_pending(struct client_obd *cli,
1738                                struct loi_oap_pages *lop, int cmd, int delta)
1739 {
1740         lop->lop_num_pending += delta;
1741         if (cmd & OBD_BRW_WRITE)
1742                 cli->cl_pending_w_pages += delta;
1743         else
1744                 cli->cl_pending_r_pages += delta;
1745 }
1746
1747 /* this is called when a sync waiter receives an interruption.  Its job is to
1748  * get the caller woken as soon as possible.  If its page hasn't been put in an
1749  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1750  * desiring interruption which will forcefully complete the rpc once the rpc
1751  * has timed out */
1752 static void osc_occ_interrupted(struct oig_callback_context *occ)
1753 {
1754         struct osc_async_page *oap;
1755         struct loi_oap_pages *lop;
1756         struct lov_oinfo *loi;
1757         ENTRY;
1758
1759         /* XXX member_of() */
1760         oap = list_entry(occ, struct osc_async_page, oap_occ);
1761
1762         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1763
1764         oap->oap_interrupted = 1;
1765
1766         /* ok, it's been put in an rpc. only one oap gets a request reference */
1767         if (oap->oap_request != NULL) {
1768                 ptlrpc_mark_interrupted(oap->oap_request);
1769                 ptlrpcd_wake(oap->oap_request);
1770                 GOTO(unlock, 0);
1771         }
1772
1773         /* we don't get interruption callbacks until osc_trigger_group_io()
1774          * has been called and put the sync oaps in the pending/urgent lists.*/
1775         if (!list_empty(&oap->oap_pending_item)) {
1776                 list_del_init(&oap->oap_pending_item);
1777                 list_del_init(&oap->oap_urgent_item);
1778
1779                 loi = oap->oap_loi;
1780                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1781                         &loi->loi_write_lop : &loi->loi_read_lop;
1782                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1783                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1784
1785                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1786                 oap->oap_oig = NULL;
1787         }
1788
1789 unlock:
1790         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1791 }
1792
1793 /* this is trying to propogate async writeback errors back up to the
1794  * application.  As an async write fails we record the error code for later if
1795  * the app does an fsync.  As long as errors persist we force future rpcs to be
1796  * sync so that the app can get a sync error and break the cycle of queueing
1797  * pages for which writeback will fail. */
1798 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1799                            int rc)
1800 {
1801         if (rc) {
1802                 if (!ar->ar_rc)
1803                         ar->ar_rc = rc;
1804
1805                 ar->ar_force_sync = 1;
1806                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1807                 return;
1808
1809         }
1810
1811         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1812                 ar->ar_force_sync = 0;
1813 }
1814
1815 static void osc_oap_to_pending(struct osc_async_page *oap)
1816 {
1817         struct loi_oap_pages *lop;
1818
1819         if (oap->oap_cmd & OBD_BRW_WRITE)
1820                 lop = &oap->oap_loi->loi_write_lop;
1821         else
1822                 lop = &oap->oap_loi->loi_read_lop;
1823
1824         if (oap->oap_async_flags & ASYNC_URGENT)
1825                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1826         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1827         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1828 }
1829
1830 /* this must be called holding the loi list lock to give coverage to exit_cache,
1831  * async_flag maintenance, and oap_request */
1832 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1833                               struct osc_async_page *oap, int sent, int rc)
1834 {
1835         ENTRY;
1836         oap->oap_async_flags = 0;
1837         oap->oap_interrupted = 0;
1838
1839         if (oap->oap_cmd & OBD_BRW_WRITE) {
1840                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1841                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1842         }
1843
1844         if (oap->oap_request != NULL) {
1845                 ptlrpc_req_finished(oap->oap_request);
1846                 oap->oap_request = NULL;
1847         }
1848
1849         if (rc == 0 && oa != NULL) {
1850                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1851                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1852                 if (oa->o_valid & OBD_MD_FLMTIME)
1853                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1854                 if (oa->o_valid & OBD_MD_FLATIME)
1855                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1856                 if (oa->o_valid & OBD_MD_FLCTIME)
1857                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1858         }
1859
1860         if (oap->oap_oig) {
1861                 osc_exit_cache(cli, oap, sent);
1862                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1863                 oap->oap_oig = NULL;
1864                 EXIT;
1865                 return;
1866         }
1867
1868         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1869                                                 oap->oap_cmd, oa, rc);
1870
1871         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1872          * I/O on the page could start, but OSC calls it under lock
1873          * and thus we can add oap back to pending safely */
1874         if (rc)
1875                 /* upper layer wants to leave the page on pending queue */
1876                 osc_oap_to_pending(oap);
1877         else
1878                 osc_exit_cache(cli, oap, sent);
1879         EXIT;
1880 }
1881
1882 static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
1883 {
1884         struct osc_async_page *oap, *tmp;
1885         struct osc_brw_async_args *aa = data;
1886         struct client_obd *cli;
1887         ENTRY;
1888
1889         rc = osc_brw_fini_request(req, rc);
1890         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1891         if (rc == -EAGAIN) {
1892                 rc = osc_brw_redo_request(req, aa);
1893                 if (rc == 0)
1894                         RETURN(0);
1895                 GOTO(out, rc);
1896         }
1897
1898         cli = aa->aa_cli;
1899
1900         client_obd_list_lock(&cli->cl_loi_list_lock);
1901
1902         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1903          * is called so we know whether to go to sync BRWs or wait for more
1904          * RPCs to complete */
1905         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1906                 cli->cl_w_in_flight--;
1907         else
1908                 cli->cl_r_in_flight--;
1909
1910         /* the caller may re-use the oap after the completion call so
1911          * we need to clean it up a little */
1912         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1913                 list_del_init(&oap->oap_rpc_item);
1914                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1915         }
1916
1917         osc_wake_cache_waiters(cli);
1918         osc_check_rpcs(cli);
1919
1920         client_obd_list_unlock(&cli->cl_loi_list_lock);
1921
1922         OBDO_FREE(aa->aa_oa);
1923         rc = 0;
1924 out:
1925         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1926         RETURN(rc);
1927 }
1928
1929 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1930                                             struct list_head *rpc_list,
1931                                             int page_count, int cmd)
1932 {
1933         struct ptlrpc_request *req;
1934         struct brw_page **pga = NULL;
1935         struct osc_brw_async_args *aa;
1936         struct obdo *oa = NULL;
1937         struct obd_async_page_ops *ops = NULL;
1938         void *caller_data = NULL;
1939         struct obd_capa *ocapa;
1940         struct osc_async_page *oap;
1941         int i, rc;
1942
1943         ENTRY;
1944         LASSERT(!list_empty(rpc_list));
1945
1946         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1947         if (pga == NULL)
1948                 RETURN(ERR_PTR(-ENOMEM));
1949
1950         OBDO_ALLOC(oa);
1951         if (oa == NULL)
1952                 GOTO(out, req = ERR_PTR(-ENOMEM));
1953
1954         i = 0;
1955         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1956                 if (ops == NULL) {
1957                         ops = oap->oap_caller_ops;
1958                         caller_data = oap->oap_caller_data;
1959                 }
1960                 pga[i] = &oap->oap_brw_page;
1961                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1962                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1963                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1964                 i++;
1965         }
1966
1967         /* always get the data for the obdo for the rpc */
1968         LASSERT(ops != NULL);
1969         ops->ap_fill_obdo(caller_data, cmd, oa);
1970         ocapa = ops->ap_lookup_capa(caller_data, cmd);
1971
1972         sort_brw_pages(pga, page_count);
1973         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1974                                   pga, &req, ocapa);
1975         capa_put(ocapa);
1976         if (rc != 0) {
1977                 CERROR("prep_req failed: %d\n", rc);
1978                 GOTO(out, req = ERR_PTR(rc));
1979         }
1980
1981         /* Need to update the timestamps after the request is built in case
1982          * we race with setattr (locally or in queue at OST).  If OST gets
1983          * later setattr before earlier BRW (as determined by the request xid),
1984          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1985          * way to do this in a single call.  bug 10150 */
1986         ops->ap_update_obdo(caller_data, cmd, oa,
1987                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1988
1989         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1990         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1991         INIT_LIST_HEAD(&aa->aa_oaps);
1992         list_splice(rpc_list, &aa->aa_oaps);
1993         INIT_LIST_HEAD(rpc_list);
1994
1995 out:
1996         if (IS_ERR(req)) {
1997                 if (oa)
1998                         OBDO_FREE(oa);
1999                 if (pga)
2000                         OBD_FREE(pga, sizeof(*pga) * page_count);
2001         }
2002         RETURN(req);
2003 }
2004
2005 /* the loi lock is held across this function but it's allowed to release
2006  * and reacquire it during its work */
2007 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2008                             int cmd, struct loi_oap_pages *lop)
2009 {
2010         struct ptlrpc_request *req;
2011         obd_count page_count = 0;
2012         struct osc_async_page *oap = NULL, *tmp;
2013         struct osc_brw_async_args *aa;
2014         struct obd_async_page_ops *ops;
2015         CFS_LIST_HEAD(rpc_list);
2016         unsigned int ending_offset;
2017         unsigned  starting_offset = 0;
2018         ENTRY;
2019
2020         /* first we find the pages we're allowed to work with */
2021         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2022                                  oap_pending_item) {
2023                 ops = oap->oap_caller_ops;
2024
2025                 LASSERT(oap->oap_magic == OAP_MAGIC);
2026
2027                 /* in llite being 'ready' equates to the page being locked
2028                  * until completion unlocks it.  commit_write submits a page
2029                  * as not ready because its unlock will happen unconditionally
2030                  * as the call returns.  if we race with commit_write giving
2031                  * us that page we dont' want to create a hole in the page
2032                  * stream, so we stop and leave the rpc to be fired by
2033                  * another dirtier or kupdated interval (the not ready page
2034                  * will still be on the dirty list).  we could call in
2035                  * at the end of ll_file_write to process the queue again. */
2036                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2037                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2038                         if (rc < 0)
2039                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2040                                                 "instead of ready\n", oap,
2041                                                 oap->oap_page, rc);
2042                         switch (rc) {
2043                         case -EAGAIN:
2044                                 /* llite is telling us that the page is still
2045                                  * in commit_write and that we should try
2046                                  * and put it in an rpc again later.  we
2047                                  * break out of the loop so we don't create
2048                                  * a hole in the sequence of pages in the rpc
2049                                  * stream.*/
2050                                 oap = NULL;
2051                                 break;
2052                         case -EINTR:
2053                                 /* the io isn't needed.. tell the checks
2054                                  * below to complete the rpc with EINTR */
2055                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2056                                 oap->oap_count = -EINTR;
2057                                 break;
2058                         case 0:
2059                                 oap->oap_async_flags |= ASYNC_READY;
2060                                 break;
2061                         default:
2062                                 LASSERTF(0, "oap %p page %p returned %d "
2063                                             "from make_ready\n", oap,
2064                                             oap->oap_page, rc);
2065                                 break;
2066                         }
2067                 }
2068                 if (oap == NULL)
2069                         break;
2070                 /*
2071                  * Page submitted for IO has to be locked. Either by
2072                  * ->ap_make_ready() or by higher layers.
2073                  *
2074                  * XXX nikita: this assertion should be adjusted when lustre
2075                  * starts using PG_writeback for pages being written out.
2076                  */
2077 #if defined(__KERNEL__) && defined(__LINUX__)
2078                 LASSERT(PageLocked(oap->oap_page));
2079 #endif
2080                 /* If there is a gap at the start of this page, it can't merge
2081                  * with any previous page, so we'll hand the network a
2082                  * "fragmented" page array that it can't transfer in 1 RDMA */
2083                 if (page_count != 0 && oap->oap_page_off != 0)
2084                         break;
2085
2086                 /* take the page out of our book-keeping */
2087                 list_del_init(&oap->oap_pending_item);
2088                 lop_update_pending(cli, lop, cmd, -1);
2089                 list_del_init(&oap->oap_urgent_item);
2090
2091                 if (page_count == 0)
2092                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2093                                           (PTLRPC_MAX_BRW_SIZE - 1);
2094
2095                 /* ask the caller for the size of the io as the rpc leaves. */
2096                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2097                         oap->oap_count =
2098                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2099                 if (oap->oap_count <= 0) {
2100                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2101                                oap->oap_count);
2102                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2103                         continue;
2104                 }
2105
2106                 /* now put the page back in our accounting */
2107                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2108                 if (++page_count >= cli->cl_max_pages_per_rpc)
2109                         break;
2110
2111                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2112                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2113                  * have the same alignment as the initial writes that allocated
2114                  * extents on the server. */
2115                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2116                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2117                 if (ending_offset == 0)
2118                         break;
2119
2120                 /* If there is a gap at the end of this page, it can't merge
2121                  * with any subsequent pages, so we'll hand the network a
2122                  * "fragmented" page array that it can't transfer in 1 RDMA */
2123                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2124                         break;
2125         }
2126
2127         osc_wake_cache_waiters(cli);
2128
2129         if (page_count == 0)
2130                 RETURN(0);
2131
2132         loi_list_maint(cli, loi);
2133
2134         client_obd_list_unlock(&cli->cl_loi_list_lock);
2135
2136         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2137         if (IS_ERR(req)) {
2138                 /* this should happen rarely and is pretty bad, it makes the
2139                  * pending list not follow the dirty order */
2140                 client_obd_list_lock(&cli->cl_loi_list_lock);
2141                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2142                         list_del_init(&oap->oap_rpc_item);
2143
2144                         /* queued sync pages can be torn down while the pages
2145                          * were between the pending list and the rpc */
2146                         if (oap->oap_interrupted) {
2147                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2148                                 osc_ap_completion(cli, NULL, oap, 0,
2149                                                   oap->oap_count);
2150                                 continue;
2151                         }
2152                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2153                 }
2154                 loi_list_maint(cli, loi);
2155                 RETURN(PTR_ERR(req));
2156         }
2157
2158         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2159
2160         if (cmd == OBD_BRW_READ) {
2161                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2162                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2163                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2164                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2165                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2166         } else {
2167                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2168                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2169                                  cli->cl_w_in_flight);
2170                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2171                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2172                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2173         }
2174
2175         client_obd_list_lock(&cli->cl_loi_list_lock);
2176
2177         if (cmd == OBD_BRW_READ)
2178                 cli->cl_r_in_flight++;
2179         else
2180                 cli->cl_w_in_flight++;
2181
2182         /* queued sync pages can be torn down while the pages
2183          * were between the pending list and the rpc */
2184         tmp = NULL;
2185         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2186                 /* only one oap gets a request reference */
2187                 if (tmp == NULL)
2188                         tmp = oap;
2189                 if (oap->oap_interrupted && !req->rq_intr) {
2190                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2191                                oap, req);
2192                         ptlrpc_mark_interrupted(req);
2193                 }
2194         }
2195         if (tmp != NULL)
2196                 tmp->oap_request = ptlrpc_request_addref(req);
2197
2198         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2199                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2200
2201         req->rq_interpret_reply = brw_interpret_oap;
2202         ptlrpcd_add_req(req);
2203         RETURN(1);
2204 }
2205
2206 #define LOI_DEBUG(LOI, STR, args...)                                     \
2207         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2208                !list_empty(&(LOI)->loi_cli_item),                        \
2209                (LOI)->loi_write_lop.lop_num_pending,                     \
2210                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2211                (LOI)->loi_read_lop.lop_num_pending,                      \
2212                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2213                args)                                                     \
2214
2215 /* This is called by osc_check_rpcs() to find which objects have pages that
2216  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2217 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2218 {
2219         ENTRY;
2220         /* first return all objects which we already know to have
2221          * pages ready to be stuffed into rpcs */
2222         if (!list_empty(&cli->cl_loi_ready_list))
2223                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2224                                   struct lov_oinfo, loi_cli_item));
2225
2226         /* then if we have cache waiters, return all objects with queued
2227          * writes.  This is especially important when many small files
2228          * have filled up the cache and not been fired into rpcs because
2229          * they don't pass the nr_pending/object threshhold */
2230         if (!list_empty(&cli->cl_cache_waiters) &&
2231             !list_empty(&cli->cl_loi_write_list))
2232                 RETURN(list_entry(cli->cl_loi_write_list.next,
2233                                   struct lov_oinfo, loi_write_item));
2234
2235         /* then return all queued objects when we have an invalid import
2236          * so that they get flushed */
2237         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2238                 if (!list_empty(&cli->cl_loi_write_list))
2239                         RETURN(list_entry(cli->cl_loi_write_list.next,
2240                                           struct lov_oinfo, loi_write_item));
2241                 if (!list_empty(&cli->cl_loi_read_list))
2242                         RETURN(list_entry(cli->cl_loi_read_list.next,
2243                                           struct lov_oinfo, loi_read_item));
2244         }
2245         RETURN(NULL);
2246 }
2247
2248 /* called with the loi list lock held */
2249 static void osc_check_rpcs(struct client_obd *cli)
2250 {
2251         struct lov_oinfo *loi;
2252         int rc = 0, race_counter = 0;
2253         ENTRY;
2254
2255         while ((loi = osc_next_loi(cli)) != NULL) {
2256                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2257
2258                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2259                         break;
2260
2261                 /* attempt some read/write balancing by alternating between
2262                  * reads and writes in an object.  The makes_rpc checks here
2263                  * would be redundant if we were getting read/write work items
2264                  * instead of objects.  we don't want send_oap_rpc to drain a
2265                  * partial read pending queue when we're given this object to
2266                  * do io on writes while there are cache waiters */
2267                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2268                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2269                                               &loi->loi_write_lop);
2270                         if (rc < 0)
2271                                 break;
2272                         if (rc > 0)
2273                                 race_counter = 0;
2274                         else
2275                                 race_counter++;
2276                 }
2277                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2278                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2279                                               &loi->loi_read_lop);
2280                         if (rc < 0)
2281                                 break;
2282                         if (rc > 0)
2283                                 race_counter = 0;
2284                         else
2285                                 race_counter++;
2286                 }
2287
2288                 /* attempt some inter-object balancing by issueing rpcs
2289                  * for each object in turn */
2290                 if (!list_empty(&loi->loi_cli_item))
2291                         list_del_init(&loi->loi_cli_item);
2292                 if (!list_empty(&loi->loi_write_item))
2293                         list_del_init(&loi->loi_write_item);
2294                 if (!list_empty(&loi->loi_read_item))
2295                         list_del_init(&loi->loi_read_item);
2296
2297                 loi_list_maint(cli, loi);
2298
2299                 /* send_oap_rpc fails with 0 when make_ready tells it to
2300                  * back off.  llite's make_ready does this when it tries
2301                  * to lock a page queued for write that is already locked.
2302                  * we want to try sending rpcs from many objects, but we
2303                  * don't want to spin failing with 0.  */
2304                 if (race_counter == 10)
2305                         break;
2306         }
2307         EXIT;
2308 }
2309
2310 /* we're trying to queue a page in the osc so we're subject to the
2311  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2312  * If the osc's queued pages are already at that limit, then we want to sleep
2313  * until there is space in the osc's queue for us.  We also may be waiting for
2314  * write credits from the OST if there are RPCs in flight that may return some
2315  * before we fall back to sync writes.
2316  *
2317  * We need this know our allocation was granted in the presence of signals */
2318 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2319 {
2320         int rc;
2321         ENTRY;
2322         client_obd_list_lock(&cli->cl_loi_list_lock);
2323         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2324         client_obd_list_unlock(&cli->cl_loi_list_lock);
2325         RETURN(rc);
2326 };
2327
2328 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2329  * grant or cache space. */
2330 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2331                            struct osc_async_page *oap)
2332 {
2333         struct osc_cache_waiter ocw;
2334         struct l_wait_info lwi = { 0 };
2335
2336         ENTRY;
2337
2338         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2339                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2340                cli->cl_dirty_max, obd_max_dirty_pages,
2341                cli->cl_lost_grant, cli->cl_avail_grant);
2342
2343         /* force the caller to try sync io.  this can jump the list
2344          * of queued writes and create a discontiguous rpc stream */
2345         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2346             loi->loi_ar.ar_force_sync)
2347                 RETURN(-EDQUOT);
2348
2349         /* Hopefully normal case - cache space and write credits available */
2350         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2351             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2352             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2353                 /* account for ourselves */
2354                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2355                 RETURN(0);
2356         }
2357
2358         /* Make sure that there are write rpcs in flight to wait for.  This
2359          * is a little silly as this object may not have any pending but
2360          * other objects sure might. */
2361         if (cli->cl_w_in_flight) {
2362                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2363                 cfs_waitq_init(&ocw.ocw_waitq);
2364                 ocw.ocw_oap = oap;
2365                 ocw.ocw_rc = 0;
2366
2367                 loi_list_maint(cli, loi);
2368                 osc_check_rpcs(cli);
2369                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2370
2371                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2372                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2373
2374                 client_obd_list_lock(&cli->cl_loi_list_lock);
2375                 if (!list_empty(&ocw.ocw_entry)) {
2376                         list_del(&ocw.ocw_entry);
2377                         RETURN(-EINTR);
2378                 }
2379                 RETURN(ocw.ocw_rc);
2380         }
2381
2382         RETURN(-EDQUOT);
2383 }
2384
2385 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2386                         struct lov_oinfo *loi, cfs_page_t *page,
2387                         obd_off offset, struct obd_async_page_ops *ops,
2388                         void *data, void **res)
2389 {
2390         struct osc_async_page *oap;
2391         ENTRY;
2392
2393         if (!page)
2394                 return size_round(sizeof(*oap));
2395
2396         oap = *res;
2397         oap->oap_magic = OAP_MAGIC;
2398         oap->oap_cli = &exp->exp_obd->u.cli;
2399         oap->oap_loi = loi;
2400
2401         oap->oap_caller_ops = ops;
2402         oap->oap_caller_data = data;
2403
2404         oap->oap_page = page;
2405         oap->oap_obj_off = offset;
2406
2407         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2408         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2409         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2410
2411         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2412
2413         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2414         RETURN(0);
2415 }
2416
2417 struct osc_async_page *oap_from_cookie(void *cookie)
2418 {
2419         struct osc_async_page *oap = cookie;
2420         if (oap->oap_magic != OAP_MAGIC)
2421                 return ERR_PTR(-EINVAL);
2422         return oap;
2423 };
2424
2425 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2426                               struct lov_oinfo *loi, void *cookie,
2427                               int cmd, obd_off off, int count,
2428                               obd_flag brw_flags, enum async_flags async_flags)
2429 {
2430         struct client_obd *cli = &exp->exp_obd->u.cli;
2431         struct osc_async_page *oap;
2432         int rc = 0;
2433         ENTRY;
2434
2435         oap = oap_from_cookie(cookie);
2436         if (IS_ERR(oap))
2437                 RETURN(PTR_ERR(oap));
2438
2439         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2440                 RETURN(-EIO);
2441
2442         if (!list_empty(&oap->oap_pending_item) ||
2443             !list_empty(&oap->oap_urgent_item) ||
2444             !list_empty(&oap->oap_rpc_item))
2445                 RETURN(-EBUSY);
2446
2447         /* check if the file's owner/group is over quota */
2448 #ifdef HAVE_QUOTA_SUPPORT
2449         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2450                 struct obd_async_page_ops *ops;
2451                 struct obdo *oa;
2452
2453                 OBDO_ALLOC(oa);
2454                 if (oa == NULL)
2455                         RETURN(-ENOMEM);
2456
2457                 ops = oap->oap_caller_ops;
2458                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2459                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2460                     NO_QUOTA)
2461                         rc = -EDQUOT;
2462
2463                 OBDO_FREE(oa);
2464                 if (rc)
2465                         RETURN(rc);
2466         }
2467 #endif
2468
2469         if (loi == NULL)
2470                 loi = lsm->lsm_oinfo[0];
2471
2472         client_obd_list_lock(&cli->cl_loi_list_lock);
2473
2474         oap->oap_cmd = cmd;
2475         oap->oap_page_off = off;
2476         oap->oap_count = count;
2477         oap->oap_brw_flags = brw_flags;
2478         oap->oap_async_flags = async_flags;
2479
2480         if (cmd & OBD_BRW_WRITE) {
2481                 rc = osc_enter_cache(cli, loi, oap);
2482                 if (rc) {
2483                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2484                         RETURN(rc);
2485                 }
2486         }
2487
2488         osc_oap_to_pending(oap);
2489         loi_list_maint(cli, loi);
2490
2491         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2492                   cmd);
2493
2494         osc_check_rpcs(cli);
2495         client_obd_list_unlock(&cli->cl_loi_list_lock);
2496
2497         RETURN(0);
2498 }
2499
2500 /* aka (~was & now & flag), but this is more clear :) */
2501 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2502
2503 static int osc_set_async_flags(struct obd_export *exp,
2504                                struct lov_stripe_md *lsm,
2505                                struct lov_oinfo *loi, void *cookie,
2506                                obd_flag async_flags)
2507 {
2508         struct client_obd *cli = &exp->exp_obd->u.cli;
2509         struct loi_oap_pages *lop;
2510         struct osc_async_page *oap;
2511         int rc = 0;
2512         ENTRY;
2513
2514         oap = oap_from_cookie(cookie);
2515         if (IS_ERR(oap))
2516                 RETURN(PTR_ERR(oap));
2517
2518         /*
2519          * bug 7311: OST-side locking is only supported for liblustre for now
2520          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2521          * implementation has to handle case where OST-locked page was picked
2522          * up by, e.g., ->writepage().
2523          */
2524         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2525         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2526                                      * tread here. */
2527
2528         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2529                 RETURN(-EIO);
2530
2531         if (loi == NULL)
2532                 loi = lsm->lsm_oinfo[0];
2533
2534         if (oap->oap_cmd & OBD_BRW_WRITE) {
2535                 lop = &loi->loi_write_lop;
2536         } else {
2537                 lop = &loi->loi_read_lop;
2538         }
2539
2540         client_obd_list_lock(&cli->cl_loi_list_lock);
2541
2542         if (list_empty(&oap->oap_pending_item))
2543                 GOTO(out, rc = -EINVAL);
2544
2545         if ((oap->oap_async_flags & async_flags) == async_flags)
2546                 GOTO(out, rc = 0);
2547
2548         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2549                 oap->oap_async_flags |= ASYNC_READY;
2550
2551         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2552                 if (list_empty(&oap->oap_rpc_item)) {
2553                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2554                         loi_list_maint(cli, loi);
2555                 }
2556         }
2557
2558         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2559                         oap->oap_async_flags);
2560 out:
2561         osc_check_rpcs(cli);
2562         client_obd_list_unlock(&cli->cl_loi_list_lock);
2563         RETURN(rc);
2564 }
2565
2566 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2567                              struct lov_oinfo *loi,
2568                              struct obd_io_group *oig, void *cookie,
2569                              int cmd, obd_off off, int count,
2570                              obd_flag brw_flags,
2571                              obd_flag async_flags)
2572 {
2573         struct client_obd *cli = &exp->exp_obd->u.cli;
2574         struct osc_async_page *oap;
2575         struct loi_oap_pages *lop;
2576         int rc = 0;
2577         ENTRY;
2578
2579         oap = oap_from_cookie(cookie);
2580         if (IS_ERR(oap))
2581                 RETURN(PTR_ERR(oap));
2582
2583         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2584                 RETURN(-EIO);
2585
2586         if (!list_empty(&oap->oap_pending_item) ||
2587             !list_empty(&oap->oap_urgent_item) ||
2588             !list_empty(&oap->oap_rpc_item))
2589                 RETURN(-EBUSY);
2590
2591         if (loi == NULL)
2592                 loi = lsm->lsm_oinfo[0];
2593
2594         client_obd_list_lock(&cli->cl_loi_list_lock);
2595
2596         oap->oap_cmd = cmd;
2597         oap->oap_page_off = off;
2598         oap->oap_count = count;
2599         oap->oap_brw_flags = brw_flags;
2600         oap->oap_async_flags = async_flags;
2601
2602         if (cmd & OBD_BRW_WRITE)
2603                 lop = &loi->loi_write_lop;
2604         else
2605                 lop = &loi->loi_read_lop;
2606
2607         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2608         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2609                 oap->oap_oig = oig;
2610                 rc = oig_add_one(oig, &oap->oap_occ);
2611         }
2612
2613         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2614                   oap, oap->oap_page, rc);
2615
2616         client_obd_list_unlock(&cli->cl_loi_list_lock);
2617
2618         RETURN(rc);
2619 }
2620
2621 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2622                                  struct loi_oap_pages *lop, int cmd)
2623 {
2624         struct list_head *pos, *tmp;
2625         struct osc_async_page *oap;
2626
2627         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2628                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2629                 list_del(&oap->oap_pending_item);
2630                 osc_oap_to_pending(oap);
2631         }
2632         loi_list_maint(cli, loi);
2633 }
2634
2635 static int osc_trigger_group_io(struct obd_export *exp,
2636                                 struct lov_stripe_md *lsm,
2637                                 struct lov_oinfo *loi,
2638                                 struct obd_io_group *oig)
2639 {
2640         struct client_obd *cli = &exp->exp_obd->u.cli;
2641         ENTRY;
2642
2643         if (loi == NULL)
2644                 loi = lsm->lsm_oinfo[0];
2645
2646         client_obd_list_lock(&cli->cl_loi_list_lock);
2647
2648         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2649         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2650
2651         osc_check_rpcs(cli);
2652         client_obd_list_unlock(&cli->cl_loi_list_lock);
2653
2654         RETURN(0);
2655 }
2656
2657 static int osc_teardown_async_page(struct obd_export *exp,
2658                                    struct lov_stripe_md *lsm,
2659                                    struct lov_oinfo *loi, void *cookie)
2660 {
2661         struct client_obd *cli = &exp->exp_obd->u.cli;
2662         struct loi_oap_pages *lop;
2663         struct osc_async_page *oap;
2664         int rc = 0;
2665         ENTRY;
2666
2667         oap = oap_from_cookie(cookie);
2668         if (IS_ERR(oap))
2669                 RETURN(PTR_ERR(oap));
2670
2671         if (loi == NULL)
2672                 loi = lsm->lsm_oinfo[0];
2673
2674         if (oap->oap_cmd & OBD_BRW_WRITE) {
2675                 lop = &loi->loi_write_lop;
2676         } else {
2677                 lop = &loi->loi_read_lop;
2678         }
2679
2680         client_obd_list_lock(&cli->cl_loi_list_lock);
2681
2682         if (!list_empty(&oap->oap_rpc_item))
2683                 GOTO(out, rc = -EBUSY);
2684
2685         osc_exit_cache(cli, oap, 0);
2686         osc_wake_cache_waiters(cli);
2687
2688         if (!list_empty(&oap->oap_urgent_item)) {
2689                 list_del_init(&oap->oap_urgent_item);
2690                 oap->oap_async_flags &= ~ASYNC_URGENT;
2691         }
2692         if (!list_empty(&oap->oap_pending_item)) {
2693                 list_del_init(&oap->oap_pending_item);
2694                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2695         }
2696         loi_list_maint(cli, loi);
2697
2698         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2699 out:
2700         client_obd_list_unlock(&cli->cl_loi_list_lock);
2701         RETURN(rc);
2702 }
2703
2704 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2705                                     int flags)
2706 {
2707         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2708
2709         if (lock == NULL) {
2710                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2711                 return;
2712         }
2713         lock_res_and_lock(lock);
2714 #ifdef __KERNEL__
2715 #ifdef __LINUX__
2716         /* Liang XXX: Darwin and Winnt checking should be added */
2717         if (lock->l_ast_data && lock->l_ast_data != data) {
2718                 struct inode *new_inode = data;
2719                 struct inode *old_inode = lock->l_ast_data;
2720                 if (!(old_inode->i_state & I_FREEING))
2721                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2722                 LASSERTF(old_inode->i_state & I_FREEING,
2723                          "Found existing inode %p/%lu/%u state %lu in lock: "
2724                          "setting data to %p/%lu/%u\n", old_inode,
2725                          old_inode->i_ino, old_inode->i_generation,
2726                          old_inode->i_state,
2727                          new_inode, new_inode->i_ino, new_inode->i_generation);
2728         }
2729 #endif
2730 #endif
2731         lock->l_ast_data = data;
2732         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2733         unlock_res_and_lock(lock);
2734         LDLM_LOCK_PUT(lock);
2735 }
2736
2737 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2738                              ldlm_iterator_t replace, void *data)
2739 {
2740         struct ldlm_res_id res_id = { .name = {0} };
2741         struct obd_device *obd = class_exp2obd(exp);
2742
2743         res_id.name[0] = lsm->lsm_object_id;
2744         res_id.name[2] = lsm->lsm_object_gr;
2745
2746         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2747         return 0;
2748 }
2749
2750 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2751                             int intent, int rc)
2752 {
2753         ENTRY;
2754
2755         if (intent) {
2756                 /* The request was created before ldlm_cli_enqueue call. */
2757                 if (rc == ELDLM_LOCK_ABORTED) {
2758                         struct ldlm_reply *rep;
2759
2760                         /* swabbed by ldlm_cli_enqueue() */
2761                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2762                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2763                                              sizeof(*rep));
2764                         LASSERT(rep != NULL);
2765                         if (rep->lock_policy_res1)
2766                                 rc = rep->lock_policy_res1;
2767                 }
2768         }
2769
2770         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2771                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2772                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2773                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2774                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2775         }
2776
2777         /* Call the update callback. */
2778         rc = oinfo->oi_cb_up(oinfo, rc);
2779         RETURN(rc);
2780 }
2781
2782 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2783                                  struct osc_enqueue_args *aa, int rc)
2784 {
2785         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2786         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2787         struct ldlm_lock *lock;
2788
2789         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2790          * be valid. */
2791         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2792
2793         /* Complete obtaining the lock procedure. */
2794         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2795                                    aa->oa_ei->ei_mode,
2796                                    &aa->oa_oi->oi_flags,
2797                                    &lsm->lsm_oinfo[0]->loi_lvb,
2798                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2799                                    lustre_swab_ost_lvb,
2800                                    aa->oa_oi->oi_lockh, rc);
2801
2802         /* Complete osc stuff. */
2803         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2804
2805         /* Release the lock for async request. */
2806         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2807                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2808
2809         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2810                  aa->oa_oi->oi_lockh, req, aa);
2811         LDLM_LOCK_PUT(lock);
2812         return rc;
2813 }
2814
2815 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2816  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2817  * other synchronous requests, however keeping some locks and trying to obtain
2818  * others may take a considerable amount of time in a case of ost failure; and
2819  * when other sync requests do not get released lock from a client, the client
2820  * is excluded from the cluster -- such scenarious make the life difficult, so
2821  * release locks just after they are obtained. */
2822 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2823                        struct ldlm_enqueue_info *einfo,
2824                        struct ptlrpc_request_set *rqset)
2825 {
2826         struct ldlm_res_id res_id = { .name = {0} };
2827         struct obd_device *obd = exp->exp_obd;
2828         struct ldlm_reply *rep;
2829         struct ptlrpc_request *req = NULL;
2830         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
2831         int rc;
2832         ENTRY;
2833
2834         res_id.name[0] = oinfo->oi_md->lsm_object_id;
2835         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
2836
2837         /* Filesystem lock extents are extended to page boundaries so that
2838          * dealing with the page cache is a little smoother.  */
2839         oinfo->oi_policy.l_extent.start -=
2840                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2841         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2842
2843         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2844                 goto no_match;
2845
2846         /* Next, search for already existing extent locks that will cover us */
2847         rc = ldlm_lock_match(obd->obd_namespace,
2848                              oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
2849                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2850                              oinfo->oi_lockh);
2851         if (rc == 1) {
2852                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2853                                         oinfo->oi_flags);
2854                 if (intent) {
2855                         /* I would like to be able to ASSERT here that rss <=
2856                          * kms, but I can't, for reasons which are explained in
2857                          * lov_enqueue() */
2858                 }
2859
2860                 /* We already have a lock, and it's referenced */
2861                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2862
2863                 /* For async requests, decref the lock. */
2864                 if (rqset)
2865                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2866
2867                 RETURN(ELDLM_OK);
2868         }
2869
2870         /* If we're trying to read, we also search for an existing PW lock.  The
2871          * VFS and page cache already protect us locally, so lots of readers/
2872          * writers can share a single PW lock.
2873          *
2874          * There are problems with conversion deadlocks, so instead of
2875          * converting a read lock to a write lock, we'll just enqueue a new
2876          * one.
2877          *
2878          * At some point we should cancel the read lock instead of making them
2879          * send us a blocking callback, but there are problems with canceling
2880          * locks out from other users right now, too. */
2881
2882         if (einfo->ei_mode == LCK_PR) {
2883                 rc = ldlm_lock_match(obd->obd_namespace,
2884                                      oinfo->oi_flags | LDLM_FL_LVB_READY,
2885                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2886                                      LCK_PW, oinfo->oi_lockh);
2887                 if (rc == 1) {
2888                         /* FIXME: This is not incredibly elegant, but it might
2889                          * be more elegant than adding another parameter to
2890                          * lock_match.  I want a second opinion. */
2891                         /* addref the lock only if not async requests. */
2892                         if (!rqset)
2893                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2894                         osc_set_data_with_check(oinfo->oi_lockh,
2895                                                 einfo->ei_cbdata,
2896                                                 oinfo->oi_flags);
2897                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2898                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2899                         RETURN(ELDLM_OK);
2900                 }
2901         }
2902
2903  no_match:
2904         if (intent) {
2905                 int size[3] = {
2906                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2907                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
2908                         [DLM_LOCKREQ_OFF + 1] = 0 };
2909
2910                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
2911                 if (req == NULL)
2912                         RETURN(-ENOMEM);
2913
2914                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2915                 size[DLM_REPLY_REC_OFF] =
2916                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2917                 ptlrpc_req_set_repsize(req, 3, size);
2918         }
2919
2920         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2921         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
2922
2923         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
2924                               &oinfo->oi_policy, &oinfo->oi_flags,
2925                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2926                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2927                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2928                               rqset ? 1 : 0);
2929         if (rqset) {
2930                 if (!rc) {
2931                         struct osc_enqueue_args *aa;
2932                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2933                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2934                         aa->oa_oi = oinfo;
2935                         aa->oa_ei = einfo;
2936                         aa->oa_exp = exp;
2937
2938                         req->rq_interpret_reply = osc_enqueue_interpret;
2939                         ptlrpc_set_add_req(rqset, req);
2940                 } else if (intent) {
2941                         ptlrpc_req_finished(req);
2942                 }
2943                 RETURN(rc);
2944         }
2945
2946         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2947         if (intent)
2948                 ptlrpc_req_finished(req);
2949
2950         RETURN(rc);
2951 }
2952
2953 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2954                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2955                      int *flags, void *data, struct lustre_handle *lockh)
2956 {
2957         struct ldlm_res_id res_id = { .name = {0} };
2958         struct obd_device *obd = exp->exp_obd;
2959         int rc;
2960         int lflags = *flags;
2961         ENTRY;
2962
2963         res_id.name[0] = lsm->lsm_object_id;
2964         res_id.name[2] = lsm->lsm_object_gr;
2965
2966         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2967
2968         /* Filesystem lock extents are extended to page boundaries so that
2969          * dealing with the page cache is a little smoother */
2970         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2971         policy->l_extent.end |= ~CFS_PAGE_MASK;
2972
2973         /* Next, search for already existing extent locks that will cover us */
2974         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
2975                              &res_id, type, policy, mode, lockh);
2976         if (rc) {
2977                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2978                         osc_set_data_with_check(lockh, data, lflags);
2979                 RETURN(rc);
2980         }
2981         /* If we're trying to read, we also search for an existing PW lock.  The
2982          * VFS and page cache already protect us locally, so lots of readers/
2983          * writers can share a single PW lock. */
2984         if (mode == LCK_PR) {
2985                 rc = ldlm_lock_match(obd->obd_namespace,
2986                                      lflags | LDLM_FL_LVB_READY, &res_id,
2987                                      type, policy, LCK_PW, lockh);
2988                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2989                         /* FIXME: This is not incredibly elegant, but it might
2990                          * be more elegant than adding another parameter to
2991                          * lock_match.  I want a second opinion. */
2992                         osc_set_data_with_check(lockh, data, lflags);
2993                         ldlm_lock_addref(lockh, LCK_PR);
2994                         ldlm_lock_decref(lockh, LCK_PW);
2995                 }
2996         }
2997         RETURN(rc);
2998 }
2999
3000 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3001                       __u32 mode, struct lustre_handle *lockh)
3002 {
3003         ENTRY;
3004
3005         if (unlikely(mode == LCK_GROUP))
3006                 ldlm_lock_decref_and_cancel(lockh, mode);
3007         else
3008                 ldlm_lock_decref(lockh, mode);
3009
3010         RETURN(0);
3011 }
3012
3013 static int osc_cancel_unused(struct obd_export *exp,
3014                              struct lov_stripe_md *lsm, int flags,
3015                              void *opaque)
3016 {
3017         struct obd_device *obd = class_exp2obd(exp);
3018         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3019
3020         if (lsm != NULL) {
3021                 res_id.name[0] = lsm->lsm_object_id;
3022                 res_id.name[2] = lsm->lsm_object_gr;
3023                 resp = &res_id;
3024         }
3025
3026         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3027 }
3028
3029 static int osc_join_lru(struct obd_export *exp,
3030                         struct lov_stripe_md *lsm, int join)
3031 {
3032         struct obd_device *obd = class_exp2obd(exp);
3033         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3034
3035         if (lsm != NULL) {
3036                 res_id.name[0] = lsm->lsm_object_id;
3037                 res_id.name[2] = lsm->lsm_object_gr;
3038                 resp = &res_id;
3039         }
3040
3041         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3042 }
3043
3044 static int osc_statfs_interpret(struct ptlrpc_request *req,
3045                                 struct osc_async_args *aa, int rc)
3046 {
3047         struct obd_statfs *msfs;
3048         ENTRY;
3049
3050         if (rc != 0)
3051                 GOTO(out, rc);
3052
3053         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3054                                   lustre_swab_obd_statfs);
3055         if (msfs == NULL) {
3056                 CERROR("Can't unpack obd_statfs\n");
3057                 GOTO(out, rc = -EPROTO);
3058         }
3059
3060         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3061 out:
3062         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3063         RETURN(rc);
3064 }
3065
3066 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3067                             __u64 max_age, struct ptlrpc_request_set *rqset)
3068 {
3069         struct ptlrpc_request *req;
3070         struct osc_async_args *aa;
3071         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3072         ENTRY;
3073
3074         /* We could possibly pass max_age in the request (as an absolute
3075          * timestamp or a "seconds.usec ago") so the target can avoid doing
3076          * extra calls into the filesystem if that isn't necessary (e.g.
3077          * during mount that would help a bit).  Having relative timestamps
3078          * is not so great if request processing is slow, while absolute
3079          * timestamps are not ideal because they need time synchronization. */
3080         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3081                               OST_STATFS, 1, NULL, NULL);
3082         if (!req)
3083                 RETURN(-ENOMEM);
3084
3085         ptlrpc_req_set_repsize(req, 2, size);
3086         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3087
3088         req->rq_interpret_reply = osc_statfs_interpret;
3089         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3090         aa = (struct osc_async_args *)&req->rq_async_args;
3091         aa->aa_oi = oinfo;
3092
3093         ptlrpc_set_add_req(rqset, req);
3094         RETURN(0);
3095 }
3096
3097 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3098                       __u64 max_age)
3099 {
3100         struct obd_statfs *msfs;
3101         struct ptlrpc_request *req;
3102         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3103         ENTRY;
3104
3105         /* We could possibly pass max_age in the request (as an absolute
3106          * timestamp or a "seconds.usec ago") so the target can avoid doing
3107          * extra calls into the filesystem if that isn't necessary (e.g.
3108          * during mount that would help a bit).  Having relative timestamps
3109          * is not so great if request processing is slow, while absolute
3110          * timestamps are not ideal because they need time synchronization. */
3111         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3112                               OST_STATFS, 1, NULL, NULL);
3113         if (!req)
3114                 RETURN(-ENOMEM);
3115
3116         ptlrpc_req_set_repsize(req, 2, size);
3117         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3118
3119         rc = ptlrpc_queue_wait(req);
3120         if (rc)
3121                 GOTO(out, rc);
3122
3123         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3124                                   lustre_swab_obd_statfs);
3125         if (msfs == NULL) {
3126                 CERROR("Can't unpack obd_statfs\n");
3127                 GOTO(out, rc = -EPROTO);
3128         }
3129
3130         memcpy(osfs, msfs, sizeof(*osfs));
3131
3132         EXIT;
3133  out:
3134         ptlrpc_req_finished(req);
3135         return rc;
3136 }
3137
3138 /* Retrieve object striping information.
3139  *
3140  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3141  * the maximum number of OST indices which will fit in the user buffer.
3142  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3143  */
3144 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3145 {
3146         struct lov_user_md lum, *lumk;
3147         int rc = 0, lum_size;
3148         ENTRY;
3149
3150         if (!lsm)
3151                 RETURN(-ENODATA);
3152
3153         if (copy_from_user(&lum, lump, sizeof(lum)))
3154                 RETURN(-EFAULT);
3155
3156         if (lum.lmm_magic != LOV_USER_MAGIC)
3157                 RETURN(-EINVAL);
3158
3159         if (lum.lmm_stripe_count > 0) {
3160                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3161                 OBD_ALLOC(lumk, lum_size);
3162                 if (!lumk)
3163                         RETURN(-ENOMEM);
3164
3165                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3166                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3167         } else {
3168                 lum_size = sizeof(lum);
3169                 lumk = &lum;
3170         }
3171
3172         lumk->lmm_object_id = lsm->lsm_object_id;
3173         lumk->lmm_object_gr = lsm->lsm_object_gr;
3174         lumk->lmm_stripe_count = 1;
3175
3176         if (copy_to_user(lump, lumk, lum_size))
3177                 rc = -EFAULT;
3178
3179         if (lumk != &lum)
3180                 OBD_FREE(lumk, lum_size);
3181
3182         RETURN(rc);
3183 }
3184
3185
3186 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3187                          void *karg, void *uarg)
3188 {
3189         struct obd_device *obd = exp->exp_obd;
3190         struct obd_ioctl_data *data = karg;
3191         int err = 0;
3192         ENTRY;
3193
3194 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3195         MOD_INC_USE_COUNT;
3196 #else
3197         if (!try_module_get(THIS_MODULE)) {
3198                 CERROR("Can't get module. Is it alive?");
3199                 return -EINVAL;
3200         }
3201 #endif
3202         switch (cmd) {
3203         case OBD_IOC_LOV_GET_CONFIG: {
3204                 char *buf;
3205                 struct lov_desc *desc;
3206                 struct obd_uuid uuid;
3207
3208                 buf = NULL;
3209                 len = 0;
3210                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3211                         GOTO(out, err = -EINVAL);
3212
3213                 data = (struct obd_ioctl_data *)buf;
3214
3215                 if (sizeof(*desc) > data->ioc_inllen1) {
3216                         obd_ioctl_freedata(buf, len);
3217                         GOTO(out, err = -EINVAL);
3218                 }
3219
3220                 if (data->ioc_inllen2 < sizeof(uuid)) {
3221                         obd_ioctl_freedata(buf, len);
3222                         GOTO(out, err = -EINVAL);
3223                 }
3224
3225                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3226                 desc->ld_tgt_count = 1;
3227                 desc->ld_active_tgt_count = 1;
3228                 desc->ld_default_stripe_count = 1;
3229                 desc->ld_default_stripe_size = 0;
3230                 desc->ld_default_stripe_offset = 0;
3231                 desc->ld_pattern = 0;
3232                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3233
3234                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3235
3236                 err = copy_to_user((void *)uarg, buf, len);
3237                 if (err)
3238                         err = -EFAULT;
3239                 obd_ioctl_freedata(buf, len);
3240                 GOTO(out, err);
3241         }
3242         case LL_IOC_LOV_SETSTRIPE:
3243                 err = obd_alloc_memmd(exp, karg);
3244                 if (err > 0)
3245                         err = 0;
3246                 GOTO(out, err);
3247         case LL_IOC_LOV_GETSTRIPE:
3248                 err = osc_getstripe(karg, uarg);
3249                 GOTO(out, err);
3250         case OBD_IOC_CLIENT_RECOVER:
3251                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3252                                             data->ioc_inlbuf1);
3253                 if (err > 0)
3254                         err = 0;
3255                 GOTO(out, err);
3256         case IOC_OSC_SET_ACTIVE:
3257                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3258                                                data->ioc_offset);
3259                 GOTO(out, err);
3260         case OBD_IOC_POLL_QUOTACHECK:
3261                 err = lquota_poll_check(quota_interface, exp,
3262                                         (struct if_quotacheck *)karg);
3263                 GOTO(out, err);
3264         default:
3265                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3266                        cmd, cfs_curproc_comm());
3267                 GOTO(out, err = -ENOTTY);
3268         }
3269 out:
3270 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3271         MOD_DEC_USE_COUNT;
3272 #else
3273         module_put(THIS_MODULE);
3274 #endif
3275         return err;
3276 }
3277
3278 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3279                         void *key, __u32 *vallen, void *val)
3280 {
3281         ENTRY;
3282         if (!vallen || !val)
3283                 RETURN(-EFAULT);
3284
3285         if (keylen > strlen("lock_to_stripe") &&
3286             strcmp(key, "lock_to_stripe") == 0) {
3287                 __u32 *stripe = val;
3288                 *vallen = sizeof(*stripe);
3289                 *stripe = 0;
3290                 RETURN(0);
3291         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3292                 struct ptlrpc_request *req;
3293                 obd_id *reply;
3294                 char *bufs[2] = { NULL, key };
3295                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3296
3297                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3298                                       OST_GET_INFO, 2, size, bufs);
3299                 if (req == NULL)
3300                         RETURN(-ENOMEM);
3301
3302                 size[REPLY_REC_OFF] = *vallen;
3303                 ptlrpc_req_set_repsize(req, 2, size);
3304                 rc = ptlrpc_queue_wait(req);
3305                 if (rc)
3306                         GOTO(out, rc);
3307
3308                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3309                                            lustre_swab_ost_last_id);
3310                 if (reply == NULL) {
3311                         CERROR("Can't unpack OST last ID\n");
3312                         GOTO(out, rc = -EPROTO);
3313                 }
3314                 *((obd_id *)val) = *reply;
3315         out:
3316                 ptlrpc_req_finished(req);
3317                 RETURN(rc);
3318         }
3319         RETURN(-EINVAL);
3320 }
3321
3322 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3323                                           void *aa, int rc)
3324 {
3325         struct llog_ctxt *ctxt;
3326         struct obd_import *imp = req->rq_import;
3327         ENTRY;
3328
3329         if (rc != 0)
3330                 RETURN(rc);
3331
3332         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3333         if (ctxt) {
3334                 if (rc == 0)
3335                         rc = llog_initiator_connect(ctxt);
3336                 else
3337                         CERROR("cannot establish connection for "
3338                                "ctxt %p: %d\n", ctxt, rc);
3339         }
3340
3341         spin_lock(&imp->imp_lock);
3342         imp->imp_server_timeout = 1;
3343         imp->imp_pingable = 1;
3344         spin_unlock(&imp->imp_lock);
3345         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3346
3347         RETURN(rc);
3348 }
3349
3350 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3351                               void *key, obd_count vallen, void *val,
3352                               struct ptlrpc_request_set *set)
3353 {
3354         struct ptlrpc_request *req;
3355         struct obd_device  *obd = exp->exp_obd;
3356         struct obd_import *imp = class_exp2cliimp(exp);
3357         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3358         char *bufs[3] = { NULL, key, val };
3359         ENTRY;
3360
3361         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3362
3363         if (KEY_IS(KEY_NEXT_ID)) {
3364                 if (vallen != sizeof(obd_id))
3365                         RETURN(-EINVAL);
3366                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3367                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3368                        exp->exp_obd->obd_name,
3369                        obd->u.cli.cl_oscc.oscc_next_id);
3370
3371                 RETURN(0);
3372         }
3373
3374         if (KEY_IS("unlinked")) {
3375                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3376                 spin_lock(&oscc->oscc_lock);
3377                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3378                 spin_unlock(&oscc->oscc_lock);
3379                 RETURN(0);
3380         }
3381
3382         if (KEY_IS(KEY_INIT_RECOV)) {
3383                 if (vallen != sizeof(int))
3384                         RETURN(-EINVAL);
3385                 spin_lock(&imp->imp_lock);
3386                 imp->imp_initial_recov = *(int *)val;
3387                 spin_unlock(&imp->imp_lock);
3388                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3389                        exp->exp_obd->obd_name,
3390                        imp->imp_initial_recov);
3391                 RETURN(0);
3392         }
3393
3394         if (KEY_IS("checksum")) {
3395                 if (vallen != sizeof(int))
3396                         RETURN(-EINVAL);
3397                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3398                 RETURN(0);
3399         }
3400
3401         if (KEY_IS(KEY_FLUSH_CTX)) {
3402                 sptlrpc_import_flush_my_ctx(imp);
3403                 RETURN(0);
3404         }
3405
3406         if (!set)
3407                 RETURN(-EINVAL);
3408
3409         /* We pass all other commands directly to OST. Since nobody calls osc
3410            methods directly and everybody is supposed to go through LOV, we
3411            assume lov checked invalid values for us.
3412            The only recognised values so far are evict_by_nid and mds_conn.
3413            Even if something bad goes through, we'd get a -EINVAL from OST
3414            anyway. */
3415
3416         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3417                               bufs);
3418         if (req == NULL)
3419                 RETURN(-ENOMEM);
3420
3421         if (KEY_IS("mds_conn")) {
3422                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3423
3424                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3425                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3426                 LASSERT(oscc->oscc_oa.o_gr > 0);
3427                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3428         }
3429
3430         ptlrpc_req_set_repsize(req, 1, NULL);
3431         ptlrpc_set_add_req(set, req);
3432         ptlrpc_check_set(set);
3433
3434         RETURN(0);
3435 }
3436
3437
3438 static struct llog_operations osc_size_repl_logops = {
3439         lop_cancel: llog_obd_repl_cancel
3440 };
3441
3442 static struct llog_operations osc_mds_ost_orig_logops;
3443 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
3444                          struct obd_device *tgt, int count,
3445                          struct llog_catid *catid, struct obd_uuid *uuid)
3446 {
3447         int rc;
3448         ENTRY;
3449
3450         spin_lock(&obd->obd_dev_lock);
3451         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3452                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3453                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3454                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3455                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3456                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3457         }
3458         spin_unlock(&obd->obd_dev_lock);
3459
3460         rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3461                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3462         if (rc) {
3463                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3464                 GOTO (out, rc);
3465         }
3466
3467         rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3468                         &osc_size_repl_logops);
3469         if (rc)
3470                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3471 out:
3472         if (rc) {
3473                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3474                        obd->obd_name, tgt->obd_name, count, catid, rc);
3475                 CERROR("logid "LPX64":0x%x\n",
3476                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3477         }
3478         RETURN(rc);
3479 }
3480
3481 static int osc_llog_finish(struct obd_device *obd, int count)
3482 {
3483         struct llog_ctxt *ctxt;
3484         int rc = 0, rc2 = 0;
3485         ENTRY;
3486
3487         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3488         if (ctxt)
3489                 rc = llog_cleanup(ctxt);
3490
3491         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3492         if (ctxt)
3493                 rc2 = llog_cleanup(ctxt);
3494         if (!rc)
3495                 rc = rc2;
3496
3497         RETURN(rc);
3498 }
3499
3500 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3501                          struct obd_uuid *cluuid,
3502                          struct obd_connect_data *data)
3503 {
3504         struct client_obd *cli = &obd->u.cli;
3505
3506         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3507                 long lost_grant;
3508
3509                 client_obd_list_lock(&cli->cl_loi_list_lock);
3510                 data->ocd_grant = cli->cl_avail_grant ?:
3511                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3512                 lost_grant = cli->cl_lost_grant;
3513                 cli->cl_lost_grant = 0;
3514                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3515
3516                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3517                        "cl_lost_grant: %ld\n", data->ocd_grant,
3518                        cli->cl_avail_grant, lost_grant);
3519                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3520                        " ocd_grant: %d\n", data->ocd_connect_flags,
3521                        data->ocd_version, data->ocd_grant);
3522         }
3523
3524         RETURN(0);
3525 }
3526
3527 static int osc_disconnect(struct obd_export *exp)
3528 {
3529         struct obd_device *obd = class_exp2obd(exp);
3530         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3531         int rc;
3532
3533         if (obd->u.cli.cl_conn_count == 1)
3534                 /* flush any remaining cancel messages out to the target */
3535                 llog_sync(ctxt, exp);
3536
3537         rc = client_disconnect_export(exp);
3538         return rc;
3539 }
3540
3541 static int osc_import_event(struct obd_device *obd,
3542                             struct obd_import *imp,
3543                             enum obd_import_event event)
3544 {
3545         struct client_obd *cli;
3546         int rc = 0;
3547
3548         ENTRY;
3549         LASSERT(imp->imp_obd == obd);
3550
3551         switch (event) {
3552         case IMP_EVENT_DISCON: {
3553                 /* Only do this on the MDS OSC's */
3554                 if (imp->imp_server_timeout) {
3555                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3556
3557                         spin_lock(&oscc->oscc_lock);
3558                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3559                         spin_unlock(&oscc->oscc_lock);
3560                 }
3561                 cli = &obd->u.cli;
3562                 client_obd_list_lock(&cli->cl_loi_list_lock);
3563                 cli->cl_avail_grant = 0;
3564                 cli->cl_lost_grant = 0;
3565                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3566                 break;
3567         }
3568         case IMP_EVENT_INACTIVE: {
3569                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3570                 break;
3571         }
3572         case IMP_EVENT_INVALIDATE: {
3573                 struct ldlm_namespace *ns = obd->obd_namespace;
3574
3575                 /* Reset grants */
3576                 cli = &obd->u.cli;
3577                 client_obd_list_lock(&cli->cl_loi_list_lock);
3578                 /* all pages go to failing rpcs due to the invalid import */
3579                 osc_check_rpcs(cli);
3580                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3581
3582                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3583
3584                 break;
3585         }
3586         case IMP_EVENT_ACTIVE: {
3587                 /* Only do this on the MDS OSC's */
3588                 if (imp->imp_server_timeout) {
3589                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3590
3591                         spin_lock(&oscc->oscc_lock);
3592                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3593                         spin_unlock(&oscc->oscc_lock);
3594                 }
3595                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3596                 break;
3597         }
3598         case IMP_EVENT_OCD: {
3599                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3600
3601                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3602                         osc_init_grant(&obd->u.cli, ocd);
3603
3604                 /* See bug 7198 */
3605                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3606                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3607
3608                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3609                 break;
3610         }
3611         default:
3612                 CERROR("Unknown import event %d\n", event);
3613                 LBUG();
3614         }
3615         RETURN(rc);
3616 }
3617
3618 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3619 {
3620         int rc;
3621         ENTRY;
3622
3623         ENTRY;
3624         rc = ptlrpcd_addref();
3625         if (rc)
3626                 RETURN(rc);
3627
3628         rc = client_obd_setup(obd, lcfg);
3629         if (rc) {
3630                 ptlrpcd_decref();
3631         } else {
3632                 struct lprocfs_static_vars lvars;
3633                 struct client_obd *cli = &obd->u.cli;
3634
3635                 lprocfs_init_vars(osc, &lvars);
3636                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3637                         lproc_osc_attach_seqstat(obd);
3638                         ptlrpc_lprocfs_register_obd(obd);
3639                 }
3640
3641                 oscc_init(obd);
3642                 /* We need to allocate a few requests more, because
3643                    brw_interpret_oap tries to create new requests before freeing
3644                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3645                    reserved, but I afraid that might be too much wasted RAM
3646                    in fact, so 2 is just my guess and still should work. */
3647                 cli->cl_import->imp_rq_pool =
3648                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3649                                             OST_MAXREQSIZE,
3650                                             ptlrpc_add_rqs_to_pool);
3651         }
3652
3653         RETURN(rc);
3654 }
3655
3656 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3657 {
3658         int rc = 0;
3659         ENTRY;
3660
3661         switch (stage) {
3662         case OBD_CLEANUP_EARLY: {
3663                 struct obd_import *imp;
3664                 imp = obd->u.cli.cl_import;
3665                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3666                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3667                 ptlrpc_deactivate_import(imp);
3668                 spin_lock(&imp->imp_lock);
3669                 imp->imp_pingable = 0;
3670                 spin_unlock(&imp->imp_lock);
3671                 break;
3672         }
3673         case OBD_CLEANUP_EXPORTS: {
3674                 /* If we set up but never connected, the
3675                    client import will not have been cleaned. */
3676                 if (obd->u.cli.cl_import) {
3677                         struct obd_import *imp;
3678                         imp = obd->u.cli.cl_import;
3679                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3680                                obd->obd_name);
3681                         ptlrpc_invalidate_import(imp);
3682                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3683                         class_destroy_import(imp);
3684                         obd->u.cli.cl_import = NULL;
3685                 }
3686                 break;
3687         }
3688         case OBD_CLEANUP_SELF_EXP:
3689                 rc = obd_llog_finish(obd, 0);
3690                 if (rc != 0)
3691                         CERROR("failed to cleanup llogging subsystems\n");
3692                 break;
3693         case OBD_CLEANUP_OBD:
3694                 break;
3695         }
3696         RETURN(rc);
3697 }
3698
3699 int osc_cleanup(struct obd_device *obd)
3700 {
3701         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3702         int rc;
3703
3704         ENTRY;
3705         ptlrpc_lprocfs_unregister_obd(obd);
3706         lprocfs_obd_cleanup(obd);
3707
3708         spin_lock(&oscc->oscc_lock);
3709         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3710         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3711         spin_unlock(&oscc->oscc_lock);
3712
3713         /* free memory of osc quota cache */
3714         lquota_cleanup(quota_interface, obd);
3715
3716         rc = client_obd_cleanup(obd);
3717
3718         ptlrpcd_decref();
3719         RETURN(rc);
3720 }
3721
3722 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3723 {
3724         struct lustre_cfg *lcfg = buf;
3725         struct lprocfs_static_vars lvars;
3726         int rc = 0;
3727
3728         lprocfs_init_vars(osc, &lvars);
3729
3730         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3731         return(rc);
3732 }
3733
3734 struct obd_ops osc_obd_ops = {
3735         .o_owner                = THIS_MODULE,
3736         .o_setup                = osc_setup,
3737         .o_precleanup           = osc_precleanup,
3738         .o_cleanup              = osc_cleanup,
3739         .o_add_conn             = client_import_add_conn,
3740         .o_del_conn             = client_import_del_conn,
3741         .o_connect              = client_connect_import,
3742         .o_reconnect            = osc_reconnect,
3743         .o_disconnect           = osc_disconnect,
3744         .o_statfs               = osc_statfs,
3745         .o_statfs_async         = osc_statfs_async,
3746         .o_packmd               = osc_packmd,
3747         .o_unpackmd             = osc_unpackmd,
3748         .o_precreate            = osc_precreate,
3749         .o_create               = osc_create,
3750         .o_destroy              = osc_destroy,
3751         .o_getattr              = osc_getattr,
3752         .o_getattr_async        = osc_getattr_async,
3753         .o_setattr              = osc_setattr,
3754         .o_setattr_async        = osc_setattr_async,
3755         .o_brw                  = osc_brw,
3756         .o_brw_async            = osc_brw_async,
3757         .o_prep_async_page      = osc_prep_async_page,
3758         .o_queue_async_io       = osc_queue_async_io,
3759         .o_set_async_flags      = osc_set_async_flags,
3760         .o_queue_group_io       = osc_queue_group_io,
3761         .o_trigger_group_io     = osc_trigger_group_io,
3762         .o_teardown_async_page  = osc_teardown_async_page,
3763         .o_punch                = osc_punch,
3764         .o_sync                 = osc_sync,
3765         .o_enqueue              = osc_enqueue,
3766         .o_match                = osc_match,
3767         .o_change_cbdata        = osc_change_cbdata,
3768         .o_cancel               = osc_cancel,
3769         .o_cancel_unused        = osc_cancel_unused,
3770         .o_join_lru             = osc_join_lru,
3771         .o_iocontrol            = osc_iocontrol,
3772         .o_get_info             = osc_get_info,
3773         .o_set_info_async       = osc_set_info_async,
3774         .o_import_event         = osc_import_event,
3775         .o_llog_init            = osc_llog_init,
3776         .o_llog_finish          = osc_llog_finish,
3777         .o_process_config       = osc_process_config,
3778 };
3779
3780 int __init osc_init(void)
3781 {
3782         struct lprocfs_static_vars lvars;
3783         int rc;
3784         ENTRY;
3785
3786         lprocfs_init_vars(osc, &lvars);
3787
3788         request_module("lquota");
3789         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3790         lquota_init(quota_interface);
3791         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3792
3793         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3794                                  LUSTRE_OSC_NAME, NULL);
3795         if (rc) {
3796                 if (quota_interface)
3797                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3798                 RETURN(rc);
3799         }
3800
3801         RETURN(rc);
3802 }
3803
3804 #ifdef __KERNEL__
3805 static void /*__exit*/ osc_exit(void)
3806 {
3807         lquota_exit(quota_interface);
3808         if (quota_interface)
3809                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3810
3811         class_unregister_type(LUSTRE_OSC_NAME);
3812 }
3813
3814 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3815 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3816 MODULE_LICENSE("GPL");
3817
3818 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3819 #endif