Whamcloud - gitweb
cleanup in ptlrpc code, related to ppc platform
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 # include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
61
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 int osc_cleanup(struct obd_device *obd);
67
68 static quota_interface_t *quota_interface;
69 extern quota_interface_t osc_quota_interface;
70
71 /* by default 10s */
72 atomic_t osc_resend_time; 
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100         }
101
102         RETURN(lmm_size);
103 }
104
105 /* Unpack OSC object metadata from disk storage (LE byte order). */
106 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
107                         struct lov_mds_md *lmm, int lmm_bytes)
108 {
109         int lsm_size;
110         ENTRY;
111
112         if (lmm != NULL) {
113                 if (lmm_bytes < sizeof (*lmm)) {
114                         CERROR("lov_mds_md too small: %d, need %d\n",
115                                lmm_bytes, (int)sizeof(*lmm));
116                         RETURN(-EINVAL);
117                 }
118                 /* XXX LOV_MAGIC etc check? */
119
120                 if (lmm->lmm_object_id == 0) {
121                         CERROR("lov_mds_md: zero lmm_object_id\n");
122                         RETURN(-EINVAL);
123                 }
124         }
125
126         lsm_size = lov_stripe_md_size(1);
127         if (lsmp == NULL)
128                 RETURN(lsm_size);
129
130         if (*lsmp != NULL && lmm == NULL) {
131                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
132                 OBD_FREE(*lsmp, lsm_size);
133                 *lsmp = NULL;
134                 RETURN(0);
135         }
136
137         if (*lsmp == NULL) {
138                 OBD_ALLOC(*lsmp, lsm_size);
139                 if (*lsmp == NULL)
140                         RETURN(-ENOMEM);
141                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
142                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
143                         OBD_FREE(*lsmp, lsm_size);
144                         RETURN(-ENOMEM);
145                 }
146                 loi_init((*lsmp)->lsm_oinfo[0]);
147         }
148
149         if (lmm != NULL) {
150                 /* XXX zero *lsmp? */
151                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
152                 LASSERT((*lsmp)->lsm_object_id);
153         }
154
155         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157         RETURN(lsm_size);
158 }
159
160 static int osc_getattr_interpret(struct ptlrpc_request *req,
161                                  struct osc_async_args *aa, int rc)
162 {
163         struct ost_body *body;
164         ENTRY;
165
166         if (rc != 0)
167                 GOTO(out, rc);
168
169         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
170                                   lustre_swab_ost_body);
171         if (body) {
172                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
173                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
174
175                 /* This should really be sent by the OST */
176                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
177                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
178         } else {
179                 CERROR("can't unpack ost_body\n");
180                 rc = -EPROTO;
181                 aa->aa_oi->oi_oa->o_valid = 0;
182         }
183 out:
184         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
185         RETURN(rc);
186 }
187
188 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
189                              struct ptlrpc_request_set *set)
190 {
191         struct ptlrpc_request *req;
192         struct ost_body *body;
193         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
194         struct osc_async_args *aa;
195         ENTRY;
196
197         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
198                               OST_GETATTR, 2, size,NULL);
199         if (!req)
200                 RETURN(-ENOMEM);
201
202         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
203         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
204
205         ptlrpc_req_set_repsize(req, 2, size);
206         req->rq_interpret_reply = osc_getattr_interpret;
207
208         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
209         aa = (struct osc_async_args *)&req->rq_async_args;
210         aa->aa_oi = oinfo;
211
212         ptlrpc_set_add_req(set, req);
213         RETURN (0);
214 }
215
216 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
217 {
218         struct ptlrpc_request *req;
219         struct ost_body *body;
220         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
221         ENTRY;
222
223         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
224                               OST_GETATTR, 2, size, NULL);
225         if (!req)
226                 RETURN(-ENOMEM);
227
228         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
229         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
230
231         ptlrpc_req_set_repsize(req, 2, size);
232
233         rc = ptlrpc_queue_wait(req);
234         if (rc) {
235                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
236                 GOTO(out, rc);
237         }
238
239         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
240                                   lustre_swab_ost_body);
241         if (body == NULL) {
242                 CERROR ("can't unpack ost_body\n");
243                 GOTO (out, rc = -EPROTO);
244         }
245
246         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
247         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
248
249         /* This should really be sent by the OST */
250         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
251         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
252
253         EXIT;
254  out:
255         ptlrpc_req_finished(req);
256         return rc;
257 }
258
259 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
260                        struct obd_trans_info *oti)
261 {
262         struct ptlrpc_request *req;
263         struct ost_body *body;
264         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
265         ENTRY;
266
267         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
268                               OST_SETATTR, 2, size, NULL);
269         if (!req)
270                 RETURN(-ENOMEM);
271
272         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
273         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
274
275         ptlrpc_req_set_repsize(req, 2, size);
276
277         rc = ptlrpc_queue_wait(req);
278         if (rc)
279                 GOTO(out, rc);
280
281         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
282                                   lustre_swab_ost_body);
283         if (body == NULL)
284                 GOTO(out, rc = -EPROTO);
285
286         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
287
288         EXIT;
289 out:
290         ptlrpc_req_finished(req);
291         RETURN(rc);
292 }
293
294 static int osc_setattr_interpret(struct ptlrpc_request *req,
295                                  struct osc_async_args *aa, int rc)
296 {
297         struct ost_body *body;
298         ENTRY;
299
300         if (rc != 0)
301                 GOTO(out, rc);
302
303         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
304                                   lustre_swab_ost_body);
305         if (body == NULL) {
306                 CERROR("can't unpack ost_body\n");
307                 GOTO(out, rc = -EPROTO);
308         }
309
310         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
311 out:
312         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
313         RETURN(rc);
314 }
315
316 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
317                              struct obd_trans_info *oti,
318                              struct ptlrpc_request_set *rqset)
319 {
320         struct ptlrpc_request *req;
321         struct ost_body *body;
322         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
323         struct osc_async_args *aa;
324         ENTRY;
325
326         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
327                               OST_SETATTR, 2, size, NULL);
328         if (!req)
329                 RETURN(-ENOMEM);
330
331         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
332
333         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
334                 LASSERT(oti);
335                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
336         }
337
338         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
339         ptlrpc_req_set_repsize(req, 2, size);
340         /* do mds to ost setattr asynchronouly */
341         if (!rqset) {
342                 /* Do not wait for response. */
343                 ptlrpcd_add_req(req);
344         } else {
345                 req->rq_interpret_reply = osc_setattr_interpret;
346
347                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
348                 aa = (struct osc_async_args *)&req->rq_async_args;
349                 aa->aa_oi = oinfo;
350
351                 ptlrpc_set_add_req(rqset, req);
352         }
353
354         RETURN(0);
355 }
356
357 int osc_real_create(struct obd_export *exp, struct obdo *oa,
358                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
359 {
360         struct ptlrpc_request *req;
361         struct ost_body *body;
362         struct lov_stripe_md *lsm;
363         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
364         ENTRY;
365
366         LASSERT(oa);
367         LASSERT(ea);
368
369         lsm = *ea;
370         if (!lsm) {
371                 rc = obd_alloc_memmd(exp, &lsm);
372                 if (rc < 0)
373                         RETURN(rc);
374         }
375
376         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
377                               OST_CREATE, 2, size, NULL);
378         if (!req)
379                 GOTO(out, rc = -ENOMEM);
380
381         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
382         memcpy(&body->oa, oa, sizeof(body->oa));
383
384         ptlrpc_req_set_repsize(req, 2, size);
385         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
386             oa->o_flags == OBD_FL_DELORPHAN) {
387                 DEBUG_REQ(D_HA, req,
388                           "delorphan from OST integration");
389                 /* Don't resend the delorphan req */
390                 req->rq_no_resend = req->rq_no_delay = 1;
391         }
392
393         rc = ptlrpc_queue_wait(req);
394         if (rc)
395                 GOTO(out_req, rc);
396
397         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
398                                   lustre_swab_ost_body);
399         if (body == NULL) {
400                 CERROR ("can't unpack ost_body\n");
401                 GOTO (out_req, rc = -EPROTO);
402         }
403
404         memcpy(oa, &body->oa, sizeof(*oa));
405
406         /* This should really be sent by the OST */
407         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
408         oa->o_valid |= OBD_MD_FLBLKSZ;
409
410         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
411          * have valid lsm_oinfo data structs, so don't go touching that.
412          * This needs to be fixed in a big way.
413          */
414         lsm->lsm_object_id = oa->o_id;
415         *ea = lsm;
416
417         if (oti != NULL) {
418                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
419
420                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
421                         if (!oti->oti_logcookies)
422                                 oti_alloc_cookies(oti, 1);
423                         *oti->oti_logcookies = oa->o_lcookie;
424                 }
425         }
426
427         CDEBUG(D_HA, "transno: "LPD64"\n",
428                lustre_msg_get_transno(req->rq_repmsg));
429 out_req:
430         ptlrpc_req_finished(req);
431 out:
432         if (rc && !*ea)
433                 obd_free_memmd(exp, &lsm);
434         RETURN(rc);
435 }
436
437 static int osc_punch_interpret(struct ptlrpc_request *req,
438                                struct osc_async_args *aa, int rc)
439 {
440         struct ost_body *body;
441         ENTRY;
442
443         if (rc != 0)
444                 GOTO(out, rc);
445
446         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
447                                   lustre_swab_ost_body);
448         if (body == NULL) {
449                 CERROR ("can't unpack ost_body\n");
450                 GOTO(out, rc = -EPROTO);
451         }
452
453         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
454 out:
455         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
456         RETURN(rc);
457 }
458
459 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
460                      struct obd_trans_info *oti,
461                      struct ptlrpc_request_set *rqset)
462 {
463         struct ptlrpc_request *req;
464         struct osc_async_args *aa;
465         struct ost_body *body;
466         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
467         ENTRY;
468
469         if (!oinfo->oi_oa) {
470                 CERROR("oa NULL\n");
471                 RETURN(-EINVAL);
472         }
473
474         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
475                               OST_PUNCH, 2, size, NULL);
476         if (!req)
477                 RETURN(-ENOMEM);
478
479         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
480         ptlrpc_at_set_req_timeout(req);
481
482         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
483         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
484
485         /* overload the size and blocks fields in the oa with start/end */
486         body->oa.o_size = oinfo->oi_policy.l_extent.start;
487         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
488         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
489
490         ptlrpc_req_set_repsize(req, 2, size);
491
492         req->rq_interpret_reply = osc_punch_interpret;
493         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
494         aa = (struct osc_async_args *)&req->rq_async_args;
495         aa->aa_oi = oinfo;
496         ptlrpc_set_add_req(rqset, req);
497
498         RETURN(0);
499 }
500
501 static int osc_sync(struct obd_export *exp, struct obdo *oa,
502                     struct lov_stripe_md *md, obd_size start, obd_size end)
503 {
504         struct ptlrpc_request *req;
505         struct ost_body *body;
506         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
507         ENTRY;
508
509         if (!oa) {
510                 CERROR("oa NULL\n");
511                 RETURN(-EINVAL);
512         }
513
514         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
515                               OST_SYNC, 2, size, NULL);
516         if (!req)
517                 RETURN(-ENOMEM);
518
519         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
520         memcpy(&body->oa, oa, sizeof(*oa));
521
522         /* overload the size and blocks fields in the oa with start/end */
523         body->oa.o_size = start;
524         body->oa.o_blocks = end;
525         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
526
527         ptlrpc_req_set_repsize(req, 2, size);
528
529         rc = ptlrpc_queue_wait(req);
530         if (rc)
531                 GOTO(out, rc);
532
533         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
534                                   lustre_swab_ost_body);
535         if (body == NULL) {
536                 CERROR ("can't unpack ost_body\n");
537                 GOTO (out, rc = -EPROTO);
538         }
539
540         memcpy(oa, &body->oa, sizeof(*oa));
541
542         EXIT;
543  out:
544         ptlrpc_req_finished(req);
545         return rc;
546 }
547
548 /* Find and cancel locally locks matched by @mode in the resource found by
549  * @objid. Found locks are added into @cancel list. Returns the amount of
550  * locks added to @cancels list. */
551 static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
552                                    struct list_head *cancels, ldlm_mode_t mode,
553                                    int lock_flags)
554 {
555         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
556         struct ldlm_res_id res_id = { .name = { objid } };
557         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
558         int count;
559         ENTRY;
560
561         if (res == NULL)
562                 RETURN(0);
563
564         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
565                                            lock_flags, 0, NULL);
566         ldlm_resource_putref(res);
567         RETURN(count);
568 }
569
570 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
571                                  int rc)
572 {
573         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
574
575         atomic_dec(&cli->cl_destroy_in_flight);
576         cfs_waitq_signal(&cli->cl_destroy_waitq);
577         return 0;
578 }
579
580 static int osc_can_send_destroy(struct client_obd *cli)
581 {
582         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
583             cli->cl_max_rpcs_in_flight) {
584                 /* The destroy request can be sent */
585                 return 1;
586         }
587         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
588             cli->cl_max_rpcs_in_flight) {
589                 /*
590                  * The counter has been modified between the two atomic
591                  * operations.
592                  */
593                 cfs_waitq_signal(&cli->cl_destroy_waitq);
594         }
595         return 0;
596 }
597
598 /* Destroy requests can be async always on the client, and we don't even really
599  * care about the return code since the client cannot do anything at all about
600  * a destroy failure.
601  * When the MDS is unlinking a filename, it saves the file objects into a
602  * recovery llog, and these object records are cancelled when the OST reports
603  * they were destroyed and sync'd to disk (i.e. transaction committed).
604  * If the client dies, or the OST is down when the object should be destroyed,
605  * the records are not cancelled, and when the OST reconnects to the MDS next,
606  * it will retrieve the llog unlink logs and then sends the log cancellation
607  * cookies to the MDS after committing destroy transactions. */
608 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
609                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
610                        struct obd_export *md_export)
611 {
612         CFS_LIST_HEAD(cancels);
613         struct ptlrpc_request *req;
614         struct ost_body *body;
615         int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
616                         sizeof(struct ldlm_request) };
617         int count, bufcount = 2;
618         struct client_obd *cli = &exp->exp_obd->u.cli;
619         ENTRY;
620
621         if (!oa) {
622                 CERROR("oa NULL\n");
623                 RETURN(-EINVAL);
624         }
625
626         LASSERT(oa->o_id != 0);
627
628         count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
629                                         LDLM_FL_DISCARD_DATA);
630         if (exp_connect_cancelset(exp))
631                 bufcount = 3;
632         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
633                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
634         if (!req)
635                 RETURN(-ENOMEM);
636
637         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
638         req->rq_interpret_reply = osc_destroy_interpret;
639         ptlrpc_at_set_req_timeout(req);
640
641         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
642
643         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
644                 oa->o_lcookie = *oti->oti_logcookies;
645         }
646
647         memcpy(&body->oa, oa, sizeof(*oa));
648         ptlrpc_req_set_repsize(req, 2, size);
649
650         if (!osc_can_send_destroy(cli)) {
651                 struct l_wait_info lwi = { 0 };
652
653                 /*
654                  * Wait until the number of on-going destroy RPCs drops
655                  * under max_rpc_in_flight
656                  */
657                 l_wait_event_exclusive(cli->cl_destroy_waitq,
658                                        osc_can_send_destroy(cli), &lwi);
659         }
660
661         /* Do not wait for response */
662         ptlrpcd_add_req(req);
663         RETURN(0);
664 }
665
666 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
667                                 long writing_bytes)
668 {
669         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
670
671         LASSERT(!(oa->o_valid & bits));
672
673         oa->o_valid |= bits;
674         client_obd_list_lock(&cli->cl_loi_list_lock);
675         oa->o_dirty = cli->cl_dirty;
676         if (cli->cl_dirty > cli->cl_dirty_max) {
677                 CERROR("dirty %lu > dirty_max %lu\n",
678                        cli->cl_dirty, cli->cl_dirty_max);
679                 oa->o_undirty = 0;
680         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
681                 CERROR("dirty %d > system dirty_max %d\n",
682                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
683                 oa->o_undirty = 0;
684         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
685                 CERROR("dirty %lu - dirty_max %lu too big???\n",
686                        cli->cl_dirty, cli->cl_dirty_max);
687                 oa->o_undirty = 0;
688         } else {
689                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
690                                 (cli->cl_max_rpcs_in_flight + 1);
691                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
692         }
693         oa->o_grant = cli->cl_avail_grant;
694         oa->o_dropped = cli->cl_lost_grant;
695         cli->cl_lost_grant = 0;
696         client_obd_list_unlock(&cli->cl_loi_list_lock);
697         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
698                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
699 }
700
701 /* caller must hold loi_list_lock */
702 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
703 {
704         atomic_inc(&obd_dirty_pages);
705         cli->cl_dirty += CFS_PAGE_SIZE;
706         cli->cl_avail_grant -= CFS_PAGE_SIZE;
707         pga->flag |= OBD_BRW_FROM_GRANT;
708         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
709                CFS_PAGE_SIZE, pga, pga->pg);
710         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
711                  cli->cl_avail_grant);
712 }
713
714 /* the companion to osc_consume_write_grant, called when a brw has completed.
715  * must be called with the loi lock held. */
716 static void osc_release_write_grant(struct client_obd *cli,
717                                     struct brw_page *pga, int sent)
718 {
719         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
720         ENTRY;
721
722         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
723                 EXIT;
724                 return;
725         }
726
727         pga->flag &= ~OBD_BRW_FROM_GRANT;
728         atomic_dec(&obd_dirty_pages);
729         cli->cl_dirty -= CFS_PAGE_SIZE;
730         if (!sent) {
731                 cli->cl_lost_grant += CFS_PAGE_SIZE;
732                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
733                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
734         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
735                 /* For short writes we shouldn't count parts of pages that
736                  * span a whole block on the OST side, or our accounting goes
737                  * wrong.  Should match the code in filter_grant_check. */
738                 int offset = pga->off & ~CFS_PAGE_MASK;
739                 int count = pga->count + (offset & (blocksize - 1));
740                 int end = (offset + pga->count) & (blocksize - 1);
741                 if (end)
742                         count += blocksize - end;
743
744                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
745                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
746                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
747                        cli->cl_avail_grant, cli->cl_dirty);
748         }
749
750         EXIT;
751 }
752
753 static unsigned long rpcs_in_flight(struct client_obd *cli)
754 {
755         return cli->cl_r_in_flight + cli->cl_w_in_flight;
756 }
757
758 /* caller must hold loi_list_lock */
759 void osc_wake_cache_waiters(struct client_obd *cli)
760 {
761         struct list_head *l, *tmp;
762         struct osc_cache_waiter *ocw;
763
764         ENTRY;
765         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
766                 /* if we can't dirty more, we must wait until some is written */
767                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
768                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
769                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
770                                "osc max %ld, sys max %d\n", cli->cl_dirty,
771                                cli->cl_dirty_max, obd_max_dirty_pages);
772                         return;
773                 }
774
775                 /* if still dirty cache but no grant wait for pending RPCs that
776                  * may yet return us some grant before doing sync writes */
777                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
778                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
779                                cli->cl_w_in_flight);
780                         return;
781                 }
782
783                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
784                 list_del_init(&ocw->ocw_entry);
785                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
786                         /* no more RPCs in flight to return grant, do sync IO */
787                         ocw->ocw_rc = -EDQUOT;
788                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
789                 } else {
790                         osc_consume_write_grant(cli,
791                                                 &ocw->ocw_oap->oap_brw_page);
792                 }
793
794                 cfs_waitq_signal(&ocw->ocw_waitq);
795         }
796
797         EXIT;
798 }
799
800 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
801 {
802         client_obd_list_lock(&cli->cl_loi_list_lock);
803         cli->cl_avail_grant = ocd->ocd_grant;
804         client_obd_list_unlock(&cli->cl_loi_list_lock);
805
806         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
807                cli->cl_avail_grant, cli->cl_lost_grant);
808         LASSERT(cli->cl_avail_grant >= 0);
809 }
810
811 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
812 {
813         client_obd_list_lock(&cli->cl_loi_list_lock);
814         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
815         if (body->oa.o_valid & OBD_MD_FLGRANT)
816                 cli->cl_avail_grant += body->oa.o_grant;
817         /* waiters are woken in brw_interpret_oap */
818         client_obd_list_unlock(&cli->cl_loi_list_lock);
819 }
820
821 /* We assume that the reason this OSC got a short read is because it read
822  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
823  * via the LOV, and it _knows_ it's reading inside the file, it's just that
824  * this stripe never got written at or beyond this stripe offset yet. */
825 static void handle_short_read(int nob_read, obd_count page_count,
826                               struct brw_page **pga)
827 {
828         char *ptr;
829         int i = 0;
830
831         /* skip bytes read OK */
832         while (nob_read > 0) {
833                 LASSERT (page_count > 0);
834
835                 if (pga[i]->count > nob_read) {
836                         /* EOF inside this page */
837                         ptr = cfs_kmap(pga[i]->pg) + 
838                                 (pga[i]->off & ~CFS_PAGE_MASK);
839                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
840                         cfs_kunmap(pga[i]->pg);
841                         page_count--;
842                         i++;
843                         break;
844                 }
845
846                 nob_read -= pga[i]->count;
847                 page_count--;
848                 i++;
849         }
850
851         /* zero remaining pages */
852         while (page_count-- > 0) {
853                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
854                 memset(ptr, 0, pga[i]->count);
855                 cfs_kunmap(pga[i]->pg);
856                 i++;
857         }
858 }
859
860 static int check_write_rcs(struct ptlrpc_request *req,
861                            int requested_nob, int niocount,
862                            obd_count page_count, struct brw_page **pga)
863 {
864         int    *remote_rcs, i;
865
866         /* return error if any niobuf was in error */
867         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
868                                         sizeof(*remote_rcs) * niocount, NULL);
869         if (remote_rcs == NULL) {
870                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
871                 return(-EPROTO);
872         }
873         if (lustre_rep_need_swab(req))
874                 for (i = 0; i < niocount; i++)
875                         __swab32s(&remote_rcs[i]);
876
877         for (i = 0; i < niocount; i++) {
878                 if (remote_rcs[i] < 0)
879                         return(remote_rcs[i]);
880
881                 if (remote_rcs[i] != 0) {
882                         CERROR("rc[%d] invalid (%d) req %p\n",
883                                 i, remote_rcs[i], req);
884                         return(-EPROTO);
885                 }
886         }
887
888         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
889                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
890                        requested_nob, req->rq_bulk->bd_nob_transferred);
891                 return(-EPROTO);
892         }
893
894         return (0);
895 }
896
897 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
898 {
899         if (p1->flag != p2->flag) {
900                 unsigned mask = ~OBD_BRW_FROM_GRANT;
901
902                 /* warn if we try to combine flags that we don't know to be
903                  * safe to combine */
904                 if ((p1->flag & mask) != (p2->flag & mask))
905                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
906                                "same brw?\n", p1->flag, p2->flag);
907                 return 0;
908         }
909
910         return (p1->off + p1->count == p2->off);
911 }
912
913 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
914                                    struct brw_page **pga, int opc,
915                                    cksum_type_t cksum_type)
916 {
917         __u32 cksum;
918         int i = 0;
919
920         LASSERT (pg_count > 0);
921         cksum = init_checksum(cksum_type);
922         while (nob > 0 && pg_count > 0) {
923                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
924                 int off = pga[i]->off & ~CFS_PAGE_MASK;
925                 int count = pga[i]->count > nob ? nob : pga[i]->count;
926
927                 /* corrupt the data before we compute the checksum, to
928                  * simulate an OST->client data error */
929                 if (i == 0 && opc == OST_READ &&
930                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
931                         memcpy(ptr + off, "bad1", min(4, nob));
932                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
933                 cfs_kunmap(pga[i]->pg);
934                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
935                                off, cksum);
936
937                 nob -= pga[i]->count;
938                 pg_count--;
939                 i++;
940         }
941         /* For sending we only compute the wrong checksum instead
942          * of corrupting the data so it is still correct on a redo */
943         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
944                 cksum++;
945
946         return cksum;
947 }
948
949 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
950                                 struct lov_stripe_md *lsm, obd_count page_count,
951                                 struct brw_page **pga,
952                                 struct ptlrpc_request **reqp)
953 {
954         struct ptlrpc_request   *req;
955         struct ptlrpc_bulk_desc *desc;
956         struct ost_body         *body;
957         struct obd_ioobj        *ioobj;
958         struct niobuf_remote    *niobuf;
959         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
960         int niocount, i, requested_nob, opc, rc;
961         struct ptlrpc_request_pool *pool;
962         struct osc_brw_async_args *aa;
963         struct brw_page *pg_prev;
964
965         ENTRY;
966         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
967         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
968
969         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
970         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
971
972         for (niocount = i = 1; i < page_count; i++) {
973                 if (!can_merge_pages(pga[i - 1], pga[i]))
974                         niocount++;
975         }
976
977         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
978         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
979
980         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
981                                    NULL, pool);
982         if (req == NULL)
983                 RETURN (-ENOMEM);
984
985         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
986         ptlrpc_at_set_req_timeout(req);
987
988         if (opc == OST_WRITE)
989                 desc = ptlrpc_prep_bulk_imp (req, page_count,
990                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
991         else
992                 desc = ptlrpc_prep_bulk_imp (req, page_count,
993                                              BULK_PUT_SINK, OST_BULK_PORTAL);
994         if (desc == NULL)
995                 GOTO(out, rc = -ENOMEM);
996         /* NB request now owns desc and will free it when it gets freed */
997
998         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
999         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1000         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1001                                 niocount * sizeof(*niobuf));
1002
1003         memcpy(&body->oa, oa, sizeof(*oa));
1004
1005         obdo_to_ioobj(oa, ioobj);
1006         ioobj->ioo_bufcnt = niocount;
1007
1008         LASSERT (page_count > 0);
1009         pg_prev = pga[0];
1010         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1011                 struct brw_page *pg = pga[i];
1012
1013                 LASSERT(pg->count > 0);
1014                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1015                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1016                          pg->off, pg->count);
1017 #ifdef __linux__
1018                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1019                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1020                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1021                          i, page_count,
1022                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1023                          pg_prev->pg, page_private(pg_prev->pg),
1024                          pg_prev->pg->index, pg_prev->off);
1025 #else
1026                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1027                          "i %d p_c %u\n", i, page_count);
1028 #endif
1029                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1030                         (pg->flag & OBD_BRW_SRVLOCK));
1031
1032                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1033                                       pg->count);
1034                 requested_nob += pg->count;
1035
1036                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1037                         niobuf--;
1038                         niobuf->len += pg->count;
1039                 } else {
1040                         niobuf->offset = pg->off;
1041                         niobuf->len    = pg->count;
1042                         niobuf->flags  = pg->flag;
1043                 }
1044                 pg_prev = pg;
1045         }
1046
1047         LASSERTF((void *)(niobuf - niocount) ==
1048                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1049                                niocount * sizeof(*niobuf)),
1050                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1051                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1052                 (void *)(niobuf - niocount));
1053
1054         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1055
1056         /* size[REQ_REC_OFF] still sizeof (*body) */
1057         if (opc == OST_WRITE) {
1058                 if (cli->cl_checksum) {
1059                         /* store cl_cksum_type in a local variable since
1060                          * it can be changed via lprocfs */
1061                         cksum_type_t cksum_type = cli->cl_cksum_type;
1062
1063                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1064                                 oa->o_flags = body->oa.o_flags = 0;
1065                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1066                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1067                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1068                                                              page_count, pga,
1069                                                              OST_WRITE,
1070                                                              cksum_type);
1071                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1072                                body->oa.o_cksum);
1073                         /* save this in 'oa', too, for later checking */
1074                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1075                         oa->o_flags |= cksum_type_pack(cksum_type);
1076                 } else {
1077                         /* clear out the checksum flag, in case this is a
1078                          * resend but cl_checksum is no longer set. b=11238 */
1079                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1080                 }
1081                 oa->o_cksum = body->oa.o_cksum;
1082                 /* 1 RC per niobuf */
1083                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1084                 ptlrpc_req_set_repsize(req, 3, size);
1085         } else {
1086                 if (cli->cl_checksum) {
1087                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1088                                 body->oa.o_flags = 0;
1089                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1090                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1091                 }
1092                 /* 1 RC for the whole I/O */
1093                 ptlrpc_req_set_repsize(req, 2, size);
1094         }
1095
1096         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1097         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1098         aa->aa_oa = oa;
1099         aa->aa_requested_nob = requested_nob;
1100         aa->aa_nio_count = niocount;
1101         aa->aa_page_count = page_count;
1102         aa->aa_resends = 0;
1103         aa->aa_ppga = pga;
1104         aa->aa_cli = cli;
1105         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1106
1107         *reqp = req;
1108         RETURN (0);
1109
1110  out:
1111         ptlrpc_req_finished (req);
1112         RETURN (rc);
1113 }
1114
1115 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1116                                 __u32 client_cksum, __u32 server_cksum, int nob,
1117                                 obd_count page_count, struct brw_page **pga,
1118                                 cksum_type_t client_cksum_type)
1119 {
1120         __u32 new_cksum;
1121         char *msg;
1122         cksum_type_t cksum_type;
1123
1124         if (server_cksum == client_cksum) {
1125                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1126                 return 0;
1127         }
1128
1129         if (oa->o_valid & OBD_MD_FLFLAGS)
1130                 cksum_type = cksum_type_unpack(oa->o_flags);
1131         else
1132                 cksum_type = OBD_CKSUM_CRC32;
1133
1134         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1135                                       cksum_type);
1136
1137         if (cksum_type != client_cksum_type)
1138                 msg = "the server did not use the checksum type specified in "
1139                       "the original request - likely a protocol problem";
1140         else if (new_cksum == server_cksum)
1141                 msg = "changed on the client after we checksummed it - "
1142                       "likely false positive due to mmap IO (bug 11742)";
1143         else if (new_cksum == client_cksum)
1144                 msg = "changed in transit before arrival at OST";
1145         else
1146                 msg = "changed in transit AND doesn't match the original - "
1147                       "likely false positive due to mmap IO (bug 11742)";
1148
1149         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1150                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1151                            "["LPU64"-"LPU64"]\n",
1152                            msg, libcfs_nid2str(peer->nid),
1153                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1154                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1155                                                         (__u64)0,
1156                            oa->o_id,
1157                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1158                            pga[0]->off,
1159                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1160         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1161                "client csum now %x\n", client_cksum, client_cksum_type,
1162                server_cksum, cksum_type, new_cksum);
1163
1164         return 1;
1165 }
1166
1167 /* Note rc enters this function as number of bytes transferred */
1168 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1169 {
1170         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1171         const lnet_process_id_t *peer =
1172                         &req->rq_import->imp_connection->c_peer;
1173         struct client_obd *cli = aa->aa_cli;
1174         struct ost_body *body;
1175         __u32 client_cksum = 0;
1176         ENTRY;
1177
1178         if (rc < 0 && rc != -EDQUOT)
1179                 RETURN(rc);
1180
1181         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1182         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1183                                   lustre_swab_ost_body);
1184         if (body == NULL) {
1185                 CERROR ("Can't unpack body\n");
1186                 RETURN(-EPROTO);
1187         }
1188
1189         /* set/clear over quota flag for a uid/gid */
1190         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1191             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1192                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1193                              body->oa.o_gid, body->oa.o_valid,
1194                              body->oa.o_flags);
1195
1196         if (rc < 0)
1197                 RETURN(rc);
1198
1199         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1200                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1201
1202         osc_update_grant(cli, body);
1203
1204         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1205                 if (rc > 0) {
1206                         CERROR ("Unexpected +ve rc %d\n", rc);
1207                         RETURN(-EPROTO);
1208                 }
1209                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1210
1211                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1212                     check_write_checksum(&body->oa, peer, client_cksum,
1213                                          body->oa.o_cksum, aa->aa_requested_nob,
1214                                          aa->aa_page_count, aa->aa_ppga,
1215                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1216                         RETURN(-EAGAIN);
1217
1218                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1219                                      aa->aa_page_count, aa->aa_ppga);
1220                 GOTO(out, rc);
1221         }
1222
1223         /* The rest of this function executes only for OST_READs */
1224         if (rc > aa->aa_requested_nob) {
1225                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1226                        aa->aa_requested_nob);
1227                 RETURN(-EPROTO);
1228         }
1229
1230         if (rc != req->rq_bulk->bd_nob_transferred) {
1231                 CERROR ("Unexpected rc %d (%d transferred)\n",
1232                         rc, req->rq_bulk->bd_nob_transferred);
1233                 return (-EPROTO);
1234         }
1235
1236         if (rc < aa->aa_requested_nob)
1237                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1238
1239         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1240                 static int cksum_counter;
1241                 __u32      server_cksum = body->oa.o_cksum;
1242                 char      *via;
1243                 char      *router;
1244                 cksum_type_t cksum_type;
1245
1246                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1247                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1248                 else
1249                         cksum_type = OBD_CKSUM_CRC32;
1250                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1251                                                  aa->aa_ppga, OST_READ,
1252                                                  cksum_type);
1253
1254                 if (peer->nid == req->rq_bulk->bd_sender) {
1255                         via = router = "";
1256                 } else {
1257                         via = " via ";
1258                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1259                 }
1260
1261                 if (server_cksum == ~0 && rc > 0) {
1262                         CERROR("Protocol error: server %s set the 'checksum' "
1263                                "bit, but didn't send a checksum.  Not fatal, "
1264                                "but please tell CFS.\n",
1265                                libcfs_nid2str(peer->nid));
1266                 } else if (server_cksum != client_cksum) {
1267                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1268                                            "%s%s%s inum "LPU64"/"LPU64" object "
1269                                            LPU64"/"LPU64" extent "
1270                                            "["LPU64"-"LPU64"]\n",
1271                                            req->rq_import->imp_obd->obd_name,
1272                                            libcfs_nid2str(peer->nid),
1273                                            via, router,
1274                                            body->oa.o_valid & OBD_MD_FLFID ?
1275                                                 body->oa.o_fid : (__u64)0,
1276                                            body->oa.o_valid & OBD_MD_FLFID ?
1277                                                 body->oa.o_generation :(__u64)0,
1278                                            body->oa.o_id,
1279                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1280                                                 body->oa.o_gr : (__u64)0,
1281                                            aa->aa_ppga[0]->off,
1282                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1283                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1284                                                                         1);
1285                         CERROR("client %x, server %x, cksum_type %x\n",
1286                                client_cksum, server_cksum, cksum_type);
1287                         cksum_counter = 0;
1288                         aa->aa_oa->o_cksum = client_cksum;
1289                         rc = -EAGAIN;
1290                 } else {
1291                         cksum_counter++;
1292                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1293                         rc = 0;
1294                 }
1295         } else if (unlikely(client_cksum)) {
1296                 static int cksum_missed;
1297
1298                 cksum_missed++;
1299                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1300                         CERROR("Checksum %u requested from %s but not sent\n",
1301                                cksum_missed, libcfs_nid2str(peer->nid));
1302         } else {
1303                 rc = 0;
1304         }
1305 out:
1306         if (rc >= 0)
1307                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1308
1309         RETURN(rc);
1310 }
1311
1312 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1313                             struct lov_stripe_md *lsm,
1314                             obd_count page_count, struct brw_page **pga)
1315 {
1316         struct ptlrpc_request *request;
1317         int                    rc;
1318         cfs_waitq_t            waitq;
1319         int                    resends = 0;
1320         struct l_wait_info     lwi;
1321
1322         ENTRY;
1323         init_waitqueue_head(&waitq);
1324
1325 restart_bulk:
1326         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1327                                   page_count, pga, &request);
1328         if (rc != 0)
1329                 return (rc);
1330
1331         rc = ptlrpc_queue_wait(request);
1332
1333         if (rc == -ETIMEDOUT && request->rq_resend) {
1334                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1335                 ptlrpc_req_finished(request);
1336                 goto restart_bulk;
1337         }
1338
1339         rc = osc_brw_fini_request(request, rc);
1340
1341         ptlrpc_req_finished(request);
1342         if (osc_recoverable_error(rc)) {
1343                 resends++;
1344                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1345                         CERROR("too many resend retries, returning error\n");
1346                         RETURN(-EIO);
1347                 }
1348
1349                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1350                 l_wait_event(waitq, 0, &lwi);
1351
1352                 goto restart_bulk;
1353         }
1354         RETURN(rc);
1355 }
1356
1357 int osc_brw_redo_request(struct ptlrpc_request *request,
1358                          struct osc_brw_async_args *aa)
1359 {
1360         struct ptlrpc_request *new_req;
1361         struct ptlrpc_request_set *set = request->rq_set;
1362         struct osc_brw_async_args *new_aa;
1363         struct osc_async_page *oap;
1364         int rc = 0;
1365         ENTRY;
1366
1367         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1368                 CERROR("too many resend retries, returning error\n");
1369                 RETURN(-EIO);
1370         }
1371
1372         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1373
1374         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1375                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1376                                   aa->aa_cli, aa->aa_oa,
1377                                   NULL /* lsm unused by osc currently */,
1378                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1379         if (rc)
1380                 RETURN(rc);
1381
1382         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1383
1384         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1385                 if (oap->oap_request != NULL) {
1386                         LASSERTF(request == oap->oap_request,
1387                                  "request %p != oap_request %p\n",
1388                                  request, oap->oap_request);
1389                         if (oap->oap_interrupted) {
1390                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1391                                 ptlrpc_req_finished(new_req);
1392                                 RETURN(-EINTR);
1393                         }
1394                 }
1395         }
1396         /* New request takes over pga and oaps from old request.
1397          * Note that copying a list_head doesn't work, need to move it... */
1398         aa->aa_resends++;
1399         new_req->rq_interpret_reply = request->rq_interpret_reply;
1400         new_req->rq_async_args = request->rq_async_args;
1401         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1402
1403         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1404
1405         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1406         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1407         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1408
1409         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1410                 if (oap->oap_request) {
1411                         ptlrpc_req_finished(oap->oap_request);
1412                         oap->oap_request = ptlrpc_request_addref(new_req);
1413                 }
1414         }
1415
1416         /* use ptlrpc_set_add_req is safe because interpret functions work 
1417          * in check_set context. only one way exist with access to request 
1418          * from different thread got -EINTR - this way protected with 
1419          * cl_loi_list_lock */
1420         ptlrpc_set_add_req(set, new_req);
1421
1422         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1423
1424         DEBUG_REQ(D_INFO, new_req, "new request");
1425         RETURN(0);
1426 }
1427
1428 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
1429 {
1430         struct osc_brw_async_args *aa = data;
1431         int                        i;
1432         ENTRY;
1433
1434         rc = osc_brw_fini_request(request, rc);
1435         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);   
1436         if (osc_recoverable_error(rc)) {
1437                 rc = osc_brw_redo_request(request, aa);
1438                 if (rc == 0)
1439                         RETURN(0);
1440         }
1441         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1442         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1443                 aa->aa_cli->cl_w_in_flight--;
1444         else
1445                 aa->aa_cli->cl_r_in_flight--;
1446
1447         for (i = 0; i < aa->aa_page_count; i++)
1448                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1449         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1450         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1451
1452         RETURN(rc);
1453 }
1454
1455 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1456                           struct lov_stripe_md *lsm, obd_count page_count,
1457                           struct brw_page **pga, struct ptlrpc_request_set *set)
1458 {
1459         struct ptlrpc_request     *request;
1460         struct client_obd         *cli = &exp->exp_obd->u.cli;
1461         int                        rc, i;
1462         struct osc_brw_async_args *aa;
1463         ENTRY;
1464
1465         /* Consume write credits even if doing a sync write -
1466          * otherwise we may run out of space on OST due to grant. */
1467         if (cmd == OBD_BRW_WRITE) {
1468                 client_obd_list_lock(&cli->cl_loi_list_lock);
1469                 for (i = 0; i < page_count; i++) {
1470                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1471                                 osc_consume_write_grant(cli, pga[i]);
1472                 }
1473                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1474         }
1475
1476         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1477                                   page_count, pga, &request);
1478
1479         aa = (struct osc_brw_async_args *)&request->rq_async_args;
1480         if (cmd == OBD_BRW_READ) {
1481                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1482                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1483                 ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
1484         } else {
1485                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1486                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1487                                  cli->cl_w_in_flight);
1488                 ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
1489         }
1490
1491         if (rc == 0) {
1492                 request->rq_interpret_reply = brw_interpret;
1493                 ptlrpc_set_add_req(set, request);
1494                 client_obd_list_lock(&cli->cl_loi_list_lock);
1495                 if (cmd == OBD_BRW_READ)
1496                         cli->cl_r_in_flight++;
1497                 else
1498                         cli->cl_w_in_flight++;
1499                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1500         } else if (cmd == OBD_BRW_WRITE) {
1501                 client_obd_list_lock(&cli->cl_loi_list_lock);
1502                 for (i = 0; i < page_count; i++)
1503                         osc_release_write_grant(cli, pga[i], 0);
1504                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1505         }
1506
1507         RETURN (rc);
1508 }
1509
1510 /*
1511  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1512  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1513  * fine for our small page arrays and doesn't require allocation.  its an
1514  * insertion sort that swaps elements that are strides apart, shrinking the
1515  * stride down until its '1' and the array is sorted.
1516  */
1517 static void sort_brw_pages(struct brw_page **array, int num)
1518 {
1519         int stride, i, j;
1520         struct brw_page *tmp;
1521
1522         if (num == 1)
1523                 return;
1524         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1525                 ;
1526
1527         do {
1528                 stride /= 3;
1529                 for (i = stride ; i < num ; i++) {
1530                         tmp = array[i];
1531                         j = i;
1532                         while (j >= stride && array[j-stride]->off > tmp->off) {
1533                                 array[j] = array[j - stride];
1534                                 j -= stride;
1535                         }
1536                         array[j] = tmp;
1537                 }
1538         } while (stride > 1);
1539 }
1540
1541 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1542 {
1543         int count = 1;
1544         int offset;
1545         int i = 0;
1546
1547         LASSERT (pages > 0);
1548         offset = pg[i]->off & (~CFS_PAGE_MASK);
1549
1550         for (;;) {
1551                 pages--;
1552                 if (pages == 0)         /* that's all */
1553                         return count;
1554
1555                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1556                         return count;   /* doesn't end on page boundary */
1557
1558                 i++;
1559                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1560                 if (offset != 0)        /* doesn't start on page boundary */
1561                         return count;
1562
1563                 count++;
1564         }
1565 }
1566
1567 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1568 {
1569         struct brw_page **ppga;
1570         int i;
1571
1572         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1573         if (ppga == NULL)
1574                 return NULL;
1575
1576         for (i = 0; i < count; i++)
1577                 ppga[i] = pga + i;
1578         return ppga;
1579 }
1580
1581 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1582 {
1583         LASSERT(ppga != NULL);
1584         OBD_FREE(ppga, sizeof(*ppga) * count);
1585 }
1586
1587 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1588                    obd_count page_count, struct brw_page *pga,
1589                    struct obd_trans_info *oti)
1590 {
1591         struct obdo *saved_oa = NULL;
1592         struct brw_page **ppga, **orig;
1593         struct obd_import *imp = class_exp2cliimp(exp);
1594         struct client_obd *cli = &imp->imp_obd->u.cli;
1595         int rc, page_count_orig;
1596         ENTRY;
1597
1598         if (cmd & OBD_BRW_CHECK) {
1599                 /* The caller just wants to know if there's a chance that this
1600                  * I/O can succeed */
1601
1602                 if (imp == NULL || imp->imp_invalid)
1603                         RETURN(-EIO);
1604                 RETURN(0);
1605         }
1606
1607         /* test_brw with a failed create can trip this, maybe others. */
1608         LASSERT(cli->cl_max_pages_per_rpc);
1609
1610         rc = 0;
1611
1612         orig = ppga = osc_build_ppga(pga, page_count);
1613         if (ppga == NULL)
1614                 RETURN(-ENOMEM);
1615         page_count_orig = page_count;
1616
1617         sort_brw_pages(ppga, page_count);
1618         while (page_count) {
1619                 obd_count pages_per_brw;
1620
1621                 if (page_count > cli->cl_max_pages_per_rpc)
1622                         pages_per_brw = cli->cl_max_pages_per_rpc;
1623                 else
1624                         pages_per_brw = page_count;
1625
1626                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1627
1628                 if (saved_oa != NULL) {
1629                         /* restore previously saved oa */
1630                         *oinfo->oi_oa = *saved_oa;
1631                 } else if (page_count > pages_per_brw) {
1632                         /* save a copy of oa (brw will clobber it) */
1633                         OBDO_ALLOC(saved_oa);
1634                         if (saved_oa == NULL)
1635                                 GOTO(out, rc = -ENOMEM);
1636                         *saved_oa = *oinfo->oi_oa;
1637                 }
1638
1639                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1640                                       pages_per_brw, ppga);
1641
1642                 if (rc != 0)
1643                         break;
1644
1645                 page_count -= pages_per_brw;
1646                 ppga += pages_per_brw;
1647         }
1648
1649 out:
1650         osc_release_ppga(orig, page_count_orig);
1651
1652         if (saved_oa != NULL)
1653                 OBDO_FREE(saved_oa);
1654
1655         RETURN(rc);
1656 }
1657
1658 static int osc_brw_async(int cmd, struct obd_export *exp,
1659                          struct obd_info *oinfo, obd_count page_count,
1660                          struct brw_page *pga, struct obd_trans_info *oti,
1661                          struct ptlrpc_request_set *set)
1662 {
1663         struct brw_page **ppga, **orig;
1664         int page_count_orig;
1665         int rc = 0;
1666         ENTRY;
1667
1668         if (cmd & OBD_BRW_CHECK) {
1669                 /* The caller just wants to know if there's a chance that this
1670                  * I/O can succeed */
1671                 struct obd_import *imp = class_exp2cliimp(exp);
1672
1673                 if (imp == NULL || imp->imp_invalid)
1674                         RETURN(-EIO);
1675                 RETURN(0);
1676         }
1677
1678         orig = ppga = osc_build_ppga(pga, page_count);
1679         if (ppga == NULL)
1680                 RETURN(-ENOMEM);
1681         page_count_orig = page_count;
1682
1683         sort_brw_pages(ppga, page_count);
1684         while (page_count) {
1685                 struct brw_page **copy;
1686                 obd_count pages_per_brw;
1687
1688                 pages_per_brw = min_t(obd_count, page_count,
1689                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1690
1691                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1692
1693                 /* use ppga only if single RPC is going to fly */
1694                 if (pages_per_brw != page_count_orig || ppga != orig) {
1695                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1696                         if (copy == NULL)
1697                                 GOTO(out, rc = -ENOMEM);
1698                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1699                 } else
1700                         copy = ppga;
1701
1702                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1703                                     pages_per_brw, copy, set);
1704
1705                 if (rc != 0) {
1706                         if (copy != ppga)
1707                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1708                         break;
1709                 }
1710
1711                 if (copy == orig) {
1712                         /* we passed it to async_internal() which is
1713                          * now responsible for releasing memory */
1714                         orig = NULL;
1715                 }
1716
1717                 page_count -= pages_per_brw;
1718                 ppga += pages_per_brw;
1719         }
1720 out:
1721         if (orig)
1722                 osc_release_ppga(orig, page_count_orig);
1723         RETURN(rc);
1724 }
1725
1726 static void osc_check_rpcs(struct client_obd *cli);
1727
1728 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1729  * the dirty accounting.  Writeback completes or truncate happens before
1730  * writing starts.  Must be called with the loi lock held. */
1731 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1732                            int sent)
1733 {
1734         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1735 }
1736
1737 /* This maintains the lists of pending pages to read/write for a given object
1738  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1739  * to quickly find objects that are ready to send an RPC. */
1740 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1741                          int cmd)
1742 {
1743         int optimal;
1744         ENTRY;
1745
1746         if (lop->lop_num_pending == 0)
1747                 RETURN(0);
1748
1749         /* if we have an invalid import we want to drain the queued pages
1750          * by forcing them through rpcs that immediately fail and complete
1751          * the pages.  recovery relies on this to empty the queued pages
1752          * before canceling the locks and evicting down the llite pages */
1753         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1754                 RETURN(1);
1755
1756         /* stream rpcs in queue order as long as as there is an urgent page
1757          * queued.  this is our cheap solution for good batching in the case
1758          * where writepage marks some random page in the middle of the file
1759          * as urgent because of, say, memory pressure */
1760         if (!list_empty(&lop->lop_urgent)) {
1761                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1762                 RETURN(1);
1763         }
1764
1765         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1766         optimal = cli->cl_max_pages_per_rpc;
1767         if (cmd & OBD_BRW_WRITE) {
1768                 /* trigger a write rpc stream as long as there are dirtiers
1769                  * waiting for space.  as they're waiting, they're not going to
1770                  * create more pages to coallesce with what's waiting.. */
1771                 if (!list_empty(&cli->cl_cache_waiters)) {
1772                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1773                         RETURN(1);
1774                 }
1775
1776                 /* +16 to avoid triggering rpcs that would want to include pages
1777                  * that are being queued but which can't be made ready until
1778                  * the queuer finishes with the page. this is a wart for
1779                  * llite::commit_write() */
1780                 optimal += 16;
1781         }
1782         if (lop->lop_num_pending >= optimal)
1783                 RETURN(1);
1784
1785         RETURN(0);
1786 }
1787
1788 static void on_list(struct list_head *item, struct list_head *list,
1789                     int should_be_on)
1790 {
1791         if (list_empty(item) && should_be_on)
1792                 list_add_tail(item, list);
1793         else if (!list_empty(item) && !should_be_on)
1794                 list_del_init(item);
1795 }
1796
1797 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1798  * can find pages to build into rpcs quickly */
1799 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1800 {
1801         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1802                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1803                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1804
1805         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1806                 loi->loi_write_lop.lop_num_pending);
1807
1808         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1809                 loi->loi_read_lop.lop_num_pending);
1810 }
1811
1812 static void lop_update_pending(struct client_obd *cli,
1813                                struct loi_oap_pages *lop, int cmd, int delta)
1814 {
1815         lop->lop_num_pending += delta;
1816         if (cmd & OBD_BRW_WRITE)
1817                 cli->cl_pending_w_pages += delta;
1818         else
1819                 cli->cl_pending_r_pages += delta;
1820 }
1821
1822 /* this is called when a sync waiter receives an interruption.  Its job is to
1823  * get the caller woken as soon as possible.  If its page hasn't been put in an
1824  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1825  * desiring interruption which will forcefully complete the rpc once the rpc
1826  * has timed out */
1827 static void osc_occ_interrupted(struct oig_callback_context *occ)
1828 {
1829         struct osc_async_page *oap;
1830         struct loi_oap_pages *lop;
1831         struct lov_oinfo *loi;
1832         ENTRY;
1833
1834         /* XXX member_of() */
1835         oap = list_entry(occ, struct osc_async_page, oap_occ);
1836
1837         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1838
1839         oap->oap_interrupted = 1;
1840
1841         /* ok, it's been put in an rpc. only one oap gets a request reference */
1842         if (oap->oap_request != NULL) {
1843                 ptlrpc_mark_interrupted(oap->oap_request);
1844                 ptlrpcd_wake(oap->oap_request);
1845                 GOTO(unlock, 0);
1846         }
1847
1848         /* we don't get interruption callbacks until osc_trigger_group_io()
1849          * has been called and put the sync oaps in the pending/urgent lists.*/
1850         if (!list_empty(&oap->oap_pending_item)) {
1851                 list_del_init(&oap->oap_pending_item);
1852                 list_del_init(&oap->oap_urgent_item);
1853
1854                 loi = oap->oap_loi;
1855                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1856                         &loi->loi_write_lop : &loi->loi_read_lop;
1857                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1858                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1859
1860                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1861                 oap->oap_oig = NULL;
1862         }
1863
1864 unlock:
1865         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1866 }
1867
1868 /* this is trying to propogate async writeback errors back up to the
1869  * application.  As an async write fails we record the error code for later if
1870  * the app does an fsync.  As long as errors persist we force future rpcs to be
1871  * sync so that the app can get a sync error and break the cycle of queueing
1872  * pages for which writeback will fail. */
1873 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1874                            int rc)
1875 {
1876         if (rc) {
1877                 if (!ar->ar_rc)
1878                         ar->ar_rc = rc;
1879
1880                 ar->ar_force_sync = 1;
1881                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1882                 return;
1883
1884         }
1885
1886         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1887                 ar->ar_force_sync = 0;
1888 }
1889
1890 static void osc_oap_to_pending(struct osc_async_page *oap)
1891 {
1892         struct loi_oap_pages *lop;
1893
1894         if (oap->oap_cmd & OBD_BRW_WRITE)
1895                 lop = &oap->oap_loi->loi_write_lop;
1896         else
1897                 lop = &oap->oap_loi->loi_read_lop;
1898
1899         if (oap->oap_async_flags & ASYNC_URGENT)
1900                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1901         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1902         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1903 }
1904
1905 /* this must be called holding the loi list lock to give coverage to exit_cache,
1906  * async_flag maintenance, and oap_request */
1907 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1908                               struct osc_async_page *oap, int sent, int rc)
1909 {
1910         __u64 xid = 0;
1911
1912         ENTRY;
1913         if (oap->oap_request != NULL) {
1914                 xid = ptlrpc_req_xid(oap->oap_request);
1915                 ptlrpc_req_finished(oap->oap_request);
1916                 oap->oap_request = NULL;
1917         }
1918
1919         oap->oap_async_flags = 0;
1920         oap->oap_interrupted = 0;
1921
1922         if (oap->oap_cmd & OBD_BRW_WRITE) {
1923                 osc_process_ar(&cli->cl_ar, xid, rc);
1924                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1925         }
1926
1927         if (rc == 0 && oa != NULL) {
1928                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1929                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1930                 if (oa->o_valid & OBD_MD_FLMTIME)
1931                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1932                 if (oa->o_valid & OBD_MD_FLATIME)
1933                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1934                 if (oa->o_valid & OBD_MD_FLCTIME)
1935                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1936         }
1937
1938         if (oap->oap_oig) {
1939                 osc_exit_cache(cli, oap, sent);
1940                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1941                 oap->oap_oig = NULL;
1942                 EXIT;
1943                 return;
1944         }
1945
1946         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1947                                                 oap->oap_cmd, oa, rc);
1948
1949         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1950          * I/O on the page could start, but OSC calls it under lock
1951          * and thus we can add oap back to pending safely */
1952         if (rc)
1953                 /* upper layer wants to leave the page on pending queue */
1954                 osc_oap_to_pending(oap);
1955         else
1956                 osc_exit_cache(cli, oap, sent);
1957         EXIT;
1958 }
1959
1960 static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
1961 {
1962         struct osc_brw_async_args *aa = data;
1963         struct osc_async_page *oap, *tmp;
1964         struct client_obd *cli;
1965         ENTRY;
1966
1967         rc = osc_brw_fini_request(request, rc);
1968         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
1969
1970         if (osc_recoverable_error(rc)) {
1971                 rc = osc_brw_redo_request(request, aa);
1972                 if (rc == 0)
1973                         RETURN(0);
1974         }
1975
1976         cli = aa->aa_cli;
1977         client_obd_list_lock(&cli->cl_loi_list_lock);
1978         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1979          * is called so we know whether to go to sync BRWs or wait for more
1980          * RPCs to complete */
1981         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
1982                 cli->cl_w_in_flight--;
1983         else
1984                 cli->cl_r_in_flight--;
1985
1986         /* the caller may re-use the oap after the completion call so
1987          * we need to clean it up a little */
1988         list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1989                 list_del_init(&oap->oap_rpc_item);
1990                 osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
1991         }
1992
1993         osc_wake_cache_waiters(cli);
1994         osc_check_rpcs(cli);
1995         client_obd_list_unlock(&cli->cl_loi_list_lock);
1996
1997         OBDO_FREE(aa->aa_oa);
1998
1999         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2000         RETURN(rc);
2001 }
2002
2003 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2004                                             struct list_head *rpc_list,
2005                                             int page_count, int cmd)
2006 {
2007         struct ptlrpc_request *req;
2008         struct brw_page **pga = NULL;
2009         struct osc_brw_async_args *aa;
2010         struct obdo *oa = NULL;
2011         struct obd_async_page_ops *ops = NULL;
2012         void *caller_data = NULL;
2013         struct osc_async_page *oap;
2014         struct ldlm_lock *lock = NULL;
2015         int i, rc;
2016
2017         ENTRY;
2018         LASSERT(!list_empty(rpc_list));
2019
2020         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2021         if (pga == NULL)
2022                 RETURN(ERR_PTR(-ENOMEM));
2023
2024         OBDO_ALLOC(oa);
2025         if (oa == NULL)
2026                 GOTO(out, req = ERR_PTR(-ENOMEM));
2027
2028         i = 0;
2029         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2030                 if (ops == NULL) {
2031                         ops = oap->oap_caller_ops;
2032                         caller_data = oap->oap_caller_data;
2033                         lock = oap->oap_ldlm_lock;
2034                 }
2035                 pga[i] = &oap->oap_brw_page;
2036                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2037                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2038                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2039                 i++;
2040         }
2041
2042         /* always get the data for the obdo for the rpc */
2043         LASSERT(ops != NULL);
2044         ops->ap_fill_obdo(caller_data, cmd, oa);
2045         if (lock) {
2046                 oa->o_handle = lock->l_remote_handle;
2047                 oa->o_valid |= OBD_MD_FLHANDLE;
2048         }
2049
2050         sort_brw_pages(pga, page_count);
2051         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2052         if (rc != 0) {
2053                 CERROR("prep_req failed: %d\n", rc);
2054                 GOTO(out, req = ERR_PTR(rc));
2055         }
2056
2057         /* Need to update the timestamps after the request is built in case
2058          * we race with setattr (locally or in queue at OST).  If OST gets
2059          * later setattr before earlier BRW (as determined by the request xid),
2060          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2061          * way to do this in a single call.  bug 10150 */
2062         ops->ap_update_obdo(caller_data, cmd, oa,
2063                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2064
2065         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2066         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2067         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2068         list_splice(rpc_list, &aa->aa_oaps);
2069         CFS_INIT_LIST_HEAD(rpc_list);
2070
2071 out:
2072         if (IS_ERR(req)) {
2073                 if (oa)
2074                         OBDO_FREE(oa);
2075                 if (pga)
2076                         OBD_FREE(pga, sizeof(*pga) * page_count);
2077         }
2078         RETURN(req);
2079 }
2080
2081 /* the loi lock is held across this function but it's allowed to release
2082  * and reacquire it during its work */
2083 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2084                             int cmd, struct loi_oap_pages *lop)
2085 {
2086         struct ptlrpc_request *req;
2087         obd_count page_count = 0;
2088         struct osc_async_page *oap = NULL, *tmp;
2089         struct osc_brw_async_args *aa;
2090         struct obd_async_page_ops *ops;
2091         CFS_LIST_HEAD(rpc_list);
2092         unsigned int ending_offset;
2093         unsigned  starting_offset = 0;
2094         int srvlock = 0;
2095         ENTRY;
2096
2097         /* first we find the pages we're allowed to work with */
2098         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2099                 ops = oap->oap_caller_ops;
2100
2101                 LASSERT(oap->oap_magic == OAP_MAGIC);
2102
2103                 if (page_count != 0 &&
2104                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2105                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2106                                " oap %p, page %p, srvlock %u\n",
2107                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2108                         break;
2109                 }
2110                 /* in llite being 'ready' equates to the page being locked
2111                  * until completion unlocks it.  commit_write submits a page
2112                  * as not ready because its unlock will happen unconditionally
2113                  * as the call returns.  if we race with commit_write giving
2114                  * us that page we dont' want to create a hole in the page
2115                  * stream, so we stop and leave the rpc to be fired by
2116                  * another dirtier or kupdated interval (the not ready page
2117                  * will still be on the dirty list).  we could call in
2118                  * at the end of ll_file_write to process the queue again. */
2119                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2120                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2121                         if (rc < 0)
2122                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2123                                                 "instead of ready\n", oap,
2124                                                 oap->oap_page, rc);
2125                         switch (rc) {
2126                         case -EAGAIN:
2127                                 /* llite is telling us that the page is still
2128                                  * in commit_write and that we should try
2129                                  * and put it in an rpc again later.  we
2130                                  * break out of the loop so we don't create
2131                                  * a hole in the sequence of pages in the rpc
2132                                  * stream.*/
2133                                 oap = NULL;
2134                                 break;
2135                         case -EINTR:
2136                                 /* the io isn't needed.. tell the checks
2137                                  * below to complete the rpc with EINTR */
2138                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2139                                 oap->oap_count = -EINTR;
2140                                 break;
2141                         case 0:
2142                                 oap->oap_async_flags |= ASYNC_READY;
2143                                 break;
2144                         default:
2145                                 LASSERTF(0, "oap %p page %p returned %d "
2146                                             "from make_ready\n", oap,
2147                                             oap->oap_page, rc);
2148                                 break;
2149                         }
2150                 }
2151                 if (oap == NULL)
2152                         break;
2153                 /*
2154                  * Page submitted for IO has to be locked. Either by
2155                  * ->ap_make_ready() or by higher layers.
2156                  *
2157                  * XXX nikita: this assertion should be adjusted when lustre
2158                  * starts using PG_writeback for pages being written out.
2159                  */
2160 #if defined(__KERNEL__) && defined(__linux__)
2161                 LASSERT(PageLocked(oap->oap_page));
2162 #endif
2163                 /* If there is a gap at the start of this page, it can't merge
2164                  * with any previous page, so we'll hand the network a
2165                  * "fragmented" page array that it can't transfer in 1 RDMA */
2166                 if (page_count != 0 && oap->oap_page_off != 0)
2167                         break;
2168
2169                 /* take the page out of our book-keeping */
2170                 list_del_init(&oap->oap_pending_item);
2171                 lop_update_pending(cli, lop, cmd, -1);
2172                 list_del_init(&oap->oap_urgent_item);
2173
2174                 if (page_count == 0)
2175                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2176                                           (PTLRPC_MAX_BRW_SIZE - 1);
2177
2178                 /* ask the caller for the size of the io as the rpc leaves. */
2179                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2180                         oap->oap_count =
2181                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2182                 if (oap->oap_count <= 0) {
2183                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2184                                oap->oap_count);
2185                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2186                         continue;
2187                 }
2188
2189                 /* now put the page back in our accounting */
2190                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2191                 if (page_count == 0)
2192                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2193                 if (++page_count >= cli->cl_max_pages_per_rpc)
2194                         break;
2195
2196                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2197                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2198                  * have the same alignment as the initial writes that allocated
2199                  * extents on the server. */
2200                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2201                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2202                 if (ending_offset == 0)
2203                         break;
2204
2205                 /* If there is a gap at the end of this page, it can't merge
2206                  * with any subsequent pages, so we'll hand the network a
2207                  * "fragmented" page array that it can't transfer in 1 RDMA */
2208                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2209                         break;
2210         }
2211
2212         osc_wake_cache_waiters(cli);
2213
2214         if (page_count == 0)
2215                 RETURN(0);
2216
2217         loi_list_maint(cli, loi);
2218
2219         client_obd_list_unlock(&cli->cl_loi_list_lock);
2220
2221         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2222         if (IS_ERR(req)) {
2223                 /* this should happen rarely and is pretty bad, it makes the
2224                  * pending list not follow the dirty order */
2225                 client_obd_list_lock(&cli->cl_loi_list_lock);
2226                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2227                         list_del_init(&oap->oap_rpc_item);
2228
2229                         /* queued sync pages can be torn down while the pages
2230                          * were between the pending list and the rpc */
2231                         if (oap->oap_interrupted) {
2232                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2233                                 osc_ap_completion(cli, NULL, oap, 0,
2234                                                   oap->oap_count);
2235                                 continue;
2236                         }
2237                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2238                 }
2239                 loi_list_maint(cli, loi);
2240                 RETURN(PTR_ERR(req));
2241         }
2242
2243         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2244         if (cmd == OBD_BRW_READ) {
2245                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2246                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2247                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2248                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2249                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2250         } else {
2251                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2252                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2253                                  cli->cl_w_in_flight);
2254                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2255                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2256                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2257         }
2258
2259         client_obd_list_lock(&cli->cl_loi_list_lock);
2260
2261         if (cmd == OBD_BRW_READ)
2262                 cli->cl_r_in_flight++;
2263         else
2264                 cli->cl_w_in_flight++;
2265
2266         /* queued sync pages can be torn down while the pages
2267          * were between the pending list and the rpc */
2268         tmp = NULL;
2269         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2270                 /* only one oap gets a request reference */
2271                 if (tmp == NULL)
2272                         tmp = oap;
2273                 if (oap->oap_interrupted && !req->rq_intr) {
2274                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2275                                oap, req);
2276                         ptlrpc_mark_interrupted(req);
2277                 }
2278         }
2279         if (tmp != NULL)
2280                 tmp->oap_request = ptlrpc_request_addref(req);
2281
2282         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2283                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2284
2285         req->rq_interpret_reply = brw_interpret_oap;
2286         ptlrpcd_add_req(req);
2287         RETURN(1);
2288 }
2289
2290 #define LOI_DEBUG(LOI, STR, args...)                                     \
2291         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2292                !list_empty(&(LOI)->loi_cli_item),                        \
2293                (LOI)->loi_write_lop.lop_num_pending,                     \
2294                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2295                (LOI)->loi_read_lop.lop_num_pending,                      \
2296                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2297                args)                                                     \
2298
2299 /* This is called by osc_check_rpcs() to find which objects have pages that
2300  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2301 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2302 {
2303         ENTRY;
2304         /* first return all objects which we already know to have
2305          * pages ready to be stuffed into rpcs */
2306         if (!list_empty(&cli->cl_loi_ready_list))
2307                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2308                                   struct lov_oinfo, loi_cli_item));
2309
2310         /* then if we have cache waiters, return all objects with queued
2311          * writes.  This is especially important when many small files
2312          * have filled up the cache and not been fired into rpcs because
2313          * they don't pass the nr_pending/object threshhold */
2314         if (!list_empty(&cli->cl_cache_waiters) &&
2315             !list_empty(&cli->cl_loi_write_list))
2316                 RETURN(list_entry(cli->cl_loi_write_list.next,
2317                                   struct lov_oinfo, loi_write_item));
2318
2319         /* then return all queued objects when we have an invalid import
2320          * so that they get flushed */
2321         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2322                 if (!list_empty(&cli->cl_loi_write_list))
2323                         RETURN(list_entry(cli->cl_loi_write_list.next,
2324                                           struct lov_oinfo, loi_write_item));
2325                 if (!list_empty(&cli->cl_loi_read_list))
2326                         RETURN(list_entry(cli->cl_loi_read_list.next,
2327                                           struct lov_oinfo, loi_read_item));
2328         }
2329         RETURN(NULL);
2330 }
2331
2332 /* called with the loi list lock held */
2333 static void osc_check_rpcs(struct client_obd *cli)
2334 {
2335         struct lov_oinfo *loi;
2336         int rc = 0, race_counter = 0;
2337         ENTRY;
2338
2339         while ((loi = osc_next_loi(cli)) != NULL) {
2340                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2341
2342                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2343                         break;
2344
2345                 /* attempt some read/write balancing by alternating between
2346                  * reads and writes in an object.  The makes_rpc checks here
2347                  * would be redundant if we were getting read/write work items
2348                  * instead of objects.  we don't want send_oap_rpc to drain a
2349                  * partial read pending queue when we're given this object to
2350                  * do io on writes while there are cache waiters */
2351                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2352                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2353                                               &loi->loi_write_lop);
2354                         if (rc < 0)
2355                                 break;
2356                         if (rc > 0)
2357                                 race_counter = 0;
2358                         else
2359                                 race_counter++;
2360                 }
2361                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2362                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2363                                               &loi->loi_read_lop);
2364                         if (rc < 0)
2365                                 break;
2366                         if (rc > 0)
2367                                 race_counter = 0;
2368                         else
2369                                 race_counter++;
2370                 }
2371
2372                 /* attempt some inter-object balancing by issueing rpcs
2373                  * for each object in turn */
2374                 if (!list_empty(&loi->loi_cli_item))
2375                         list_del_init(&loi->loi_cli_item);
2376                 if (!list_empty(&loi->loi_write_item))
2377                         list_del_init(&loi->loi_write_item);
2378                 if (!list_empty(&loi->loi_read_item))
2379                         list_del_init(&loi->loi_read_item);
2380
2381                 loi_list_maint(cli, loi);
2382
2383                 /* send_oap_rpc fails with 0 when make_ready tells it to
2384                  * back off.  llite's make_ready does this when it tries
2385                  * to lock a page queued for write that is already locked.
2386                  * we want to try sending rpcs from many objects, but we
2387                  * don't want to spin failing with 0.  */
2388                 if (race_counter == 10)
2389                         break;
2390         }
2391         EXIT;
2392 }
2393
2394 /* we're trying to queue a page in the osc so we're subject to the
2395  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2396  * If the osc's queued pages are already at that limit, then we want to sleep
2397  * until there is space in the osc's queue for us.  We also may be waiting for
2398  * write credits from the OST if there are RPCs in flight that may return some
2399  * before we fall back to sync writes.
2400  *
2401  * We need this know our allocation was granted in the presence of signals */
2402 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2403 {
2404         int rc;
2405         ENTRY;
2406         client_obd_list_lock(&cli->cl_loi_list_lock);
2407         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2408         client_obd_list_unlock(&cli->cl_loi_list_lock);
2409         RETURN(rc);
2410 };
2411
2412 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2413  * grant or cache space. */
2414 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2415                            struct osc_async_page *oap)
2416 {
2417         struct osc_cache_waiter ocw;
2418         struct l_wait_info lwi = { 0 };
2419         ENTRY;
2420
2421         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2422                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2423                cli->cl_dirty_max, obd_max_dirty_pages,
2424                cli->cl_lost_grant, cli->cl_avail_grant);
2425
2426         /* force the caller to try sync io.  this can jump the list
2427          * of queued writes and create a discontiguous rpc stream */
2428         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2429             loi->loi_ar.ar_force_sync)
2430                 RETURN(-EDQUOT);
2431
2432         /* Hopefully normal case - cache space and write credits available */
2433         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2434             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2435             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2436                 /* account for ourselves */
2437                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2438                 RETURN(0);
2439         }
2440
2441         /* Make sure that there are write rpcs in flight to wait for.  This
2442          * is a little silly as this object may not have any pending but
2443          * other objects sure might. */
2444         if (cli->cl_w_in_flight) {
2445                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2446                 cfs_waitq_init(&ocw.ocw_waitq);
2447                 ocw.ocw_oap = oap;
2448                 ocw.ocw_rc = 0;
2449
2450                 loi_list_maint(cli, loi);
2451                 osc_check_rpcs(cli);
2452                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2453
2454                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2455                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2456
2457                 client_obd_list_lock(&cli->cl_loi_list_lock);
2458                 if (!list_empty(&ocw.ocw_entry)) {
2459                         list_del(&ocw.ocw_entry);
2460                         RETURN(-EINTR);
2461                 }
2462                 RETURN(ocw.ocw_rc);
2463         }
2464
2465         RETURN(-EDQUOT);
2466 }
2467
2468 static int osc_reget_short_lock(struct obd_export *exp,
2469                                 struct lov_stripe_md *lsm,
2470                                 void **res, int rw,
2471                                 loff_t start, loff_t end,
2472                                 void **cookie)
2473 {
2474         struct osc_async_page *oap = *res;
2475         int rc;
2476
2477         ENTRY;
2478
2479         spin_lock(&oap->oap_lock);
2480         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2481                                   start, end, cookie);
2482         spin_unlock(&oap->oap_lock);
2483
2484         RETURN(rc);
2485 }
2486
2487 static int osc_release_short_lock(struct obd_export *exp,
2488                                   struct lov_stripe_md *lsm, loff_t end,
2489                                   void *cookie, int rw)
2490 {
2491         ENTRY;
2492         ldlm_lock_fast_release(cookie, rw);
2493         /* no error could have happened at this layer */
2494         RETURN(0);
2495 }
2496
2497 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2498                         struct lov_oinfo *loi, cfs_page_t *page,
2499                         obd_off offset, struct obd_async_page_ops *ops,
2500                         void *data, void **res, int nocache,
2501                         struct lustre_handle *lockh)
2502 {
2503         struct osc_async_page *oap;
2504         struct ldlm_res_id oid = {{0}};
2505         int rc = 0;
2506
2507         ENTRY;
2508
2509         if (!page)
2510                 return size_round(sizeof(*oap));
2511
2512         oap = *res;
2513         oap->oap_magic = OAP_MAGIC;
2514         oap->oap_cli = &exp->exp_obd->u.cli;
2515         oap->oap_loi = loi;
2516
2517         oap->oap_caller_ops = ops;
2518         oap->oap_caller_data = data;
2519
2520         oap->oap_page = page;
2521         oap->oap_obj_off = offset;
2522
2523         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2524         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2525         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2526         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2527
2528         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2529
2530         spin_lock_init(&oap->oap_lock);
2531
2532         /* If the page was marked as notcacheable - don't add to any locks */ 
2533         if (!nocache) {
2534                 oid.name[0] = loi->loi_id;
2535                 /* This is the only place where we can call cache_add_extent
2536                    without oap_lock, because this page is locked now, and
2537                    the lock we are adding it to is referenced, so cannot lose
2538                    any pages either. */
2539                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2540                 if (rc)
2541                         RETURN(rc);
2542         }
2543
2544         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2545         RETURN(0);
2546 }
2547
2548 struct osc_async_page *oap_from_cookie(void *cookie)
2549 {
2550         struct osc_async_page *oap = cookie;
2551         if (oap->oap_magic != OAP_MAGIC)
2552                 return ERR_PTR(-EINVAL);
2553         return oap;
2554 };
2555
2556 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2557                               struct lov_oinfo *loi, void *cookie,
2558                               int cmd, obd_off off, int count,
2559                               obd_flag brw_flags, enum async_flags async_flags)
2560 {
2561         struct client_obd *cli = &exp->exp_obd->u.cli;
2562         struct osc_async_page *oap;
2563         int rc = 0;
2564         ENTRY;
2565
2566         oap = oap_from_cookie(cookie);
2567         if (IS_ERR(oap))
2568                 RETURN(PTR_ERR(oap));
2569
2570         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2571                 RETURN(-EIO);
2572
2573         if (!list_empty(&oap->oap_pending_item) ||
2574             !list_empty(&oap->oap_urgent_item) ||
2575             !list_empty(&oap->oap_rpc_item))
2576                 RETURN(-EBUSY);
2577
2578         /* check if the file's owner/group is over quota */
2579 #ifdef HAVE_QUOTA_SUPPORT
2580         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2581                 struct obd_async_page_ops *ops;
2582                 struct obdo *oa;
2583
2584                 OBDO_ALLOC(oa);
2585                 if (oa == NULL)
2586                         RETURN(-ENOMEM);
2587
2588                 ops = oap->oap_caller_ops;
2589                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2590                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2591                     NO_QUOTA)
2592                         rc = -EDQUOT;
2593
2594                 OBDO_FREE(oa);
2595                 if (rc)
2596                         RETURN(rc);
2597         }
2598 #endif
2599
2600         if (loi == NULL)
2601                 loi = lsm->lsm_oinfo[0];
2602
2603         client_obd_list_lock(&cli->cl_loi_list_lock);
2604
2605         oap->oap_cmd = cmd;
2606         oap->oap_page_off = off;
2607         oap->oap_count = count;
2608         oap->oap_brw_flags = brw_flags;
2609         oap->oap_async_flags = async_flags;
2610
2611         if (cmd & OBD_BRW_WRITE) {
2612                 rc = osc_enter_cache(cli, loi, oap);
2613                 if (rc) {
2614                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2615                         RETURN(rc);
2616                 }
2617         }
2618
2619         osc_oap_to_pending(oap);
2620         loi_list_maint(cli, loi);
2621
2622         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2623                   cmd);
2624
2625         osc_check_rpcs(cli);
2626         client_obd_list_unlock(&cli->cl_loi_list_lock);
2627
2628         RETURN(0);
2629 }
2630
2631 /* aka (~was & now & flag), but this is more clear :) */
2632 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2633
2634 static int osc_set_async_flags(struct obd_export *exp,
2635                                struct lov_stripe_md *lsm,
2636                                struct lov_oinfo *loi, void *cookie,
2637                                obd_flag async_flags)
2638 {
2639         struct client_obd *cli = &exp->exp_obd->u.cli;
2640         struct loi_oap_pages *lop;
2641         struct osc_async_page *oap;
2642         int rc = 0;
2643         ENTRY;
2644
2645         oap = oap_from_cookie(cookie);
2646         if (IS_ERR(oap))
2647                 RETURN(PTR_ERR(oap));
2648
2649         /*
2650          * bug 7311: OST-side locking is only supported for liblustre for now
2651          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2652          * implementation has to handle case where OST-locked page was picked
2653          * up by, e.g., ->writepage().
2654          */
2655         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2656         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2657                                      * tread here. */
2658
2659         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2660                 RETURN(-EIO);
2661
2662         if (loi == NULL)
2663                 loi = lsm->lsm_oinfo[0];
2664
2665         if (oap->oap_cmd & OBD_BRW_WRITE) {
2666                 lop = &loi->loi_write_lop;
2667         } else {
2668                 lop = &loi->loi_read_lop;
2669         }
2670
2671         client_obd_list_lock(&cli->cl_loi_list_lock);
2672
2673         if (list_empty(&oap->oap_pending_item))
2674                 GOTO(out, rc = -EINVAL);
2675
2676         if ((oap->oap_async_flags & async_flags) == async_flags)
2677                 GOTO(out, rc = 0);
2678
2679         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2680                 oap->oap_async_flags |= ASYNC_READY;
2681
2682         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2683                 if (list_empty(&oap->oap_rpc_item)) {
2684                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2685                         loi_list_maint(cli, loi);
2686                 }
2687         }
2688
2689         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2690                         oap->oap_async_flags);
2691 out:
2692         osc_check_rpcs(cli);
2693         client_obd_list_unlock(&cli->cl_loi_list_lock);
2694         RETURN(rc);
2695 }
2696
2697 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2698                              struct lov_oinfo *loi,
2699                              struct obd_io_group *oig, void *cookie,
2700                              int cmd, obd_off off, int count,
2701                              obd_flag brw_flags,
2702                              obd_flag async_flags)
2703 {
2704         struct client_obd *cli = &exp->exp_obd->u.cli;
2705         struct osc_async_page *oap;
2706         struct loi_oap_pages *lop;
2707         int rc = 0;
2708         ENTRY;
2709
2710         oap = oap_from_cookie(cookie);
2711         if (IS_ERR(oap))
2712                 RETURN(PTR_ERR(oap));
2713
2714         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2715                 RETURN(-EIO);
2716
2717         if (!list_empty(&oap->oap_pending_item) ||
2718             !list_empty(&oap->oap_urgent_item) ||
2719             !list_empty(&oap->oap_rpc_item))
2720                 RETURN(-EBUSY);
2721
2722         if (loi == NULL)
2723                 loi = lsm->lsm_oinfo[0];
2724
2725         client_obd_list_lock(&cli->cl_loi_list_lock);
2726
2727         oap->oap_cmd = cmd;
2728         oap->oap_page_off = off;
2729         oap->oap_count = count;
2730         oap->oap_brw_flags = brw_flags;
2731         oap->oap_async_flags = async_flags;
2732
2733         if (cmd & OBD_BRW_WRITE)
2734                 lop = &loi->loi_write_lop;
2735         else
2736                 lop = &loi->loi_read_lop;
2737
2738         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2739         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2740                 oap->oap_oig = oig;
2741                 rc = oig_add_one(oig, &oap->oap_occ);
2742         }
2743
2744         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2745                   oap, oap->oap_page, rc);
2746
2747         client_obd_list_unlock(&cli->cl_loi_list_lock);
2748
2749         RETURN(rc);
2750 }
2751
2752 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2753                                  struct loi_oap_pages *lop, int cmd)
2754 {
2755         struct list_head *pos, *tmp;
2756         struct osc_async_page *oap;
2757
2758         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2759                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2760                 list_del(&oap->oap_pending_item);
2761                 osc_oap_to_pending(oap);
2762         }
2763         loi_list_maint(cli, loi);
2764 }
2765
2766 static int osc_trigger_group_io(struct obd_export *exp,
2767                                 struct lov_stripe_md *lsm,
2768                                 struct lov_oinfo *loi,
2769                                 struct obd_io_group *oig)
2770 {
2771         struct client_obd *cli = &exp->exp_obd->u.cli;
2772         ENTRY;
2773
2774         if (loi == NULL)
2775                 loi = lsm->lsm_oinfo[0];
2776
2777         client_obd_list_lock(&cli->cl_loi_list_lock);
2778
2779         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2780         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2781
2782         osc_check_rpcs(cli);
2783         client_obd_list_unlock(&cli->cl_loi_list_lock);
2784
2785         RETURN(0);
2786 }
2787
2788 static int osc_teardown_async_page(struct obd_export *exp,
2789                                    struct lov_stripe_md *lsm,
2790                                    struct lov_oinfo *loi, void *cookie)
2791 {
2792         struct client_obd *cli = &exp->exp_obd->u.cli;
2793         struct loi_oap_pages *lop;
2794         struct osc_async_page *oap;
2795         int rc = 0;
2796         ENTRY;
2797
2798         oap = oap_from_cookie(cookie);
2799         if (IS_ERR(oap))
2800                 RETURN(PTR_ERR(oap));
2801
2802         if (loi == NULL)
2803                 loi = lsm->lsm_oinfo[0];
2804
2805         if (oap->oap_cmd & OBD_BRW_WRITE) {
2806                 lop = &loi->loi_write_lop;
2807         } else {
2808                 lop = &loi->loi_read_lop;
2809         }
2810
2811         client_obd_list_lock(&cli->cl_loi_list_lock);
2812
2813         if (!list_empty(&oap->oap_rpc_item))
2814                 GOTO(out, rc = -EBUSY);
2815
2816         osc_exit_cache(cli, oap, 0);
2817         osc_wake_cache_waiters(cli);
2818
2819         if (!list_empty(&oap->oap_urgent_item)) {
2820                 list_del_init(&oap->oap_urgent_item);
2821                 oap->oap_async_flags &= ~ASYNC_URGENT;
2822         }
2823         if (!list_empty(&oap->oap_pending_item)) {
2824                 list_del_init(&oap->oap_pending_item);
2825                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2826         }
2827         loi_list_maint(cli, loi);
2828         cache_remove_extent(cli->cl_cache, oap);
2829
2830         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2831 out:
2832         client_obd_list_unlock(&cli->cl_loi_list_lock);
2833         RETURN(rc);
2834 }
2835
2836 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2837                            struct ldlm_lock_desc *new, void *data,
2838                            int flag)
2839 {
2840         struct lustre_handle lockh = { 0 };
2841         int rc;
2842         ENTRY;  
2843                 
2844         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2845                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2846                 LBUG(); 
2847         }       
2848
2849         switch (flag) {
2850         case LDLM_CB_BLOCKING:
2851                 ldlm_lock2handle(lock, &lockh);
2852                 rc = ldlm_cli_cancel(&lockh);
2853                 if (rc != ELDLM_OK)
2854                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2855                 break;
2856         case LDLM_CB_CANCELING: {
2857
2858                 ldlm_lock2handle(lock, &lockh);
2859                 /* This lock wasn't granted, don't try to do anything */
2860                 if (lock->l_req_mode != lock->l_granted_mode)
2861                         RETURN(0);
2862
2863                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2864                                   &lockh);
2865
2866                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2867                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2868                                                           lock, new, data,flag);
2869                 break;
2870         }
2871         default:
2872                 LBUG();
2873         }
2874
2875         RETURN(0);
2876 }
2877 EXPORT_SYMBOL(osc_extent_blocking_cb);
2878
2879 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2880                                     int flags)
2881 {
2882         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2883
2884         if (lock == NULL) {
2885                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2886                 return;
2887         }
2888         lock_res_and_lock(lock);
2889 #if defined (__KERNEL__) && defined (__linux__)
2890         /* Liang XXX: Darwin and Winnt checking should be added */
2891         if (lock->l_ast_data && lock->l_ast_data != data) {
2892                 struct inode *new_inode = data;
2893                 struct inode *old_inode = lock->l_ast_data;
2894                 if (!(old_inode->i_state & I_FREEING))
2895                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2896                 LASSERTF(old_inode->i_state & I_FREEING,
2897                          "Found existing inode %p/%lu/%u state %lu in lock: "
2898                          "setting data to %p/%lu/%u\n", old_inode,
2899                          old_inode->i_ino, old_inode->i_generation,
2900                          old_inode->i_state,
2901                          new_inode, new_inode->i_ino, new_inode->i_generation);
2902         }
2903 #endif
2904         lock->l_ast_data = data;
2905         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2906         unlock_res_and_lock(lock);
2907         LDLM_LOCK_PUT(lock);
2908 }
2909
2910 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2911                              ldlm_iterator_t replace, void *data)
2912 {
2913         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
2914         struct obd_device *obd = class_exp2obd(exp);
2915
2916         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2917         return 0;
2918 }
2919
2920 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
2921                             struct obd_info *oinfo, int intent, int rc)
2922 {
2923         ENTRY;
2924
2925         if (intent) {
2926                 /* The request was created before ldlm_cli_enqueue call. */
2927                 if (rc == ELDLM_LOCK_ABORTED) {
2928                         struct ldlm_reply *rep;
2929
2930                         /* swabbed by ldlm_cli_enqueue() */
2931                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
2932                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
2933                                              sizeof(*rep));
2934                         LASSERT(rep != NULL);
2935                         if (rep->lock_policy_res1)
2936                                 rc = rep->lock_policy_res1;
2937                 }
2938         }
2939
2940         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2941                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2942                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
2943                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
2944                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
2945         }
2946
2947         if (!rc)
2948                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
2949
2950         /* Call the update callback. */
2951         rc = oinfo->oi_cb_up(oinfo, rc);
2952         RETURN(rc);
2953 }
2954
2955 static int osc_enqueue_interpret(struct ptlrpc_request *req,
2956                                  struct osc_enqueue_args *aa, int rc)
2957 {
2958         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
2959         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
2960         struct ldlm_lock *lock;
2961
2962         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2963          * be valid. */
2964         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
2965
2966         /* Complete obtaining the lock procedure. */
2967         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2968                                    aa->oa_ei->ei_mode,
2969                                    &aa->oa_oi->oi_flags,
2970                                    &lsm->lsm_oinfo[0]->loi_lvb,
2971                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
2972                                    lustre_swab_ost_lvb,
2973                                    aa->oa_oi->oi_lockh, rc);
2974
2975         /* Complete osc stuff. */
2976         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
2977
2978         /* Release the lock for async request. */
2979         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
2980                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
2981
2982         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2983                  aa->oa_oi->oi_lockh, req, aa);
2984         LDLM_LOCK_PUT(lock);
2985         return rc;
2986 }
2987
2988 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2989  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2990  * other synchronous requests, however keeping some locks and trying to obtain
2991  * others may take a considerable amount of time in a case of ost failure; and
2992  * when other sync requests do not get released lock from a client, the client
2993  * is excluded from the cluster -- such scenarious make the life difficult, so
2994  * release locks just after they are obtained. */
2995 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2996                        struct ldlm_enqueue_info *einfo,
2997                        struct ptlrpc_request_set *rqset)
2998 {
2999         struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
3000         struct obd_device *obd = exp->exp_obd;
3001         struct ldlm_reply *rep;
3002         struct ptlrpc_request *req = NULL;
3003         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3004         ldlm_mode_t mode;
3005         int rc;
3006         ENTRY;
3007
3008         /* Filesystem lock extents are extended to page boundaries so that
3009          * dealing with the page cache is a little smoother.  */
3010         oinfo->oi_policy.l_extent.start -=
3011                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3012         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3013
3014         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3015                 goto no_match;
3016
3017         /* Next, search for already existing extent locks that will cover us */
3018         /* If we're trying to read, we also search for an existing PW lock.  The
3019          * VFS and page cache already protect us locally, so lots of readers/
3020          * writers can share a single PW lock.
3021          *
3022          * There are problems with conversion deadlocks, so instead of
3023          * converting a read lock to a write lock, we'll just enqueue a new
3024          * one.
3025          *
3026          * At some point we should cancel the read lock instead of making them
3027          * send us a blocking callback, but there are problems with canceling
3028          * locks out from other users right now, too. */
3029         mode = einfo->ei_mode;
3030         if (einfo->ei_mode == LCK_PR)
3031                 mode |= LCK_PW;
3032         mode = ldlm_lock_match(obd->obd_namespace,
3033                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3034                                einfo->ei_type, &oinfo->oi_policy, mode,
3035                                oinfo->oi_lockh);
3036         if (mode) {
3037                 /* addref the lock only if not async requests and PW lock is
3038                  * matched whereas we asked for PR. */
3039                 if (!rqset && einfo->ei_mode != mode)
3040                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3041                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3042                                         oinfo->oi_flags);
3043                 if (intent) {
3044                         /* I would like to be able to ASSERT here that rss <=
3045                          * kms, but I can't, for reasons which are explained in
3046                          * lov_enqueue() */
3047                 }
3048
3049                 /* We already have a lock, and it's referenced */
3050                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3051
3052                 /* For async requests, decref the lock. */
3053                 if (einfo->ei_mode != mode)
3054                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3055                 else if (rqset)
3056                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3057
3058                 RETURN(ELDLM_OK);
3059         }
3060
3061  no_match:
3062         if (intent) {
3063                 int size[3] = {
3064                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3065                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3066                         [DLM_LOCKREQ_OFF + 1] = 0 };
3067
3068                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3069                 if (req == NULL)
3070                         RETURN(-ENOMEM);
3071
3072                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3073                 size[DLM_REPLY_REC_OFF] = 
3074                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3075                 ptlrpc_req_set_repsize(req, 3, size);
3076         }
3077
3078         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3079         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3080
3081         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3082                               &oinfo->oi_policy, &oinfo->oi_flags,
3083                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3084                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3085                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3086                               rqset ? 1 : 0);
3087         if (rqset) {
3088                 if (!rc) {
3089                         struct osc_enqueue_args *aa;
3090                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3091                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3092                         aa->oa_oi = oinfo;
3093                         aa->oa_ei = einfo;
3094                         aa->oa_exp = exp;
3095
3096                         req->rq_interpret_reply = osc_enqueue_interpret;
3097                         ptlrpc_set_add_req(rqset, req);
3098                 } else if (intent) {
3099                         ptlrpc_req_finished(req);
3100                 }
3101                 RETURN(rc);
3102         }
3103
3104         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3105         if (intent)
3106                 ptlrpc_req_finished(req);
3107
3108         RETURN(rc);
3109 }
3110
3111 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3112                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3113                      int *flags, void *data, struct lustre_handle *lockh)
3114 {
3115         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3116         struct obd_device *obd = exp->exp_obd;
3117         int lflags = *flags;
3118         ldlm_mode_t rc;
3119         ENTRY;
3120
3121         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3122
3123         /* Filesystem lock extents are extended to page boundaries so that
3124          * dealing with the page cache is a little smoother */
3125         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3126         policy->l_extent.end |= ~CFS_PAGE_MASK;
3127
3128         /* Next, search for already existing extent locks that will cover us */
3129         /* If we're trying to read, we also search for an existing PW lock.  The
3130          * VFS and page cache already protect us locally, so lots of readers/
3131          * writers can share a single PW lock. */
3132         rc = mode;
3133         if (mode == LCK_PR)
3134                 rc |= LCK_PW;
3135         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3136                              &res_id, type, policy, rc, lockh);
3137         if (rc) {
3138                 osc_set_data_with_check(lockh, data, lflags);
3139                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3140                         ldlm_lock_addref(lockh, LCK_PR);
3141                         ldlm_lock_decref(lockh, LCK_PW);
3142                 }
3143                 RETURN(rc);
3144         }
3145
3146         RETURN(rc);
3147 }
3148
3149 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3150                       __u32 mode, struct lustre_handle *lockh)
3151 {
3152         ENTRY;
3153
3154         if (unlikely(mode == LCK_GROUP))
3155                 ldlm_lock_decref_and_cancel(lockh, mode);
3156         else
3157                 ldlm_lock_decref(lockh, mode);
3158
3159         RETURN(0);
3160 }
3161
3162 static int osc_cancel_unused(struct obd_export *exp,
3163                              struct lov_stripe_md *lsm, int flags, void *opaque)
3164 {
3165         struct obd_device *obd = class_exp2obd(exp);
3166         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3167
3168         return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
3169                                       opaque);
3170 }
3171
3172 static int osc_join_lru(struct obd_export *exp,
3173                         struct lov_stripe_md *lsm, int join)
3174 {
3175         struct obd_device *obd = class_exp2obd(exp);
3176         struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
3177
3178         return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
3179 }
3180
3181 static int osc_statfs_interpret(struct ptlrpc_request *req,
3182                                 struct osc_async_args *aa, int rc)
3183 {
3184         struct obd_statfs *msfs;
3185         ENTRY;
3186
3187         if (rc != 0)
3188                 GOTO(out, rc);
3189
3190         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3191                                   lustre_swab_obd_statfs);
3192         if (msfs == NULL) {
3193                 CERROR("Can't unpack obd_statfs\n");
3194                 GOTO(out, rc = -EPROTO);
3195         }
3196
3197         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3198 out:
3199         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3200         RETURN(rc);
3201 }
3202
3203 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3204                             __u64 max_age, struct ptlrpc_request_set *rqset)
3205 {
3206         struct ptlrpc_request *req;
3207         struct osc_async_args *aa;
3208         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3209         ENTRY;
3210
3211         /* We could possibly pass max_age in the request (as an absolute
3212          * timestamp or a "seconds.usec ago") so the target can avoid doing
3213          * extra calls into the filesystem if that isn't necessary (e.g.
3214          * during mount that would help a bit).  Having relative timestamps
3215          * is not so great if request processing is slow, while absolute
3216          * timestamps are not ideal because they need time synchronization. */
3217         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3218                               OST_STATFS, 1, NULL, NULL);
3219         if (!req)
3220                 RETURN(-ENOMEM);
3221
3222         ptlrpc_req_set_repsize(req, 2, size);
3223         req->rq_request_portal = OST_CREATE_PORTAL;
3224         ptlrpc_at_set_req_timeout(req);
3225         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3226                 /* procfs requests not want stat in wait for avoid deadlock */
3227                 req->rq_no_resend = 1;
3228                 req->rq_no_delay = 1;
3229         }
3230
3231         req->rq_interpret_reply = osc_statfs_interpret;
3232         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3233         aa = (struct osc_async_args *)&req->rq_async_args;
3234         aa->aa_oi = oinfo;
3235
3236         ptlrpc_set_add_req(rqset, req);
3237         RETURN(0);
3238 }
3239
3240 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3241                       __u64 max_age, __u32 flags)
3242 {
3243         struct obd_statfs *msfs;
3244         struct ptlrpc_request *req;
3245         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3246         ENTRY;
3247
3248         /* We could possibly pass max_age in the request (as an absolute
3249          * timestamp or a "seconds.usec ago") so the target can avoid doing
3250          * extra calls into the filesystem if that isn't necessary (e.g.
3251          * during mount that would help a bit).  Having relative timestamps
3252          * is not so great if request processing is slow, while absolute
3253          * timestamps are not ideal because they need time synchronization. */
3254         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3255                               OST_STATFS, 1, NULL, NULL);
3256         if (!req)
3257                 RETURN(-ENOMEM);
3258
3259         ptlrpc_req_set_repsize(req, 2, size);
3260         req->rq_request_portal = OST_CREATE_PORTAL;
3261         ptlrpc_at_set_req_timeout(req);
3262
3263         if (flags & OBD_STATFS_NODELAY) {
3264                 /* procfs requests not want stat in wait for avoid deadlock */
3265                 req->rq_no_resend = 1;
3266                 req->rq_no_delay = 1;
3267         }
3268
3269         rc = ptlrpc_queue_wait(req);
3270         if (rc)
3271                 GOTO(out, rc);
3272
3273         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3274                                   lustre_swab_obd_statfs);
3275         if (msfs == NULL) {
3276                 CERROR("Can't unpack obd_statfs\n");
3277                 GOTO(out, rc = -EPROTO);
3278         }
3279
3280         memcpy(osfs, msfs, sizeof(*osfs));
3281
3282         EXIT;
3283  out:
3284         ptlrpc_req_finished(req);
3285         return rc;
3286 }
3287
3288 /* Retrieve object striping information.
3289  *
3290  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3291  * the maximum number of OST indices which will fit in the user buffer.
3292  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3293  */
3294 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3295 {
3296         struct lov_user_md lum, *lumk;
3297         int rc = 0, lum_size;
3298         ENTRY;
3299
3300         if (!lsm)
3301                 RETURN(-ENODATA);
3302
3303         if (copy_from_user(&lum, lump, sizeof(lum)))
3304                 RETURN(-EFAULT);
3305
3306         if (lum.lmm_magic != LOV_USER_MAGIC)
3307                 RETURN(-EINVAL);
3308
3309         if (lum.lmm_stripe_count > 0) {
3310                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3311                 OBD_ALLOC(lumk, lum_size);
3312                 if (!lumk)
3313                         RETURN(-ENOMEM);
3314
3315                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3316         } else {
3317                 lum_size = sizeof(lum);
3318                 lumk = &lum;
3319         }
3320
3321         lumk->lmm_object_id = lsm->lsm_object_id;
3322         lumk->lmm_stripe_count = 1;
3323
3324         if (copy_to_user(lump, lumk, lum_size))
3325                 rc = -EFAULT;
3326
3327         if (lumk != &lum)
3328                 OBD_FREE(lumk, lum_size);
3329
3330         RETURN(rc);
3331 }
3332
3333
3334 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3335                          void *karg, void *uarg)
3336 {
3337         struct obd_device *obd = exp->exp_obd;
3338         struct obd_ioctl_data *data = karg;
3339         int err = 0;
3340         ENTRY;
3341
3342 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3343         MOD_INC_USE_COUNT;
3344 #else
3345         if (!try_module_get(THIS_MODULE)) {
3346                 CERROR("Can't get module. Is it alive?");
3347                 return -EINVAL;
3348         }
3349 #endif
3350         switch (cmd) {
3351         case OBD_IOC_LOV_GET_CONFIG: {
3352                 char *buf;
3353                 struct lov_desc *desc;
3354                 struct obd_uuid uuid;
3355
3356                 buf = NULL;
3357                 len = 0;
3358                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3359                         GOTO(out, err = -EINVAL);
3360
3361                 data = (struct obd_ioctl_data *)buf;
3362
3363                 if (sizeof(*desc) > data->ioc_inllen1) {
3364                         obd_ioctl_freedata(buf, len);
3365                         GOTO(out, err = -EINVAL);
3366                 }
3367
3368                 if (data->ioc_inllen2 < sizeof(uuid)) {
3369                         obd_ioctl_freedata(buf, len);
3370                         GOTO(out, err = -EINVAL);
3371                 }
3372
3373                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3374                 desc->ld_tgt_count = 1;
3375                 desc->ld_active_tgt_count = 1;
3376                 desc->ld_default_stripe_count = 1;
3377                 desc->ld_default_stripe_size = 0;
3378                 desc->ld_default_stripe_offset = 0;
3379                 desc->ld_pattern = 0;
3380                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3381
3382                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3383
3384                 err = copy_to_user((void *)uarg, buf, len);
3385                 if (err)
3386                         err = -EFAULT;
3387                 obd_ioctl_freedata(buf, len);
3388                 GOTO(out, err);
3389         }
3390         case LL_IOC_LOV_SETSTRIPE:
3391                 err = obd_alloc_memmd(exp, karg);
3392                 if (err > 0)
3393                         err = 0;
3394                 GOTO(out, err);
3395         case LL_IOC_LOV_GETSTRIPE:
3396                 err = osc_getstripe(karg, uarg);
3397                 GOTO(out, err);
3398         case OBD_IOC_CLIENT_RECOVER:
3399                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3400                                             data->ioc_inlbuf1);
3401                 if (err > 0)
3402                         err = 0;
3403                 GOTO(out, err);
3404         case IOC_OSC_SET_ACTIVE:
3405                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3406                                                data->ioc_offset);
3407                 GOTO(out, err);
3408         case OBD_IOC_POLL_QUOTACHECK:
3409                 err = lquota_poll_check(quota_interface, exp,
3410                                         (struct if_quotacheck *)karg);
3411                 GOTO(out, err);
3412         case OBD_IOC_DESTROY: {
3413                 struct obdo            *oa;
3414
3415                 if (!capable (CAP_SYS_ADMIN))
3416                         GOTO (out, err = -EPERM);
3417                 oa = &data->ioc_obdo1;
3418
3419                 if (oa->o_id == 0)
3420                         GOTO(out, err = -EINVAL);
3421
3422                 oa->o_valid |= OBD_MD_FLGROUP;
3423
3424                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3425                 GOTO(out, err);
3426         }
3427         default:
3428                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3429                        cmd, cfs_curproc_comm());
3430                 GOTO(out, err = -ENOTTY);
3431         }
3432 out:
3433 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
3434         MOD_DEC_USE_COUNT;
3435 #else
3436         module_put(THIS_MODULE);
3437 #endif
3438         return err;
3439 }
3440
3441 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3442                         void *key, __u32 *vallen, void *val)
3443 {
3444         ENTRY;
3445         if (!vallen || !val)
3446                 RETURN(-EFAULT);
3447
3448         if (KEY_IS("lock_to_stripe")) {
3449                 __u32 *stripe = val;
3450                 *vallen = sizeof(*stripe);
3451                 *stripe = 0;
3452                 RETURN(0);
3453         } else if (KEY_IS("last_id")) {
3454                 struct ptlrpc_request *req;
3455                 obd_id *reply;
3456                 char *bufs[2] = { NULL, key };
3457                 int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
3458
3459                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3460                                       OST_GET_INFO, 2, size, bufs);
3461                 if (req == NULL)
3462                         RETURN(-ENOMEM);
3463
3464                 size[REPLY_REC_OFF] = *vallen;
3465                 ptlrpc_req_set_repsize(req, 2, size);
3466                 rc = ptlrpc_queue_wait(req);
3467                 if (rc)
3468                         GOTO(out, rc);
3469
3470                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3471                                            lustre_swab_ost_last_id);
3472                 if (reply == NULL) {
3473                         CERROR("Can't unpack OST last ID\n");
3474                         GOTO(out, rc = -EPROTO);
3475                 }
3476                 *((obd_id *)val) = *reply;
3477         out:
3478                 ptlrpc_req_finished(req);
3479                 RETURN(rc);
3480         }
3481         RETURN(-EINVAL);
3482 }
3483
3484 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3485                                           void *aa, int rc)
3486 {
3487         struct llog_ctxt *ctxt;
3488         struct obd_import *imp = req->rq_import;
3489         ENTRY;
3490
3491         if (rc != 0)
3492                 RETURN(rc);
3493
3494         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3495         if (ctxt) {
3496                 if (rc == 0)
3497                         rc = llog_initiator_connect(ctxt);
3498                 else
3499                         CERROR("cannot establish connection for "
3500                                "ctxt %p: %d\n", ctxt, rc);
3501         }
3502
3503         llog_ctxt_put(ctxt);
3504         spin_lock(&imp->imp_lock);
3505         imp->imp_server_timeout = 1;
3506         imp->imp_pingable = 1;
3507         spin_unlock(&imp->imp_lock);
3508         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3509
3510         RETURN(rc);
3511 }
3512
3513 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3514                               void *key, obd_count vallen, void *val,
3515                               struct ptlrpc_request_set *set)
3516 {
3517         struct ptlrpc_request *req;
3518         struct obd_device  *obd = exp->exp_obd;
3519         struct obd_import *imp = class_exp2cliimp(exp);
3520         int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3521         char *bufs[3] = { NULL, key, val };
3522         ENTRY;
3523
3524         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3525
3526         if (KEY_IS(KEY_NEXT_ID)) {
3527                 if (vallen != sizeof(obd_id))
3528                         RETURN(-EINVAL);
3529                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3530                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3531                        exp->exp_obd->obd_name,
3532                        obd->u.cli.cl_oscc.oscc_next_id);
3533
3534                 RETURN(0);
3535         }
3536
3537         if (KEY_IS("unlinked")) {
3538                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3539                 spin_lock(&oscc->oscc_lock);
3540                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3541                 spin_unlock(&oscc->oscc_lock);
3542                 RETURN(0);
3543         }
3544
3545         if (KEY_IS(KEY_INIT_RECOV)) {
3546                 if (vallen != sizeof(int))
3547                         RETURN(-EINVAL);
3548                 spin_lock(&imp->imp_lock);
3549                 imp->imp_initial_recov = *(int *)val;
3550                 spin_unlock(&imp->imp_lock);
3551                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3552                        exp->exp_obd->obd_name,
3553                        imp->imp_initial_recov);
3554                 RETURN(0);
3555         }
3556
3557         if (KEY_IS("checksum")) {
3558                 if (vallen != sizeof(int))
3559                         RETURN(-EINVAL);
3560                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3561                 RETURN(0);
3562         }
3563
3564         if (!set)
3565                 RETURN(-EINVAL);
3566
3567         /* We pass all other commands directly to OST. Since nobody calls osc
3568            methods directly and everybody is supposed to go through LOV, we
3569            assume lov checked invalid values for us.
3570            The only recognised values so far are evict_by_nid and mds_conn.
3571            Even if something bad goes through, we'd get a -EINVAL from OST
3572            anyway. */
3573
3574         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3575                               bufs);
3576         if (req == NULL)
3577                 RETURN(-ENOMEM);
3578
3579         if (KEY_IS(KEY_MDS_CONN))
3580                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3581
3582         ptlrpc_req_set_repsize(req, 1, NULL);
3583         ptlrpc_set_add_req(set, req);
3584         ptlrpc_check_set(set);
3585
3586         RETURN(0);
3587 }
3588
3589
3590 static struct llog_operations osc_size_repl_logops = {
3591         lop_cancel: llog_obd_repl_cancel
3592 };
3593
3594 static struct llog_operations osc_mds_ost_orig_logops;
3595 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3596                          int count, struct llog_catid *catid, 
3597                          struct obd_uuid *uuid)
3598 {
3599         int rc;
3600         ENTRY;
3601
3602         spin_lock(&obd->obd_dev_lock);
3603         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3604                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3605                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3606                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3607                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3608                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3609         }
3610         spin_unlock(&obd->obd_dev_lock);
3611
3612         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3613                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3614         if (rc) {
3615                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3616                 GOTO (out, rc);
3617         }
3618
3619         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3620                         &osc_size_repl_logops);
3621         if (rc) 
3622                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3623 out:
3624         if (rc) {
3625                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
3626                        obd->obd_name, tgt->obd_name, count, catid, rc);
3627                 CERROR("logid "LPX64":0x%x\n",
3628                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3629         }
3630         RETURN(rc);
3631 }
3632
3633 static int osc_llog_finish(struct obd_device *obd, int count)
3634 {
3635         struct llog_ctxt *ctxt;
3636         int rc = 0, rc2 = 0;
3637         ENTRY;
3638
3639         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3640         if (ctxt)
3641                 rc = llog_cleanup(ctxt);
3642
3643         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3644         if (ctxt)
3645                 rc2 = llog_cleanup(ctxt);
3646         if (!rc)
3647                 rc = rc2;
3648
3649         RETURN(rc);
3650 }
3651
3652 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3653                          struct obd_uuid *cluuid,
3654                          struct obd_connect_data *data)
3655 {
3656         struct client_obd *cli = &obd->u.cli;
3657
3658         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3659                 long lost_grant;
3660
3661                 client_obd_list_lock(&cli->cl_loi_list_lock);
3662                 data->ocd_grant = cli->cl_avail_grant ?:
3663                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3664                 lost_grant = cli->cl_lost_grant;
3665                 cli->cl_lost_grant = 0;
3666                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3667
3668                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3669                        "cl_lost_grant: %ld\n", data->ocd_grant,
3670                        cli->cl_avail_grant, lost_grant);
3671                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3672                        " ocd_grant: %d\n", data->ocd_connect_flags,
3673                        data->ocd_version, data->ocd_grant);
3674         }
3675
3676         RETURN(0);
3677 }
3678
3679 static int osc_disconnect(struct obd_export *exp)
3680 {
3681         struct obd_device *obd = class_exp2obd(exp);
3682         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3683         int rc;
3684
3685         if (obd->u.cli.cl_conn_count == 1)
3686                 /* flush any remaining cancel messages out to the target */
3687                 llog_sync(ctxt, exp);
3688         
3689         llog_ctxt_put(ctxt);
3690
3691         rc = client_disconnect_export(exp);
3692         return rc;
3693 }
3694
3695 static int osc_import_event(struct obd_device *obd,
3696                             struct obd_import *imp,
3697                             enum obd_import_event event)
3698 {
3699         struct client_obd *cli;
3700         int rc = 0;
3701
3702         ENTRY;
3703         LASSERT(imp->imp_obd == obd);
3704
3705         switch (event) {
3706         case IMP_EVENT_DISCON: {
3707                 /* Only do this on the MDS OSC's */
3708                 if (imp->imp_server_timeout) {
3709                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3710
3711                         spin_lock(&oscc->oscc_lock);
3712                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3713                         spin_unlock(&oscc->oscc_lock);
3714                 }
3715                 cli = &obd->u.cli;
3716                 client_obd_list_lock(&cli->cl_loi_list_lock);
3717                 cli->cl_avail_grant = 0;
3718                 cli->cl_lost_grant = 0;
3719                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3720                 ptlrpc_import_setasync(imp, -1);
3721
3722                 break;
3723         }
3724         case IMP_EVENT_INACTIVE: {
3725                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3726                 break;
3727         }
3728         case IMP_EVENT_INVALIDATE: {
3729                 struct ldlm_namespace *ns = obd->obd_namespace;
3730
3731                 /* Reset grants */
3732                 cli = &obd->u.cli;
3733                 client_obd_list_lock(&cli->cl_loi_list_lock);
3734                 /* all pages go to failing rpcs due to the invalid import */
3735                 osc_check_rpcs(cli);
3736                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3737
3738                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3739
3740                 break;
3741         }
3742         case IMP_EVENT_ACTIVE: {
3743                 /* Only do this on the MDS OSC's */
3744                 if (imp->imp_server_timeout) {
3745                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3746
3747                         spin_lock(&oscc->oscc_lock);
3748                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3749                         spin_unlock(&oscc->oscc_lock);
3750                 }
3751                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3752                 break;
3753         }
3754         case IMP_EVENT_OCD: {
3755                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3756
3757                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3758                         osc_init_grant(&obd->u.cli, ocd);
3759
3760                 /* See bug 7198 */
3761                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3762                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3763
3764                 ptlrpc_import_setasync(imp, 1);
3765                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3766                 break;
3767         }
3768         default:
3769                 CERROR("Unknown import event %d\n", event);
3770                 LBUG();
3771         }
3772         RETURN(rc);
3773 }
3774
3775 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3776 {
3777         int rc;
3778         ENTRY;
3779
3780         ENTRY;
3781         rc = ptlrpcd_addref();
3782         if (rc)
3783                 RETURN(rc);
3784
3785         rc = client_obd_setup(obd, len, buf);
3786         if (rc) {
3787                 ptlrpcd_decref();
3788         } else {
3789                 struct lprocfs_static_vars lvars = { 0 };
3790                 struct client_obd *cli = &obd->u.cli;
3791
3792                 lprocfs_osc_init_vars(&lvars);
3793                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3794                         lproc_osc_attach_seqstat(obd);
3795                         ptlrpc_lprocfs_register_obd(obd);
3796                 }
3797
3798                 oscc_init(obd);
3799                 /* We need to allocate a few requests more, because
3800                    brw_interpret_oap tries to create new requests before freeing
3801                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3802                    reserved, but I afraid that might be too much wasted RAM
3803                    in fact, so 2 is just my guess and still should work. */
3804                 cli->cl_import->imp_rq_pool =
3805                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3806                                             OST_MAXREQSIZE,
3807                                             ptlrpc_add_rqs_to_pool);
3808                 cli->cl_cache = cache_create(obd);
3809                 if (!cli->cl_cache) {
3810                         osc_cleanup(obd);
3811                         rc = -ENOMEM;
3812                 }
3813         }
3814
3815         RETURN(rc);
3816 }
3817
3818 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3819 {
3820         int rc = 0;
3821         ENTRY;
3822
3823         switch (stage) {
3824         case OBD_CLEANUP_EARLY: {
3825                 struct obd_import *imp;
3826                 imp = obd->u.cli.cl_import;
3827                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3828                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3829                 ptlrpc_deactivate_import(imp);
3830                 break;
3831         }
3832         case OBD_CLEANUP_EXPORTS: {
3833                 /* If we set up but never connected, the
3834                    client import will not have been cleaned. */
3835                 if (obd->u.cli.cl_import) {
3836                         struct obd_import *imp;
3837                         imp = obd->u.cli.cl_import;
3838                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3839                                obd->obd_name);
3840                         ptlrpc_invalidate_import(imp);
3841                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3842                         class_destroy_import(imp);
3843                         obd->u.cli.cl_import = NULL;
3844                 }
3845                 break;
3846         }
3847         case OBD_CLEANUP_SELF_EXP:
3848                 rc = obd_llog_finish(obd, 0);
3849                 if (rc != 0)
3850                         CERROR("failed to cleanup llogging subsystems\n");
3851                 break;
3852         case OBD_CLEANUP_OBD:
3853                 break;
3854         }
3855         RETURN(rc);
3856 }
3857
3858 int osc_cleanup(struct obd_device *obd)
3859 {
3860         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3861         int rc;
3862
3863         ENTRY;
3864         ptlrpc_lprocfs_unregister_obd(obd);
3865         lprocfs_obd_cleanup(obd);
3866
3867         spin_lock(&oscc->oscc_lock);
3868         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3869         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3870         spin_unlock(&oscc->oscc_lock);
3871
3872         /* free memory of osc quota cache */
3873         lquota_cleanup(quota_interface, obd);
3874
3875         cache_destroy(obd->u.cli.cl_cache);
3876         rc = client_obd_cleanup(obd);
3877
3878         ptlrpcd_decref();
3879         RETURN(rc);
3880 }
3881
3882 static int osc_register_page_removal_cb(struct obd_export *exp,
3883                                         obd_page_removal_cb_t func,
3884                                         obd_pin_extent_cb pin_cb)
3885 {
3886         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
3887                                            pin_cb);
3888 }
3889
3890 static int osc_unregister_page_removal_cb(struct obd_export *exp,
3891                                           obd_page_removal_cb_t func)
3892 {
3893         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
3894 }
3895
3896 static int osc_register_lock_cancel_cb(struct obd_export *exp,
3897                                        obd_lock_cancel_cb cb)
3898 {
3899         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
3900
3901         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
3902         return 0;
3903 }
3904
3905 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
3906                                          obd_lock_cancel_cb cb)
3907 {
3908         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
3909                 CERROR("Unregistering cancel cb %p, while only %p was "
3910                        "registered\n", cb,
3911                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
3912                 RETURN(-EINVAL);
3913         }
3914
3915         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
3916         return 0;
3917 }
3918
3919 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3920 {
3921         struct lustre_cfg *lcfg = buf;
3922         struct lprocfs_static_vars lvars = { 0 };
3923         int rc = 0;
3924
3925         lprocfs_osc_init_vars(&lvars);
3926
3927         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
3928         return(rc);
3929 }
3930
3931 struct obd_ops osc_obd_ops = {
3932         .o_owner                = THIS_MODULE,
3933         .o_setup                = osc_setup,
3934         .o_precleanup           = osc_precleanup,
3935         .o_cleanup              = osc_cleanup,
3936         .o_add_conn             = client_import_add_conn,
3937         .o_del_conn             = client_import_del_conn,
3938         .o_connect              = client_connect_import,
3939         .o_reconnect            = osc_reconnect,
3940         .o_disconnect           = osc_disconnect,
3941         .o_statfs               = osc_statfs,
3942         .o_statfs_async         = osc_statfs_async,
3943         .o_packmd               = osc_packmd,
3944         .o_unpackmd             = osc_unpackmd,
3945         .o_precreate            = osc_precreate,
3946         .o_create               = osc_create,
3947         .o_destroy              = osc_destroy,
3948         .o_getattr              = osc_getattr,
3949         .o_getattr_async        = osc_getattr_async,
3950         .o_setattr              = osc_setattr,
3951         .o_setattr_async        = osc_setattr_async,
3952         .o_brw                  = osc_brw,
3953         .o_brw_async            = osc_brw_async,
3954         .o_prep_async_page      = osc_prep_async_page,
3955         .o_reget_short_lock     = osc_reget_short_lock,
3956         .o_release_short_lock   = osc_release_short_lock,
3957         .o_queue_async_io       = osc_queue_async_io,
3958         .o_set_async_flags      = osc_set_async_flags,
3959         .o_queue_group_io       = osc_queue_group_io,
3960         .o_trigger_group_io     = osc_trigger_group_io,
3961         .o_teardown_async_page  = osc_teardown_async_page,
3962         .o_punch                = osc_punch,
3963         .o_sync                 = osc_sync,
3964         .o_enqueue              = osc_enqueue,
3965         .o_match                = osc_match,
3966         .o_change_cbdata        = osc_change_cbdata,
3967         .o_cancel               = osc_cancel,
3968         .o_cancel_unused        = osc_cancel_unused,
3969         .o_join_lru             = osc_join_lru,
3970         .o_iocontrol            = osc_iocontrol,
3971         .o_get_info             = osc_get_info,
3972         .o_set_info_async       = osc_set_info_async,
3973         .o_import_event         = osc_import_event,
3974         .o_llog_init            = osc_llog_init,
3975         .o_llog_finish          = osc_llog_finish,
3976         .o_process_config       = osc_process_config,
3977         .o_register_page_removal_cb = osc_register_page_removal_cb,
3978         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
3979         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
3980         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
3981 };
3982 int __init osc_init(void)
3983 {
3984         struct lprocfs_static_vars lvars = { 0 };
3985         int rc;
3986         ENTRY;
3987
3988         lprocfs_osc_init_vars(&lvars);
3989
3990         request_module("lquota");
3991         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
3992         lquota_init(quota_interface);
3993         init_obd_quota_ops(quota_interface, &osc_obd_ops);
3994
3995         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
3996                                  LUSTRE_OSC_NAME);
3997         if (rc) {
3998                 if (quota_interface)
3999                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4000                 RETURN(rc);
4001         }
4002
4003         RETURN(rc);
4004 }
4005
4006 #ifdef __KERNEL__
4007 static void /*__exit*/ osc_exit(void)
4008 {
4009         lquota_exit(quota_interface);
4010         if (quota_interface)
4011                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4012
4013         class_unregister_type(LUSTRE_OSC_NAME);
4014 }
4015
4016 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4017 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4018 MODULE_LICENSE("GPL");
4019
4020 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4021 #endif