Whamcloud - gitweb
3583f181c80ba0cba18d281981e89eaaa60db54e
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include "osc_internal.h"
60
61 static quota_interface_t *quota_interface = NULL;
62 extern quota_interface_t osc_quota_interface;
63
64 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
65
66 static quota_interface_t *quota_interface;
67 extern quota_interface_t osc_quota_interface;
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
95         }
96
97         RETURN(lmm_size);
98 }
99
100 /* Unpack OSC object metadata from disk storage (LE byte order). */
101 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
102                         struct lov_mds_md *lmm, int lmm_bytes)
103 {
104         int lsm_size;
105         ENTRY;
106
107         if (lmm != NULL) {
108                 if (lmm_bytes < sizeof (*lmm)) {
109                         CERROR("lov_mds_md too small: %d, need %d\n",
110                                lmm_bytes, (int)sizeof(*lmm));
111                         RETURN(-EINVAL);
112                 }
113                 /* XXX LOV_MAGIC etc check? */
114
115                 if (lmm->lmm_object_id == 0) {
116                         CERROR("lov_mds_md: zero lmm_object_id\n");
117                         RETURN(-EINVAL);
118                 }
119         }
120
121         lsm_size = lov_stripe_md_size(1);
122         if (lsmp == NULL)
123                 RETURN(lsm_size);
124
125         if (*lsmp != NULL && lmm == NULL) {
126                 OBD_FREE(*lsmp, lsm_size);
127                 *lsmp = NULL;
128                 RETURN(0);
129         }
130
131         if (*lsmp == NULL) {
132                 OBD_ALLOC(*lsmp, lsm_size);
133                 if (*lsmp == NULL)
134                         RETURN(-ENOMEM);
135                 loi_init((*lsmp)->lsm_oinfo);
136         }
137
138         if (lmm != NULL) {
139                 /* XXX zero *lsmp? */
140                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
141                 LASSERT((*lsmp)->lsm_object_id);
142         }
143
144         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
145
146         RETURN(lsm_size);
147 }
148
149 static int osc_getattr_interpret(struct ptlrpc_request *req,
150                                  struct osc_async_args *aa, int rc)
151 {
152         struct ost_body *body;
153         ENTRY;
154
155         if (rc != 0)
156                 GOTO(out, rc);
157
158         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
159                                   lustre_swab_ost_body);
160         if (body) {
161                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
162                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
163
164                 /* This should really be sent by the OST */
165                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
166                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
167         } else {
168                 CERROR("can't unpack ost_body\n");
169                 rc = -EPROTO;
170                 aa->aa_oi->oi_oa->o_valid = 0;
171         }
172 out:
173         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
174         RETURN(rc);
175 }
176
177 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
178                              struct ptlrpc_request_set *set)
179 {
180         struct ptlrpc_request *req;
181         struct ost_body *body;
182         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
183         struct osc_async_args *aa;
184         ENTRY;
185
186         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
187                               OST_GETATTR, 2, size,NULL);
188         if (!req)
189                 RETURN(-ENOMEM);
190
191         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
192         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
193
194         ptlrpc_req_set_repsize(req, 2, size);
195         req->rq_interpret_reply = osc_getattr_interpret;
196
197         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
198         aa = (struct osc_async_args *)&req->rq_async_args;
199         aa->aa_oi = oinfo;
200
201         ptlrpc_set_add_req(set, req);
202         RETURN (0);
203 }
204
205 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
206 {
207         struct ptlrpc_request *req;
208         struct ost_body *body;
209         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
210         ENTRY;
211
212         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
213                               OST_GETATTR, 2, size, NULL);
214         if (!req)
215                 RETURN(-ENOMEM);
216
217         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
218         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
219
220         ptlrpc_req_set_repsize(req, 2, size);
221
222         rc = ptlrpc_queue_wait(req);
223         if (rc) {
224                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
225                 GOTO(out, rc);
226         }
227
228         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
229                                   lustre_swab_ost_body);
230         if (body == NULL) {
231                 CERROR ("can't unpack ost_body\n");
232                 GOTO (out, rc = -EPROTO);
233         }
234
235         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
236         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
237
238         /* This should really be sent by the OST */
239         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
240         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
241
242         EXIT;
243  out:
244         ptlrpc_req_finished(req);
245         return rc;
246 }
247
248 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
249                        struct obd_trans_info *oti)
250 {
251         struct ptlrpc_request *req;
252         struct ost_body *body;
253         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
254         ENTRY;
255
256         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
257                               OST_SETATTR, 2, size, NULL);
258         if (!req)
259                 RETURN(-ENOMEM);
260
261         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
262         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
263
264         ptlrpc_req_set_repsize(req, 2, size);
265
266         rc = ptlrpc_queue_wait(req);
267         if (rc)
268                 GOTO(out, rc);
269
270         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
271                                   lustre_swab_ost_body);
272         if (body == NULL)
273                 GOTO(out, rc = -EPROTO);
274
275         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
276
277         EXIT;
278 out:
279         ptlrpc_req_finished(req);
280         RETURN(rc);
281 }
282
283 static int osc_setattr_interpret(struct ptlrpc_request *req,
284                                  struct osc_async_args *aa, int rc)
285 {
286         struct ost_body *body;
287         ENTRY;
288
289         if (rc != 0)
290                 GOTO(out, rc);
291
292         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
293                                   lustre_swab_ost_body);
294         if (body == NULL) {
295                 CERROR("can't unpack ost_body\n");
296                 GOTO(out, rc = -EPROTO);
297         }
298
299         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
300 out:
301         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
302         RETURN(rc);
303 }
304
305 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
306                              struct obd_trans_info *oti,
307                              struct ptlrpc_request_set *rqset)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body *body;
311         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
312         struct osc_async_args *aa;
313         ENTRY;
314
315         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
316                               OST_SETATTR, 2, size, NULL);
317         if (!req)
318                 RETURN(-ENOMEM);
319
320         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
321
322         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
323                 LASSERT(oti);
324                 memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
325                        sizeof(*oti->oti_logcookies));
326         }
327
328         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
329         ptlrpc_req_set_repsize(req, 2, size);
330         /* do mds to ost setattr asynchronouly */
331         if (!rqset) {
332                 /* Do not wait for response. */
333                 ptlrpcd_add_req(req);
334         } else {
335                 req->rq_interpret_reply = osc_setattr_interpret;
336
337                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
338                 aa = (struct osc_async_args *)&req->rq_async_args;
339                 aa->aa_oi = oinfo;
340
341                 ptlrpc_set_add_req(rqset, req);
342         }
343
344         RETURN(0);
345 }
346
347 int osc_real_create(struct obd_export *exp, struct obdo *oa,
348                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
349 {
350         struct ptlrpc_request *req;
351         struct ost_body *body;
352         struct lov_stripe_md *lsm;
353         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
354         ENTRY;
355
356         LASSERT(oa);
357         LASSERT(ea);
358
359         lsm = *ea;
360         if (!lsm) {
361                 rc = obd_alloc_memmd(exp, &lsm);
362                 if (rc < 0)
363                         RETURN(rc);
364         }
365
366         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
367                               OST_CREATE, 2, size, NULL);
368         if (!req)
369                 GOTO(out, rc = -ENOMEM);
370
371         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
372         memcpy(&body->oa, oa, sizeof(body->oa));
373
374         ptlrpc_req_set_repsize(req, 2, size);
375         if (oa->o_valid & OBD_MD_FLINLINE) {
376                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
377                         oa->o_flags == OBD_FL_DELORPHAN);
378                 DEBUG_REQ(D_HA, req,
379                           "delorphan from OST integration");
380                 /* Don't resend the delorphan req */
381                 req->rq_no_resend = req->rq_no_delay = 1;
382         }
383
384         rc = ptlrpc_queue_wait(req);
385         if (rc)
386                 GOTO(out_req, rc);
387
388         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
389                                   lustre_swab_ost_body);
390         if (body == NULL) {
391                 CERROR ("can't unpack ost_body\n");
392                 GOTO (out_req, rc = -EPROTO);
393         }
394
395         memcpy(oa, &body->oa, sizeof(*oa));
396
397         /* This should really be sent by the OST */
398         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
399         oa->o_valid |= OBD_MD_FLBLKSZ;
400
401         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
402          * have valid lsm_oinfo data structs, so don't go touching that.
403          * This needs to be fixed in a big way.
404          */
405         lsm->lsm_object_id = oa->o_id;
406         *ea = lsm;
407
408         if (oti != NULL) {
409                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
410
411                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
412                         if (!oti->oti_logcookies)
413                                 oti_alloc_cookies(oti, 1);
414                         memcpy(oti->oti_logcookies, obdo_logcookie(oa),
415                                sizeof(oti->oti_onecookie));
416                 }
417         }
418
419         CDEBUG(D_HA, "transno: "LPD64"\n",
420                lustre_msg_get_transno(req->rq_repmsg));
421         EXIT;
422 out_req:
423         ptlrpc_req_finished(req);
424 out:
425         if (rc && !*ea)
426                 obd_free_memmd(exp, &lsm);
427         return rc;
428 }
429
430 static int osc_punch_interpret(struct ptlrpc_request *req,
431                                struct osc_async_args *aa, int rc)
432 {
433         struct ost_body *body;
434         ENTRY;
435
436         if (rc != 0)
437                 GOTO(out, rc);
438
439         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
440                                   lustre_swab_ost_body);
441         if (body == NULL) {
442                 CERROR ("can't unpack ost_body\n");
443                 GOTO(out, rc = -EPROTO);
444         }
445
446         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
447 out:
448         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
449         RETURN(rc);
450 }
451
452 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
453                      struct obd_trans_info *oti,
454                      struct ptlrpc_request_set *rqset)
455 {
456         struct ptlrpc_request *req;
457         struct osc_async_args *aa;
458         struct ost_body *body;
459         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
460         ENTRY;
461
462         if (!oinfo->oi_oa) {
463                 CERROR("oa NULL\n");
464                 RETURN(-EINVAL);
465         }
466
467         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
468                               OST_PUNCH, 2, size, NULL);
469         if (!req)
470                 RETURN(-ENOMEM);
471
472         /* FIXME bug 249. Also see bug 7198 */
473         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
474             OBD_CONNECT_REQPORTAL)
475                 req->rq_request_portal = OST_IO_PORTAL;
476
477         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
478         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
479
480         /* overload the size and blocks fields in the oa with start/end */
481         body->oa.o_size = oinfo->oi_policy.l_extent.start;
482         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
483         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
484
485         ptlrpc_req_set_repsize(req, 2, size);
486
487         req->rq_interpret_reply = osc_punch_interpret;
488         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
489         aa = (struct osc_async_args *)&req->rq_async_args;
490         aa->aa_oi = oinfo;
491         ptlrpc_set_add_req(rqset, req);
492
493         RETURN(0);
494 }
495
496 static int osc_sync(struct obd_export *exp, struct obdo *oa,
497                     struct lov_stripe_md *md, obd_size start, obd_size end)
498 {
499         struct ptlrpc_request *req;
500         struct ost_body *body;
501         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
502         ENTRY;
503
504         if (!oa) {
505                 CERROR("oa NULL\n");
506                 RETURN(-EINVAL);
507         }
508
509         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
510                               OST_SYNC, 2, size, NULL);
511         if (!req)
512                 RETURN(-ENOMEM);
513
514         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
515         memcpy(&body->oa, oa, sizeof(*oa));
516
517         /* overload the size and blocks fields in the oa with start/end */
518         body->oa.o_size = start;
519         body->oa.o_blocks = end;
520         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
521
522         ptlrpc_req_set_repsize(req, 2, size);
523
524         rc = ptlrpc_queue_wait(req);
525         if (rc)
526                 GOTO(out, rc);
527
528         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
529                                   lustre_swab_ost_body);
530         if (body == NULL) {
531                 CERROR ("can't unpack ost_body\n");
532                 GOTO (out, rc = -EPROTO);
533         }
534
535         memcpy(oa, &body->oa, sizeof(*oa));
536
537         EXIT;
538  out:
539         ptlrpc_req_finished(req);
540         return rc;
541 }
542
543 /* Destroy requests can be async always on the client, and we don't even really
544  * care about the return code since the client cannot do anything at all about
545  * a destroy failure.
546  * When the MDS is unlinking a filename, it saves the file objects into a
547  * recovery llog, and these object records are cancelled when the OST reports
548  * they were destroyed and sync'd to disk (i.e. transaction committed).
549  * If the client dies, or the OST is down when the object should be destroyed,
550  * the records are not cancelled, and when the OST reconnects to the MDS next,
551  * it will retrieve the llog unlink logs and then sends the log cancellation
552  * cookies to the MDS after committing destroy transactions. */
553 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
554                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
555                        struct obd_export *md_export)
556 {
557         struct ptlrpc_request *req;
558         struct ost_body *body;
559         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
560         ENTRY;
561
562         if (!oa) {
563                 CERROR("oa NULL\n");
564                 RETURN(-EINVAL);
565         }
566
567         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
568                               OST_DESTROY, 2, size, NULL);
569         if (!req)
570                 RETURN(-ENOMEM);
571
572         /* FIXME bug 249. Also see bug 7198 */
573         if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
574             OBD_CONNECT_REQPORTAL)
575                 req->rq_request_portal = OST_IO_PORTAL;
576
577         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
578
579         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
580                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
581                        sizeof(*oti->oti_logcookies));
582         }
583
584         memcpy(&body->oa, oa, sizeof(*oa));
585         ptlrpc_req_set_repsize(req, 2, size);
586
587         ptlrpcd_add_req(req);
588         RETURN(0);
589 }
590
591 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
592                                 long writing_bytes)
593 {
594         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
595
596         LASSERT(!(oa->o_valid & bits));
597
598         oa->o_valid |= bits;
599         client_obd_list_lock(&cli->cl_loi_list_lock);
600         oa->o_dirty = cli->cl_dirty;
601         if (cli->cl_dirty > cli->cl_dirty_max) {
602                 CERROR("dirty %lu > dirty_max %lu\n",
603                        cli->cl_dirty, cli->cl_dirty_max);
604                 oa->o_undirty = 0;
605         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
606                 CERROR("dirty %d > system dirty_max %d\n",
607                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
608                 oa->o_undirty = 0;
609         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
610                 CERROR("dirty %lu - dirty_max %lu too big???\n",
611                        cli->cl_dirty, cli->cl_dirty_max);
612                 oa->o_undirty = 0;
613         } else {
614                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
615                                 (cli->cl_max_rpcs_in_flight + 1);
616                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
617         }
618         oa->o_grant = cli->cl_avail_grant;
619         oa->o_dropped = cli->cl_lost_grant;
620         cli->cl_lost_grant = 0;
621         client_obd_list_unlock(&cli->cl_loi_list_lock);
622         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
623                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
624 }
625
626 /* caller must hold loi_list_lock */
627 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
628 {
629         atomic_inc(&obd_dirty_pages);
630         cli->cl_dirty += CFS_PAGE_SIZE;
631         cli->cl_avail_grant -= CFS_PAGE_SIZE;
632         pga->flag |= OBD_BRW_FROM_GRANT;
633         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
634                CFS_PAGE_SIZE, pga, pga->pg);
635         LASSERT(cli->cl_avail_grant >= 0);
636 }
637
638 /* the companion to osc_consume_write_grant, called when a brw has completed.
639  * must be called with the loi lock held. */
640 static void osc_release_write_grant(struct client_obd *cli,
641                                     struct brw_page *pga, int sent)
642 {
643         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
644         ENTRY;
645
646         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
647                 EXIT;
648                 return;
649         }
650
651         pga->flag &= ~OBD_BRW_FROM_GRANT;
652         atomic_dec(&obd_dirty_pages);
653         cli->cl_dirty -= CFS_PAGE_SIZE;
654         if (!sent) {
655                 cli->cl_lost_grant += CFS_PAGE_SIZE;
656                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
657                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
658         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
659                 /* For short writes we shouldn't count parts of pages that
660                  * span a whole block on the OST side, or our accounting goes
661                  * wrong.  Should match the code in filter_grant_check. */
662                 int offset = pga->off & ~CFS_PAGE_MASK;
663                 int count = pga->count + (offset & (blocksize - 1));
664                 int end = (offset + pga->count) & (blocksize - 1);
665                 if (end)
666                         count += blocksize - end;
667
668                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
669                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
670                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
671                        cli->cl_avail_grant, cli->cl_dirty);
672         }
673
674         EXIT;
675 }
676
677 static unsigned long rpcs_in_flight(struct client_obd *cli)
678 {
679         return cli->cl_r_in_flight + cli->cl_w_in_flight;
680 }
681
682 /* caller must hold loi_list_lock */
683 void osc_wake_cache_waiters(struct client_obd *cli)
684 {
685         struct list_head *l, *tmp;
686         struct osc_cache_waiter *ocw;
687
688         ENTRY;
689         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
690                 /* if we can't dirty more, we must wait until some is written */
691                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
692                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
693                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
694                                "osc max %ld, sys max %d\n", cli->cl_dirty,
695                                cli->cl_dirty_max, obd_max_dirty_pages);
696                         return;
697                 }
698
699                 /* if still dirty cache but no grant wait for pending RPCs that
700                  * may yet return us some grant before doing sync writes */
701                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
702                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
703                                cli->cl_w_in_flight);
704                         return;
705                 }
706
707                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
708                 list_del_init(&ocw->ocw_entry);
709                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
710                         /* no more RPCs in flight to return grant, do sync IO */
711                         ocw->ocw_rc = -EDQUOT;
712                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
713                 } else {
714                         osc_consume_write_grant(cli,
715                                                 &ocw->ocw_oap->oap_brw_page);
716                 }
717
718                 cfs_waitq_signal(&ocw->ocw_waitq);
719         }
720
721         EXIT;
722 }
723
724 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
725 {
726         client_obd_list_lock(&cli->cl_loi_list_lock);
727         cli->cl_avail_grant = ocd->ocd_grant;
728         client_obd_list_unlock(&cli->cl_loi_list_lock);
729
730         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
731                cli->cl_avail_grant, cli->cl_lost_grant);
732         LASSERT(cli->cl_avail_grant >= 0);
733 }
734
735 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
736 {
737         client_obd_list_lock(&cli->cl_loi_list_lock);
738         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
739         cli->cl_avail_grant += body->oa.o_grant;
740         /* waiters are woken in brw_interpret_oap */
741         client_obd_list_unlock(&cli->cl_loi_list_lock);
742 }
743
744 /* We assume that the reason this OSC got a short read is because it read
745  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
746  * via the LOV, and it _knows_ it's reading inside the file, it's just that
747  * this stripe never got written at or beyond this stripe offset yet. */
748 static void handle_short_read(int nob_read, obd_count page_count,
749                               struct brw_page **pga)
750 {
751         char *ptr;
752         int i = 0;
753
754         /* skip bytes read OK */
755         while (nob_read > 0) {
756                 LASSERT (page_count > 0);
757
758                 if (pga[i]->count > nob_read) {
759                         /* EOF inside this page */
760                         ptr = cfs_kmap(pga[i]->pg) + 
761                                 (pga[i]->off & ~CFS_PAGE_MASK);
762                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
763                         cfs_kunmap(pga[i]->pg);
764                         page_count--;
765                         i++;
766                         break;
767                 }
768
769                 nob_read -= pga[i]->count;
770                 page_count--;
771                 i++;
772         }
773
774         /* zero remaining pages */
775         while (page_count-- > 0) {
776                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
777                 memset(ptr, 0, pga[i]->count);
778                 cfs_kunmap(pga[i]->pg);
779                 i++;
780         }
781 }
782
783 static int check_write_rcs(struct ptlrpc_request *req,
784                            int requested_nob, int niocount,
785                            obd_count page_count, struct brw_page **pga)
786 {
787         int    *remote_rcs, i;
788
789         /* return error if any niobuf was in error */
790         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
791                                         sizeof(*remote_rcs) * niocount, NULL);
792         if (remote_rcs == NULL) {
793                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
794                 return(-EPROTO);
795         }
796         if (lustre_msg_swabbed(req->rq_repmsg))
797                 for (i = 0; i < niocount; i++)
798                         __swab32s(&remote_rcs[i]);
799
800         for (i = 0; i < niocount; i++) {
801                 if (remote_rcs[i] < 0)
802                         return(remote_rcs[i]);
803
804                 if (remote_rcs[i] != 0) {
805                         CERROR("rc[%d] invalid (%d) req %p\n",
806                                 i, remote_rcs[i], req);
807                         return(-EPROTO);
808                 }
809         }
810
811         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
812                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
813                        requested_nob, req->rq_bulk->bd_nob_transferred);
814                 return(-EPROTO);
815         }
816
817         return (0);
818 }
819
820 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
821 {
822         if (p1->flag != p2->flag) {
823                 unsigned mask = ~OBD_BRW_FROM_GRANT;
824
825                 /* warn if we try to combine flags that we don't know to be
826                  * safe to combine */
827                 if ((p1->flag & mask) != (p2->flag & mask))
828                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
829                                "same brw?\n", p1->flag, p2->flag);
830                 return 0;
831         }
832
833         return (p1->off + p1->count == p2->off);
834 }
835
836 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
837                                    struct brw_page **pga)
838 {
839         __u32 cksum = ~0;
840         int i = 0;
841
842         LASSERT (pg_count > 0);
843         while (nob > 0 && pg_count > 0) {
844                 char *ptr = cfs_kmap(pga[i]->pg);
845                 int off = pga[i]->off & ~CFS_PAGE_MASK;
846                 int count = pga[i]->count > nob ? nob : pga[i]->count;
847
848                 /* corrupt the data before we compute the checksum, to
849                  * simulate an OST->client data error */
850                 if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
851                         memcpy(ptr + off, "bad1", min(4, nob));
852                 cksum = crc32_le(cksum, ptr + off, count);
853                 cfs_kunmap(pga[i]->pg);
854                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
855                                off, cksum);
856
857                 nob -= pga[i]->count;
858                 pg_count--;
859                 i++;
860         }
861         /* For sending we only compute the wrong checksum instead
862          * of corrupting the data so it is still correct on a redo */
863         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
864                 cksum++;
865
866         return cksum;
867 }
868
869 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
870                                 struct lov_stripe_md *lsm, obd_count page_count,
871                                 struct brw_page **pga,
872                                 struct ptlrpc_request **reqp)
873 {
874         struct ptlrpc_request   *req;
875         struct ptlrpc_bulk_desc *desc;
876         struct ost_body         *body;
877         struct obd_ioobj        *ioobj;
878         struct niobuf_remote    *niobuf;
879         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
880         int niocount, i, requested_nob, opc, rc;
881         struct ptlrpc_request_pool *pool;
882         struct osc_brw_async_args *aa;
883
884         ENTRY;
885         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
886         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
887
888         for (niocount = i = 1; i < page_count; i++) {
889                 if (!can_merge_pages(pga[i - 1], pga[i]))
890                         niocount++;
891         }
892
893         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
894         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
895
896         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
897         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
898                                    NULL, pool);
899         if (req == NULL)
900                 RETURN (-ENOMEM);
901
902         /* FIXME bug 249. Also see bug 7198 */
903         if (cli->cl_import->imp_connect_data.ocd_connect_flags &
904             OBD_CONNECT_REQPORTAL)
905                 req->rq_request_portal = OST_IO_PORTAL;
906
907         if (opc == OST_WRITE)
908                 desc = ptlrpc_prep_bulk_imp (req, page_count,
909                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
910         else
911                 desc = ptlrpc_prep_bulk_imp (req, page_count,
912                                              BULK_PUT_SINK, OST_BULK_PORTAL);
913         if (desc == NULL)
914                 GOTO(out, rc = -ENOMEM);
915         /* NB request now owns desc and will free it when it gets freed */
916
917         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
918         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
919         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
920                                 niocount * sizeof(*niobuf));
921
922         memcpy(&body->oa, oa, sizeof(*oa));
923
924         obdo_to_ioobj(oa, ioobj);
925         ioobj->ioo_bufcnt = niocount;
926
927         LASSERT (page_count > 0);
928         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
929                 struct brw_page *pg = pga[i];
930                 struct brw_page *pg_prev = pga[i - 1];
931
932                 LASSERT(pg->count > 0);
933                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
934                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
935                          pg->off, pg->count);
936 #ifdef __LINUX__
937                 LASSERTF(i == 0 || pg->off > pg_prev->off,
938                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
939                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
940                          i, page_count,
941                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
942                          pg_prev->pg, page_private(pg_prev->pg),
943                          pg_prev->pg->index, pg_prev->off);
944 #else
945                 LASSERTF(i == 0 || pg->off > pg_prev->off,
946                          "i %d p_c %u\n", i, page_count);
947 #endif
948                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
949                         (pg->flag & OBD_BRW_SRVLOCK));
950
951                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
952                                       pg->count);
953                 requested_nob += pg->count;
954
955                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
956                         niobuf--;
957                         niobuf->len += pg->count;
958                 } else {
959                         niobuf->offset = pg->off;
960                         niobuf->len    = pg->count;
961                         niobuf->flags  = pg->flag;
962                 }
963         }
964
965         LASSERT((void *)(niobuf - niocount) ==
966                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
967                                niocount * sizeof(*niobuf)));
968         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
969
970         /* size[REQ_REC_OFF] still sizeof (*body) */
971         if (opc == OST_WRITE) {
972                 if (unlikely(cli->cl_checksum)) {
973                         body->oa.o_valid |= OBD_MD_FLCKSUM;
974                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
975                                                              page_count, pga);
976                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
977                                body->oa.o_cksum);
978                         /* save this in 'oa', too, for later checking */
979                         oa->o_valid |= OBD_MD_FLCKSUM;
980                 } else {
981                         /* clear out the checksum flag, in case this is a
982                          * resend but cl_checksum is no longer set. b=11238 */
983                         oa->o_valid &= ~OBD_MD_FLCKSUM;
984                 }
985                 oa->o_cksum = body->oa.o_cksum;
986                 /* 1 RC per niobuf */
987                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
988                 ptlrpc_req_set_repsize(req, 3, size);
989         } else {
990                 if (unlikely(cli->cl_checksum))
991                         body->oa.o_valid |= OBD_MD_FLCKSUM;
992                 /* 1 RC for the whole I/O */
993                 ptlrpc_req_set_repsize(req, 2, size);
994         }
995
996         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
997         aa = (struct osc_brw_async_args *)&req->rq_async_args;
998         aa->aa_oa = oa;
999         aa->aa_requested_nob = requested_nob;
1000         aa->aa_nio_count = niocount;
1001         aa->aa_page_count = page_count;
1002         aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
1003         aa->aa_ppga = pga;
1004         aa->aa_cli = cli;
1005         INIT_LIST_HEAD(&aa->aa_oaps);
1006
1007         *reqp = req;
1008         RETURN (0);
1009
1010  out:
1011         ptlrpc_req_finished (req);
1012         RETURN (rc);
1013 }
1014
1015 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1016                                  __u32 client_cksum, __u32 server_cksum, int nob,
1017                                  obd_count page_count, struct brw_page **pga)
1018 {
1019         __u32 new_cksum;
1020         char *msg;
1021
1022         if (server_cksum == client_cksum) {
1023                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1024                 return 0;
1025         }
1026
1027         new_cksum = osc_checksum_bulk(nob, page_count, pga);
1028
1029         if (new_cksum == server_cksum)
1030                 msg = "changed on the client after we checksummed it";
1031         else if (new_cksum == client_cksum)
1032                 msg = "changed in transit before arrival at OST";
1033         else
1034                 msg = "changed in transit AND doesn't match the original";
1035
1036         LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
1037                        " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1038                        msg, libcfs_nid2str(peer->nid),
1039                        oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1040                        oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
1041                        oa->o_id,
1042                        oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1043                        pga[0]->off,
1044                        pga[page_count-1]->off + pga[page_count-1]->count - 1);
1045         CERROR("original client csum %x, server csum %x, client csum now %x\n",
1046                client_cksum, server_cksum, new_cksum);
1047
1048         return 1;
1049 }
1050
1051 /* Note rc enters this function as number of bytes transferred */
1052 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1053 {
1054         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1055         const lnet_process_id_t *peer =
1056                         &req->rq_import->imp_connection->c_peer;
1057         struct client_obd *cli = aa->aa_cli;
1058         struct ost_body *body;
1059         __u32 client_cksum = 0;
1060         ENTRY;
1061
1062         if (rc < 0 && rc != -EDQUOT)
1063                 RETURN(rc);
1064
1065         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1066         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1067                                   lustre_swab_ost_body);
1068         if (body == NULL) {
1069                 CERROR ("Can't unpack body\n");
1070                 RETURN(-EPROTO);
1071         }
1072
1073         /* set/clear over quota flag for a uid/gid */
1074         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1075             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1076                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1077                              body->oa.o_gid, body->oa.o_valid,
1078                              body->oa.o_flags);
1079
1080         if (rc < 0)
1081                 RETURN(rc);
1082
1083         if (unlikely(aa->aa_oa->o_valid & OBD_MD_FLCKSUM))
1084                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1085
1086         osc_update_grant(cli, body);
1087
1088         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1089                 if (rc > 0) {
1090                         CERROR ("Unexpected +ve rc %d\n", rc);
1091                         RETURN(-EPROTO);
1092                 }
1093                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1094
1095                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
1096                              client_cksum &&
1097                              check_write_checksum(&body->oa, peer, client_cksum,
1098                                                  body->oa.o_cksum,
1099                                                  aa->aa_requested_nob,
1100                                                  aa->aa_page_count,
1101                                                  aa->aa_ppga)))
1102                         RETURN(-EAGAIN);
1103
1104                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1105                                      aa->aa_page_count, aa->aa_ppga);
1106                 GOTO(out, rc);
1107         }
1108
1109         /* The rest of this function executes only for OST_READs */
1110         if (rc > aa->aa_requested_nob) {
1111                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1112                        aa->aa_requested_nob);
1113                 RETURN(-EPROTO);
1114         }
1115
1116         if (rc != req->rq_bulk->bd_nob_transferred) {
1117                 CERROR ("Unexpected rc %d (%d transferred)\n",
1118                         rc, req->rq_bulk->bd_nob_transferred);
1119                 return (-EPROTO);
1120         }
1121
1122         if (rc < aa->aa_requested_nob)
1123                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1124
1125         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
1126                 static int cksum_counter;
1127                 __u32 server_cksum = body->oa.o_cksum;
1128                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1129                                                  aa->aa_ppga);
1130
1131                 if (server_cksum == ~0 && rc > 0) {
1132                         CERROR("Protocol error: server %s set the 'checksum' "
1133                                "bit, but didn't send a checksum.  Not fatal, "
1134                                "but please tell CFS.\n",
1135                                libcfs_nid2str(peer->nid));
1136                 } else if (server_cksum != client_cksum) {
1137                         LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
1138                                        LPU64"/"LPU64" object "LPU64"/"LPU64
1139                                        " extent ["LPU64"-"LPU64"]\n",
1140                                        req->rq_import->imp_obd->obd_name,
1141                                        libcfs_nid2str(peer->nid),
1142                                        body->oa.o_valid & OBD_MD_FLFID ?
1143                                                 body->oa.o_fid : (__u64)0,
1144                                        body->oa.o_valid & OBD_MD_FLFID ?
1145                                                 body->oa.o_generation :(__u64)0,
1146                                        body->oa.o_id,
1147                                        body->oa.o_valid & OBD_MD_FLGROUP ?
1148                                                 body->oa.o_gr : (__u64)0,
1149                                        aa->aa_ppga[0]->off,
1150                                        aa->aa_ppga[aa->aa_page_count-1]->off +
1151                                        aa->aa_ppga[aa->aa_page_count-1]->count -
1152                                                                         1);
1153                         CERROR("client %x, server %x\n",
1154                                client_cksum, server_cksum);
1155                         cksum_counter = 0;
1156                         aa->aa_oa->o_cksum = client_cksum;
1157                         rc = -EAGAIN;
1158                 } else {
1159                         cksum_counter++;
1160                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1161                         rc = 0;
1162                 }
1163         } else if (unlikely(client_cksum)) {
1164                 static int cksum_missed;
1165
1166                 cksum_missed++;
1167                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1168                         CERROR("Checksum %u requested from %s but not sent\n",
1169                                cksum_missed, libcfs_nid2str(peer->nid));
1170         } else {
1171                 rc = 0;
1172         }
1173 out:
1174         if (rc >= 0)
1175                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1176
1177         RETURN(rc);
1178 }
1179
1180 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1181                             struct lov_stripe_md *lsm,
1182                             obd_count page_count, struct brw_page **pga)
1183 {
1184         struct ptlrpc_request *request;
1185         int                    rc, retries = 5; /* lprocfs? */
1186         ENTRY;
1187
1188 restart_bulk:
1189         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1190                                   page_count, pga, &request);
1191         if (rc != 0)
1192                 return (rc);
1193
1194         rc = ptlrpc_queue_wait(request);
1195
1196         if (rc == -ETIMEDOUT && request->rq_resend) {
1197                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1198                 ptlrpc_req_finished(request);
1199                 goto restart_bulk;
1200         }
1201
1202         rc = osc_brw_fini_request(request, rc);
1203
1204         ptlrpc_req_finished(request);
1205         if (rc == -EAGAIN) {
1206                 if (retries-- > 0)
1207                         goto restart_bulk;
1208                 rc = -EIO;
1209         }
1210         RETURN(rc);
1211 }
1212
1213 int osc_brw_redo_request(struct ptlrpc_request *request,
1214                          struct osc_brw_async_args *aa)
1215 {
1216         struct ptlrpc_request *new_req;
1217         struct ptlrpc_request_set *set = request->rq_set;
1218         struct osc_brw_async_args *new_aa;
1219         struct osc_async_page *oap;
1220         int rc = 0;
1221         ENTRY;
1222
1223         if (aa->aa_retries-- <= 0) {
1224                 CERROR("too many checksum retries, returning error\n");
1225                 RETURN(-EIO);
1226         }
1227
1228         DEBUG_REQ(D_ERROR, request, "redo for checksum error");
1229         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1230                 if (oap->oap_request != NULL) {
1231                         LASSERTF(request == oap->oap_request,
1232                                  "request %p != oap_request %p\n",
1233                                  request, oap->oap_request);
1234                         if (oap->oap_interrupted) {
1235                                 ptlrpc_mark_interrupted(oap->oap_request);
1236                                 rc = -EINTR;
1237                                 break;
1238                         }
1239                 }
1240         }
1241         if (rc)
1242                 RETURN(rc);
1243
1244         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1245                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1246                                   aa->aa_cli, aa->aa_oa,
1247                                   NULL /* lsm unused by osc currently */,
1248                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1249         if (rc)
1250                 RETURN(rc);
1251
1252         /* New request takes over pga and oaps from old request.
1253          * Note that copying a list_head doesn't work, need to move it... */
1254         new_req->rq_interpret_reply = request->rq_interpret_reply;
1255         new_req->rq_async_args = request->rq_async_args;
1256         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1257         INIT_LIST_HEAD(&new_aa->aa_oaps);
1258         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1259         INIT_LIST_HEAD(&aa->aa_oaps);
1260
1261         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1262                 if (oap->oap_request) {
1263                         ptlrpc_req_finished(oap->oap_request);
1264                         oap->oap_request = ptlrpc_request_addref(new_req);
1265                 }
1266         }
1267
1268         ptlrpc_set_add_req(set, new_req);
1269
1270         RETURN(0);
1271 }
1272
1273 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1274 {
1275         struct osc_brw_async_args *aa = data;
1276         int                        i;
1277         ENTRY;
1278
1279         rc = osc_brw_fini_request(request, rc);
1280         if (rc == -EAGAIN) {
1281                 rc = osc_brw_redo_request(request, aa);
1282                 if (rc == 0)
1283                         RETURN(0);
1284         }
1285
1286         spin_lock(&aa->aa_cli->cl_loi_list_lock);
1287         for (i = 0; i < aa->aa_page_count; i++)
1288                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1289         spin_unlock(&aa->aa_cli->cl_loi_list_lock);
1290
1291         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1292
1293         RETURN(rc);
1294 }
1295
1296 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1297                           struct lov_stripe_md *lsm, obd_count page_count,
1298                           struct brw_page **pga, struct ptlrpc_request_set *set)
1299 {
1300         struct ptlrpc_request     *request;
1301         struct client_obd         *cli = &exp->exp_obd->u.cli;
1302         int                        rc, i;
1303         ENTRY;
1304
1305         /* Consume write credits even if doing a sync write -
1306          * otherwise we may run out of space on OST due to grant. */
1307         spin_lock(&cli->cl_loi_list_lock);
1308         for (i = 0; i < page_count; i++) {
1309                 if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1310                         osc_consume_write_grant(cli, pga[i]);
1311         }
1312         spin_unlock(&cli->cl_loi_list_lock);
1313
1314         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1315                                   page_count, pga, &request);
1316
1317         if (rc == 0) {
1318                 request->rq_interpret_reply = brw_interpret;
1319                 ptlrpc_set_add_req(set, request);
1320         } else {
1321                 spin_lock(&cli->cl_loi_list_lock);
1322                 for (i = 0; i < page_count; i++)
1323                         osc_release_write_grant(cli, pga[i], 0);
1324                 spin_unlock(&cli->cl_loi_list_lock);
1325         }
1326
1327         RETURN (rc);
1328 }
1329
1330 /*
1331  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1332  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1333  * fine for our small page arrays and doesn't require allocation.  its an
1334  * insertion sort that swaps elements that are strides apart, shrinking the
1335  * stride down until its '1' and the array is sorted.
1336  */
1337 static void sort_brw_pages(struct brw_page **array, int num)
1338 {
1339         int stride, i, j;
1340         struct brw_page *tmp;
1341
1342         if (num == 1)
1343                 return;
1344         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1345                 ;
1346
1347         do {
1348                 stride /= 3;
1349                 for (i = stride ; i < num ; i++) {
1350                         tmp = array[i];
1351                         j = i;
1352                         while (j >= stride && array[j-stride]->off > tmp->off) {
1353                                 array[j] = array[j - stride];
1354                                 j -= stride;
1355                         }
1356                         array[j] = tmp;
1357                 }
1358         } while (stride > 1);
1359 }
1360
1361 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1362 {
1363         int count = 1;
1364         int offset;
1365         int i = 0;
1366
1367         LASSERT (pages > 0);
1368         offset = pg[i]->off & (~CFS_PAGE_MASK);
1369
1370         for (;;) {
1371                 pages--;
1372                 if (pages == 0)         /* that's all */
1373                         return count;
1374
1375                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1376                         return count;   /* doesn't end on page boundary */
1377
1378                 i++;
1379                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1380                 if (offset != 0)        /* doesn't start on page boundary */
1381                         return count;
1382
1383                 count++;
1384         }
1385 }
1386
1387 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1388 {
1389         struct brw_page **ppga;
1390         int i;
1391
1392         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1393         if (ppga == NULL)
1394                 return NULL;
1395
1396         for (i = 0; i < count; i++)
1397                 ppga[i] = pga + i;
1398         return ppga;
1399 }
1400
1401 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1402 {
1403         LASSERT(ppga != NULL);
1404         OBD_FREE(ppga, sizeof(*ppga) * count);
1405 }
1406
1407 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1408                    obd_count page_count, struct brw_page *pga,
1409                    struct obd_trans_info *oti)
1410 {
1411         struct obdo *saved_oa = NULL;
1412         struct brw_page **ppga, **orig;
1413         struct obd_import *imp = class_exp2cliimp(exp);
1414         struct client_obd *cli = &imp->imp_obd->u.cli;
1415         int rc, page_count_orig;
1416         ENTRY;
1417
1418         if (cmd & OBD_BRW_CHECK) {
1419                 /* The caller just wants to know if there's a chance that this
1420                  * I/O can succeed */
1421
1422                 if (imp == NULL || imp->imp_invalid)
1423                         RETURN(-EIO);
1424                 RETURN(0);
1425         }
1426
1427         /* test_brw with a failed create can trip this, maybe others. */
1428         LASSERT(cli->cl_max_pages_per_rpc);
1429
1430         rc = 0;
1431
1432         orig = ppga = osc_build_ppga(pga, page_count);
1433         if (ppga == NULL)
1434                 RETURN(-ENOMEM);
1435         page_count_orig = page_count;
1436
1437         sort_brw_pages(ppga, page_count);
1438         while (page_count) {
1439                 obd_count pages_per_brw;
1440
1441                 if (page_count > cli->cl_max_pages_per_rpc)
1442                         pages_per_brw = cli->cl_max_pages_per_rpc;
1443                 else
1444                         pages_per_brw = page_count;
1445
1446                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1447
1448                 if (saved_oa != NULL) {
1449                         /* restore previously saved oa */
1450                         *oinfo->oi_oa = *saved_oa;
1451                 } else if (page_count > pages_per_brw) {
1452                         /* save a copy of oa (brw will clobber it) */
1453                         saved_oa = obdo_alloc();
1454                         if (saved_oa == NULL)
1455                                 GOTO(out, rc = -ENOMEM);
1456                         *saved_oa = *oinfo->oi_oa;
1457                 }
1458
1459                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1460                                       pages_per_brw, ppga);
1461
1462                 if (rc != 0)
1463                         break;
1464
1465                 page_count -= pages_per_brw;
1466                 ppga += pages_per_brw;
1467         }
1468
1469 out:
1470         osc_release_ppga(orig, page_count_orig);
1471
1472         if (saved_oa != NULL)
1473                 obdo_free(saved_oa);
1474
1475         RETURN(rc);
1476 }
1477
1478 static int osc_brw_async(int cmd, struct obd_export *exp,
1479                          struct obd_info *oinfo, obd_count page_count,
1480                          struct brw_page *pga, struct obd_trans_info *oti,
1481                          struct ptlrpc_request_set *set)
1482 {
1483         struct brw_page **ppga, **orig;
1484         int page_count_orig;
1485         int rc = 0;
1486         ENTRY;
1487
1488         if (cmd & OBD_BRW_CHECK) {
1489                 /* The caller just wants to know if there's a chance that this
1490                  * I/O can succeed */
1491                 struct obd_import *imp = class_exp2cliimp(exp);
1492
1493                 if (imp == NULL || imp->imp_invalid)
1494                         RETURN(-EIO);
1495                 RETURN(0);
1496         }
1497
1498         orig = ppga = osc_build_ppga(pga, page_count);
1499         if (ppga == NULL)
1500                 RETURN(-ENOMEM);
1501         page_count_orig = page_count;
1502
1503         sort_brw_pages(ppga, page_count);
1504         while (page_count) {
1505                 struct brw_page **copy;
1506                 obd_count pages_per_brw;
1507
1508                 pages_per_brw = min_t(obd_count, page_count,
1509                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1510
1511                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1512
1513                 /* use ppga only if single RPC is going to fly */
1514                 if (pages_per_brw != page_count_orig || ppga != orig) {
1515                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1516                         if (copy == NULL)
1517                                 GOTO(out, rc = -ENOMEM);
1518                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1519                 } else
1520                         copy = ppga;
1521
1522                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1523                                     pages_per_brw, copy, set);
1524
1525                 if (rc != 0) {
1526                         if (copy != ppga)
1527                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1528                         break;
1529                 }
1530
1531                 if (copy == orig) {
1532                         /* we passed it to async_internal() which is
1533                          * now responsible for releasing memory */
1534                         orig = NULL;
1535                 }
1536
1537                 page_count -= pages_per_brw;
1538                 ppga += pages_per_brw;
1539         }
1540 out:
1541         if (orig)
1542                 osc_release_ppga(orig, page_count_orig);
1543         RETURN(rc);
1544 }
1545
1546 static void osc_check_rpcs(struct client_obd *cli);
1547
1548 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1549  * the dirty accounting.  Writeback completes or truncate happens before
1550  * writing starts.  Must be called with the loi lock held. */
1551 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1552                            int sent)
1553 {
1554         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1555 }
1556
1557 /* This maintains the lists of pending pages to read/write for a given object
1558  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1559  * to quickly find objects that are ready to send an RPC. */
1560 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1561                          int cmd)
1562 {
1563         int optimal;
1564         ENTRY;
1565
1566         if (lop->lop_num_pending == 0)
1567                 RETURN(0);
1568
1569         /* if we have an invalid import we want to drain the queued pages
1570          * by forcing them through rpcs that immediately fail and complete
1571          * the pages.  recovery relies on this to empty the queued pages
1572          * before canceling the locks and evicting down the llite pages */
1573         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1574                 RETURN(1);
1575
1576         /* stream rpcs in queue order as long as as there is an urgent page
1577          * queued.  this is our cheap solution for good batching in the case
1578          * where writepage marks some random page in the middle of the file
1579          * as urgent because of, say, memory pressure */
1580         if (!list_empty(&lop->lop_urgent)) {
1581                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1582                 RETURN(1);
1583         }
1584
1585         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1586         optimal = cli->cl_max_pages_per_rpc;
1587         if (cmd & OBD_BRW_WRITE) {
1588                 /* trigger a write rpc stream as long as there are dirtiers
1589                  * waiting for space.  as they're waiting, they're not going to
1590                  * create more pages to coallesce with what's waiting.. */
1591                 if (!list_empty(&cli->cl_cache_waiters)) {
1592                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1593                         RETURN(1);
1594                 }
1595
1596                 /* +16 to avoid triggering rpcs that would want to include pages
1597                  * that are being queued but which can't be made ready until
1598                  * the queuer finishes with the page. this is a wart for
1599                  * llite::commit_write() */
1600                 optimal += 16;
1601         }
1602         if (lop->lop_num_pending >= optimal)
1603                 RETURN(1);
1604
1605         RETURN(0);
1606 }
1607
1608 static void on_list(struct list_head *item, struct list_head *list,
1609                     int should_be_on)
1610 {
1611         if (list_empty(item) && should_be_on)
1612                 list_add_tail(item, list);
1613         else if (!list_empty(item) && !should_be_on)
1614                 list_del_init(item);
1615 }
1616
1617 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1618  * can find pages to build into rpcs quickly */
1619 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1620 {
1621         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1622                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1623                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1624
1625         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1626                 loi->loi_write_lop.lop_num_pending);
1627
1628         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1629                 loi->loi_read_lop.lop_num_pending);
1630 }
1631
1632 static void lop_update_pending(struct client_obd *cli,
1633                                struct loi_oap_pages *lop, int cmd, int delta)
1634 {
1635         lop->lop_num_pending += delta;
1636         if (cmd & OBD_BRW_WRITE)
1637                 cli->cl_pending_w_pages += delta;
1638         else
1639                 cli->cl_pending_r_pages += delta;
1640 }
1641
1642 /* this is called when a sync waiter receives an interruption.  Its job is to
1643  * get the caller woken as soon as possible.  If its page hasn't been put in an
1644  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1645  * desiring interruption which will forcefully complete the rpc once the rpc
1646  * has timed out */
1647 static void osc_occ_interrupted(struct oig_callback_context *occ)
1648 {
1649         struct osc_async_page *oap;
1650         struct loi_oap_pages *lop;
1651         struct lov_oinfo *loi;
1652         ENTRY;
1653
1654         /* XXX member_of() */
1655         oap = list_entry(occ, struct osc_async_page, oap_occ);
1656
1657         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1658
1659         oap->oap_interrupted = 1;
1660
1661         /* ok, it's been put in an rpc. only one oap gets a request reference */
1662         if (oap->oap_request != NULL) {
1663                 ptlrpc_mark_interrupted(oap->oap_request);
1664                 ptlrpcd_wake(oap->oap_request);
1665                 GOTO(unlock, 0);
1666         }
1667
1668         /* we don't get interruption callbacks until osc_trigger_group_io()
1669          * has been called and put the sync oaps in the pending/urgent lists.*/
1670         if (!list_empty(&oap->oap_pending_item)) {
1671                 list_del_init(&oap->oap_pending_item);
1672                 list_del_init(&oap->oap_urgent_item);
1673
1674                 loi = oap->oap_loi;
1675                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1676                         &loi->loi_write_lop : &loi->loi_read_lop;
1677                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1678                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1679
1680                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1681                 oap->oap_oig = NULL;
1682         }
1683
1684 unlock:
1685         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1686 }
1687
1688 /* this is trying to propogate async writeback errors back up to the
1689  * application.  As an async write fails we record the error code for later if
1690  * the app does an fsync.  As long as errors persist we force future rpcs to be
1691  * sync so that the app can get a sync error and break the cycle of queueing
1692  * pages for which writeback will fail. */
1693 static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
1694                            int rc)
1695 {
1696         if (rc) {
1697                 if (!ar->ar_rc)
1698                         ar->ar_rc = rc;
1699
1700                 ar->ar_force_sync = 1;
1701                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1702                 return;
1703
1704         }
1705
1706         if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
1707                 ar->ar_force_sync = 0;
1708 }
1709
1710 static void osc_oap_to_pending(struct osc_async_page *oap)
1711 {
1712         struct loi_oap_pages *lop;
1713
1714         if (oap->oap_cmd & OBD_BRW_WRITE)
1715                 lop = &oap->oap_loi->loi_write_lop;
1716         else
1717                 lop = &oap->oap_loi->loi_read_lop;
1718
1719         if (oap->oap_async_flags & ASYNC_URGENT)
1720                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1721         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1722         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1723 }
1724
1725 /* this must be called holding the loi list lock to give coverage to exit_cache,
1726  * async_flag maintenance, and oap_request */
1727 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1728                               struct osc_async_page *oap, int sent, int rc)
1729 {
1730         ENTRY;
1731         oap->oap_async_flags = 0;
1732         oap->oap_interrupted = 0;
1733
1734         if (oap->oap_cmd & OBD_BRW_WRITE) {
1735                 osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
1736                 osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
1737         }
1738
1739         if (oap->oap_request != NULL) {
1740                 ptlrpc_req_finished(oap->oap_request);
1741                 oap->oap_request = NULL;
1742         }
1743
1744         if (rc == 0 && oa != NULL) {
1745                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1746                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1747                 if (oa->o_valid & OBD_MD_FLMTIME)
1748                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1749                 if (oa->o_valid & OBD_MD_FLATIME)
1750                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1751                 if (oa->o_valid & OBD_MD_FLCTIME)
1752                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1753         }
1754
1755         if (oap->oap_oig) {
1756                 osc_exit_cache(cli, oap, sent);
1757                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1758                 oap->oap_oig = NULL;
1759                 EXIT;
1760                 return;
1761         }
1762
1763         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1764                                                 oap->oap_cmd, oa, rc);
1765
1766         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1767          * I/O on the page could start, but OSC calls it under lock
1768          * and thus we can add oap back to pending safely */
1769         if (rc)
1770                 /* upper layer wants to leave the page on pending queue */
1771                 osc_oap_to_pending(oap);
1772         else
1773                 osc_exit_cache(cli, oap, sent);
1774         EXIT;
1775 }
1776
1777 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1778 {
1779         struct osc_brw_async_args *aa = data;
1780         struct osc_async_page *oap, *tmp;
1781         struct client_obd *cli;
1782         ENTRY;
1783
1784         rc = osc_brw_fini_request(request, rc);
1785         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1786         if (rc == -EAGAIN) {
1787                 rc = osc_brw_redo_request(request, aa);
1788                 if (rc == 0)
1789                         RETURN(0);
1790                 GOTO(out, rc);
1791         }
1792
1793         cli = aa->aa_cli;
1794
1795         client_obd_list_lock(&cli->cl_loi_list_lock);
1796
1797         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1798          * is called so we know whether to go to sync BRWs or wait for more
1799          * RPCs to complete */
1800         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1801                 cli->cl_w_in_flight--;
1802         else
1803                 cli->cl_r_in_flight--;
1804
1805         /* the caller may re-use the oap after the completion call so
1806          * we need to clean it up a little */
1807         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1808                 list_del_init(&oap->oap_rpc_item);
1809                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1810         }
1811
1812         osc_wake_cache_waiters(cli);
1813         osc_check_rpcs(cli);
1814
1815         client_obd_list_unlock(&cli->cl_loi_list_lock);
1816
1817         obdo_free(aa->aa_oa);
1818
1819         rc = 0;
1820 out:
1821         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1822         RETURN(rc);
1823 }
1824
1825 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
1826                                             struct list_head *rpc_list,
1827                                             int page_count, int cmd)
1828 {
1829         struct ptlrpc_request *req;
1830         struct brw_page **pga = NULL;
1831         struct osc_brw_async_args *aa;
1832         struct obdo *oa = NULL;
1833         struct obd_async_page_ops *ops = NULL;
1834         void *caller_data = NULL;
1835         struct osc_async_page *oap;
1836         int i, rc;
1837
1838         ENTRY;
1839         LASSERT(!list_empty(rpc_list));
1840
1841         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1842         if (pga == NULL)
1843                 RETURN(ERR_PTR(-ENOMEM));
1844
1845         oa = obdo_alloc();
1846         if (oa == NULL)
1847                 GOTO(out, req = ERR_PTR(-ENOMEM));
1848
1849         i = 0;
1850         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
1851                 if (ops == NULL) {
1852                         ops = oap->oap_caller_ops;
1853                         caller_data = oap->oap_caller_data;
1854                 }
1855                 pga[i] = &oap->oap_brw_page;
1856                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1857                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1858                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
1859                 i++;
1860         }
1861
1862         /* always get the data for the obdo for the rpc */
1863         LASSERT(ops != NULL);
1864         ops->ap_fill_obdo(caller_data, cmd, oa);
1865
1866         sort_brw_pages(pga, page_count);
1867         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
1868         if (rc != 0) {
1869                 CERROR("prep_req failed: %d\n", rc);
1870                 GOTO(out, req = ERR_PTR(rc));
1871         }
1872
1873         /* Need to update the timestamps after the request is built in case
1874          * we race with setattr (locally or in queue at OST).  If OST gets
1875          * later setattr before earlier BRW (as determined by the request xid),
1876          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1877          * way to do this in a single call.  bug 10150 */
1878         ops->ap_update_obdo(caller_data, cmd, oa,
1879                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
1880
1881         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1882         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1883         INIT_LIST_HEAD(&aa->aa_oaps);
1884         list_splice(rpc_list, &aa->aa_oaps);
1885         INIT_LIST_HEAD(rpc_list);
1886
1887 out:
1888         if (IS_ERR(req)) {
1889                 if (oa)
1890                         obdo_free(oa);
1891                 if (pga)
1892                         OBD_FREE(pga, sizeof(*pga) * page_count);
1893         }
1894         RETURN(req);
1895 }
1896
1897 /* the loi lock is held across this function but it's allowed to release
1898  * and reacquire it during its work */
1899 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
1900                             int cmd, struct loi_oap_pages *lop)
1901 {
1902         struct ptlrpc_request *req;
1903         obd_count page_count = 0;
1904         struct osc_async_page *oap = NULL, *tmp;
1905         struct osc_brw_async_args *aa;
1906         struct obd_async_page_ops *ops;
1907         CFS_LIST_HEAD(rpc_list);
1908         unsigned int ending_offset;
1909         unsigned  starting_offset = 0;
1910         ENTRY;
1911
1912         /* first we find the pages we're allowed to work with */
1913         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
1914                 ops = oap->oap_caller_ops;
1915
1916                 LASSERT(oap->oap_magic == OAP_MAGIC);
1917
1918                 /* in llite being 'ready' equates to the page being locked
1919                  * until completion unlocks it.  commit_write submits a page
1920                  * as not ready because its unlock will happen unconditionally
1921                  * as the call returns.  if we race with commit_write giving
1922                  * us that page we dont' want to create a hole in the page
1923                  * stream, so we stop and leave the rpc to be fired by
1924                  * another dirtier or kupdated interval (the not ready page
1925                  * will still be on the dirty list).  we could call in
1926                  * at the end of ll_file_write to process the queue again. */
1927                 if (!(oap->oap_async_flags & ASYNC_READY)) {
1928                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
1929                         if (rc < 0)
1930                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
1931                                                 "instead of ready\n", oap,
1932                                                 oap->oap_page, rc);
1933                         switch (rc) {
1934                         case -EAGAIN:
1935                                 /* llite is telling us that the page is still
1936                                  * in commit_write and that we should try
1937                                  * and put it in an rpc again later.  we
1938                                  * break out of the loop so we don't create
1939                                  * a hole in the sequence of pages in the rpc
1940                                  * stream.*/
1941                                 oap = NULL;
1942                                 break;
1943                         case -EINTR:
1944                                 /* the io isn't needed.. tell the checks
1945                                  * below to complete the rpc with EINTR */
1946                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1947                                 oap->oap_count = -EINTR;
1948                                 break;
1949                         case 0:
1950                                 oap->oap_async_flags |= ASYNC_READY;
1951                                 break;
1952                         default:
1953                                 LASSERTF(0, "oap %p page %p returned %d "
1954                                             "from make_ready\n", oap,
1955                                             oap->oap_page, rc);
1956                                 break;
1957                         }
1958                 }
1959                 if (oap == NULL)
1960                         break;
1961                 /*
1962                  * Page submitted for IO has to be locked. Either by
1963                  * ->ap_make_ready() or by higher layers.
1964                  *
1965                  * XXX nikita: this assertion should be adjusted when lustre
1966                  * starts using PG_writeback for pages being written out.
1967                  */
1968 #if defined(__KERNEL__) && defined(__LINUX__)
1969                 LASSERT(PageLocked(oap->oap_page));
1970 #endif
1971                 /* If there is a gap at the start of this page, it can't merge
1972                  * with any previous page, so we'll hand the network a
1973                  * "fragmented" page array that it can't transfer in 1 RDMA */
1974                 if (page_count != 0 && oap->oap_page_off != 0)
1975                         break;
1976
1977                 /* take the page out of our book-keeping */
1978                 list_del_init(&oap->oap_pending_item);
1979                 lop_update_pending(cli, lop, cmd, -1);
1980                 list_del_init(&oap->oap_urgent_item);
1981
1982                 if (page_count == 0)
1983                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
1984                                           (PTLRPC_MAX_BRW_SIZE - 1);
1985
1986                 /* ask the caller for the size of the io as the rpc leaves. */
1987                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
1988                         oap->oap_count =
1989                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
1990                 if (oap->oap_count <= 0) {
1991                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
1992                                oap->oap_count);
1993                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
1994                         continue;
1995                 }
1996
1997                 /* now put the page back in our accounting */
1998                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1999                 if (++page_count >= cli->cl_max_pages_per_rpc)
2000                         break;
2001
2002                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2003                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2004                  * have the same alignment as the initial writes that allocated
2005                  * extents on the server. */
2006                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2007                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2008                 if (ending_offset == 0)
2009                         break;
2010
2011                 /* If there is a gap at the end of this page, it can't merge
2012                  * with any subsequent pages, so we'll hand the network a
2013                  * "fragmented" page array that it can't transfer in 1 RDMA */
2014                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2015                         break;
2016         }
2017
2018         osc_wake_cache_waiters(cli);
2019
2020         if (page_count == 0)
2021                 RETURN(0);
2022
2023         loi_list_maint(cli, loi);
2024
2025         client_obd_list_unlock(&cli->cl_loi_list_lock);
2026
2027         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2028         if (IS_ERR(req)) {
2029                 /* this should happen rarely and is pretty bad, it makes the
2030                  * pending list not follow the dirty order */
2031                 client_obd_list_lock(&cli->cl_loi_list_lock);
2032                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2033                         list_del_init(&oap->oap_rpc_item);
2034
2035                         /* queued sync pages can be torn down while the pages
2036                          * were between the pending list and the rpc */
2037                         if (oap->oap_interrupted) {
2038                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2039                                 osc_ap_completion(cli, NULL, oap, 0,
2040                                                   oap->oap_count);
2041                                 continue;
2042                         }
2043                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2044                 }
2045                 loi_list_maint(cli, loi);
2046                 RETURN(PTR_ERR(req));
2047         }
2048
2049         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2050         if (cmd == OBD_BRW_READ) {
2051                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2052                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2053                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2054                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2055                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2056         } else {
2057                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2058                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2059                                  cli->cl_w_in_flight);
2060                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2061                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2062                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2063         }
2064
2065         client_obd_list_lock(&cli->cl_loi_list_lock);
2066
2067         if (cmd == OBD_BRW_READ)
2068                 cli->cl_r_in_flight++;
2069         else
2070                 cli->cl_w_in_flight++;
2071
2072         /* queued sync pages can be torn down while the pages
2073          * were between the pending list and the rpc */
2074         tmp = NULL;
2075         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2076                 /* only one oap gets a request reference */
2077                 if (tmp == NULL)
2078                         tmp = oap;
2079                 if (oap->oap_interrupted && !req->rq_intr) {
2080                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2081                                oap, req);
2082                         ptlrpc_mark_interrupted(req);
2083                 }
2084         }
2085         if (tmp != NULL)
2086                 tmp->oap_request = ptlrpc_request_addref(req);
2087
2088         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2089                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2090
2091         req->rq_interpret_reply = brw_interpret_oap;
2092         ptlrpcd_add_req(req);
2093         RETURN(1);
2094 }
2095
2096 #define LOI_DEBUG(LOI, STR, args...)                                     \
2097         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2098                !list_empty(&(LOI)->loi_cli_item),                        \
2099                (LOI)->loi_write_lop.lop_num_pending,                     \
2100                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2101                (LOI)->loi_read_lop.lop_num_pending,                      \
2102                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2103                args)                                                     \
2104
2105 /* This is called by osc_check_rpcs() to find which objects have pages that
2106  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2107 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2108 {
2109         ENTRY;
2110         /* first return all objects which we already know to have
2111          * pages ready to be stuffed into rpcs */
2112         if (!list_empty(&cli->cl_loi_ready_list))
2113                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2114                                   struct lov_oinfo, loi_cli_item));
2115
2116         /* then if we have cache waiters, return all objects with queued
2117          * writes.  This is especially important when many small files
2118          * have filled up the cache and not been fired into rpcs because
2119          * they don't pass the nr_pending/object threshhold */
2120         if (!list_empty(&cli->cl_cache_waiters) &&
2121             !list_empty(&cli->cl_loi_write_list))
2122                 RETURN(list_entry(cli->cl_loi_write_list.next,
2123                                   struct lov_oinfo, loi_write_item));
2124
2125         /* then return all queued objects when we have an invalid import
2126          * so that they get flushed */
2127         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2128                 if (!list_empty(&cli->cl_loi_write_list))
2129                         RETURN(list_entry(cli->cl_loi_write_list.next,
2130                                           struct lov_oinfo, loi_write_item));
2131                 if (!list_empty(&cli->cl_loi_read_list))
2132                         RETURN(list_entry(cli->cl_loi_read_list.next,
2133                                           struct lov_oinfo, loi_read_item));
2134         }
2135         RETURN(NULL);
2136 }
2137
2138 /* called with the loi list lock held */
2139 static void osc_check_rpcs(struct client_obd *cli)
2140 {
2141         struct lov_oinfo *loi;
2142         int rc = 0, race_counter = 0;
2143         ENTRY;
2144
2145         while ((loi = osc_next_loi(cli)) != NULL) {
2146                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2147
2148                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2149                         break;
2150
2151                 /* attempt some read/write balancing by alternating between
2152                  * reads and writes in an object.  The makes_rpc checks here
2153                  * would be redundant if we were getting read/write work items
2154                  * instead of objects.  we don't want send_oap_rpc to drain a
2155                  * partial read pending queue when we're given this object to
2156                  * do io on writes while there are cache waiters */
2157                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2158                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2159                                               &loi->loi_write_lop);
2160                         if (rc < 0)
2161                                 break;
2162                         if (rc > 0)
2163                                 race_counter = 0;
2164                         else
2165                                 race_counter++;
2166                 }
2167                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2168                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2169                                               &loi->loi_read_lop);
2170                         if (rc < 0)
2171                                 break;
2172                         if (rc > 0)
2173                                 race_counter = 0;
2174                         else
2175                                 race_counter++;
2176                 }
2177
2178                 /* attempt some inter-object balancing by issueing rpcs
2179                  * for each object in turn */
2180                 if (!list_empty(&loi->loi_cli_item))
2181                         list_del_init(&loi->loi_cli_item);
2182                 if (!list_empty(&loi->loi_write_item))
2183                         list_del_init(&loi->loi_write_item);
2184                 if (!list_empty(&loi->loi_read_item))
2185                         list_del_init(&loi->loi_read_item);
2186
2187                 loi_list_maint(cli, loi);
2188
2189                 /* send_oap_rpc fails with 0 when make_ready tells it to
2190                  * back off.  llite's make_ready does this when it tries
2191                  * to lock a page queued for write that is already locked.
2192                  * we want to try sending rpcs from many objects, but we
2193                  * don't want to spin failing with 0.  */
2194                 if (race_counter == 10)
2195                         break;
2196         }
2197         EXIT;
2198 }
2199
2200 /* we're trying to queue a page in the osc so we're subject to the
2201  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2202  * If the osc's queued pages are already at that limit, then we want to sleep
2203  * until there is space in the osc's queue for us.  We also may be waiting for
2204  * write credits from the OST if there are RPCs in flight that may return some
2205  * before we fall back to sync writes.
2206  *
2207  * We need this know our allocation was granted in the presence of signals */
2208 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2209 {
2210         int rc;
2211         ENTRY;
2212         client_obd_list_lock(&cli->cl_loi_list_lock);
2213         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2214         client_obd_list_unlock(&cli->cl_loi_list_lock);
2215         RETURN(rc);
2216 };
2217
2218 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2219  * grant or cache space. */
2220 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2221                            struct osc_async_page *oap)
2222 {
2223         struct osc_cache_waiter ocw;
2224         struct l_wait_info lwi = { 0 };
2225         ENTRY;
2226
2227         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2228                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2229                cli->cl_dirty_max, obd_max_dirty_pages,
2230                cli->cl_lost_grant, cli->cl_avail_grant);
2231
2232         /* force the caller to try sync io.  this can jump the list
2233          * of queued writes and create a discontiguous rpc stream */
2234         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2235             loi->loi_ar.ar_force_sync)
2236                 RETURN(-EDQUOT);
2237
2238         /* Hopefully normal case - cache space and write credits available */
2239         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2240             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2241             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2242                 /* account for ourselves */
2243                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2244                 RETURN(0);
2245         }
2246
2247         /* Make sure that there are write rpcs in flight to wait for.  This
2248          * is a little silly as this object may not have any pending but
2249          * other objects sure might. */
2250         if (cli->cl_w_in_flight) {
2251                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2252                 cfs_waitq_init(&ocw.ocw_waitq);
2253                 ocw.ocw_oap = oap;
2254                 ocw.ocw_rc = 0;
2255
2256                 loi_list_maint(cli, loi);
2257                 osc_check_rpcs(cli);
2258                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2259
2260                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2261                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2262
2263                 client_obd_list_lock(&cli->cl_loi_list_lock);
2264                 if (!list_empty(&ocw.ocw_entry)) {
2265                         list_del(&ocw.ocw_entry);
2266                         RETURN(-EINTR);
2267                 }
2268                 RETURN(ocw.ocw_rc);
2269         }
2270
2271         RETURN(-EDQUOT);
2272 }
2273
2274 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2275                         struct lov_oinfo *loi, cfs_page_t *page,
2276                         obd_off offset, struct obd_async_page_ops *ops,
2277                         void *data, void **res)
2278 {
2279         struct osc_async_page *oap;
2280         ENTRY;
2281
2282         if (!page)
2283                 return size_round(sizeof(*oap));
2284
2285         oap = *res;
2286         oap->oap_magic = OAP_MAGIC;
2287         oap->oap_cli = &exp->exp_obd->u.cli;
2288         oap->oap_loi = loi;
2289
2290         oap->oap_caller_ops = ops;
2291         oap->oap_caller_data = data;
2292
2293         oap->oap_page = page;
2294         oap->oap_obj_off = offset;
2295
2296         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2297         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2298         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2299
2300         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2301
2302         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2303         RETURN(0);
2304 }
2305
2306 struct osc_async_page *oap_from_cookie(void *cookie)
2307 {
2308         struct osc_async_page *oap = cookie;
2309         if (oap->oap_magic != OAP_MAGIC)
2310                 return ERR_PTR(-EINVAL);
2311         return oap;
2312 };
2313
2314 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2315                               struct lov_oinfo *loi, void *cookie,
2316                               int cmd, obd_off off, int count,
2317                               obd_flag brw_flags, enum async_flags async_flags)
2318 {
2319         struct client_obd *cli = &exp->exp_obd->u.cli;
2320         struct osc_async_page *oap;
2321         int rc = 0;
2322         ENTRY;
2323
2324         oap = oap_from_cookie(cookie);
2325         if (IS_ERR(oap))
2326                 RETURN(PTR_ERR(oap));
2327
2328         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2329                 RETURN(-EIO);
2330
2331         if (!list_empty(&oap->oap_pending_item) ||
2332             !list_empty(&oap->oap_urgent_item) ||
2333             !list_empty(&oap->oap_rpc_item))
2334                 RETURN(-EBUSY);
2335
2336         /* check if the file's owner/group is over quota */
2337 #ifdef HAVE_QUOTA_SUPPORT
2338         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2339                 struct obd_async_page_ops *ops;
2340                 struct obdo *oa;
2341
2342                 oa = obdo_alloc();
2343                 if (oa == NULL)
2344                         RETURN(-ENOMEM);
2345
2346                 ops = oap->oap_caller_ops;
2347                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2348                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2349                     NO_QUOTA)
2350                         rc = -EDQUOT;
2351
2352                 obdo_free(oa);
2353                 if (rc)
2354                         RETURN(rc);
2355         }
2356 #endif
2357
2358         if (loi == NULL)
2359                 loi = &lsm->lsm_oinfo[0];
2360
2361         client_obd_list_lock(&cli->cl_loi_list_lock);
2362
2363         oap->oap_cmd = cmd;
2364         oap->oap_page_off = off;
2365         oap->oap_count = count;
2366         oap->oap_brw_flags = brw_flags;
2367         oap->oap_async_flags = async_flags;
2368
2369         if (cmd & OBD_BRW_WRITE) {
2370                 rc = osc_enter_cache(cli, loi, oap);
2371                 if (rc) {
2372                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2373                         RETURN(rc);
2374                 }
2375         }
2376
2377         osc_oap_to_pending(oap);
2378         loi_list_maint(cli, loi);
2379
2380         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2381                   cmd);
2382
2383         osc_check_rpcs(cli);
2384         client_obd_list_unlock(&cli->cl_loi_list_lock);
2385
2386         RETURN(0);
2387 }
2388
2389 /* aka (~was & now & flag), but this is more clear :) */
2390 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2391
2392 static int osc_set_async_flags(struct obd_export *exp,
2393                                struct lov_stripe_md *lsm,
2394                                struct lov_oinfo *loi, void *cookie,
2395                                obd_flag async_flags)
2396 {
2397         struct client_obd *cli = &exp->exp_obd->u.cli;
2398         struct loi_oap_pages *lop;
2399         struct osc_async_page *oap;
2400         int rc = 0;
2401         ENTRY;
2402
2403         oap = oap_from_cookie(cookie);
2404         if (IS_ERR(oap))
2405                 RETURN(PTR_ERR(oap));
2406
2407         /*
2408          * bug 7311: OST-side locking is only supported for liblustre for now
2409          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2410          * implementation has to handle case where OST-locked page was picked
2411          * up by, e.g., ->writepage().
2412          */
2413         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2414         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2415                                      * tread here. */
2416
2417         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2418                 RETURN(-EIO);
2419
2420         if (loi == NULL)
2421                 loi = &lsm->lsm_oinfo[0];
2422
2423         if (oap->oap_cmd & OBD_BRW_WRITE) {
2424                 lop = &loi->loi_write_lop;
2425         } else {
2426                 lop = &loi->loi_read_lop;
2427         }
2428
2429         client_obd_list_lock(&cli->cl_loi_list_lock);
2430
2431         if (list_empty(&oap->oap_pending_item))
2432                 GOTO(out, rc = -EINVAL);
2433
2434         if ((oap->oap_async_flags & async_flags) == async_flags)
2435                 GOTO(out, rc = 0);
2436
2437         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2438                 oap->oap_async_flags |= ASYNC_READY;
2439
2440         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2441                 if (list_empty(&oap->oap_rpc_item)) {
2442                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2443                         loi_list_maint(cli, loi);
2444                 }
2445         }
2446
2447         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2448                         oap->oap_async_flags);
2449 out:
2450         osc_check_rpcs(cli);
2451         client_obd_list_unlock(&cli->cl_loi_list_lock);
2452         RETURN(rc);
2453 }
2454
2455 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2456                              struct lov_oinfo *loi,
2457                              struct obd_io_group *oig, void *cookie,
2458                              int cmd, obd_off off, int count,
2459                              obd_flag brw_flags,
2460                              obd_flag async_flags)
2461 {
2462         struct client_obd *cli = &exp->exp_obd->u.cli;
2463         struct osc_async_page *oap;
2464         struct loi_oap_pages *lop;
2465         int rc = 0;
2466         ENTRY;
2467
2468         oap = oap_from_cookie(cookie);
2469         if (IS_ERR(oap))
2470                 RETURN(PTR_ERR(oap));
2471
2472         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2473                 RETURN(-EIO);
2474
2475         if (!list_empty(&oap->oap_pending_item) ||
2476             !list_empty(&oap->oap_urgent_item) ||
2477             !list_empty(&oap->oap_rpc_item))
2478                 RETURN(-EBUSY);
2479
2480         if (loi == NULL)
2481                 loi = &lsm->lsm_oinfo[0];
2482
2483         client_obd_list_lock(&cli->cl_loi_list_lock);
2484
2485         oap->oap_cmd = cmd;
2486         oap->oap_page_off = off;
2487         oap->oap_count = count;
2488         oap->oap_brw_flags = brw_flags;
2489         oap->oap_async_flags = async_flags;
2490
2491         if (cmd & OBD_BRW_WRITE)
2492                 lop = &loi->loi_write_lop;
2493         else
2494                 lop = &loi->loi_read_lop;
2495
2496         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2497         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2498                 oap->oap_oig = oig;
2499                 rc = oig_add_one(oig, &oap->oap_occ);
2500         }
2501
2502         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2503                   oap, oap->oap_page, rc);
2504
2505         client_obd_list_unlock(&cli->cl_loi_list_lock);
2506
2507         RETURN(rc);
2508 }
2509
2510 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2511                                  struct loi_oap_pages *lop, int cmd)
2512 {
2513         struct list_head *pos, *tmp;
2514         struct osc_async_page *oap;
2515
2516         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2517                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2518                 list_del(&oap->oap_pending_item);
2519                 osc_oap_to_pending(oap);
2520         }
2521         loi_list_maint(cli, loi);
2522 }
2523
2524 static int osc_trigger_group_io(struct obd_export *exp,
2525                                 struct lov_stripe_md *lsm,
2526                                 struct lov_oinfo *loi,
2527                                 struct obd_io_group *oig)
2528 {
2529         struct client_obd *cli = &exp->exp_obd->u.cli;
2530         ENTRY;
2531
2532         if (loi == NULL)
2533                 loi = &lsm->lsm_oinfo[0];
2534
2535         client_obd_list_lock(&cli->cl_loi_list_lock);
2536
2537         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2538         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2539
2540         osc_check_rpcs(cli);
2541         client_obd_list_unlock(&cli->cl_loi_list_lock);
2542
2543         RETURN(0);
2544 }
2545
2546 static int osc_teardown_async_page(struct obd_export *exp,
2547                                    struct lov_stripe_md *lsm,
2548                                    struct lov_oinfo *loi, void *cookie)
2549 {
2550         struct client_obd *cli = &exp->exp_obd->u.cli;
2551         struct loi_oap_pages *lop;
2552         struct osc_async_page *oap;
2553         int rc = 0;
2554         ENTRY;
2555
2556         oap = oap_from_cookie(cookie);
2557         if (IS_ERR(oap))
2558                 RETURN(PTR_ERR(oap));
2559
2560         if (loi == NULL)
2561                 loi = &lsm->lsm_oinfo[0];
2562
2563         if (oap->oap_cmd & OBD_BRW_WRITE) {
2564                 lop = &loi->loi_write_lop;
2565         } else {
2566                 lop = &loi->loi_read_lop;
2567         }
2568
2569         client_obd_list_lock(&cli->cl_loi_list_lock);
2570
2571         if (!list_empty(&oap->oap_rpc_item))
2572                 GOTO(out, rc = -EBUSY);
2573
2574         osc_exit_cache(cli, oap, 0);
2575         osc_wake_cache_waiters(cli);
2576
2577         if (!list_empty(&oap->oap_urgent_item)) {
2578                 list_del_init(&oap->oap_urgent_item);
2579                 oap->oap_async_flags &= ~ASYNC_URGENT;
2580         }
2581         if (!list_empty(&oap->oap_pending_item)) {
2582                 list_del_init(&oap->oap_pending_item);
2583                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2584         }
2585         loi_list_maint(cli, loi);
2586
2587         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2588 out:
2589         client_obd_list_unlock(&cli->cl_loi_list_lock);
2590         RETURN(rc);
2591 }
2592
2593 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2594                                     int flags)
2595 {
2596         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2597
2598         if (lock == NULL) {
2599                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2600                 return;
2601         }
2602         lock_res_and_lock(lock);
2603 #ifdef __KERNEL__
2604 #ifdef __LINUX__
2605         /* Liang XXX: Darwin and Winnt checking should be added */
2606         if (lock->l_ast_data && lock->l_ast_data != data) {
2607                 struct inode *new_inode = data;
2608                 struct inode *old_inode = lock->l_ast_data;
2609                 if (!(old_inode->i_state & I_FREEING))
2610                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2611                 LASSERTF(old_inode->i_state & I_FREEING,
2612                          "Found existing inode %p/%lu/%u state %lu in lock: "
2613                          "setting data to %p/%lu/%u\n", old_inode,
2614                          old_inode->i_ino, old_inode->i_generation,
2615                          old_inode->i_state,
2616                          new_inode, new_inode->i_ino, new_inode->i_generation);
2617         }
2618 #endif
2619 #endif
2620         lock->l_ast_data = data;
2621         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2622         unlock_res_and_lock(lock);
2623         LDLM_LOCK_PUT(lock);
2624 }
2625
2626 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2627                              ldlm_iterator_t replace, void *data)
2628 {
2629         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2630         struct obd_device *obd = class_exp2obd(exp);
2631
2632         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2633         return 0;
2634 }
2635
2636 static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
2637                             int intent, int rc)
2638 {
2639         ENTRY;
2640
2641         if (intent) {
2642                 /* The request was created before ldlm_cli_enqueue call. */
2643                 if (rc == ELDLM_LOCK_ABORTED) {
2644                         struct ldlm_reply *rep;
2645
2646                         /* swabbed by ldlm_cli_enqueue() */
2647                         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
2648                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2649                                              sizeof(*rep));
2650                         LASSERT(rep != NULL);
2651                         if (rep->lock_policy_res1)
2652                                 rc = rep->lock_policy_res1;
2653                 }
2654         }
2655
2656         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2657                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2658                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_size,
2659                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_blocks,
2660                        oinfo->oi_md->lsm_oinfo->loi_lvb.lvb_mtime);
2661         }
2662
2663         /* Call the update callback. */
2664         rc = oinfo->oi_cb_up(oinfo, rc);
2665         RETURN(rc);
2666 }
2667
2668 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2669                                  struct osc_enqueue_args *aa, int rc)
2670 {
2671         int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
2672         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2673         struct ldlm_lock *lock;
2674
2675         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2676          * be valid. */
2677         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2678
2679         /* Complete obtaining the lock procedure. */
2680         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2681                                    aa->oa_ei->ei_mode,
2682                                    &aa->oa_ei->ei_flags,
2683                                    &lsm->lsm_oinfo->loi_lvb,
2684                                    sizeof(lsm->lsm_oinfo->loi_lvb),
2685                                    lustre_swab_ost_lvb,
2686                                    aa->oa_oi->oi_lockh, rc);
2687
2688         /* Complete osc stuff. */
2689         rc = osc_enqueue_fini(req, aa->oa_oi, intent, rc);
2690
2691         /* Release the lock for async request. */
2692         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2693                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2694
2695         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2696                  aa->oa_oi->oi_lockh, req, aa);
2697         LDLM_LOCK_PUT(lock);
2698         return rc;
2699 }
2700
2701 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2702  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2703  * other synchronous requests, however keeping some locks and trying to obtain
2704  * others may take a considerable amount of time in a case of ost failure; and
2705  * when other sync requests do not get released lock from a client, the client
2706  * is excluded from the cluster -- such scenarious make the life difficult, so
2707  * release locks just after they are obtained. */
2708 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2709                        struct obd_enqueue_info *einfo)
2710 {
2711         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
2712         struct obd_device *obd = exp->exp_obd;
2713         struct ldlm_reply *rep;
2714         struct ptlrpc_request *req = NULL;
2715         int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
2716         int rc;
2717         ENTRY;
2718
2719         /* Filesystem lock extents are extended to page boundaries so that
2720          * dealing with the page cache is a little smoother.  */
2721         oinfo->oi_policy.l_extent.start -=
2722                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
2723         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
2724
2725         if (oinfo->oi_md->lsm_oinfo->loi_kms_valid == 0)
2726                 goto no_match;
2727
2728         /* Next, search for already existing extent locks that will cover us */
2729         rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
2730                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
2731                              oinfo->oi_lockh);
2732         if (rc == 1) {
2733                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
2734                                         einfo->ei_flags);
2735                 if (intent) {
2736                         /* I would like to be able to ASSERT here that rss <=
2737                          * kms, but I can't, for reasons which are explained in
2738                          * lov_enqueue() */
2739                 }
2740
2741                 /* We already have a lock, and it's referenced */
2742                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
2743
2744                 /* For async requests, decref the lock. */
2745                 if (einfo->ei_rqset)
2746                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
2747
2748                 RETURN(ELDLM_OK);
2749         }
2750
2751         /* If we're trying to read, we also search for an existing PW lock.  The
2752          * VFS and page cache already protect us locally, so lots of readers/
2753          * writers can share a single PW lock.
2754          *
2755          * There are problems with conversion deadlocks, so instead of
2756          * converting a read lock to a write lock, we'll just enqueue a new
2757          * one.
2758          *
2759          * At some point we should cancel the read lock instead of making them
2760          * send us a blocking callback, but there are problems with canceling
2761          * locks out from other users right now, too. */
2762
2763         if (einfo->ei_mode == LCK_PR) {
2764                 rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
2765                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
2766                                      LCK_PW, oinfo->oi_lockh);
2767                 if (rc == 1) {
2768                         /* FIXME: This is not incredibly elegant, but it might
2769                          * be more elegant than adding another parameter to
2770                          * lock_match.  I want a second opinion. */
2771                         /* addref the lock only if not async requests. */
2772                         if (!einfo->ei_rqset)
2773                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
2774                         osc_set_data_with_check(oinfo->oi_lockh,
2775                                                 einfo->ei_cbdata,
2776                                                 einfo->ei_flags);
2777                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
2778                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
2779                         RETURN(ELDLM_OK);
2780                 }
2781         }
2782
2783  no_match:
2784         if (intent) {
2785                 int size[3] = {
2786                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2787                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
2788
2789                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
2790                                       LDLM_ENQUEUE, 2, size, NULL);
2791                 if (req == NULL)
2792                         RETURN(-ENOMEM);
2793
2794                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
2795                 size[DLM_REPLY_REC_OFF] = 
2796                         sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb);
2797                 ptlrpc_req_set_repsize(req, 3, size);
2798         }
2799
2800         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2801         einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
2802
2803         rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
2804                               &oinfo->oi_policy, einfo->ei_mode,
2805                               &einfo->ei_flags, einfo->ei_cb_bl,
2806                               einfo->ei_cb_cp, einfo->ei_cb_gl,
2807                               einfo->ei_cbdata,
2808                               &oinfo->oi_md->lsm_oinfo->loi_lvb,
2809                               sizeof(oinfo->oi_md->lsm_oinfo->loi_lvb),
2810                               lustre_swab_ost_lvb, oinfo->oi_lockh,
2811                               einfo->ei_rqset ? 1 : 0);
2812         if (einfo->ei_rqset) {
2813                 if (!rc) {
2814                         struct osc_enqueue_args *aa;
2815                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2816                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
2817                         aa->oa_oi = oinfo;
2818                         aa->oa_ei = einfo;
2819                         aa->oa_exp = exp;
2820
2821                         req->rq_interpret_reply = osc_enqueue_interpret;
2822                         ptlrpc_set_add_req(einfo->ei_rqset, req);
2823                 } else if (intent) {
2824                         ptlrpc_req_finished(req);
2825                 }
2826                 RETURN(rc);
2827         }
2828
2829         rc = osc_enqueue_fini(req, oinfo, intent, rc);
2830         if (intent)
2831                 ptlrpc_req_finished(req);
2832
2833         RETURN(rc);
2834 }
2835
2836 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
2837                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2838                      int *flags, void *data, struct lustre_handle *lockh)
2839 {
2840         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2841         struct obd_device *obd = exp->exp_obd;
2842         int rc;
2843         int lflags = *flags;
2844         ENTRY;
2845
2846         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
2847
2848         /* Filesystem lock extents are extended to page boundaries so that
2849          * dealing with the page cache is a little smoother */
2850         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2851         policy->l_extent.end |= ~CFS_PAGE_MASK;
2852
2853         /* Next, search for already existing extent locks that will cover us */
2854         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
2855                              policy, mode, lockh);
2856         if (rc) {
2857                 //if (!(*flags & LDLM_FL_TEST_LOCK))
2858                         osc_set_data_with_check(lockh, data, lflags);
2859                 RETURN(rc);
2860         }
2861         /* If we're trying to read, we also search for an existing PW lock.  The
2862          * VFS and page cache already protect us locally, so lots of readers/
2863          * writers can share a single PW lock. */
2864         if (mode == LCK_PR) {
2865                 rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
2866                                      &res_id, type,
2867                                      policy, LCK_PW, lockh);
2868                 if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
2869                         /* FIXME: This is not incredibly elegant, but it might
2870                          * be more elegant than adding another parameter to
2871                          * lock_match.  I want a second opinion. */
2872                         osc_set_data_with_check(lockh, data, lflags);
2873                         ldlm_lock_addref(lockh, LCK_PR);
2874                         ldlm_lock_decref(lockh, LCK_PW);
2875                 }
2876         }
2877         RETURN(rc);
2878 }
2879
2880 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2881                       __u32 mode, struct lustre_handle *lockh)
2882 {
2883         ENTRY;
2884
2885         if (unlikely(mode == LCK_GROUP))
2886                 ldlm_lock_decref_and_cancel(lockh, mode);
2887         else
2888                 ldlm_lock_decref(lockh, mode);
2889
2890         RETURN(0);
2891 }
2892
2893 static int osc_cancel_unused(struct obd_export *exp,
2894                              struct lov_stripe_md *lsm, int flags, void *opaque)
2895 {
2896         struct obd_device *obd = class_exp2obd(exp);
2897         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2898
2899         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
2900                                       opaque);
2901 }
2902
2903 static int osc_join_lru(struct obd_export *exp,
2904                         struct lov_stripe_md *lsm, int join)
2905 {
2906         struct obd_device *obd = class_exp2obd(exp);
2907         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2908
2909         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
2910 }
2911
2912 static int osc_statfs_interpret(struct ptlrpc_request *req,
2913                                 struct osc_async_args *aa, int rc)
2914 {
2915         struct obd_statfs *msfs;
2916         ENTRY;
2917
2918         if (rc != 0)
2919                 GOTO(out, rc);
2920
2921         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2922                                   lustre_swab_obd_statfs);
2923         if (msfs == NULL) {
2924                 CERROR("Can't unpack obd_statfs\n");
2925                 GOTO(out, rc = -EPROTO);
2926         }
2927
2928         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
2929 out:
2930         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2931         RETURN(rc);
2932 }
2933
2934 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
2935                             __u64 max_age, struct ptlrpc_request_set *rqset)
2936 {
2937         struct ptlrpc_request *req;
2938         struct osc_async_args *aa;
2939         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
2940         ENTRY;
2941
2942         /* We could possibly pass max_age in the request (as an absolute
2943          * timestamp or a "seconds.usec ago") so the target can avoid doing
2944          * extra calls into the filesystem if that isn't necessary (e.g.
2945          * during mount that would help a bit).  Having relative timestamps
2946          * is not so great if request processing is slow, while absolute
2947          * timestamps are not ideal because they need time synchronization. */
2948         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2949                               OST_STATFS, 1, NULL, NULL);
2950         if (!req)
2951                 RETURN(-ENOMEM);
2952
2953         ptlrpc_req_set_repsize(req, 2, size);
2954         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2955
2956         req->rq_interpret_reply = osc_statfs_interpret;
2957         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2958         aa = (struct osc_async_args *)&req->rq_async_args;
2959         aa->aa_oi = oinfo;
2960
2961         ptlrpc_set_add_req(rqset, req);
2962         RETURN(0);
2963 }
2964
2965 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
2966                       __u64 max_age)
2967 {
2968         struct obd_statfs *msfs;
2969         struct ptlrpc_request *req;
2970         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
2971         ENTRY;
2972
2973         /* We could possibly pass max_age in the request (as an absolute
2974          * timestamp or a "seconds.usec ago") so the target can avoid doing
2975          * extra calls into the filesystem if that isn't necessary (e.g.
2976          * during mount that would help a bit).  Having relative timestamps
2977          * is not so great if request processing is slow, while absolute
2978          * timestamps are not ideal because they need time synchronization. */
2979         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
2980                               OST_STATFS, 1, NULL, NULL);
2981         if (!req)
2982                 RETURN(-ENOMEM);
2983
2984         ptlrpc_req_set_repsize(req, 2, size);
2985         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
2986
2987         rc = ptlrpc_queue_wait(req);
2988         if (rc)
2989                 GOTO(out, rc);
2990
2991         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
2992                                   lustre_swab_obd_statfs);
2993         if (msfs == NULL) {
2994                 CERROR("Can't unpack obd_statfs\n");
2995                 GOTO(out, rc = -EPROTO);
2996         }
2997
2998         memcpy(osfs, msfs, sizeof(*osfs));
2999
3000         EXIT;
3001  out:
3002         ptlrpc_req_finished(req);
3003         return rc;
3004 }
3005
3006 /* Retrieve object striping information.
3007  *
3008  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3009  * the maximum number of OST indices which will fit in the user buffer.
3010  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3011  */
3012 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3013 {
3014         struct lov_user_md lum, *lumk;
3015         int rc = 0, lum_size;
3016         ENTRY;
3017
3018         if (!lsm)
3019                 RETURN(-ENODATA);
3020
3021         if (copy_from_user(&lum, lump, sizeof(lum)))
3022                 RETURN(-EFAULT);
3023
3024         if (lum.lmm_magic != LOV_USER_MAGIC)
3025                 RETURN(-EINVAL);
3026
3027         if (lum.lmm_stripe_count > 0) {
3028                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3029                 OBD_ALLOC(lumk, lum_size);
3030                 if (!lumk)
3031                         RETURN(-ENOMEM);
3032
3033                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3034         } else {
3035                 lum_size = sizeof(lum);
3036                 lumk = &lum;
3037         }
3038
3039         lumk->lmm_object_id = lsm->lsm_object_id;
3040         lumk->lmm_stripe_count = 1;
3041
3042         if (copy_to_user(lump, lumk, lum_size))
3043                 rc = -EFAULT;
3044
3045         if (lumk != &lum)
3046                 OBD_FREE(lumk, lum_size);
3047
3048         RETURN(rc);
3049 }
3050
3051
3052 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3053                          void *karg, void *uarg)
3054 {
3055         struct obd_device *obd = exp->exp_obd;
3056         struct obd_ioctl_data *data = karg;
3057         int err = 0;
3058         ENTRY;
3059
3060 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3061         MOD_INC_USE_COUNT;
3062 #else
3063         if (!try_module_get(THIS_MODULE)) {
3064                 CERROR("Can't get module. Is it alive?");
3065                 return -EINVAL;
3066         }
3067 #endif
3068         switch (cmd) {
3069         case OBD_IOC_LOV_GET_CONFIG: {
3070                 char *buf;
3071                 struct lov_desc *desc;
3072                 struct obd_uuid uuid;
3073
3074                 buf = NULL;
3075                 len = 0;
3076                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3077                         GOTO(out, err = -EINVAL);
3078
3079                 data = (struct obd_ioctl_data *)buf;
3080
3081                 if (sizeof(*desc) > data->ioc_inllen1) {
3082                         obd_ioctl_freedata(buf, len);
3083                         GOTO(out, err = -EINVAL);
3084                 }
3085
3086                 if (data->ioc_inllen2 < sizeof(uuid)) {
3087                         obd_ioctl_freedata(buf, len);
3088                         GOTO(out, err = -EINVAL);
3089                 }
3090
3091                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3092                 desc->ld_tgt_count = 1;
3093                 desc->ld_active_tgt_count = 1;
3094                 desc->ld_default_stripe_count = 1;
3095                 desc->ld_default_stripe_size = 0;
3096                 desc->ld_default_stripe_offset = 0;
3097                 desc->ld_pattern = 0;
3098                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3099
3100                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3101
3102                 err = copy_to_user((void *)uarg, buf, len);
3103                 if (err)
3104                         err = -EFAULT;
3105                 obd_ioctl_freedata(buf, len);
3106                 GOTO(out, err);
3107         }
3108         case LL_IOC_LOV_SETSTRIPE:
3109                 err = obd_alloc_memmd(exp, karg);
3110                 if (err > 0)
3111                         err = 0;
3112                 GOTO(out, err);
3113         case LL_IOC_LOV_GETSTRIPE:
3114                 err = osc_getstripe(karg, uarg);
3115                 GOTO(out, err);
3116         case OBD_IOC_CLIENT_RECOVER:
3117                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3118                                             data->ioc_inlbuf1);
3119                 if (err > 0)
3120                         err = 0;
3121                 GOTO(out, err);
3122         case IOC_OSC_SET_ACTIVE:
3123                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3124                                                data->ioc_offset);
3125                 GOTO(out, err);
3126         case OBD_IOC_POLL_QUOTACHECK:
3127                 err = lquota_poll_check(quota_interface, exp,
3128                                         (struct if_quotacheck *)karg);
3129                 GOTO(out, err);
3130         default:
3131                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3132                        cmd, cfs_curproc_comm());
3133                 GOTO(out, err = -ENOTTY);
3134         }
3135 out:
3136 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3137         MOD_DEC_USE_COUNT;
3138 #else
3139         module_put(THIS_MODULE);
3140 #endif
3141         return err;
3142 }
3143
3144 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3145                         void *key, __u32 *vallen, void *val)
3146 {
3147         ENTRY;
3148         if (!vallen || !val)
3149                 RETURN(-EFAULT);
3150
3151         if (keylen > strlen("lock_to_stripe") &&
3152             strcmp(key, "lock_to_stripe") == 0) {
3153                 __u32 *stripe = val;
3154                 *vallen = sizeof(*stripe);
3155                 *stripe = 0;
3156                 RETURN(0);
3157         } else if (keylen >= strlen("last_id") && strcmp(key, "last_id") == 0) {
3158                 struct ptlrpc_request *req;
3159                 obd_id *reply;
3160                 char *bufs[2] = { NULL, key };
3161                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3162
3163                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3164                                       OST_GET_INFO, 2, size, bufs);
3165                 if (req == NULL)
3166                         RETURN(-ENOMEM);
3167
3168                 size[REPLY_REC_OFF] = *vallen;
3169                 ptlrpc_req_set_repsize(req, 2, size);
3170                 rc = ptlrpc_queue_wait(req);
3171                 if (rc)
3172                         GOTO(out, rc);
3173
3174                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3175                                            lustre_swab_ost_last_id);
3176                 if (reply == NULL) {
3177                         CERROR("Can't unpack OST last ID\n");
3178                         GOTO(out, rc = -EPROTO);
3179                 }
3180                 *((obd_id *)val) = *reply;
3181         out:
3182                 ptlrpc_req_finished(req);
3183                 RETURN(rc);
3184         }
3185         RETURN(-EINVAL);
3186 }
3187
3188 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3189                                           void *aa, int rc)
3190 {
3191         struct llog_ctxt *ctxt;
3192         struct obd_import *imp = req->rq_import;
3193         ENTRY;
3194
3195         if (rc != 0)
3196                 RETURN(rc);
3197
3198         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3199         if (ctxt) {
3200                 if (rc == 0)
3201                         rc = llog_initiator_connect(ctxt);
3202                 else
3203                         CERROR("cannot establish connection for "
3204                                "ctxt %p: %d\n", ctxt, rc);
3205         }
3206
3207         imp->imp_server_timeout = 1;
3208         CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3209         imp->imp_pingable = 1;
3210
3211         RETURN(rc);
3212 }
3213
3214 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3215                               void *key, obd_count vallen, void *val,
3216                               struct ptlrpc_request_set *set)
3217 {
3218         struct ptlrpc_request *req;
3219         struct obd_device  *obd = exp->exp_obd;
3220         struct obd_import *imp = class_exp2cliimp(exp);
3221         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3222         char *bufs[3] = { NULL, key, val };
3223         ENTRY;
3224
3225         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3226
3227         if (KEY_IS(KEY_NEXT_ID)) {
3228                 if (vallen != sizeof(obd_id))
3229                         RETURN(-EINVAL);
3230                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3231                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3232                        exp->exp_obd->obd_name,
3233                        obd->u.cli.cl_oscc.oscc_next_id);
3234
3235                 RETURN(0);
3236         }
3237
3238         if (KEY_IS("unlinked")) {
3239                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3240                 spin_lock(&oscc->oscc_lock);
3241                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3242                 spin_unlock(&oscc->oscc_lock);
3243                 RETURN(0);
3244         }
3245
3246         if (KEY_IS(KEY_INIT_RECOV)) {
3247                 if (vallen != sizeof(int))
3248                         RETURN(-EINVAL);
3249                 imp->imp_initial_recov = *(int *)val;
3250                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3251                        exp->exp_obd->obd_name,
3252                        imp->imp_initial_recov);
3253                 RETURN(0);
3254         }
3255
3256         if (KEY_IS("checksum")) {
3257                 if (vallen != sizeof(int))
3258                         RETURN(-EINVAL);
3259                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3260                 RETURN(0);
3261         }
3262
3263         if (!set)
3264                 RETURN(-EINVAL);
3265
3266         /* We pass all other commands directly to OST. Since nobody calls osc
3267            methods directly and everybody is supposed to go through LOV, we
3268            assume lov checked invalid values for us.
3269            The only recognised values so far are evict_by_nid and mds_conn.
3270            Even if something bad goes through, we'd get a -EINVAL from OST
3271            anyway. */
3272
3273         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3274                               bufs);
3275         if (req == NULL)
3276                 RETURN(-ENOMEM);
3277
3278         if (KEY_IS("mds_conn"))
3279                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3280
3281         ptlrpc_req_set_repsize(req, 1, NULL);
3282         ptlrpc_set_add_req(set, req);
3283         ptlrpc_check_set(set);
3284
3285         RETURN(0);
3286 }
3287
3288
3289 static struct llog_operations osc_size_repl_logops = {
3290         lop_cancel: llog_obd_repl_cancel
3291 };
3292
3293 static struct llog_operations osc_mds_ost_orig_logops;
3294 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3295                          int count, struct llog_catid *catid, 
3296                          struct obd_uuid *uuid)
3297 {
3298         int rc;
3299         ENTRY;
3300
3301         spin_lock(&obd->obd_dev_lock);
3302         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3303                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3304                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3305                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3306                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3307                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3308         }
3309         spin_unlock(&obd->obd_dev_lock);
3310
3311         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3312                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3313         if (rc) {
3314                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3315                 GOTO (out, rc);
3316         }
3317
3318         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3319                         &osc_size_repl_logops);
3320         if (rc) 
3321                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3322 out:
3323         if (rc) {
3324                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3325                        obd->obd_name, tgt->obd_name, count, catid, rc);
3326                 CERROR("logid "LPX64":0x%x\n",
3327                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3328         }
3329         RETURN(rc);
3330 }
3331
3332 static int osc_llog_finish(struct obd_device *obd, int count)
3333 {
3334         struct llog_ctxt *ctxt;
3335         int rc = 0, rc2 = 0;
3336         ENTRY;
3337
3338         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3339         if (ctxt)
3340                 rc = llog_cleanup(ctxt);
3341
3342         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3343         if (ctxt)
3344                 rc2 = llog_cleanup(ctxt);
3345         if (!rc)
3346                 rc = rc2;
3347
3348         RETURN(rc);
3349 }
3350
3351 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3352                          struct obd_uuid *cluuid,
3353                          struct obd_connect_data *data)
3354 {
3355         struct client_obd *cli = &obd->u.cli;
3356
3357         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3358                 long lost_grant;
3359
3360                 client_obd_list_lock(&cli->cl_loi_list_lock);
3361                 data->ocd_grant = cli->cl_avail_grant ?:
3362                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3363                 lost_grant = cli->cl_lost_grant;
3364                 cli->cl_lost_grant = 0;
3365                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3366
3367                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3368                        "cl_lost_grant: %ld\n", data->ocd_grant,
3369                        cli->cl_avail_grant, lost_grant);
3370                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3371                        " ocd_grant: %d\n", data->ocd_connect_flags,
3372                        data->ocd_version, data->ocd_grant);
3373         }
3374
3375         RETURN(0);
3376 }
3377
3378 static int osc_disconnect(struct obd_export *exp)
3379 {
3380         struct obd_device *obd = class_exp2obd(exp);
3381         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3382         int rc;
3383
3384         if (obd->u.cli.cl_conn_count == 1)
3385                 /* flush any remaining cancel messages out to the target */
3386                 llog_sync(ctxt, exp);
3387
3388         rc = client_disconnect_export(exp);
3389         return rc;
3390 }
3391
3392 static int osc_import_event(struct obd_device *obd,
3393                             struct obd_import *imp,
3394                             enum obd_import_event event)
3395 {
3396         struct client_obd *cli;
3397         int rc = 0;
3398
3399         ENTRY;
3400         LASSERT(imp->imp_obd == obd);
3401
3402         switch (event) {
3403         case IMP_EVENT_DISCON: {
3404                 /* Only do this on the MDS OSC's */
3405                 if (imp->imp_server_timeout) {
3406                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3407
3408                         spin_lock(&oscc->oscc_lock);
3409                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3410                         spin_unlock(&oscc->oscc_lock);
3411                 }
3412
3413                 break;
3414         }
3415         case IMP_EVENT_INACTIVE: {
3416                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3417                 break;
3418         }
3419         case IMP_EVENT_INVALIDATE: {
3420                 struct ldlm_namespace *ns = obd->obd_namespace;
3421
3422                 /* Reset grants */
3423                 cli = &obd->u.cli;
3424                 client_obd_list_lock(&cli->cl_loi_list_lock);
3425                 cli->cl_avail_grant = 0;
3426                 cli->cl_lost_grant = 0;
3427                 /* all pages go to failing rpcs due to the invalid import */
3428                 osc_check_rpcs(cli);
3429                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3430
3431                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3432
3433                 break;
3434         }
3435         case IMP_EVENT_ACTIVE: {
3436                 /* Only do this on the MDS OSC's */
3437                 if (imp->imp_server_timeout) {
3438                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3439
3440                         spin_lock(&oscc->oscc_lock);
3441                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3442                         spin_unlock(&oscc->oscc_lock);
3443                 }
3444                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3445                 break;
3446         }
3447         case IMP_EVENT_OCD: {
3448                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3449
3450                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3451                         osc_init_grant(&obd->u.cli, ocd);
3452
3453                 /* See bug 7198 */
3454                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3455                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3456
3457                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3458                 break;
3459         }
3460         default:
3461                 CERROR("Unknown import event %d\n", event);
3462                 LBUG();
3463         }
3464         RETURN(rc);
3465 }
3466
3467 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3468 {
3469         int rc;
3470         ENTRY;
3471
3472         ENTRY;
3473         rc = ptlrpcd_addref();
3474         if (rc)
3475                 RETURN(rc);
3476
3477         rc = client_obd_setup(obd, len, buf);
3478         if (rc) {
3479                 ptlrpcd_decref();
3480         } else {
3481                 struct lprocfs_static_vars lvars;
3482                 struct client_obd *cli = &obd->u.cli;
3483
3484                 lprocfs_init_vars(osc, &lvars);
3485                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3486                         lproc_osc_attach_seqstat(obd);
3487                         ptlrpc_lprocfs_register_obd(obd);
3488                 }
3489
3490                 oscc_init(obd);
3491                 /* We need to allocate a few requests more, because
3492                    brw_interpret_oap tries to create new requests before freeing
3493                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3494                    reserved, but I afraid that might be too much wasted RAM
3495                    in fact, so 2 is just my guess and still should work. */
3496                 cli->cl_import->imp_rq_pool =
3497                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3498                                             OST_MAXREQSIZE,
3499                                             ptlrpc_add_rqs_to_pool);
3500         }
3501
3502         RETURN(rc);
3503 }
3504
3505 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3506 {
3507         int rc = 0;
3508         ENTRY;
3509
3510         switch (stage) {
3511         case OBD_CLEANUP_EARLY: {
3512                 struct obd_import *imp;
3513                 imp = obd->u.cli.cl_import;
3514                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3515                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3516                 ptlrpc_deactivate_import(imp);
3517                 break;
3518         }
3519         case OBD_CLEANUP_EXPORTS: {
3520                 /* If we set up but never connected, the
3521                    client import will not have been cleaned. */
3522                 if (obd->u.cli.cl_import) {
3523                         struct obd_import *imp;
3524                         imp = obd->u.cli.cl_import;
3525                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3526                                obd->obd_name);
3527                         ptlrpc_invalidate_import(imp);
3528                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3529                         class_destroy_import(imp);
3530                         obd->u.cli.cl_import = NULL;
3531                 }
3532                 break;
3533         }
3534         case OBD_CLEANUP_SELF_EXP:
3535                 rc = obd_llog_finish(obd, 0);
3536                 if (rc != 0)
3537                         CERROR("failed to cleanup llogging subsystems\n");
3538                 break;
3539         case OBD_CLEANUP_OBD:
3540                 break;
3541         }
3542         RETURN(rc);
3543 }
3544
3545 int osc_cleanup(struct obd_device *obd)
3546 {
3547         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3548         int rc;
3549
3550         ENTRY;
3551         ptlrpc_lprocfs_unregister_obd(obd);
3552         lprocfs_obd_cleanup(obd);
3553
3554         spin_lock(&oscc->oscc_lock);
3555         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3556         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3557         spin_unlock(&oscc->oscc_lock);
3558
3559         /* free memory of osc quota cache */
3560         lquota_cleanup(quota_interface, obd);
3561
3562         rc = client_obd_cleanup(obd);
3563
3564         ptlrpcd_decref();
3565         RETURN(rc);
3566 }
3567
3568 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3569 {
3570         struct lustre_cfg *lcfg = buf;
3571         struct lprocfs_static_vars lvars;
3572         int rc = 0;
3573
3574         lprocfs_init_vars(osc, &lvars);
3575
3576         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3577         return(rc);
3578 }
3579
3580 struct obd_ops osc_obd_ops = {
3581         .o_owner                = THIS_MODULE,
3582         .o_setup                = osc_setup,
3583         .o_precleanup           = osc_precleanup,
3584         .o_cleanup              = osc_cleanup,
3585         .o_add_conn             = client_import_add_conn,
3586         .o_del_conn             = client_import_del_conn,
3587         .o_connect              = client_connect_import,
3588         .o_reconnect            = osc_reconnect,
3589         .o_disconnect           = osc_disconnect,
3590         .o_statfs               = osc_statfs,
3591         .o_statfs_async         = osc_statfs_async,
3592         .o_packmd               = osc_packmd,
3593         .o_unpackmd             = osc_unpackmd,
3594         .o_create               = osc_create,
3595         .o_destroy              = osc_destroy,
3596         .o_getattr              = osc_getattr,
3597         .o_getattr_async        = osc_getattr_async,
3598         .o_setattr              = osc_setattr,
3599         .o_setattr_async        = osc_setattr_async,
3600         .o_brw                  = osc_brw,
3601         .o_brw_async            = osc_brw_async,
3602         .o_prep_async_page      = osc_prep_async_page,
3603         .o_queue_async_io       = osc_queue_async_io,
3604         .o_set_async_flags      = osc_set_async_flags,
3605         .o_queue_group_io       = osc_queue_group_io,
3606         .o_trigger_group_io     = osc_trigger_group_io,
3607         .o_teardown_async_page  = osc_teardown_async_page,
3608         .o_punch                = osc_punch,
3609         .o_sync                 = osc_sync,
3610         .o_enqueue              = osc_enqueue,
3611         .o_match                = osc_match,
3612         .o_change_cbdata        = osc_change_cbdata,
3613         .o_cancel               = osc_cancel,
3614         .o_cancel_unused        = osc_cancel_unused,
3615         .o_join_lru             = osc_join_lru,
3616         .o_iocontrol            = osc_iocontrol,
3617         .o_get_info             = osc_get_info,
3618         .o_set_info_async       = osc_set_info_async,
3619         .o_import_event         = osc_import_event,
3620         .o_llog_init            = osc_llog_init,
3621         .o_llog_finish          = osc_llog_finish,
3622         .o_process_config       = osc_process_config,
3623 };
3624
3625 int __init osc_init(void)
3626 {
3627         struct lprocfs_static_vars lvars;
3628         int rc;
3629         ENTRY;
3630
3631         lprocfs_init_vars(osc, &lvars);
3632
3633         request_module("lquota");
3634         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3635         lquota_init(quota_interface);
3636         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3637
3638         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3639                                  LUSTRE_OSC_NAME);
3640         if (rc) {
3641                 if (quota_interface)
3642                         PORTAL_SYMBOL_PUT(osc_quota_interface);
3643                 RETURN(rc);
3644         }
3645
3646         RETURN(rc);
3647 }
3648
3649 #ifdef __KERNEL__
3650 static void /*__exit*/ osc_exit(void)
3651 {
3652         lquota_exit(quota_interface);
3653         if (quota_interface)
3654                 PORTAL_SYMBOL_PUT(osc_quota_interface);
3655
3656         class_unregister_type(LUSTRE_OSC_NAME);
3657 }
3658
3659 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3660 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3661 MODULE_LICENSE("GPL");
3662
3663 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3664 #endif