Whamcloud - gitweb
31a54adedb74bb6041d64dc5042c8d8d0797d7b6
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
95         }
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         ENTRY;
106
107         if (lmm != NULL) {
108                 if (lmm_bytes < sizeof (*lmm)) {
109                         CERROR("lov_mds_md too small: %d, need %d\n",
110                                lmm_bytes, (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (lmm->lmm_object_id == 0) {
116                         CERROR("lov_mds_md: zero lmm_object_id\n");
117                         RETURN(-EINVAL);
118                 }
119         }
120
121         lsm_size = lov_stripe_md_size(1);
122         if (lsmp == NULL)
123                 RETURN(lsm_size);
124
125         if (*lsmp != NULL && lmm == NULL) {
126                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
127                 OBD_FREE(*lsmp, lsm_size);
128                 *lsmp = NULL;
129                 RETURN(0);
130         }
131
132         if (*lsmp == NULL) {
133                 OBD_ALLOC(*lsmp, lsm_size);
134                 if (*lsmp == NULL)
135                         RETURN(-ENOMEM);
136                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
137                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
138                         OBD_FREE(*lsmp, lsm_size);
139                         RETURN(-ENOMEM);
140                 }
141                 loi_init((*lsmp)->lsm_oinfo[0]);
142         }
143
144         if (lmm != NULL) {
145                 /* XXX zero *lsmp? */
146                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
147                 LASSERT((*lsmp)->lsm_object_id);
148         }
149
150         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
151
152         RETURN(lsm_size);
153 }
154
155 static int osc_getattr_interpret(struct ptlrpc_request *req,
156                                  struct osc_async_args *aa, int rc)
157 {
158         struct ost_body *body;
159         ENTRY;
160
161         if (rc != 0)
162                 GOTO(out, rc);
163
164         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
165                                   lustre_swab_ost_body);
166         if (body) {
167                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
168                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
169
170                 /* This should really be sent by the OST */
171                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
172                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
173         } else {
174                 CERROR("can't unpack ost_body\n");
175                 rc = -EPROTO;
176                 aa->aa_oi->oi_oa->o_valid = 0;
177         }
178 out:
179         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
180         RETURN(rc);
181 }
182
183 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
184                              struct ptlrpc_request_set *set)
185 {
186         struct ptlrpc_request *req;
187         struct ost_body *body;
188         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
189         struct osc_async_args *aa;
190         ENTRY;
191
192         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
193                               OST_GETATTR, 2, size,NULL);
194         if (!req)
195                 RETURN(-ENOMEM);
196
197         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
198         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
199
200         ptlrpc_req_set_repsize(req, 2, size);
201         req->rq_interpret_reply = osc_getattr_interpret;
202
203         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
204         aa = (struct osc_async_args *)&req->rq_async_args;
205         aa->aa_oi = oinfo;
206
207         ptlrpc_set_add_req(set, req);
208         RETURN (0);
209 }
210
211 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
212 {
213         struct ptlrpc_request *req;
214         struct ost_body *body;
215         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
216         ENTRY;
217
218         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
219                               OST_GETATTR, 2, size, NULL);
220         if (!req)
221                 RETURN(-ENOMEM);
222
223         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
224         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
225
226         ptlrpc_req_set_repsize(req, 2, size);
227
228         rc = ptlrpc_queue_wait(req);
229         if (rc) {
230                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
231                 GOTO(out, rc);
232         }
233
234         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
235                                   lustre_swab_ost_body);
236         if (body == NULL) {
237                 CERROR ("can't unpack ost_body\n");
238                 GOTO (out, rc = -EPROTO);
239         }
240
241         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
242         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
243
244         /* This should really be sent by the OST */
245         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
246         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
247
248         EXIT;
249  out:
250         ptlrpc_req_finished(req);
251         return rc;
252 }
253
254 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
255                        struct obd_trans_info *oti)
256 {
257         struct ptlrpc_request *req;
258         struct ost_body *body;
259         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
260         ENTRY;
261
262         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
263                               OST_SETATTR, 2, size, NULL);
264         if (!req)
265                 RETURN(-ENOMEM);
266
267         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
268         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
269
270         ptlrpc_req_set_repsize(req, 2, size);
271
272         rc = ptlrpc_queue_wait(req);
273         if (rc)
274                 GOTO(out, rc);
275
276         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
277                                   lustre_swab_ost_body);
278         if (body == NULL)
279                 GOTO(out, rc = -EPROTO);
280
281         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
282
283         EXIT;
284 out:
285         ptlrpc_req_finished(req);
286         RETURN(rc);
287 }
288
289 static int osc_setattr_interpret(struct ptlrpc_request *req,
290                                  struct osc_async_args *aa, int rc)
291 {
292         struct ost_body *body;
293         ENTRY;
294
295         if (rc != 0)
296                 GOTO(out, rc);
297
298         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
299                                   lustre_swab_ost_body);
300         if (body == NULL) {
301                 CERROR("can't unpack ost_body\n");
302                 GOTO(out, rc = -EPROTO);
303         }
304
305         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
306 out:
307         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
308         RETURN(rc);
309 }
310
311 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
312                              struct obd_trans_info *oti,
313                              struct ptlrpc_request_set *rqset)
314 {
315         struct ptlrpc_request *req;
316         struct ost_body *body;
317         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
318         struct osc_async_args *aa;
319         ENTRY;
320
321         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
322                               OST_SETATTR, 2, size, NULL);
323         if (!req)
324                 RETURN(-ENOMEM);
325
326         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
327
328         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
329                 LASSERT(oti);
330                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
331                        sizeof(*oti->oti_logcookies));
332         }
333
334         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
335         ptlrpc_req_set_repsize(req, 2, size);
336         /* do mds to ost setattr asynchronouly */
337         if (!rqset) {
338                 /* Do not wait for response. */
339                 ptlrpcd_add_req(req);
340         } else {
341                 req->rq_interpret_reply = osc_setattr_interpret;
342
343                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
344                 aa = (struct osc_async_args *)&req->rq_async_args;
345                 aa->aa_oi = oinfo;
346
347                 ptlrpc_set_add_req(rqset, req);
348         }
349
350         RETURN(0);
351 }
352
353 int osc_real_create(struct obd_export *exp, struct obdo *oa,
354                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
355 {
356         struct ptlrpc_request *req;
357         struct ost_body *body;
358         struct lov_stripe_md *lsm;
359         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
360         ENTRY;
361
362         LASSERT(oa);
363         LASSERT(ea);
364
365         lsm = *ea;
366         if (!lsm) {
367                 rc = obd_alloc_memmd(exp, &lsm);
368                 if (rc < 0)
369                         RETURN(rc);
370         }
371
372         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
373                               OST_CREATE, 2, size, NULL);
374         if (!req)
375                 GOTO(out, rc = -ENOMEM);
376
377         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
378         memcpy(&body->oa, oa, sizeof(body->oa));
379
380         ptlrpc_req_set_repsize(req, 2, size);
381         if (oa->o_valid & OBD_MD_FLINLINE) {
382                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
383                         oa->o_flags == OBD_FL_DELORPHAN);
384                 DEBUG_REQ(D_HA, req,
385                           "delorphan from OST integration");
386                 /* Don't resend the delorphan req */
387                 req->rq_no_resend = req->rq_no_delay = 1;
388         }
389
390         rc = ptlrpc_queue_wait(req);
391         if (rc)
392                 GOTO(out_req, rc);
393
394         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
395                                   lustre_swab_ost_body);
396         if (body == NULL) {
397                 CERROR ("can't unpack ost_body\n");
398                 GOTO (out_req, rc = -EPROTO);
399         }
400
401         memcpy(oa, &body->oa, sizeof(*oa));
402
403         /* This should really be sent by the OST */
404         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
405         oa->o_valid |= OBD_MD_FLBLKSZ;
406
407         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
408          * have valid lsm_oinfo data structs, so don't go touching that.
409          * This needs to be fixed in a big way.
410          */
411         lsm->lsm_object_id = oa->o_id;
412         *ea = lsm;
413
414         if (oti != NULL) {
415                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
416
417                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
418                         if (!oti->oti_logcookies)
419                                 oti_alloc_cookies(oti, 1);
420                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
421                                sizeof(oti->oti_onecookie));
422                 }
423         }
424
425         CDEBUG(D_HA, "transno: "LPD64"\n",
426                lustre_msg_get_transno(req->rq_repmsg));
427         EXIT;
428 out_req:
429         ptlrpc_req_finished(req);
430 out:
431         if (rc && !*ea)
432                 obd_free_memmd(exp, &lsm);
433         return rc;
434 }
435
436 static int osc_punch_interpret(struct ptlrpc_request *req,
437                                struct osc_async_args *aa, int rc)
438 {
439         struct ost_body *body;
440         ENTRY;
441
442         if (rc != 0)
443                 GOTO(out, rc);
444
445         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
446                                   lustre_swab_ost_body);
447         if (body == NULL) {
448                 CERROR ("can't unpack ost_body\n");
449                 GOTO(out, rc = -EPROTO);
450         }
451
452         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
453 out:
454         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
455         RETURN(rc);
456 }
457
458 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
459                      struct obd_trans_info *oti,
460                      struct ptlrpc_request_set *rqset)
461 {
462         struct ptlrpc_request *req;
463         struct osc_async_args *aa;
464         struct ost_body *body;
465         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
466         ENTRY;
467
468         if (!oinfo->oi_oa) {
469                 CERROR("oa NULL\n");
470                 RETURN(-EINVAL);
471         }
472
473         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
474                               OST_PUNCH, 2, size, NULL);
475         if (!req)
476                 RETURN(-ENOMEM);
477
478         /* FIXME bug 249. Also see bug 7198 */
479         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
480             OBD_CONNECT_REQPORTAL)
481                 req->rq_request_portal = OST_IO_PORTAL;
482
483         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
484         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
485
486         /* overload the size and blocks fields in the oa with start/end */
487         body->oa.o_size = oinfo->oi_policy.l_extent.start;
488         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
489         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
490
491         ptlrpc_req_set_repsize(req, 2, size);
492
493         req->rq_interpret_reply = osc_punch_interpret;
494         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
495         aa = (struct osc_async_args *)&req->rq_async_args;
496         aa->aa_oi = oinfo;
497         ptlrpc_set_add_req(rqset, req);
498
499         RETURN(0);
500 }
501
502 static int osc_sync(struct obd_export *exp, struct obdo *oa,
503                     struct lov_stripe_md *md, obd_size start, obd_size end)
504 {
505         struct ptlrpc_request *req;
506         struct ost_body *body;
507         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
508         ENTRY;
509
510         if (!oa) {
511                 CERROR("oa NULL\n");
512                 RETURN(-EINVAL);
513         }
514
515         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
516                               OST_SYNC, 2, size, NULL);
517         if (!req)
518                 RETURN(-ENOMEM);
519
520         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
521         memcpy(&body->oa, oa, sizeof(*oa));
522
523         /* overload the size and blocks fields in the oa with start/end */
524         body->oa.o_size = start;
525         body->oa.o_blocks = end;
526         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
527
528         ptlrpc_req_set_repsize(req, 2, size);
529
530         rc = ptlrpc_queue_wait(req);
531         if (rc)
532                 GOTO(out, rc);
533
534         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
535                                   lustre_swab_ost_body);
536         if (body == NULL) {
537                 CERROR ("can't unpack ost_body\n");
538                 GOTO (out, rc = -EPROTO);
539         }
540
541         memcpy(oa, &body->oa, sizeof(*oa));
542
543         EXIT;
544  out:
545         ptlrpc_req_finished(req);
546         return rc;
547 }
548
549 /* Destroy requests can be async always on the client, and we don't even really
550  * care about the return code since the client cannot do anything at all about
551  * a destroy failure.
552  * When the MDS is unlinking a filename, it saves the file objects into a
553  * recovery llog, and these object records are cancelled when the OST reports
554  * they were destroyed and sync'd to disk (i.e. transaction committed).
555  * If the client dies, or the OST is down when the object should be destroyed,
556  * the records are not cancelled, and when the OST reconnects to the MDS next,
557  * it will retrieve the llog unlink logs and then sends the log cancellation
558  * cookies to the MDS after committing destroy transactions. */
559 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
560                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
561                        struct obd_export *md_export)
562 {
563         struct ptlrpc_request *req;
564         struct ost_body *body;
565         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
566         ENTRY;
567
568         if (!oa) {
569                 CERROR("oa NULL\n");
570                 RETURN(-EINVAL);
571         }
572
573         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
574                               OST_DESTROY, 2, size, NULL);
575         if (!req)
576                 RETURN(-ENOMEM);
577
578         /* FIXME bug 249. Also see bug 7198 */
579         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
580             OBD_CONNECT_REQPORTAL)
581                 req->rq_request_portal = OST_IO_PORTAL;
582
583         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
584
585         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
586                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
587                        sizeof(*oti->oti_logcookies));
588         }
589
590         memcpy(&body->oa, oa, sizeof(*oa));
591         ptlrpc_req_set_repsize(req, 2, size);
592
593         ptlrpcd_add_req(req);
594         RETURN(0);
595 }
596
597 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
598                                 long writing_bytes)
599 {
600         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
601
602         LASSERT(!(oa->o_valid & bits));
603
604         oa->o_valid |= bits;
605         client_obd_list_lock(&cli->cl_loi_list_lock);
606         oa->o_dirty = cli->cl_dirty;
607         if (cli->cl_dirty > cli->cl_dirty_max) {
608                 CERROR("dirty %lu > dirty_max %lu\n",
609                        cli->cl_dirty, cli->cl_dirty_max);
610                 oa->o_undirty = 0;
611         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
612                 CERROR("dirty %d > system dirty_max %d\n",
613                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
614                 oa->o_undirty = 0;
615         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
616                 CERROR("dirty %lu - dirty_max %lu too big???\n",
617                        cli->cl_dirty, cli->cl_dirty_max);
618                 oa->o_undirty = 0;
619         } else {
620                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
621                                 (cli->cl_max_rpcs_in_flight + 1);
622                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
623         }
624         oa->o_grant = cli->cl_avail_grant;
625         oa->o_dropped = cli->cl_lost_grant;
626         cli->cl_lost_grant = 0;
627         client_obd_list_unlock(&cli->cl_loi_list_lock);
628         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
629                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
630 }
631
632 /* caller must hold loi_list_lock */
633 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
634 {
635         atomic_inc(&obd_dirty_pages);
636         cli->cl_dirty += CFS_PAGE_SIZE;
637         cli->cl_avail_grant -= CFS_PAGE_SIZE;
638         pga->flag |= OBD_BRW_FROM_GRANT;
639         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
640                CFS_PAGE_SIZE, pga, pga->pg);
641         LASSERT(cli->cl_avail_grant >= 0);
642 }
643
644 /* the companion to osc_consume_write_grant, called when a brw has completed.
645  * must be called with the loi lock held. */
646 static void osc_release_write_grant(struct client_obd *cli,
647                                     struct brw_page *pga, int sent)
648 {
649         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
650         ENTRY;
651
652         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
653                 EXIT;
654                 return;
655         }
656
657         pga->flag &= ~OBD_BRW_FROM_GRANT;
658         atomic_dec(&obd_dirty_pages);
659         cli->cl_dirty -= CFS_PAGE_SIZE;
660         if (!sent) {
661                 cli->cl_lost_grant += CFS_PAGE_SIZE;
662                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
663                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
664         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
665                 /* For short writes we shouldn't count parts of pages that
666                  * span a whole block on the OST side, or our accounting goes
667                  * wrong.  Should match the code in filter_grant_check. */
668                 int offset = pga->off & ~CFS_PAGE_MASK;
669                 int count = pga->count + (offset & (blocksize - 1));
670                 int end = (offset + pga->count) & (blocksize - 1);
671                 if (end)
672                         count += blocksize - end;
673
674                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
675                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
676                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
677                        cli->cl_avail_grant, cli->cl_dirty);
678         }
679
680         EXIT;
681 }
682
683 static unsigned long rpcs_in_flight(struct client_obd *cli)
684 {
685         return cli->cl_r_in_flight + cli->cl_w_in_flight;
686 }
687
688 /* caller must hold loi_list_lock */
689 void osc_wake_cache_waiters(struct client_obd *cli)
690 {
691         struct list_head *l, *tmp;
692         struct osc_cache_waiter *ocw;
693
694         ENTRY;
695         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
696                 /* if we can't dirty more, we must wait until some is written */
697                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
698                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
699                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
700                                "osc max %ld, sys max %d\n", cli->cl_dirty,
701                                cli->cl_dirty_max, obd_max_dirty_pages);
702                         return;
703                 }
704
705                 /* if still dirty cache but no grant wait for pending RPCs that
706                  * may yet return us some grant before doing sync writes */
707                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
708                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
709                                cli->cl_w_in_flight);
710                         return;
711                 }
712
713                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
714                 list_del_init(&ocw->ocw_entry);
715                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
716                         /* no more RPCs in flight to return grant, do sync IO */
717                         ocw->ocw_rc = -EDQUOT;
718                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
719                 } else {
720                         osc_consume_write_grant(cli,
721                                                 &ocw->ocw_oap->oap_brw_page);
722                 }
723
724                 cfs_waitq_signal(&ocw->ocw_waitq);
725         }
726
727         EXIT;
728 }
729
730 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
731 {
732         client_obd_list_lock(&cli->cl_loi_list_lock);
733         cli->cl_avail_grant = ocd->ocd_grant;
734         client_obd_list_unlock(&cli->cl_loi_list_lock);
735
736         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
737                cli->cl_avail_grant, cli->cl_lost_grant);
738         LASSERT(cli->cl_avail_grant >= 0);
739 }
740
741 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
742 {
743         client_obd_list_lock(&cli->cl_loi_list_lock);
744         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
745         cli->cl_avail_grant += body->oa.o_grant;
746         /* waiters are woken in brw_interpret_oap */
747         client_obd_list_unlock(&cli->cl_loi_list_lock);
748 }
749
750 /* We assume that the reason this OSC got a short read is because it read
751  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
752  * via the LOV, and it _knows_ it's reading inside the file, it's just that
753  * this stripe never got written at or beyond this stripe offset yet. */
754 static void handle_short_read(int nob_read, obd_count page_count,
755                               struct brw_page **pga)
756 {
757         char *ptr;
758         int i = 0;
759
760         /* skip bytes read OK */
761         while (nob_read > 0) {
762                 LASSERT (page_count > 0);
763
764                 if (pga[i]->count > nob_read) {
765                         /* EOF inside this page */
766                         ptr = cfs_kmap(pga[i]->pg) + 
767                                 (pga[i]->off & ~CFS_PAGE_MASK);
768                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
769                         cfs_kunmap(pga[i]->pg);
770                         page_count--;
771                         i++;
772                         break;
773                 }
774
775                 nob_read -= pga[i]->count;
776                 page_count--;
777                 i++;
778         }
779
780         /* zero remaining pages */
781         while (page_count-- > 0) {
782                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
783                 memset(ptr, 0, pga[i]->count);
784                 cfs_kunmap(pga[i]->pg);
785                 i++;
786         }
787 }
788
789 static int check_write_rcs(struct ptlrpc_request *req,
790                            int requested_nob, int niocount,
791                            obd_count page_count, struct brw_page **pga)
792 {
793         int    *remote_rcs, i;
794
795         /* return error if any niobuf was in error */
796         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
797                                         sizeof(*remote_rcs) * niocount, NULL);
798         if (remote_rcs == NULL) {
799                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
800                 return(-EPROTO);
801         }
802         if (lustre_msg_swabbed(req->rq_repmsg))
803                 for (i = 0; i < niocount; i++)
804                         __swab32s(&remote_rcs[i]);
805
806         for (i = 0; i < niocount; i++) {
807                 if (remote_rcs[i] < 0)
808                         return(remote_rcs[i]);
809
810                 if (remote_rcs[i] != 0) {
811                         CERROR("rc[%d] invalid (%d) req %p\n",
812                                 i, remote_rcs[i], req);
813                         return(-EPROTO);
814                 }
815         }
816
817         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
818                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
819                        requested_nob, req->rq_bulk->bd_nob_transferred);
820                 return(-EPROTO);
821         }
822
823         return (0);
824 }
825
826 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
827 {
828         if (p1->flag != p2->flag) {
829                 unsigned mask = ~OBD_BRW_FROM_GRANT;
830
831                 /* warn if we try to combine flags that we don't know to be
832                  * safe to combine */
833                 if ((p1->flag & mask) != (p2->flag & mask))
834                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
835                                "same brw?\n", p1->flag, p2->flag);
836                 return 0;
837         }
838
839         return (p1->off + p1->count == p2->off);
840 }
841
842 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
843                                    struct brw_page **pga)
844 {
845         __u32 cksum = ~0;
846         int i = 0;
847
848         LASSERT (pg_count > 0);
849         while (nob > 0 && pg_count > 0) {
850                 char *ptr = cfs_kmap(pga[i]->pg);
851                 int off = pga[i]->off & ~CFS_PAGE_MASK;
852                 int count = pga[i]->count > nob ? nob : pga[i]->count;
853
854                 /* corrupt the data before we compute the checksum, to
855                  * simulate an OST->client data error */
856                 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
857                         memcpy(ptr + off, "bad1", min(4, nob));
858                 cksum = crc32_le(cksum, ptr + off, count);
859                 cfs_kunmap(pga[i]->pg);
860                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
861                                off, cksum);
862
863                 nob -= pga[i]->count;
864                 pg_count--;
865                 i++;
866         }
867         /* For sending we only compute the wrong checksum instead
868          * of corrupting the data so it is still correct on a redo */
869         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
870                 cksum++;
871
872         return cksum;
873 }
874
875 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
876                                 struct lov_stripe_md *lsm, obd_count page_count,
877                                 struct brw_page **pga,
878                                 struct ptlrpc_request **reqp)
879 {
880         struct ptlrpc_request   *req;
881         struct ptlrpc_bulk_desc *desc;
882         struct ost_body         *body;
883         struct obd_ioobj        *ioobj;
884         struct niobuf_remote    *niobuf;
885         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
886         int niocount, i, requested_nob, opc, rc;
887         struct ptlrpc_request_pool *pool;
888         struct osc_brw_async_args *aa;
889
890         ENTRY;
891         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
892         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
893
894         for (niocount = i = 1; i < page_count; i++) {
895                 if (!can_merge_pages(pga[i - 1], pga[i]))
896                         niocount++;
897         }
898
899         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
900         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
901
902         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
903         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
904                                    NULL, pool);
905         if (req == NULL)
906                 RETURN (-ENOMEM);
907
908         /* FIXME bug 249. Also see bug 7198 */
909         if (cli->cl_import->imp_connect_data.ocd_connect_flags &
910             OBD_CONNECT_REQPORTAL)
911                 req->rq_request_portal = OST_IO_PORTAL;
912
913         if (opc == OST_WRITE)
914                 desc = ptlrpc_prep_bulk_imp (req, page_count,
915                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
916         else
917                 desc = ptlrpc_prep_bulk_imp (req, page_count,
918                                              BULK_PUT_SINK, OST_BULK_PORTAL);
919         if (desc == NULL)
920                 GOTO(out, rc = -ENOMEM);
921         /* NB request now owns desc and will free it when it gets freed */
922
923         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
924         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
925         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
926                                 niocount * sizeof(*niobuf));
927
928         memcpy(&body->oa, oa, sizeof(*oa));
929
930         obdo_to_ioobj(oa, ioobj);
931         ioobj->ioo_bufcnt = niocount;
932
933         LASSERT (page_count > 0);
934         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
935                 struct brw_page *pg = pga[i];
936                 struct brw_page *pg_prev = pga[i - 1];
937
938                 LASSERT(pg->count > 0);
939                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
940                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
941                          pg->off, pg->count);
942 #ifdef __LINUX__
943                 LASSERTF(i == 0 || pg->off > pg_prev->off,
944                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
945                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
946                          i, page_count,
947                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
948                          pg_prev->pg, page_private(pg_prev->pg),
949                          pg_prev->pg->index, pg_prev->off);
950 #else
951                 LASSERTF(i == 0 || pg->off > pg_prev->off,
952                          "i %d p_c %u\n", i, page_count);
953 #endif
954                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
955                         (pg->flag & OBD_BRW_SRVLOCK));
956
957                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
958                                       pg->count);
959                 requested_nob += pg->count;
960
961                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
962                         niobuf--;
963                         niobuf->len += pg->count;
964                 } else {
965                         niobuf->offset = pg->off;
966                         niobuf->len    = pg->count;
967                         niobuf->flags  = pg->flag;
968                 }
969         }
970
971         LASSERT((void *)(niobuf - niocount) ==
972                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
973                                niocount * sizeof(*niobuf)));
974         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
975
976         /* size[REQ_REC_OFF] still sizeof (*body) */
977         if (opc == OST_WRITE) {
978                 if (unlikely(cli->cl_checksum)) {
979                         body->oa.o_valid |= OBD_MD_FLCKSUM;
980                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
981                                                              page_count, pga);
982                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
983                                body->oa.o_cksum);
984                         /* save this in 'oa', too, for later checking */
985                         oa->o_valid |= OBD_MD_FLCKSUM;
986                 } else {
987                         /* clear out the checksum flag, in case this is a
988                          * resend but cl_checksum is no longer set. b=11238 */
989                         oa->o_valid &= ~OBD_MD_FLCKSUM;
990                 }
991                 oa->o_cksum = body->oa.o_cksum;
992                 /* 1 RC per niobuf */
993                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
994                 ptlrpc_req_set_repsize(req, 3, size);
995         } else {
996                 if (unlikely(cli->cl_checksum))
997                         body->oa.o_valid |= OBD_MD_FLCKSUM;
998                 /* 1 RC for the whole I/O */
999                 ptlrpc_req_set_repsize(req, 2, size);
1000         }
1001
1002         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1003         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1004         aa->aa_oa = oa;
1005         aa->aa_requested_nob = requested_nob;
1006         aa->aa_nio_count = niocount;
1007         aa->aa_page_count = page_count;
1008         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1009         aa->aa_ppga = pga;
1010         aa->aa_cli = cli;
1011         INIT_LIST_HEAD(&aa->aa_oaps);
1012
1013         *reqp = req;
1014         RETURN (0);
1015
1016  out:
1017         ptlrpc_req_finished (req);
1018         RETURN (rc);
1019 }
1020
1021 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1022                                  __u32 client_cksum, __u32 server_cksum, int nob,
1023                                  obd_count page_count, struct brw_page **pga)
1024 {
1025         __u32 new_cksum;
1026         char *msg;
1027
1028         if (server_cksum == client_cksum) {
1029                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1030                 return 0;
1031         }
1032
1033         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1034
1035         if (new_cksum == server_cksum)
1036                 msg = "changed on the client after we checksummed it";
1037         else if (new_cksum == client_cksum)
1038                 msg = "changed in transit before arrival at OST";
1039         else
1040                 msg = "changed in transit AND doesn't match the original";
1041
1042         LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1043                        " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1044                        msg, libcfs_nid2str(peer->nid),
1045                        oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1046                        oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1047                        oa->o_id,
1048                        oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1049                        pga[0]->off,
1050                        pga[page_count-1]->off + pga[page_count-1]->count - 1);
1051         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1052                client_cksum, server_cksum, new_cksum);
1053
1054         return 1;
1055 }
1056
1057 /* Note rc enters this function as number of bytes transferred */
1058 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1059 {
1060         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1061         const lnet_process_id_t *peer =
1062                         &req->rq_import->imp_connection->c_peer;
1063         struct client_obd *cli = aa->aa_cli;
1064         struct ost_body *body;
1065         __u32 client_cksum = 0;
1066         ENTRY;
1067
1068         if (rc < 0 && rc != -EDQUOT)
1069                 RETURN(rc);
1070
1071         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1072         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1073                                   lustre_swab_ost_body);
1074         if (body == NULL) {
1075                 CERROR ("Can't unpack body\n");
1076                 RETURN(-EPROTO);
1077         }
1078
1079         /* set/clear over quota flag for a uid/gid */
1080         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1081             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1082                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1083                              body->oa.o_gid, body->oa.o_valid,
1084                              body->oa.o_flags);
1085
1086         if (rc < 0)
1087                 RETURN(rc);
1088
1089         if (req->rq_set && req->rq_set->set_countp)
1090                 atomic_add(rc, (atomic_t *)req->rq_set->set_countp);
1091
1092         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1093                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1094
1095         osc_update_grant(cli, body);
1096
1097         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1098                 if (rc > 0) {
1099                         CERROR ("Unexpected +ve rc %d\n", rc);
1100                         RETURN(-EPROTO);
1101                 }
1102                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1103
1104                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1105                              client_cksum &&
1106                              check_write_checksum(&body->oa, peer, client_cksum,
1107                                                  body->oa.o_cksum,
1108                                                  aa->aa_requested_nob,
1109                                                  aa->aa_page_count,
1110                                                  aa->aa_ppga)))
1111                         RETURN(-EAGAIN);
1112
1113                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1114                                      aa->aa_page_count, aa->aa_ppga);
1115                 GOTO(out, rc);
1116         }
1117
1118         /* The rest of this function executes only for OST_READs */
1119         if (rc > aa->aa_requested_nob) {
1120                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1121                        aa->aa_requested_nob);
1122                 RETURN(-EPROTO);
1123         }
1124
1125         if (rc != req->rq_bulk->bd_nob_transferred) {
1126                 CERROR ("Unexpected rc %d (%d transferred)\n",
1127                         rc, req->rq_bulk->bd_nob_transferred);
1128                 return (-EPROTO);
1129         }
1130
1131         if (rc < aa->aa_requested_nob)
1132                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1133
1134         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1135                 static int cksum_counter;
1136                 __u32      server_cksum = body->oa.o_cksum;
1137                 char      *via;
1138                 char      *router;
1139
1140                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1141                                                  aa->aa_ppga);
1142
1143                 if (peer->nid == req->rq_bulk->bd_sender) {
1144                         via = router = "";
1145                 } else {
1146                         via = " via ";
1147                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1148                 }
1149                 
1150                 if (server_cksum == ~0 && rc > 0) {
1151                         CERROR("Protocol error: server %s set the 'checksum' "
1152                                "bit, but didn't send a checksum.  Not fatal, "
1153                                "but please tell CFS.\n",
1154                                libcfs_nid2str(peer->nid));
1155                 } else if (server_cksum != client_cksum) {
1156                         LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s%s%s inum "
1157                                        LPU64"/"LPU64" object "LPU64"/"LPU64
1158                                        " extent ["LPU64"-"LPU64"]\n",
1159                                        req->rq_import->imp_obd->obd_name,
1160                                        libcfs_nid2str(peer->nid),
1161                                        via, router,
1162                                        body->oa.o_valid & OBD_MD_FLFID ?
1163                                                 body->oa.o_fid : (__u64)0,
1164                                        body->oa.o_valid & OBD_MD_FLFID ?
1165                                                 body->oa.o_generation :(__u64)0,
1166                                        body->oa.o_id,
1167                                        body->oa.o_valid & OBD_MD_FLGROUP ?
1168                                                 body->oa.o_gr : (__u64)0,
1169                                        aa->aa_ppga[0]->off,
1170                                        aa->aa_ppga[aa->aa_page_count-1]->off +
1171                                        aa->aa_ppga[aa->aa_page_count-1]->count -
1172                                                                         1);
1173                         CERROR("client %x, server %x\n",
1174                                client_cksum, server_cksum);
1175                         cksum_counter = 0;
1176                         aa->aa_oa->o_cksum = client_cksum;
1177                         rc = -EAGAIN;
1178                 } else {
1179                         cksum_counter++;
1180                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1181                         rc = 0;
1182                 }
1183         } else if (unlikely(client_cksum)) {
1184                 static int cksum_missed;
1185
1186                 cksum_missed++;
1187                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1188                         CERROR("Checksum %u requested from %s but not sent\n",
1189                                cksum_missed, libcfs_nid2str(peer->nid));
1190         } else {
1191                 rc = 0;
1192         }
1193 out:
1194         if (rc >= 0)
1195                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1196
1197         RETURN(rc);
1198 }
1199
1200 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1201                             struct lov_stripe_md *lsm,
1202                             obd_count page_count, struct brw_page **pga)
1203 {
1204         struct ptlrpc_request *request;
1205         int                    rc, retries = 5; /* lprocfs? */
1206         ENTRY;
1207
1208 restart_bulk:
1209         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1210                                   page_count, pga, &request);
1211         if (rc != 0)
1212                 return (rc);
1213
1214         rc = ptlrpc_queue_wait(request);
1215
1216         if (rc == -ETIMEDOUT && request->rq_resend) {
1217                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1218                 ptlrpc_req_finished(request);
1219                 goto restart_bulk;
1220         }
1221
1222         rc = osc_brw_fini_request(request, rc);
1223
1224         ptlrpc_req_finished(request);
1225         if (rc == -EAGAIN) {
1226                 if (retries-- > 0)
1227                         goto restart_bulk;
1228                 rc = -EIO;
1229         }
1230         RETURN(rc);
1231 }
1232
1233 int osc_brw_redo_request(struct ptlrpc_request *request,
1234                          struct osc_brw_async_args *aa)
1235 {
1236         struct ptlrpc_request *new_req;
1237         struct ptlrpc_request_set *set = request->rq_set;
1238         struct osc_brw_async_args *new_aa;
1239         struct osc_async_page *oap;
1240         int rc = 0;
1241         ENTRY;
1242
1243         if (aa->aa_retries-- <= 0) {
1244                 CERROR("too many checksum retries, returning error\n");
1245                 RETURN(-EIO);
1246         }
1247
1248         DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1249         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1250                 if (oap->oap_request != NULL) {
1251                         LASSERTF(request == oap->oap_request,
1252                                  "request %p != oap_request %p\n",
1253                                  request, oap->oap_request);
1254                         if (oap->oap_interrupted) {
1255                                 ptlrpc_mark_interrupted(oap->oap_request);
1256                                 rc = -EINTR;
1257                                 break;
1258                         }
1259                 }
1260         }
1261         if (rc)
1262                 RETURN(rc);
1263
1264         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1265                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1266                                   aa->aa_cli, aa->aa_oa,
1267                                   NULL /* lsm unused by osc currently */,
1268                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1269         if (rc)
1270                 RETURN(rc);
1271
1272         /* New request takes over pga and oaps from old request.
1273          * Note that copying a list_head doesn't work, need to move it... */
1274         new_req->rq_interpret_reply = request->rq_interpret_reply;
1275         new_req->rq_async_args = request->rq_async_args;
1276         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1277         INIT_LIST_HEAD(&new_aa->aa_oaps);
1278         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1279         INIT_LIST_HEAD(&aa->aa_oaps);
1280
1281         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1282                 if (oap->oap_request) {
1283                         ptlrpc_req_finished(oap->oap_request);
1284                         oap->oap_request = ptlrpc_request_addref(new_req);
1285                 }
1286         }
1287
1288         ptlrpc_set_add_req(set, new_req);
1289
1290         RETURN(0);
1291 }
1292
1293 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1294 {
1295         struct osc_brw_async_args *aa = data;
1296         int                        i;
1297         ENTRY;
1298
1299         rc = osc_brw_fini_request(request, rc);
1300         if (rc == -EAGAIN) {
1301                 rc = osc_brw_redo_request(request, aa);
1302                 if (rc == 0)
1303                         RETURN(0);
1304         }
1305
1306         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1307         for (i = 0; i < aa->aa_page_count; i++)
1308                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1309         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1310
1311         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1312
1313         RETURN(rc);
1314 }
1315
1316 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1317                           struct lov_stripe_md *lsm, obd_count page_count,
1318                           struct brw_page **pga, struct ptlrpc_request_set *set)
1319 {
1320         struct ptlrpc_request     *request;
1321         struct client_obd         *cli = &exp->exp_obd->u.cli;
1322         int                        rc, i;
1323         ENTRY;
1324
1325         /* Consume write credits even if doing a sync write -
1326          * otherwise we may run out of space on OST due to grant. */
1327         if (cmd == OBD_BRW_WRITE) {
1328                 spin_lock(&cli->cl_loi_list_lock);
1329                 for (i = 0; i < page_count; i++) {
1330                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1331                                 osc_consume_write_grant(cli, pga[i]);
1332                 }
1333                 spin_unlock(&cli->cl_loi_list_lock);
1334         }
1335
1336         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1337                                   page_count, pga, &request);
1338
1339         if (rc == 0) {
1340                 request->rq_interpret_reply = brw_interpret;
1341                 ptlrpc_set_add_req(set, request);
1342         } else if (cmd == OBD_BRW_WRITE) {
1343                 spin_lock(&cli->cl_loi_list_lock);
1344                 for (i = 0; i < page_count; i++)
1345                         osc_release_write_grant(cli, pga[i], 0);
1346                 spin_unlock(&cli->cl_loi_list_lock);
1347         }
1348
1349         RETURN (rc);
1350 }
1351
1352 /*
1353  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1354  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1355  * fine for our small page arrays and doesn't require allocation.  its an
1356  * insertion sort that swaps elements that are strides apart, shrinking the
1357  * stride down until its '1' and the array is sorted.
1358  */
1359 static void sort_brw_pages(struct brw_page **array, int num)
1360 {
1361         int stride, i, j;
1362         struct brw_page *tmp;
1363
1364         if (num == 1)
1365                 return;
1366         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1367                 ;
1368
1369         do {
1370                 stride /= 3;
1371                 for (i = stride ; i < num ; i++) {
1372                         tmp = array[i];
1373                         j = i;
1374                         while (j >= stride && array[j-stride]->off > tmp->off) {
1375                                 array[j] = array[j - stride];
1376                                 j -= stride;
1377                         }
1378                         array[j] = tmp;
1379                 }
1380         } while (stride > 1);
1381 }
1382
1383 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1384 {
1385         int count = 1;
1386         int offset;
1387         int i = 0;
1388
1389         LASSERT (pages > 0);
1390         offset = pg[i]->off & (~CFS_PAGE_MASK);
1391
1392         for (;;) {
1393                 pages--;
1394                 if (pages == 0)         /* that's all */
1395                         return count;
1396
1397                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1398                         return count;   /* doesn't end on page boundary */
1399
1400                 i++;
1401                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1402                 if (offset != 0)        /* doesn't start on page boundary */
1403                         return count;
1404
1405                 count++;
1406         }
1407 }
1408
1409 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1410 {
1411         struct brw_page **ppga;
1412         int i;
1413
1414         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1415         if (ppga == NULL)
1416                 return NULL;
1417
1418         for (i = 0; i < count; i++)
1419                 ppga[i] = pga + i;
1420         return ppga;
1421 }
1422
1423 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1424 {
1425         LASSERT(ppga != NULL);
1426         OBD_FREE(ppga, sizeof(*ppga) * count);
1427 }
1428
1429 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1430                    obd_count page_count, struct brw_page *pga,
1431                    struct obd_trans_info *oti)
1432 {
1433         struct obdo *saved_oa = NULL;
1434         struct brw_page **ppga, **orig;
1435         struct obd_import *imp = class_exp2cliimp(exp);
1436         struct client_obd *cli = &imp->imp_obd->u.cli;
1437         int rc, page_count_orig;
1438         ENTRY;
1439
1440         if (cmd & OBD_BRW_CHECK) {
1441                 /* The caller just wants to know if there's a chance that this
1442                  * I/O can succeed */
1443
1444                 if (imp == NULL || imp->imp_invalid)
1445                         RETURN(-EIO);
1446                 RETURN(0);
1447         }
1448
1449         /* test_brw with a failed create can trip this, maybe others. */
1450         LASSERT(cli->cl_max_pages_per_rpc);
1451
1452         rc = 0;
1453
1454         orig = ppga = osc_build_ppga(pga, page_count);
1455         if (ppga == NULL)
1456                 RETURN(-ENOMEM);
1457         page_count_orig = page_count;
1458
1459         sort_brw_pages(ppga, page_count);
1460         while (page_count) {
1461                 obd_count pages_per_brw;
1462
1463                 if (page_count > cli->cl_max_pages_per_rpc)
1464                         pages_per_brw = cli->cl_max_pages_per_rpc;
1465                 else
1466                         pages_per_brw = page_count;
1467
1468                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1469
1470                 if (saved_oa != NULL) {
1471                         /* restore previously saved oa */
1472                         *oinfo->oi_oa = *saved_oa;
1473                 } else if (page_count > pages_per_brw) {
1474                         /* save a copy of oa (brw will clobber it) */
1475                         saved_oa = obdo_alloc();
1476                         if (saved_oa == NULL)
1477                                 GOTO(out, rc = -ENOMEM);
1478                         *saved_oa = *oinfo->oi_oa;
1479                 }
1480
1481                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1482                                       pages_per_brw, ppga);
1483
1484                 if (rc != 0)
1485                         break;
1486
1487                 page_count -= pages_per_brw;
1488                 ppga += pages_per_brw;
1489         }
1490
1491 out:
1492         osc_release_ppga(orig, page_count_orig);
1493
1494         if (saved_oa != NULL)
1495                 obdo_free(saved_oa);
1496
1497         RETURN(rc);
1498 }
1499
1500 static int osc_brw_async(int cmd, struct obd_export *exp,
1501                          struct obd_info *oinfo, obd_count page_count,
1502                          struct brw_page *pga, struct obd_trans_info *oti,
1503                          struct ptlrpc_request_set *set)
1504 {
1505         struct brw_page **ppga, **orig;
1506         int page_count_orig;
1507         int rc = 0;
1508         ENTRY;
1509
1510         if (cmd & OBD_BRW_CHECK) {
1511                 /* The caller just wants to know if there's a chance that this
1512                  * I/O can succeed */
1513                 struct obd_import *imp = class_exp2cliimp(exp);
1514
1515                 if (imp == NULL || imp->imp_invalid)
1516                         RETURN(-EIO);
1517                 RETURN(0);
1518         }
1519
1520         orig = ppga = osc_build_ppga(pga, page_count);
1521         if (ppga == NULL)
1522                 RETURN(-ENOMEM);
1523         page_count_orig = page_count;
1524
1525         sort_brw_pages(ppga, page_count);
1526         while (page_count) {
1527                 struct brw_page **copy;
1528                 obd_count pages_per_brw;
1529
1530                 pages_per_brw = min_t(obd_count, page_count,
1531                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1532
1533                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1534
1535                 /* use ppga only if single RPC is going to fly */
1536                 if (pages_per_brw != page_count_orig || ppga != orig) {
1537                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1538                         if (copy == NULL)
1539                                 GOTO(out, rc = -ENOMEM);
1540                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1541                 } else
1542                         copy = ppga;
1543
1544                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1545                                     pages_per_brw, copy, set);
1546
1547                 if (rc != 0) {
1548                         if (copy != ppga)
1549                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1550                         break;
1551                 }
1552
1553                 if (copy == orig) {
1554                         /* we passed it to async_internal() which is
1555                          * now responsible for releasing memory */
1556                         orig = NULL;
1557                 }
1558
1559                 page_count -= pages_per_brw;
1560                 ppga += pages_per_brw;
1561         }
1562 out:
1563         if (orig)
1564                 osc_release_ppga(orig, page_count_orig);
1565         RETURN(rc);
1566 }
1567
1568 static void osc_check_rpcs(struct client_obd *cli);
1569
1570 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1571  * the dirty accounting.  Writeback completes or truncate happens before
1572  * writing starts.  Must be called with the loi lock held. */
1573 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1574                            int sent)
1575 {
1576         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1577 }
1578
1579 /* This maintains the lists of pending pages to read/write for a given object
1580  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1581  * to quickly find objects that are ready to send an RPC. */
1582 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1583                          int cmd)
1584 {
1585         int optimal;
1586         ENTRY;
1587
1588         if (lop->lop_num_pending == 0)
1589                 RETURN(0);
1590
1591         /* if we have an invalid import we want to drain the queued pages
1592          * by forcing them through rpcs that immediately fail and complete
1593          * the pages.  recovery relies on this to empty the queued pages
1594          * before canceling the locks and evicting down the llite pages */
1595         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1596                 RETURN(1);
1597
1598         /* stream rpcs in queue order as long as as there is an urgent page
1599          * queued.  this is our cheap solution for good batching in the case
1600          * where writepage marks some random page in the middle of the file
1601          * as urgent because of, say, memory pressure */
1602         if (!list_empty(&lop->lop_urgent)) {
1603                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1604                 RETURN(1);
1605         }
1606
1607         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1608         optimal = cli->cl_max_pages_per_rpc;
1609         if (cmd & OBD_BRW_WRITE) {
1610                 /* trigger a write rpc stream as long as there are dirtiers
1611                  * waiting for space.  as they're waiting, they're not going to
1612                  * create more pages to coallesce with what's waiting.. */
1613                 if (!list_empty(&cli->cl_cache_waiters)) {
1614                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1615                         RETURN(1);
1616                 }
1617
1618                 /* +16 to avoid triggering rpcs that would want to include pages
1619                  * that are being queued but which can't be made ready until
1620                  * the queuer finishes with the page. this is a wart for
1621                  * llite::commit_write() */
1622                 optimal += 16;
1623         }
1624         if (lop->lop_num_pending >= optimal)
1625                 RETURN(1);
1626
1627         RETURN(0);
1628 }
1629
1630 static void on_list(struct list_head *item, struct list_head *list,
1631                     int should_be_on)
1632 {
1633         if (list_empty(item) && should_be_on)
1634                 list_add_tail(item, list);
1635         else if (!list_empty(item) && !should_be_on)
1636                 list_del_init(item);
1637 }
1638
1639 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1640  * can find pages to build into rpcs quickly */
1641 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1642 {
1643         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1644                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1645                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1646
1647         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1648                 loi->loi_write_lop.lop_num_pending);
1649
1650         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1651                 loi->loi_read_lop.lop_num_pending);
1652 }
1653
1654 static void lop_update_pending(struct client_obd *cli,
1655                                struct loi_oap_pages *lop, int cmd, int delta)
1656 {
1657         lop->lop_num_pending += delta;
1658         if (cmd & OBD_BRW_WRITE)
1659                 cli->cl_pending_w_pages += delta;
1660         else
1661                 cli->cl_pending_r_pages += delta;
1662 }
1663
1664 /* this is called when a sync waiter receives an interruption.  Its job is to
1665  * get the caller woken as soon as possible.  If its page hasn't been put in an
1666  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1667  * desiring interruption which will forcefully complete the rpc once the rpc
1668  * has timed out */
1669 static void osc_occ_interrupted(struct oig_callback_context *occ)
1670 {
1671         struct osc_async_page *oap;
1672         struct loi_oap_pages *lop;
1673         struct lov_oinfo *loi;
1674         ENTRY;
1675
1676         /* XXX member_of() */
1677         oap = list_entry(occ, struct osc_async_page, oap_occ);
1678
1679         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1680
1681         oap->oap_interrupted = 1;
1682
1683         /* ok, it's been put in an rpc. only one oap gets a request reference */
1684         if (oap->oap_request != NULL) {
1685                 ptlrpc_mark_interrupted(oap->oap_request);
1686                 ptlrpcd_wake(oap->oap_request);
1687                 GOTO(unlock, 0);
1688         }
1689
1690         /* we don't get interruption callbacks until osc_trigger_group_io()
1691          * has been called and put the sync oaps in the pending/urgent lists.*/
1692         if (!list_empty(&oap->oap_pending_item)) {
1693                 list_del_init(&oap->oap_pending_item);
1694                 list_del_init(&oap->oap_urgent_item);
1695
1696                 loi = oap->oap_loi;
1697                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1698                         &loi->loi_write_lop : &loi->loi_read_lop;
1699                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1700                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1701
1702                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1703                 oap->oap_oig = NULL;
1704         }
1705
1706 unlock:
1707         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1708 }
1709
1710 /* this is trying to propogate async writeback errors back up to the
1711  * application.  As an async write fails we record the error code for later if
1712  * the app does an fsync.  As long as errors persist we force future rpcs to be
1713  * sync so that the app can get a sync error and break the cycle of queueing
1714  * pages for which writeback will fail. */
1715 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1716                            int rc)
1717 {
1718         if (rc) {
1719                 if (!ar->ar_rc)
1720                         ar->ar_rc = rc;
1721
1722                 ar->ar_force_sync = 1;
1723                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1724                 return;
1725
1726         }
1727
1728         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1729                 ar->ar_force_sync = 0;
1730 }
1731
1732 static void osc_oap_to_pending(struct osc_async_page *oap)
1733 {
1734         struct loi_oap_pages *lop;
1735
1736         if (oap->oap_cmd & OBD_BRW_WRITE)
1737                 lop = &oap->oap_loi->loi_write_lop;
1738         else
1739                 lop = &oap->oap_loi->loi_read_lop;
1740
1741         if (oap->oap_async_flags & ASYNC_URGENT)
1742                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1743         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1744         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1745 }
1746
1747 /* this must be called holding the loi list lock to give coverage to exit_cache,
1748  * async_flag maintenance, and oap_request */
1749 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1750                               struct osc_async_page *oap, int sent, int rc)
1751 {
1752         ENTRY;
1753         oap->oap_async_flags = 0;
1754         oap->oap_interrupted = 0;
1755
1756         if (oap->oap_cmd & OBD_BRW_WRITE) {
1757                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1758                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1759         }
1760
1761         if (oap->oap_request != NULL) {
1762                 ptlrpc_req_finished(oap->oap_request);
1763                 oap->oap_request = NULL;
1764         }
1765
1766         if (rc == 0 && oa != NULL) {
1767                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1768                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1769                 if (oa->o_valid & OBD_MD_FLMTIME)
1770                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1771                 if (oa->o_valid & OBD_MD_FLATIME)
1772                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1773                 if (oa->o_valid & OBD_MD_FLCTIME)
1774                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1775         }
1776
1777         if (oap->oap_oig) {
1778                 osc_exit_cache(cli, oap, sent);
1779                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1780                 oap->oap_oig = NULL;
1781                 EXIT;
1782                 return;
1783         }
1784
1785         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1786                                                 oap->oap_cmd, oa, rc);
1787
1788         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1789          * I/O on the page could start, but OSC calls it under lock
1790          * and thus we can add oap back to pending safely */
1791         if (rc)
1792                 /* upper layer wants to leave the page on pending queue */
1793                 osc_oap_to_pending(oap);
1794         else
1795                 osc_exit_cache(cli, oap, sent);
1796         EXIT;
1797 }
1798
1799 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1800 {
1801         struct osc_brw_async_args *aa = data;
1802         struct osc_async_page *oap, *tmp;
1803         struct client_obd *cli;
1804         ENTRY;
1805
1806         rc = osc_brw_fini_request(request, rc);
1807         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1808         if (rc == -EAGAIN) {
1809                 rc = osc_brw_redo_request(request, aa);
1810                 if (rc == 0)
1811                         RETURN(0);
1812                 GOTO(out, rc);
1813         }
1814
1815         cli = aa->aa_cli;
1816
1817         client_obd_list_lock(&cli->cl_loi_list_lock);
1818
1819         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1820          * is called so we know whether to go to sync BRWs or wait for more
1821          * RPCs to complete */
1822         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1823                 cli->cl_w_in_flight--;
1824         else
1825                 cli->cl_r_in_flight--;
1826
1827         /* the caller may re-use the oap after the completion call so
1828          * we need to clean it up a little */
1829         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1830                 list_del_init(&oap->oap_rpc_item);
1831                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1832         }
1833
1834         osc_wake_cache_waiters(cli);
1835         osc_check_rpcs(cli);
1836
1837         client_obd_list_unlock(&cli->cl_loi_list_lock);
1838
1839         obdo_free(aa->aa_oa);
1840
1841         rc = 0;
1842 out:
1843         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1844         RETURN(rc);
1845 }
1846
1847 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1848                                             struct list_head *rpc_list,
1849                                             int page_count, int cmd)
1850 {
1851         struct ptlrpc_request *req;
1852         struct brw_page **pga = NULL;
1853         struct osc_brw_async_args *aa;
1854         struct obdo *oa = NULL;
1855         struct obd_async_page_ops *ops = NULL;
1856         void *caller_data = NULL;
1857         struct osc_async_page *oap;
1858         int i, rc;
1859
1860         ENTRY;
1861         LASSERT(!list_empty(rpc_list));
1862
1863         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1864         if (pga == NULL)
1865                 RETURN(ERR_PTR(-ENOMEM));
1866
1867         oa = obdo_alloc();
1868         if (oa == NULL)
1869                 GOTO(out, req = ERR_PTR(-ENOMEM));
1870
1871         i = 0;
1872         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1873                 if (ops == NULL) {
1874                         ops = oap->oap_caller_ops;
1875                         caller_data = oap->oap_caller_data;
1876                 }
1877                 pga[i] = &oap->oap_brw_page;
1878                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1879                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1880                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1881                 i++;
1882         }
1883
1884         /* always get the data for the obdo for the rpc */
1885         LASSERT(ops != NULL);
1886         ops->ap_fill_obdo(caller_data, cmd, oa);
1887
1888         sort_brw_pages(pga, page_count);
1889         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1890         if (rc != 0) {
1891                 CERROR("prep_req failed: %d\n", rc);
1892                 GOTO(out, req = ERR_PTR(rc));
1893         }
1894
1895         /* Need to update the timestamps after the request is built in case
1896          * we race with setattr (locally or in queue at OST).  If OST gets
1897          * later setattr before earlier BRW (as determined by the request xid),
1898          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1899          * way to do this in a single call.  bug 10150 */
1900         ops->ap_update_obdo(caller_data, cmd, oa,
1901                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1902
1903         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1904         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1905         INIT_LIST_HEAD(&aa->aa_oaps);
1906         list_splice(rpc_list, &aa->aa_oaps);
1907         INIT_LIST_HEAD(rpc_list);
1908
1909 out:
1910         if (IS_ERR(req)) {
1911                 if (oa)
1912                         obdo_free(oa);
1913                 if (pga)
1914                         OBD_FREE(pga, sizeof(*pga) * page_count);
1915         }
1916         RETURN(req);
1917 }
1918
1919 /* the loi lock is held across this function but it's allowed to release
1920  * and reacquire it during its work */
1921 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1922                             int cmd, struct loi_oap_pages *lop)
1923 {
1924         struct ptlrpc_request *req;
1925         obd_count page_count = 0;
1926         struct osc_async_page *oap = NULL, *tmp;
1927         struct osc_brw_async_args *aa;
1928         struct obd_async_page_ops *ops;
1929         CFS_LIST_HEAD(rpc_list);
1930         unsigned int ending_offset;
1931         unsigned  starting_offset = 0;
1932         ENTRY;
1933
1934         /* first we find the pages we're allowed to work with */
1935         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1936                 ops = oap->oap_caller_ops;
1937
1938                 LASSERT(oap->oap_magic == OAP_MAGIC);
1939
1940                 /* in llite being 'ready' equates to the page being locked
1941                  * until completion unlocks it.  commit_write submits a page
1942                  * as not ready because its unlock will happen unconditionally
1943                  * as the call returns.  if we race with commit_write giving
1944                  * us that page we dont' want to create a hole in the page
1945                  * stream, so we stop and leave the rpc to be fired by
1946                  * another dirtier or kupdated interval (the not ready page
1947                  * will still be on the dirty list).  we could call in
1948                  * at the end of ll_file_write to process the queue again. */
1949                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1950                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1951                         if (rc < 0)
1952                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1953                                                 "instead of ready\n", oap,
1954                                                 oap->oap_page, rc);
1955                         switch (rc) {
1956                         case -EAGAIN:
1957                                 /* llite is telling us that the page is still
1958                                  * in commit_write and that we should try
1959                                  * and put it in an rpc again later.  we
1960                                  * break out of the loop so we don't create
1961                                  * a hole in the sequence of pages in the rpc
1962                                  * stream.*/
1963                                 oap = NULL;
1964                                 break;
1965                         case -EINTR:
1966                                 /* the io isn't needed.. tell the checks
1967                                  * below to complete the rpc with EINTR */
1968                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1969                                 oap->oap_count = -EINTR;
1970                                 break;
1971                         case 0:
1972                                 oap->oap_async_flags |= ASYNC_READY;
1973                                 break;
1974                         default:
1975                                 LASSERTF(0, "oap %p page %p returned %d "
1976                                             "from make_ready\n", oap,
1977                                             oap->oap_page, rc);
1978                                 break;
1979                         }
1980                 }
1981                 if (oap == NULL)
1982                         break;
1983                 /*
1984                  * Page submitted for IO has to be locked. Either by
1985                  * ->ap_make_ready() or by higher layers.
1986                  *
1987                  * XXX nikita: this assertion should be adjusted when lustre
1988                  * starts using PG_writeback for pages being written out.
1989                  */
1990 #if defined(__KERNEL__) && defined(__LINUX__)
1991                 LASSERT(PageLocked(oap->oap_page));
1992 #endif
1993                 /* If there is a gap at the start of this page, it can't merge
1994                  * with any previous page, so we'll hand the network a
1995                  * "fragmented" page array that it can't transfer in 1 RDMA */
1996                 if (page_count != 0 && oap->oap_page_off != 0)
1997                         break;
1998
1999                 /* take the page out of our book-keeping */
2000                 list_del_init(&oap->oap_pending_item);
2001                 lop_update_pending(cli, lop, cmd, -1);
2002                 list_del_init(&oap->oap_urgent_item);
2003
2004                 if (page_count == 0)
2005                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2006                                           (PTLRPC_MAX_BRW_SIZE - 1);
2007
2008                 /* ask the caller for the size of the io as the rpc leaves. */
2009                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2010                         oap->oap_count =
2011                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2012                 if (oap->oap_count <= 0) {
2013                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2014                                oap->oap_count);
2015                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2016                         continue;
2017                 }
2018
2019                 /* now put the page back in our accounting */
2020                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2021                 if (++page_count >= cli->cl_max_pages_per_rpc)
2022                         break;
2023
2024                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2025                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2026                  * have the same alignment as the initial writes that allocated
2027                  * extents on the server. */
2028                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2029                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2030                 if (ending_offset == 0)
2031                         break;
2032
2033                 /* If there is a gap at the end of this page, it can't merge
2034                  * with any subsequent pages, so we'll hand the network a
2035                  * "fragmented" page array that it can't transfer in 1 RDMA */
2036                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2037                         break;
2038         }
2039
2040         osc_wake_cache_waiters(cli);
2041
2042         if (page_count == 0)
2043                 RETURN(0);
2044
2045         loi_list_maint(cli, loi);
2046
2047         client_obd_list_unlock(&cli->cl_loi_list_lock);
2048
2049         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2050         if (IS_ERR(req)) {
2051                 /* this should happen rarely and is pretty bad, it makes the
2052                  * pending list not follow the dirty order */
2053                 client_obd_list_lock(&cli->cl_loi_list_lock);
2054                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2055                         list_del_init(&oap->oap_rpc_item);
2056
2057                         /* queued sync pages can be torn down while the pages
2058                          * were between the pending list and the rpc */
2059                         if (oap->oap_interrupted) {
2060                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2061                                 osc_ap_completion(cli, NULL, oap, 0,
2062                                                   oap->oap_count);
2063                                 continue;
2064                         }
2065                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2066                 }
2067                 loi_list_maint(cli, loi);
2068                 RETURN(PTR_ERR(req));
2069         }
2070
2071         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2072         if (cmd == OBD_BRW_READ) {
2073                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2074                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2075                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2076                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2077                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2078         } else {
2079                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2080                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2081                                  cli->cl_w_in_flight);
2082                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2083                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2084                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2085         }
2086
2087         client_obd_list_lock(&cli->cl_loi_list_lock);
2088
2089         if (cmd == OBD_BRW_READ)
2090                 cli->cl_r_in_flight++;
2091         else
2092                 cli->cl_w_in_flight++;
2093
2094         /* queued sync pages can be torn down while the pages
2095          * were between the pending list and the rpc */
2096         tmp = NULL;
2097         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2098                 /* only one oap gets a request reference */
2099                 if (tmp == NULL)
2100                         tmp = oap;
2101                 if (oap->oap_interrupted && !req->rq_intr) {
2102                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2103                                oap, req);
2104                         ptlrpc_mark_interrupted(req);
2105                 }
2106         }
2107         if (tmp != NULL)
2108                 tmp->oap_request = ptlrpc_request_addref(req);
2109
2110         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2111                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2112
2113         req->rq_interpret_reply = brw_interpret_oap;
2114         ptlrpcd_add_req(req);
2115         RETURN(1);
2116 }
2117
2118 #define LOI_DEBUG(LOI, STR, args...)                                     \
2119         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2120                !list_empty(&(LOI)->loi_cli_item),                        \
2121                (LOI)->loi_write_lop.lop_num_pending,                     \
2122                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2123                (LOI)->loi_read_lop.lop_num_pending,                      \
2124                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2125                args)                                                     \
2126
2127 /* This is called by osc_check_rpcs() to find which objects have pages that
2128  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2129 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2130 {
2131         ENTRY;
2132         /* first return all objects which we already know to have
2133          * pages ready to be stuffed into rpcs */
2134         if (!list_empty(&cli->cl_loi_ready_list))
2135                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2136                                   struct lov_oinfo, loi_cli_item));
2137
2138         /* then if we have cache waiters, return all objects with queued
2139          * writes.  This is especially important when many small files
2140          * have filled up the cache and not been fired into rpcs because
2141          * they don't pass the nr_pending/object threshhold */
2142         if (!list_empty(&cli->cl_cache_waiters) &&
2143             !list_empty(&cli->cl_loi_write_list))
2144                 RETURN(list_entry(cli->cl_loi_write_list.next,
2145                                   struct lov_oinfo, loi_write_item));
2146
2147         /* then return all queued objects when we have an invalid import
2148          * so that they get flushed */
2149         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2150                 if (!list_empty(&cli->cl_loi_write_list))
2151                         RETURN(list_entry(cli->cl_loi_write_list.next,
2152                                           struct lov_oinfo, loi_write_item));
2153                 if (!list_empty(&cli->cl_loi_read_list))
2154                         RETURN(list_entry(cli->cl_loi_read_list.next,
2155                                           struct lov_oinfo, loi_read_item));
2156         }
2157         RETURN(NULL);
2158 }
2159
2160 /* called with the loi list lock held */
2161 static void osc_check_rpcs(struct client_obd *cli)
2162 {
2163         struct lov_oinfo *loi;
2164         int rc = 0, race_counter = 0;
2165         ENTRY;
2166
2167         while ((loi = osc_next_loi(cli)) != NULL) {
2168                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2169
2170                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2171                         break;
2172
2173                 /* attempt some read/write balancing by alternating between
2174                  * reads and writes in an object.  The makes_rpc checks here
2175                  * would be redundant if we were getting read/write work items
2176                  * instead of objects.  we don't want send_oap_rpc to drain a
2177                  * partial read pending queue when we're given this object to
2178                  * do io on writes while there are cache waiters */
2179                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2180                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2181                                               &loi->loi_write_lop);
2182                         if (rc < 0)
2183                                 break;
2184                         if (rc > 0)
2185                                 race_counter = 0;
2186                         else
2187                                 race_counter++;
2188                 }
2189                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2190                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2191                                               &loi->loi_read_lop);
2192                         if (rc < 0)
2193                                 break;
2194                         if (rc > 0)
2195                                 race_counter = 0;
2196                         else
2197                                 race_counter++;
2198                 }
2199
2200                 /* attempt some inter-object balancing by issueing rpcs
2201                  * for each object in turn */
2202                 if (!list_empty(&loi->loi_cli_item))
2203                         list_del_init(&loi->loi_cli_item);
2204                 if (!list_empty(&loi->loi_write_item))
2205                         list_del_init(&loi->loi_write_item);
2206                 if (!list_empty(&loi->loi_read_item))
2207                         list_del_init(&loi->loi_read_item);
2208
2209                 loi_list_maint(cli, loi);
2210
2211                 /* send_oap_rpc fails with 0 when make_ready tells it to
2212                  * back off.  llite's make_ready does this when it tries
2213                  * to lock a page queued for write that is already locked.
2214                  * we want to try sending rpcs from many objects, but we
2215                  * don't want to spin failing with 0.  */
2216                 if (race_counter == 10)
2217                         break;
2218         }
2219         EXIT;
2220 }
2221
2222 /* we're trying to queue a page in the osc so we're subject to the
2223  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2224  * If the osc's queued pages are already at that limit, then we want to sleep
2225  * until there is space in the osc's queue for us.  We also may be waiting for
2226  * write credits from the OST if there are RPCs in flight that may return some
2227  * before we fall back to sync writes.
2228  *
2229  * We need this know our allocation was granted in the presence of signals */
2230 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2231 {
2232         int rc;
2233         ENTRY;
2234         client_obd_list_lock(&cli->cl_loi_list_lock);
2235         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2236         client_obd_list_unlock(&cli->cl_loi_list_lock);
2237         RETURN(rc);
2238 };
2239
2240 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2241  * grant or cache space. */
2242 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2243                            struct osc_async_page *oap)
2244 {
2245         struct osc_cache_waiter ocw;
2246         struct l_wait_info lwi = { 0 };
2247         ENTRY;
2248
2249         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2250                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2251                cli->cl_dirty_max, obd_max_dirty_pages,
2252                cli->cl_lost_grant, cli->cl_avail_grant);
2253
2254         /* force the caller to try sync io.  this can jump the list
2255          * of queued writes and create a discontiguous rpc stream */
2256         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2257             loi->loi_ar.ar_force_sync)
2258                 RETURN(-EDQUOT);
2259
2260         /* Hopefully normal case - cache space and write credits available */
2261         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2262             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2263             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2264                 /* account for ourselves */
2265                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2266                 RETURN(0);
2267         }
2268
2269         /* Make sure that there are write rpcs in flight to wait for.  This
2270          * is a little silly as this object may not have any pending but
2271          * other objects sure might. */
2272         if (cli->cl_w_in_flight) {
2273                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2274                 cfs_waitq_init(&ocw.ocw_waitq);
2275                 ocw.ocw_oap = oap;
2276                 ocw.ocw_rc = 0;
2277
2278                 loi_list_maint(cli, loi);
2279                 osc_check_rpcs(cli);
2280                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2281
2282                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2283                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2284
2285                 client_obd_list_lock(&cli->cl_loi_list_lock);
2286                 if (!list_empty(&ocw.ocw_entry)) {
2287                         list_del(&ocw.ocw_entry);
2288                         RETURN(-EINTR);
2289                 }
2290                 RETURN(ocw.ocw_rc);
2291         }
2292
2293         RETURN(-EDQUOT);
2294 }
2295
2296 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2297                         struct lov_oinfo *loi, cfs_page_t *page,
2298                         obd_off offset, struct obd_async_page_ops *ops,
2299                         void *data, void **res)
2300 {
2301         struct osc_async_page *oap;
2302         ENTRY;
2303
2304         if (!page)
2305                 return size_round(sizeof(*oap));
2306
2307         oap = *res;
2308         oap->oap_magic = OAP_MAGIC;
2309         oap->oap_cli = &exp->exp_obd->u.cli;
2310         oap->oap_loi = loi;
2311
2312         oap->oap_caller_ops = ops;
2313         oap->oap_caller_data = data;
2314
2315         oap->oap_page = page;
2316         oap->oap_obj_off = offset;
2317
2318         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2319         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2320         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2321
2322         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2323
2324         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2325         RETURN(0);
2326 }
2327
2328 struct osc_async_page *oap_from_cookie(void *cookie)
2329 {
2330         struct osc_async_page *oap = cookie;
2331         if (oap->oap_magic != OAP_MAGIC)
2332                 return ERR_PTR(-EINVAL);
2333         return oap;
2334 };
2335
2336 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2337                               struct lov_oinfo *loi, void *cookie,
2338                               int cmd, obd_off off, int count,
2339                               obd_flag brw_flags, enum async_flags async_flags)
2340 {
2341         struct client_obd *cli = &exp->exp_obd->u.cli;
2342         struct osc_async_page *oap;
2343         int rc = 0;
2344         ENTRY;
2345
2346         oap = oap_from_cookie(cookie);
2347         if (IS_ERR(oap))
2348                 RETURN(PTR_ERR(oap));
2349
2350         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2351                 RETURN(-EIO);
2352
2353         if (!list_empty(&oap->oap_pending_item) ||
2354             !list_empty(&oap->oap_urgent_item) ||
2355             !list_empty(&oap->oap_rpc_item))
2356                 RETURN(-EBUSY);
2357
2358         /* check if the file's owner/group is over quota */
2359 #ifdef HAVE_QUOTA_SUPPORT
2360         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2361                 struct obd_async_page_ops *ops;
2362                 struct obdo *oa;
2363
2364                 oa = obdo_alloc();
2365                 if (oa == NULL)
2366                         RETURN(-ENOMEM);
2367
2368                 ops = oap->oap_caller_ops;
2369                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2370                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2371                     NO_QUOTA)
2372                         rc = -EDQUOT;
2373
2374                 obdo_free(oa);
2375                 if (rc)
2376                         RETURN(rc);
2377         }
2378 #endif
2379
2380         if (loi == NULL)
2381                 loi = lsm->lsm_oinfo[0];
2382
2383         client_obd_list_lock(&cli->cl_loi_list_lock);
2384
2385         oap->oap_cmd = cmd;
2386         oap->oap_page_off = off;
2387         oap->oap_count = count;
2388         oap->oap_brw_flags = brw_flags;
2389         oap->oap_async_flags = async_flags;
2390
2391         if (cmd & OBD_BRW_WRITE) {
2392                 rc = osc_enter_cache(cli, loi, oap);
2393                 if (rc) {
2394                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2395                         RETURN(rc);
2396                 }
2397         }
2398
2399         osc_oap_to_pending(oap);
2400         loi_list_maint(cli, loi);
2401
2402         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2403                   cmd);
2404
2405         osc_check_rpcs(cli);
2406         client_obd_list_unlock(&cli->cl_loi_list_lock);
2407
2408         RETURN(0);
2409 }
2410
2411 /* aka (~was & now & flag), but this is more clear :) */
2412 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2413
2414 static int osc_set_async_flags(struct obd_export *exp,
2415                                struct lov_stripe_md *lsm,
2416                                struct lov_oinfo *loi, void *cookie,
2417                                obd_flag async_flags)
2418 {
2419         struct client_obd *cli = &exp->exp_obd->u.cli;
2420         struct loi_oap_pages *lop;
2421         struct osc_async_page *oap;
2422         int rc = 0;
2423         ENTRY;
2424
2425         oap = oap_from_cookie(cookie);
2426         if (IS_ERR(oap))
2427                 RETURN(PTR_ERR(oap));
2428
2429         /*
2430          * bug 7311: OST-side locking is only supported for liblustre for now
2431          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2432          * implementation has to handle case where OST-locked page was picked
2433          * up by, e.g., ->writepage().
2434          */
2435         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2436         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2437                                      * tread here. */
2438
2439         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2440                 RETURN(-EIO);
2441
2442         if (loi == NULL)
2443                 loi = lsm->lsm_oinfo[0];
2444
2445         if (oap->oap_cmd & OBD_BRW_WRITE) {
2446                 lop = &loi->loi_write_lop;
2447         } else {
2448                 lop = &loi->loi_read_lop;
2449         }
2450
2451         client_obd_list_lock(&cli->cl_loi_list_lock);
2452
2453         if (list_empty(&oap->oap_pending_item))
2454                 GOTO(out, rc = -EINVAL);
2455
2456         if ((oap->oap_async_flags & async_flags) == async_flags)
2457                 GOTO(out, rc = 0);
2458
2459         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2460                 oap->oap_async_flags |= ASYNC_READY;
2461
2462         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2463                 if (list_empty(&oap->oap_rpc_item)) {
2464                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2465                         loi_list_maint(cli, loi);
2466                 }
2467         }
2468
2469         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2470                         oap->oap_async_flags);
2471 out:
2472         osc_check_rpcs(cli);
2473         client_obd_list_unlock(&cli->cl_loi_list_lock);
2474         RETURN(rc);
2475 }
2476
2477 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2478                              struct lov_oinfo *loi,
2479                              struct obd_io_group *oig, void *cookie,
2480                              int cmd, obd_off off, int count,
2481                              obd_flag brw_flags,
2482                              obd_flag async_flags)
2483 {
2484         struct client_obd *cli = &exp->exp_obd->u.cli;
2485         struct osc_async_page *oap;
2486         struct loi_oap_pages *lop;
2487         int rc = 0;
2488         ENTRY;
2489
2490         oap = oap_from_cookie(cookie);
2491         if (IS_ERR(oap))
2492                 RETURN(PTR_ERR(oap));
2493
2494         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2495                 RETURN(-EIO);
2496
2497         if (!list_empty(&oap->oap_pending_item) ||
2498             !list_empty(&oap->oap_urgent_item) ||
2499             !list_empty(&oap->oap_rpc_item))
2500                 RETURN(-EBUSY);
2501
2502         if (loi == NULL)
2503                 loi = lsm->lsm_oinfo[0];
2504
2505         client_obd_list_lock(&cli->cl_loi_list_lock);
2506
2507         oap->oap_cmd = cmd;
2508         oap->oap_page_off = off;
2509         oap->oap_count = count;
2510         oap->oap_brw_flags = brw_flags;
2511         oap->oap_async_flags = async_flags;
2512
2513         if (cmd & OBD_BRW_WRITE)
2514                 lop = &loi->loi_write_lop;
2515         else
2516                 lop = &loi->loi_read_lop;
2517
2518         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2519         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2520                 oap->oap_oig = oig;
2521                 rc = oig_add_one(oig, &oap->oap_occ);
2522         }
2523
2524         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2525                   oap, oap->oap_page, rc);
2526
2527         client_obd_list_unlock(&cli->cl_loi_list_lock);
2528
2529         RETURN(rc);
2530 }
2531
2532 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2533                                  struct loi_oap_pages *lop, int cmd)
2534 {
2535         struct list_head *pos, *tmp;
2536         struct osc_async_page *oap;
2537
2538         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2539                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2540                 list_del(&oap->oap_pending_item);
2541                 osc_oap_to_pending(oap);
2542         }
2543         loi_list_maint(cli, loi);
2544 }
2545
2546 static int osc_trigger_group_io(struct obd_export *exp,
2547                                 struct lov_stripe_md *lsm,
2548                                 struct lov_oinfo *loi,
2549                                 struct obd_io_group *oig)
2550 {
2551         struct client_obd *cli = &exp->exp_obd->u.cli;
2552         ENTRY;
2553
2554         if (loi == NULL)
2555                 loi = lsm->lsm_oinfo[0];
2556
2557         client_obd_list_lock(&cli->cl_loi_list_lock);
2558
2559         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2560         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2561
2562         osc_check_rpcs(cli);
2563         client_obd_list_unlock(&cli->cl_loi_list_lock);
2564
2565         RETURN(0);
2566 }
2567
2568 static int osc_teardown_async_page(struct obd_export *exp,
2569                                    struct lov_stripe_md *lsm,
2570                                    struct lov_oinfo *loi, void *cookie)
2571 {
2572         struct client_obd *cli = &exp->exp_obd->u.cli;
2573         struct loi_oap_pages *lop;
2574         struct osc_async_page *oap;
2575         int rc = 0;
2576         ENTRY;
2577
2578         oap = oap_from_cookie(cookie);
2579         if (IS_ERR(oap))
2580                 RETURN(PTR_ERR(oap));
2581
2582         if (loi == NULL)
2583                 loi = lsm->lsm_oinfo[0];
2584
2585         if (oap->oap_cmd & OBD_BRW_WRITE) {
2586                 lop = &loi->loi_write_lop;
2587         } else {
2588                 lop = &loi->loi_read_lop;
2589         }
2590
2591         client_obd_list_lock(&cli->cl_loi_list_lock);
2592
2593         if (!list_empty(&oap->oap_rpc_item))
2594                 GOTO(out, rc = -EBUSY);
2595
2596         osc_exit_cache(cli, oap, 0);
2597         osc_wake_cache_waiters(cli);
2598
2599         if (!list_empty(&oap->oap_urgent_item)) {
2600                 list_del_init(&oap->oap_urgent_item);
2601                 oap->oap_async_flags &= ~ASYNC_URGENT;
2602         }
2603         if (!list_empty(&oap->oap_pending_item)) {
2604                 list_del_init(&oap->oap_pending_item);
2605                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2606         }
2607         loi_list_maint(cli, loi);
2608
2609         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2610 out:
2611         client_obd_list_unlock(&cli->cl_loi_list_lock);
2612         RETURN(rc);
2613 }
2614
2615 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2616                                     int flags)
2617 {
2618         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2619
2620         if (lock == NULL) {
2621                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2622                 return;
2623         }
2624         lock_res_and_lock(lock);
2625 #ifdef __KERNEL__
2626 #ifdef __LINUX__
2627         /* Liang XXX: Darwin and Winnt checking should be added */
2628         if (lock->l_ast_data && lock->l_ast_data != data) {
2629                 struct inode *new_inode = data;
2630                 struct inode *old_inode = lock->l_ast_data;
2631                 if (!(old_inode->i_state & I_FREEING))
2632                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2633                 LASSERTF(old_inode->i_state & I_FREEING,
2634                          "Found existing inode %p/%lu/%u state %lu in lock: "
2635                          "setting data to %p/%lu/%u\n", old_inode,
2636                          old_inode->i_ino, old_inode->i_generation,
2637                          old_inode->i_state,
2638                          new_inode, new_inode->i_ino, new_inode->i_generation);
2639         }
2640 #endif
2641 #endif
2642         lock->l_ast_data = data;
2643         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2644         unlock_res_and_lock(lock);
2645         LDLM_LOCK_PUT(lock);
2646 }
2647
2648 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2649                              ldlm_iterator_t replace, void *data)
2650 {
2651         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2652         struct obd_device *obd = class_exp2obd(exp);
2653
2654         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2655         return 0;
2656 }
2657
2658 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2659                             int intent, int rc)
2660 {
2661         ENTRY;
2662
2663         if (intent) {
2664                 /* The request was created before ldlm_cli_enqueue call. */
2665                 if (rc == ELDLM_LOCK_ABORTED) {
2666                         struct ldlm_reply *rep;
2667
2668                         /* swabbed by ldlm_cli_enqueue() */
2669                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2670                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2671                                              sizeof(*rep));
2672                         LASSERT(rep != NULL);
2673                         if (rep->lock_policy_res1)
2674                                 rc = rep->lock_policy_res1;
2675                 }
2676         }
2677
2678         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2679                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2680                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2681                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2682                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2683         }
2684
2685         /* Call the update callback. */
2686         rc = oinfo->oi_cb_up(oinfo, rc);
2687         RETURN(rc);
2688 }
2689
2690 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2691                                  struct osc_enqueue_args *aa, int rc)
2692 {
2693         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2694         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2695         struct ldlm_lock *lock;
2696
2697         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2698          * be valid. */
2699         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2700
2701         /* Complete obtaining the lock procedure. */
2702         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2703                                    aa->oa_ei->ei_mode,
2704                                    &aa->oa_ei->ei_flags,
2705                                    &lsm->lsm_oinfo[0]->loi_lvb,
2706                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2707                                    lustre_swab_ost_lvb,
2708                                    aa->oa_oi->oi_lockh, rc);
2709
2710         /* Complete osc stuff. */
2711         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2712
2713         /* Release the lock for async request. */
2714         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2715                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2716
2717         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2718                  aa->oa_oi->oi_lockh, req, aa);
2719         LDLM_LOCK_PUT(lock);
2720         return rc;
2721 }
2722
2723 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2724  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2725  * other synchronous requests, however keeping some locks and trying to obtain
2726  * others may take a considerable amount of time in a case of ost failure; and
2727  * when other sync requests do not get released lock from a client, the client
2728  * is excluded from the cluster -- such scenarious make the life difficult, so
2729  * release locks just after they are obtained. */
2730 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2731                        struct obd_enqueue_info *einfo)
2732 {
2733         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2734         struct obd_device *obd = exp->exp_obd;
2735         struct ldlm_reply *rep;
2736         struct ptlrpc_request *req = NULL;
2737         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2738         int rc;
2739         ENTRY;
2740
2741         /* Filesystem lock extents are extended to page boundaries so that
2742          * dealing with the page cache is a little smoother.  */
2743         oinfo->oi_policy.l_extent.start -=
2744                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2745         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2746
2747         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
2748                 goto no_match;
2749
2750         /* Next, search for already existing extent locks that will cover us */
2751         rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2752                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2753                              oinfo->oi_lockh);
2754         if (rc == 1) {
2755                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2756                                         einfo->ei_flags);
2757                 if (intent) {
2758                         /* I would like to be able to ASSERT here that rss <=
2759                          * kms, but I can't, for reasons which are explained in
2760                          * lov_enqueue() */
2761                 }
2762
2763                 /* We already have a lock, and it's referenced */
2764                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2765
2766                 /* For async requests, decref the lock. */
2767                 if (einfo->ei_rqset)
2768                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2769
2770                 RETURN(ELDLM_OK);
2771         }
2772
2773         /* If we're trying to read, we also search for an existing PW lock.  The
2774          * VFS and page cache already protect us locally, so lots of readers/
2775          * writers can share a single PW lock.
2776          *
2777          * There are problems with conversion deadlocks, so instead of
2778          * converting a read lock to a write lock, we'll just enqueue a new
2779          * one.
2780          *
2781          * At some point we should cancel the read lock instead of making them
2782          * send us a blocking callback, but there are problems with canceling
2783          * locks out from other users right now, too. */
2784
2785         if (einfo->ei_mode == LCK_PR) {
2786                 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2787                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2788                                      LCK_PW, oinfo->oi_lockh);
2789                 if (rc == 1) {
2790                         /* FIXME: This is not incredibly elegant, but it might
2791                          * be more elegant than adding another parameter to
2792                          * lock_match.  I want a second opinion. */
2793                         /* addref the lock only if not async requests. */
2794                         if (!einfo->ei_rqset)
2795                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2796                         osc_set_data_with_check(oinfo->oi_lockh,
2797                                                 einfo->ei_cbdata,
2798                                                 einfo->ei_flags);
2799                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2800                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2801                         RETURN(ELDLM_OK);
2802                 }
2803         }
2804
2805  no_match:
2806         if (intent) {
2807                 int size[3] = {
2808                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2809                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
2810
2811                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2812                                       LDLM_ENQUEUE, 2, size, NULL);
2813                 if (req == NULL)
2814                         RETURN(-ENOMEM);
2815
2816                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2817                 size[DLM_REPLY_REC_OFF] = 
2818                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
2819                 ptlrpc_req_set_repsize(req, 3, size);
2820         }
2821
2822         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2823         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2824
2825         rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2826                               &oinfo->oi_policy, einfo->ei_mode,
2827                               &einfo->ei_flags, einfo->ei_cb_bl,
2828                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2829                               einfo->ei_cbdata,
2830                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2831                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
2832                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2833                               einfo->ei_rqset ? 1 : 0);
2834         if (einfo->ei_rqset) {
2835                 if (!rc) {
2836                         struct osc_enqueue_args *aa;
2837                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2838                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2839                         aa->oa_oi = oinfo;
2840                         aa->oa_ei = einfo;
2841                         aa->oa_exp = exp;
2842
2843                         req->rq_interpret_reply = osc_enqueue_interpret;
2844                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2845                 } else if (intent) {
2846                         ptlrpc_req_finished(req);
2847                 }
2848                 RETURN(rc);
2849         }
2850
2851         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2852         if (intent)
2853                 ptlrpc_req_finished(req);
2854
2855         RETURN(rc);
2856 }
2857
2858 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2859                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2860                      int *flags, void *data, struct lustre_handle *lockh)
2861 {
2862         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2863         struct obd_device *obd = exp->exp_obd;
2864         int rc;
2865         int lflags = *flags;
2866         ENTRY;
2867
2868         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2869
2870         /* Filesystem lock extents are extended to page boundaries so that
2871          * dealing with the page cache is a little smoother */
2872         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2873         policy->l_extent.end |= ~CFS_PAGE_MASK;
2874
2875         /* Next, search for already existing extent locks that will cover us */
2876         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2877                              policy, mode, lockh);
2878         if (rc) {
2879                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2880                         osc_set_data_with_check(lockh, data, lflags);
2881                 RETURN(rc);
2882         }
2883         /* If we're trying to read, we also search for an existing PW lock.  The
2884          * VFS and page cache already protect us locally, so lots of readers/
2885          * writers can share a single PW lock. */
2886         if (mode == LCK_PR) {
2887                 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
2888                                      &res_id, type,
2889                                      policy, LCK_PW, lockh);
2890                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2891                         /* FIXME: This is not incredibly elegant, but it might
2892                          * be more elegant than adding another parameter to
2893                          * lock_match.  I want a second opinion. */
2894                         osc_set_data_with_check(lockh, data, lflags);
2895                         ldlm_lock_addref(lockh, LCK_PR);
2896                         ldlm_lock_decref(lockh, LCK_PW);
2897                 }
2898         }
2899         RETURN(rc);
2900 }
2901
2902 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2903                       __u32 mode, struct lustre_handle *lockh)
2904 {
2905         ENTRY;
2906
2907         if (unlikely(mode == LCK_GROUP))
2908                 ldlm_lock_decref_and_cancel(lockh, mode);
2909         else
2910                 ldlm_lock_decref(lockh, mode);
2911
2912         RETURN(0);
2913 }
2914
2915 static int osc_cancel_unused(struct obd_export *exp,
2916                              struct lov_stripe_md *lsm, int flags, void *opaque)
2917 {
2918         struct obd_device *obd = class_exp2obd(exp);
2919         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2920
2921         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2922                                       opaque);
2923 }
2924
2925 static int osc_join_lru(struct obd_export *exp,
2926                         struct lov_stripe_md *lsm, int join)
2927 {
2928         struct obd_device *obd = class_exp2obd(exp);
2929         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2930
2931         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2932 }
2933
2934 static int osc_statfs_interpret(struct ptlrpc_request *req,
2935                                 struct osc_async_args *aa, int rc)
2936 {
2937         struct obd_statfs *msfs;
2938         ENTRY;
2939
2940         if (rc != 0)
2941                 GOTO(out, rc);
2942
2943         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2944                                   lustre_swab_obd_statfs);
2945         if (msfs == NULL) {
2946                 CERROR("Can't unpack obd_statfs\n");
2947                 GOTO(out, rc = -EPROTO);
2948         }
2949
2950         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2951 out:
2952         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2953         RETURN(rc);
2954 }
2955
2956 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2957                             __u64 max_age, struct ptlrpc_request_set *rqset)
2958 {
2959         struct ptlrpc_request *req;
2960         struct osc_async_args *aa;
2961         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2962         ENTRY;
2963
2964         /* We could possibly pass max_age in the request (as an absolute
2965          * timestamp or a "seconds.usec ago") so the target can avoid doing
2966          * extra calls into the filesystem if that isn't necessary (e.g.
2967          * during mount that would help a bit).  Having relative timestamps
2968          * is not so great if request processing is slow, while absolute
2969          * timestamps are not ideal because they need time synchronization. */
2970         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2971                               OST_STATFS, 1, NULL, NULL);
2972         if (!req)
2973                 RETURN(-ENOMEM);
2974
2975         ptlrpc_req_set_repsize(req, 2, size);
2976         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2977
2978         req->rq_interpret_reply = osc_statfs_interpret;
2979         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2980         aa = (struct osc_async_args *)&req->rq_async_args;
2981         aa->aa_oi = oinfo;
2982
2983         ptlrpc_set_add_req(rqset, req);
2984         RETURN(0);
2985 }
2986
2987 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2988                       __u64 max_age)
2989 {
2990         struct obd_statfs *msfs;
2991         struct ptlrpc_request *req;
2992         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2993         ENTRY;
2994
2995         /* We could possibly pass max_age in the request (as an absolute
2996          * timestamp or a "seconds.usec ago") so the target can avoid doing
2997          * extra calls into the filesystem if that isn't necessary (e.g.
2998          * during mount that would help a bit).  Having relative timestamps
2999          * is not so great if request processing is slow, while absolute
3000          * timestamps are not ideal because they need time synchronization. */
3001         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3002                               OST_STATFS, 1, NULL, NULL);
3003         if (!req)
3004                 RETURN(-ENOMEM);
3005
3006         ptlrpc_req_set_repsize(req, 2, size);
3007         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3008
3009         rc = ptlrpc_queue_wait(req);
3010         if (rc)
3011                 GOTO(out, rc);
3012
3013         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3014                                   lustre_swab_obd_statfs);
3015         if (msfs == NULL) {
3016                 CERROR("Can't unpack obd_statfs\n");
3017                 GOTO(out, rc = -EPROTO);
3018         }
3019
3020         memcpy(osfs, msfs, sizeof(*osfs));
3021
3022         EXIT;
3023  out:
3024         ptlrpc_req_finished(req);
3025         return rc;
3026 }
3027
3028 /* Retrieve object striping information.
3029  *
3030  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3031  * the maximum number of OST indices which will fit in the user buffer.
3032  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3033  */
3034 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3035 {
3036         struct lov_user_md lum, *lumk;
3037         int rc = 0, lum_size;
3038         ENTRY;
3039
3040         if (!lsm)
3041                 RETURN(-ENODATA);
3042
3043         if (copy_from_user(&lum, lump, sizeof(lum)))
3044                 RETURN(-EFAULT);
3045
3046         if (lum.lmm_magic != LOV_USER_MAGIC)
3047                 RETURN(-EINVAL);
3048
3049         if (lum.lmm_stripe_count > 0) {
3050                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3051                 OBD_ALLOC(lumk, lum_size);
3052                 if (!lumk)
3053                         RETURN(-ENOMEM);
3054
3055                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3056         } else {
3057                 lum_size = sizeof(lum);
3058                 lumk = &lum;
3059         }
3060
3061         lumk->lmm_object_id = lsm->lsm_object_id;
3062         lumk->lmm_stripe_count = 1;
3063
3064         if (copy_to_user(lump, lumk, lum_size))
3065                 rc = -EFAULT;
3066
3067         if (lumk != &lum)
3068                 OBD_FREE(lumk, lum_size);
3069
3070         RETURN(rc);
3071 }
3072
3073
3074 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3075                          void *karg, void *uarg)
3076 {
3077         struct obd_device *obd = exp->exp_obd;
3078         struct obd_ioctl_data *data = karg;
3079         int err = 0;
3080         ENTRY;
3081
3082 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3083         MOD_INC_USE_COUNT;
3084 #else
3085         if (!try_module_get(THIS_MODULE)) {
3086                 CERROR("Can't get module. Is it alive?");
3087                 return -EINVAL;
3088         }
3089 #endif
3090         switch (cmd) {
3091         case OBD_IOC_LOV_GET_CONFIG: {
3092                 char *buf;
3093                 struct lov_desc *desc;
3094                 struct obd_uuid uuid;
3095
3096                 buf = NULL;
3097                 len = 0;
3098                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3099                         GOTO(out, err = -EINVAL);
3100
3101                 data = (struct obd_ioctl_data *)buf;
3102
3103                 if (sizeof(*desc) > data->ioc_inllen1) {
3104                         obd_ioctl_freedata(buf, len);
3105                         GOTO(out, err = -EINVAL);
3106                 }
3107
3108                 if (data->ioc_inllen2 < sizeof(uuid)) {
3109                         obd_ioctl_freedata(buf, len);
3110                         GOTO(out, err = -EINVAL);
3111                 }
3112
3113                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3114                 desc->ld_tgt_count = 1;
3115                 desc->ld_active_tgt_count = 1;
3116                 desc->ld_default_stripe_count = 1;
3117                 desc->ld_default_stripe_size = 0;
3118                 desc->ld_default_stripe_offset = 0;
3119                 desc->ld_pattern = 0;
3120                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3121
3122                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3123
3124                 err = copy_to_user((void *)uarg, buf, len);
3125                 if (err)
3126                         err = -EFAULT;
3127                 obd_ioctl_freedata(buf, len);
3128                 GOTO(out, err);
3129         }
3130         case LL_IOC_LOV_SETSTRIPE:
3131                 err = obd_alloc_memmd(exp, karg);
3132                 if (err > 0)
3133                         err = 0;
3134                 GOTO(out, err);
3135         case LL_IOC_LOV_GETSTRIPE:
3136                 err = osc_getstripe(karg, uarg);
3137                 GOTO(out, err);
3138         case OBD_IOC_CLIENT_RECOVER:
3139                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3140                                             data->ioc_inlbuf1);
3141                 if (err > 0)
3142                         err = 0;
3143                 GOTO(out, err);
3144         case IOC_OSC_SET_ACTIVE:
3145                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3146                                                data->ioc_offset);
3147                 GOTO(out, err);
3148         case OBD_IOC_POLL_QUOTACHECK:
3149                 err = lquota_poll_check(quota_interface, exp,
3150                                         (struct if_quotacheck *)karg);
3151                 GOTO(out, err);
3152         default:
3153                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3154                        cmd, cfs_curproc_comm());
3155                 GOTO(out, err = -ENOTTY);
3156         }
3157 out:
3158 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3159         MOD_DEC_USE_COUNT;
3160 #else
3161         module_put(THIS_MODULE);
3162 #endif
3163         return err;
3164 }
3165
3166 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3167                         void *key, __u32 *vallen, void *val)
3168 {
3169         ENTRY;
3170         if (!vallen || !val)
3171                 RETURN(-EFAULT);
3172
3173         if (keylen > strlen("lock_to_stripe") &&
3174             strcmp(key, "lock_to_stripe") == 0) {
3175                 __u32 *stripe = val;
3176                 *vallen = sizeof(*stripe);
3177                 *stripe = 0;
3178                 RETURN(0);
3179         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3180                 struct ptlrpc_request *req;
3181                 obd_id *reply;
3182                 char *bufs[2] = { NULL, key };
3183                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3184
3185                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3186                                       OST_GET_INFO, 2, size, bufs);
3187                 if (req == NULL)
3188                         RETURN(-ENOMEM);
3189
3190                 size[REPLY_REC_OFF] = *vallen;
3191                 ptlrpc_req_set_repsize(req, 2, size);
3192                 rc = ptlrpc_queue_wait(req);
3193                 if (rc)
3194                         GOTO(out, rc);
3195
3196                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3197                                            lustre_swab_ost_last_id);
3198                 if (reply == NULL) {
3199                         CERROR("Can't unpack OST last ID\n");
3200                         GOTO(out, rc = -EPROTO);
3201                 }
3202                 *((obd_id *)val) = *reply;
3203         out:
3204                 ptlrpc_req_finished(req);
3205                 RETURN(rc);
3206         }
3207         RETURN(-EINVAL);
3208 }
3209
3210 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3211                                           void *aa, int rc)
3212 {
3213         struct llog_ctxt *ctxt;
3214         struct obd_import *imp = req->rq_import;
3215         ENTRY;
3216
3217         if (rc != 0)
3218                 RETURN(rc);
3219
3220         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3221         if (ctxt) {
3222                 if (rc == 0)
3223                         rc = llog_initiator_connect(ctxt);
3224                 else
3225                         CERROR("cannot establish connection for "
3226                                "ctxt %p: %d\n", ctxt, rc);
3227         }
3228
3229         spin_lock(&imp->imp_lock);
3230         imp->imp_server_timeout = 1;
3231         imp->imp_pingable = 1;
3232         spin_unlock(&imp->imp_lock);
3233         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3234
3235         RETURN(rc);
3236 }
3237
3238 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3239                               void *key, obd_count vallen, void *val,
3240                               struct ptlrpc_request_set *set)
3241 {
3242         struct ptlrpc_request *req;
3243         struct obd_device  *obd = exp->exp_obd;
3244         struct obd_import *imp = class_exp2cliimp(exp);
3245         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3246         char *bufs[3] = { NULL, key, val };
3247         ENTRY;
3248
3249         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3250
3251         if (KEY_IS(KEY_NEXT_ID)) {
3252                 if (vallen != sizeof(obd_id))
3253                         RETURN(-EINVAL);
3254                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3255                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3256                        exp->exp_obd->obd_name,
3257                        obd->u.cli.cl_oscc.oscc_next_id);
3258
3259                 RETURN(0);
3260         }
3261
3262         if (KEY_IS("unlinked")) {
3263                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3264                 spin_lock(&oscc->oscc_lock);
3265                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3266                 spin_unlock(&oscc->oscc_lock);
3267                 RETURN(0);
3268         }
3269
3270         if (KEY_IS(KEY_INIT_RECOV)) {
3271                 if (vallen != sizeof(int))
3272                         RETURN(-EINVAL);
3273                 spin_lock(&imp->imp_lock);
3274                 imp->imp_initial_recov = *(int *)val;
3275                 spin_unlock(&imp->imp_lock);
3276                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3277                        exp->exp_obd->obd_name,
3278                        imp->imp_initial_recov);
3279                 RETURN(0);
3280         }
3281
3282         if (KEY_IS("checksum")) {
3283                 if (vallen != sizeof(int))
3284                         RETURN(-EINVAL);
3285                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3286                 RETURN(0);
3287         }
3288
3289         if (!set)
3290                 RETURN(-EINVAL);
3291
3292         /* We pass all other commands directly to OST. Since nobody calls osc
3293            methods directly and everybody is supposed to go through LOV, we
3294            assume lov checked invalid values for us.
3295            The only recognised values so far are evict_by_nid and mds_conn.
3296            Even if something bad goes through, we'd get a -EINVAL from OST
3297            anyway. */
3298
3299         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3300                               bufs);
3301         if (req == NULL)
3302                 RETURN(-ENOMEM);
3303
3304         if (KEY_IS("mds_conn"))
3305                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3306
3307         ptlrpc_req_set_repsize(req, 1, NULL);
3308         ptlrpc_set_add_req(set, req);
3309         ptlrpc_check_set(set);
3310
3311         RETURN(0);
3312 }
3313
3314
3315 static struct llog_operations osc_size_repl_logops = {
3316         lop_cancel: llog_obd_repl_cancel
3317 };
3318
3319 static struct llog_operations osc_mds_ost_orig_logops;
3320 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3321                          int count, struct llog_catid *catid, 
3322                          struct obd_uuid *uuid)
3323 {
3324         int rc;
3325         ENTRY;
3326
3327         spin_lock(&obd->obd_dev_lock);
3328         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3329                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3330                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3331                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3332                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3333                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3334         }
3335         spin_unlock(&obd->obd_dev_lock);
3336
3337         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3338                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3339         if (rc) {
3340                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3341                 GOTO (out, rc);
3342         }
3343
3344         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3345                         &osc_size_repl_logops);
3346         if (rc) 
3347                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3348 out:
3349         if (rc) {
3350                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3351                        obd->obd_name, tgt->obd_name, count, catid, rc);
3352                 CERROR("logid "LPX64":0x%x\n",
3353                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3354         }
3355         RETURN(rc);
3356 }
3357
3358 static int osc_llog_finish(struct obd_device *obd, int count)
3359 {
3360         struct llog_ctxt *ctxt;
3361         int rc = 0, rc2 = 0;
3362         ENTRY;
3363
3364         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3365         if (ctxt)
3366                 rc = llog_cleanup(ctxt);
3367
3368         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3369         if (ctxt)
3370                 rc2 = llog_cleanup(ctxt);
3371         if (!rc)
3372                 rc = rc2;
3373
3374         RETURN(rc);
3375 }
3376
3377 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3378                          struct obd_uuid *cluuid,
3379                          struct obd_connect_data *data)
3380 {
3381         struct client_obd *cli = &obd->u.cli;
3382
3383         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3384                 long lost_grant;
3385
3386                 client_obd_list_lock(&cli->cl_loi_list_lock);
3387                 data->ocd_grant = cli->cl_avail_grant ?:
3388                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3389                 lost_grant = cli->cl_lost_grant;
3390                 cli->cl_lost_grant = 0;
3391                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3392
3393                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3394                        "cl_lost_grant: %ld\n", data->ocd_grant,
3395                        cli->cl_avail_grant, lost_grant);
3396                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3397                        " ocd_grant: %d\n", data->ocd_connect_flags,
3398                        data->ocd_version, data->ocd_grant);
3399         }
3400
3401         RETURN(0);
3402 }
3403
3404 static int osc_disconnect(struct obd_export *exp)
3405 {
3406         struct obd_device *obd = class_exp2obd(exp);
3407         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3408         int rc;
3409
3410         if (obd->u.cli.cl_conn_count == 1)
3411                 /* flush any remaining cancel messages out to the target */
3412                 llog_sync(ctxt, exp);
3413
3414         rc = client_disconnect_export(exp);
3415         return rc;
3416 }
3417
3418 static int osc_import_event(struct obd_device *obd,
3419                             struct obd_import *imp,
3420                             enum obd_import_event event)
3421 {
3422         struct client_obd *cli;
3423         int rc = 0;
3424
3425         ENTRY;
3426         LASSERT(imp->imp_obd == obd);
3427
3428         switch (event) {
3429         case IMP_EVENT_DISCON: {
3430                 /* Only do this on the MDS OSC's */
3431                 if (imp->imp_server_timeout) {
3432                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3433
3434                         spin_lock(&oscc->oscc_lock);
3435                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3436                         spin_unlock(&oscc->oscc_lock);
3437                 }
3438
3439                 break;
3440         }
3441         case IMP_EVENT_INACTIVE: {
3442                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3443                 break;
3444         }
3445         case IMP_EVENT_INVALIDATE: {
3446                 struct ldlm_namespace *ns = obd->obd_namespace;
3447
3448                 /* Reset grants */
3449                 cli = &obd->u.cli;
3450                 client_obd_list_lock(&cli->cl_loi_list_lock);
3451                 cli->cl_avail_grant = 0;
3452                 cli->cl_lost_grant = 0;
3453                 /* all pages go to failing rpcs due to the invalid import */
3454                 osc_check_rpcs(cli);
3455                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3456
3457                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3458
3459                 break;
3460         }
3461         case IMP_EVENT_ACTIVE: {
3462                 /* Only do this on the MDS OSC's */
3463                 if (imp->imp_server_timeout) {
3464                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3465
3466                         spin_lock(&oscc->oscc_lock);
3467                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3468                         spin_unlock(&oscc->oscc_lock);
3469                 }
3470                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3471                 break;
3472         }
3473         case IMP_EVENT_OCD: {
3474                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3475
3476                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3477                         osc_init_grant(&obd->u.cli, ocd);
3478
3479                 /* See bug 7198 */
3480                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3481                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3482
3483                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3484                 break;
3485         }
3486         default:
3487                 CERROR("Unknown import event %d\n", event);
3488                 LBUG();
3489         }
3490         RETURN(rc);
3491 }
3492
3493 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3494 {
3495         int rc;
3496         ENTRY;
3497
3498         ENTRY;
3499         rc = ptlrpcd_addref();
3500         if (rc)
3501                 RETURN(rc);
3502
3503         rc = client_obd_setup(obd, len, buf);
3504         if (rc) {
3505                 ptlrpcd_decref();
3506         } else {
3507                 struct lprocfs_static_vars lvars;
3508                 struct client_obd *cli = &obd->u.cli;
3509
3510                 lprocfs_init_vars(osc, &lvars);
3511                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3512                         lproc_osc_attach_seqstat(obd);
3513                         ptlrpc_lprocfs_register_obd(obd);
3514                 }
3515
3516                 oscc_init(obd);
3517                 /* We need to allocate a few requests more, because
3518                    brw_interpret_oap tries to create new requests before freeing
3519                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3520                    reserved, but I afraid that might be too much wasted RAM
3521                    in fact, so 2 is just my guess and still should work. */
3522                 cli->cl_import->imp_rq_pool =
3523                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3524                                             OST_MAXREQSIZE,
3525                                             ptlrpc_add_rqs_to_pool);
3526         }
3527
3528         RETURN(rc);
3529 }
3530
3531 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3532 {
3533         int rc = 0;
3534         ENTRY;
3535
3536         switch (stage) {
3537         case OBD_CLEANUP_EARLY: {
3538                 struct obd_import *imp;
3539                 imp = obd->u.cli.cl_import;
3540                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3541                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3542                 ptlrpc_deactivate_import(imp);
3543                 break;
3544         }
3545         case OBD_CLEANUP_EXPORTS: {
3546                 /* If we set up but never connected, the
3547                    client import will not have been cleaned. */
3548                 if (obd->u.cli.cl_import) {
3549                         struct obd_import *imp;
3550                         imp = obd->u.cli.cl_import;
3551                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3552                                obd->obd_name);
3553                         ptlrpc_invalidate_import(imp);
3554                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3555                         class_destroy_import(imp);
3556                         obd->u.cli.cl_import = NULL;
3557                 }
3558                 break;
3559         }
3560         case OBD_CLEANUP_SELF_EXP:
3561                 rc = obd_llog_finish(obd, 0);
3562                 if (rc != 0)
3563                         CERROR("failed to cleanup llogging subsystems\n");
3564                 break;
3565         case OBD_CLEANUP_OBD:
3566                 break;
3567         }
3568         RETURN(rc);
3569 }
3570
3571 int osc_cleanup(struct obd_device *obd)
3572 {
3573         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3574         int rc;
3575
3576         ENTRY;
3577         ptlrpc_lprocfs_unregister_obd(obd);
3578         lprocfs_obd_cleanup(obd);
3579
3580         spin_lock(&oscc->oscc_lock);
3581         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3582         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3583         spin_unlock(&oscc->oscc_lock);
3584
3585         /* free memory of osc quota cache */
3586         lquota_cleanup(quota_interface, obd);
3587
3588         rc = client_obd_cleanup(obd);
3589
3590         ptlrpcd_decref();
3591         RETURN(rc);
3592 }
3593
3594 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3595 {
3596         struct lustre_cfg *lcfg = buf;
3597         struct lprocfs_static_vars lvars;
3598         int rc = 0;
3599
3600         lprocfs_init_vars(osc, &lvars);
3601
3602         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3603         return(rc);
3604 }
3605
3606 struct obd_ops osc_obd_ops = {
3607         .o_owner                = THIS_MODULE,
3608         .o_setup                = osc_setup,
3609         .o_precleanup           = osc_precleanup,
3610         .o_cleanup              = osc_cleanup,
3611         .o_add_conn             = client_import_add_conn,
3612         .o_del_conn             = client_import_del_conn,
3613         .o_connect              = client_connect_import,
3614         .o_reconnect            = osc_reconnect,
3615         .o_disconnect           = osc_disconnect,
3616         .o_statfs               = osc_statfs,
3617         .o_statfs_async         = osc_statfs_async,
3618         .o_packmd               = osc_packmd,
3619         .o_unpackmd             = osc_unpackmd,
3620         .o_create               = osc_create,
3621         .o_destroy              = osc_destroy,
3622         .o_getattr              = osc_getattr,
3623         .o_getattr_async        = osc_getattr_async,
3624         .o_setattr              = osc_setattr,
3625         .o_setattr_async        = osc_setattr_async,
3626         .o_brw                  = osc_brw,
3627         .o_brw_async            = osc_brw_async,
3628         .o_prep_async_page      = osc_prep_async_page,
3629         .o_queue_async_io       = osc_queue_async_io,
3630         .o_set_async_flags      = osc_set_async_flags,
3631         .o_queue_group_io       = osc_queue_group_io,
3632         .o_trigger_group_io     = osc_trigger_group_io,
3633         .o_teardown_async_page  = osc_teardown_async_page,
3634         .o_punch                = osc_punch,
3635         .o_sync                 = osc_sync,
3636         .o_enqueue              = osc_enqueue,
3637         .o_match                = osc_match,
3638         .o_change_cbdata        = osc_change_cbdata,
3639         .o_cancel               = osc_cancel,
3640         .o_cancel_unused        = osc_cancel_unused,
3641         .o_join_lru             = osc_join_lru,
3642         .o_iocontrol            = osc_iocontrol,
3643         .o_get_info             = osc_get_info,
3644         .o_set_info_async       = osc_set_info_async,
3645         .o_import_event         = osc_import_event,
3646         .o_llog_init            = osc_llog_init,
3647         .o_llog_finish          = osc_llog_finish,
3648         .o_process_config       = osc_process_config,
3649 };
3650
3651 int __init osc_init(void)
3652 {
3653         struct lprocfs_static_vars lvars;
3654         int rc;
3655         ENTRY;
3656
3657         lprocfs_init_vars(osc, &lvars);
3658
3659         request_module("lquota");
3660         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3661         lquota_init(quota_interface);
3662         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3663
3664         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3665                                  LUSTRE_OSC_NAME);
3666         if (rc) {
3667                 if (quota_interface)
3668                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3669                 RETURN(rc);
3670         }
3671
3672         RETURN(rc);
3673 }
3674
3675 #ifdef __KERNEL__
3676 static void /*__exit*/ osc_exit(void)
3677 {
3678         lquota_exit(quota_interface);
3679         if (quota_interface)
3680                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3681
3682         class_unregister_type(LUSTRE_OSC_NAME);
3683 }
3684
3685 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3686 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3687 MODULE_LICENSE("GPL");
3688
3689 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3690 #endif