Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #ifdef __KERNEL__
38 # include <libcfs/libcfs.h>
39 #else /* __KERNEL__ */
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <libcfs/kp30.h>
45 #include <lustre_net.h>
46 #include <lustre/lustre_user.h>
47 #include <obd_cksum.h>
48 #include <obd_ost.h>
49 #include <obd_lov.h>
50
51 #ifdef  __CYGWIN__
52 # include <ctype.h>
53 #endif
54
55 #include <lustre_ha.h>
56 #include <lprocfs_status.h>
57 #include <lustre_log.h>
58 #include <lustre_debug.h>
59 #include <lustre_param.h>
60 #include <lustre_cache.h>
61 #include "osc_internal.h"
62
63 static quota_interface_t *quota_interface = NULL;
64 extern quota_interface_t osc_quota_interface;
65
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (!lmmp)
79                 RETURN(lmm_size);
80
81         if (*lmmp && !lsm) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         }
86
87         if (!*lmmp) {
88                 OBD_ALLOC(*lmmp, lmm_size);
89                 if (!*lmmp)
90                         RETURN(-ENOMEM);
91         }
92
93         if (lsm) {
94                 LASSERT(lsm->lsm_object_id);
95                 LASSERT(lsm->lsm_object_gr);
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         ENTRY;
109
110         if (lmm != NULL) {
111                 if (lmm_bytes < sizeof (*lmm)) {
112                         CERROR("lov_mds_md too small: %d, need %d\n",
113                                lmm_bytes, (int)sizeof(*lmm));
114                         RETURN(-EINVAL);
115                 }
116                 /* XXX LOV_MAGIC etc check? */
117
118                 if (lmm->lmm_object_id == 0) {
119                         CERROR("lov_mds_md: zero lmm_object_id\n");
120                         RETURN(-EINVAL);
121                 }
122         }
123
124         lsm_size = lov_stripe_md_size(1);
125         if (lsmp == NULL)
126                 RETURN(lsm_size);
127
128         if (*lsmp != NULL && lmm == NULL) {
129                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
130                 OBD_FREE(*lsmp, lsm_size);
131                 *lsmp = NULL;
132                 RETURN(0);
133         }
134
135         if (*lsmp == NULL) {
136                 OBD_ALLOC(*lsmp, lsm_size);
137                 if (*lsmp == NULL)
138                         RETURN(-ENOMEM);
139                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
140                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
141                         OBD_FREE(*lsmp, lsm_size);
142                         RETURN(-ENOMEM);
143                 }
144                 loi_init((*lsmp)->lsm_oinfo[0]);
145         }
146
147         if (lmm != NULL) {
148                 /* XXX zero *lsmp? */
149                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
150                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
151                 LASSERT((*lsmp)->lsm_object_id);
152                 LASSERT((*lsmp)->lsm_object_gr);
153         }
154
155         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157         RETURN(lsm_size);
158 }
159
160 static inline void osc_pack_capa(struct ptlrpc_request *req,
161                                  struct ost_body *body, void *capa)
162 {
163         struct obd_capa *oc = (struct obd_capa *)capa;
164         struct lustre_capa *c;
165
166         if (!capa)
167                 return;
168
169         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170         LASSERT(c);
171         capa_cpy(c, oc);
172         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173         DEBUG_CAPA(D_SEC, c, "pack");
174 }
175
176 static inline void osc_pack_req_body(struct ptlrpc_request *req,
177                                      struct obd_info *oinfo)
178 {
179         struct ost_body *body;
180
181         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182         LASSERT(body);
183
184         body->oa = *oinfo->oi_oa;
185         osc_pack_capa(req, body, oinfo->oi_capa);
186 }
187
188 static inline void osc_set_capa_size(struct ptlrpc_request *req,
189                                      const struct req_msg_field *field,
190                                      struct obd_capa *oc)
191 {
192         if (oc == NULL)
193                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
194         else
195                 /* it is already calculated as sizeof struct obd_capa */
196                 ;
197 }
198
199 static int osc_getattr_interpret(struct ptlrpc_request *req,
200                                  struct osc_async_args *aa, int rc)
201 {
202         struct ost_body *body;
203         ENTRY;
204
205         if (rc != 0)
206                 GOTO(out, rc);
207
208         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
209                                   lustre_swab_ost_body);
210         if (body) {
211                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
212                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
213
214                 /* This should really be sent by the OST */
215                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
216                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
217         } else {
218                 CDEBUG(D_INFO, "can't unpack ost_body\n");
219                 rc = -EPROTO;
220                 aa->aa_oi->oi_oa->o_valid = 0;
221         }
222 out:
223         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
224         RETURN(rc);
225 }
226
227 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
228                              struct ptlrpc_request_set *set)
229 {
230         struct ptlrpc_request *req;
231         struct osc_async_args *aa;
232         int                    rc;
233         ENTRY;
234
235         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
236         if (req == NULL)
237                 RETURN(-ENOMEM);
238
239         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
240         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
241         if (rc) {
242                 ptlrpc_request_free(req);
243                 RETURN(rc);
244         }
245
246         osc_pack_req_body(req, oinfo);
247
248         ptlrpc_request_set_replen(req);
249         req->rq_interpret_reply = osc_getattr_interpret;
250
251         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
252         aa = (struct osc_async_args *)&req->rq_async_args;
253         aa->aa_oi = oinfo;
254
255         ptlrpc_set_add_req(set, req);
256         RETURN(0);
257 }
258
259 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
260 {
261         struct ptlrpc_request *req;
262         struct ost_body       *body;
263         int                    rc;
264         ENTRY;
265
266         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
267         if (req == NULL)
268                 RETURN(-ENOMEM);
269
270         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
271         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
272         if (rc) {
273                 ptlrpc_request_free(req);
274                 RETURN(rc);
275         }
276
277         osc_pack_req_body(req, oinfo);
278
279         ptlrpc_request_set_replen(req);
280  
281         rc = ptlrpc_queue_wait(req);
282         if (rc)
283                 GOTO(out, rc);
284
285         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
286         if (body == NULL)
287                 GOTO(out, rc = -EPROTO);
288
289         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
290         *oinfo->oi_oa = body->oa;
291
292         /* This should really be sent by the OST */
293         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
294         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
295
296         EXIT;
297  out:
298         ptlrpc_req_finished(req);
299         return rc;
300 }
301
302 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
303                        struct obd_trans_info *oti)
304 {
305         struct ptlrpc_request *req;
306         struct ost_body       *body;
307         int                    rc;
308         ENTRY;
309
310         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
311                                         oinfo->oi_oa->o_gr > 0);
312
313         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
314         if (req == NULL)
315                 RETURN(-ENOMEM);
316
317         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
318         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
319         if (rc) {
320                 ptlrpc_request_free(req);
321                 RETURN(rc);
322         }
323
324         osc_pack_req_body(req, oinfo);
325
326         ptlrpc_request_set_replen(req);
327  
328
329         rc = ptlrpc_queue_wait(req);
330         if (rc)
331                 GOTO(out, rc);
332
333         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
334         if (body == NULL)
335                 GOTO(out, rc = -EPROTO);
336
337         *oinfo->oi_oa = body->oa;
338
339         EXIT;
340 out:
341         ptlrpc_req_finished(req);
342         RETURN(rc);
343 }
344
345 static int osc_setattr_interpret(struct ptlrpc_request *req,
346                                  struct osc_async_args *aa, int rc)
347 {
348         struct ost_body *body;
349         ENTRY;
350
351         if (rc != 0)
352                 GOTO(out, rc);
353
354         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
355         if (body == NULL)
356                 GOTO(out, rc = -EPROTO);
357
358         *aa->aa_oi->oi_oa = body->oa;
359 out:
360         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
361         RETURN(rc);
362 }
363
364 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
365                              struct obd_trans_info *oti,
366                              struct ptlrpc_request_set *rqset)
367 {
368         struct ptlrpc_request *req;
369         struct osc_async_args *aa;
370         int                    rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
374         if (req == NULL)
375                 RETURN(-ENOMEM);
376
377         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
378         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
379         if (rc) {
380                 ptlrpc_request_free(req);
381                 RETURN(rc);
382         }
383
384         osc_pack_req_body(req, oinfo);
385
386         ptlrpc_request_set_replen(req);
387  
388         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
389                 LASSERT(oti);
390                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
391         }
392
393         /* do mds to ost setattr asynchronouly */
394         if (!rqset) {
395                 /* Do not wait for response. */
396                 ptlrpcd_add_req(req);
397         } else {
398                 req->rq_interpret_reply = osc_setattr_interpret;
399
400                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
401                 aa = (struct osc_async_args *)&req->rq_async_args;
402                 aa->aa_oi = oinfo;
403
404                 ptlrpc_set_add_req(rqset, req);
405         }
406
407         RETURN(0);
408 }
409
410 int osc_real_create(struct obd_export *exp, struct obdo *oa,
411                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
412 {
413         struct ptlrpc_request *req;
414         struct ost_body       *body;
415         struct lov_stripe_md  *lsm;
416         int                    rc;
417         ENTRY;
418
419         LASSERT(oa);
420         LASSERT(ea);
421
422         lsm = *ea;
423         if (!lsm) {
424                 rc = obd_alloc_memmd(exp, &lsm);
425                 if (rc < 0)
426                         RETURN(rc);
427         }
428
429         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
430         if (req == NULL)
431                 GOTO(out, rc = -ENOMEM);
432
433         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
434         if (rc) {
435                 ptlrpc_request_free(req);
436                 GOTO(out, rc);
437         }
438
439         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
440         LASSERT(body);
441         body->oa = *oa;
442
443         ptlrpc_request_set_replen(req);
444
445         if (oa->o_valid & OBD_MD_FLINLINE) {
446                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
447                         oa->o_flags == OBD_FL_DELORPHAN);
448                 DEBUG_REQ(D_HA, req,
449                           "delorphan from OST integration");
450                 /* Don't resend the delorphan req */
451                 req->rq_no_resend = req->rq_no_delay = 1;
452         }
453
454         rc = ptlrpc_queue_wait(req);
455         if (rc)
456                 GOTO(out_req, rc);
457
458         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
459         if (body == NULL)
460                 GOTO(out_req, rc = -EPROTO);
461
462         *oa = body->oa;
463
464         /* This should really be sent by the OST */
465         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
466         oa->o_valid |= OBD_MD_FLBLKSZ;
467
468         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
469          * have valid lsm_oinfo data structs, so don't go touching that.
470          * This needs to be fixed in a big way.
471          */
472         lsm->lsm_object_id = oa->o_id;
473         lsm->lsm_object_gr = oa->o_gr;
474         *ea = lsm;
475
476         if (oti != NULL) {
477                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478
479                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
480                         if (!oti->oti_logcookies)
481                                 oti_alloc_cookies(oti, 1);
482                         *oti->oti_logcookies = *obdo_logcookie(oa);
483                 }
484         }
485
486         CDEBUG(D_HA, "transno: "LPD64"\n",
487                lustre_msg_get_transno(req->rq_repmsg));
488 out_req:
489         ptlrpc_req_finished(req);
490 out:
491         if (rc && !*ea)
492                 obd_free_memmd(exp, &lsm);
493         RETURN(rc);
494 }
495
496 static int osc_punch_interpret(struct ptlrpc_request *req,
497                                struct osc_async_args *aa, int rc)
498 {
499         struct ost_body *body;
500         ENTRY;
501
502         if (rc != 0)
503                 GOTO(out, rc);
504
505         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506         if (body == NULL)
507                 GOTO(out, rc = -EPROTO);
508
509         *aa->aa_oi->oi_oa = body->oa;
510 out:
511         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
512         RETURN(rc);
513 }
514
515 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
516                      struct obd_trans_info *oti,
517                      struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request *req;
520         struct osc_async_args *aa;
521         struct ost_body       *body;
522         int                    rc;
523         ENTRY;
524
525         if (!oinfo->oi_oa) {
526                 CDEBUG(D_INFO, "oa NULL\n");
527                 RETURN(-EINVAL);
528         }
529
530         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
531         if (req == NULL)
532                 RETURN(-ENOMEM);
533
534         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
535         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 RETURN(rc);
539         }
540         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541         osc_pack_req_body(req, oinfo);
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         body->oa.o_size = oinfo->oi_policy.l_extent.start;
547         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
548         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
549         ptlrpc_request_set_replen(req);
550
551
552         req->rq_interpret_reply = osc_punch_interpret;
553         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554         aa = (struct osc_async_args *)&req->rq_async_args;
555         aa->aa_oi = oinfo;
556         ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync(struct obd_export *exp, struct obdo *oa,
562                     struct lov_stripe_md *md, obd_size start, obd_size end,
563                     void *capa)
564 {
565         struct ptlrpc_request *req;
566         struct ost_body       *body;
567         int                    rc;
568         ENTRY;
569
570         if (!oa) {
571                 CDEBUG(D_INFO, "oa NULL\n");
572                 RETURN(-EINVAL);
573         }
574
575         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
576         if (req == NULL)
577                 RETURN(-ENOMEM);
578
579         osc_set_capa_size(req, &RMF_CAPA1, capa);
580         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
581         if (rc) {
582                 ptlrpc_request_free(req);
583                 RETURN(rc);
584         }
585
586         /* overload the size and blocks fields in the oa with start/end */
587         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
588         LASSERT(body);
589         body->oa = *oa;
590         body->oa.o_size = start;
591         body->oa.o_blocks = end;
592         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
593         osc_pack_capa(req, body, capa);
594
595         ptlrpc_request_set_replen(req);
596
597         rc = ptlrpc_queue_wait(req);
598         if (rc)
599                 GOTO(out, rc);
600
601         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602         if (body == NULL)
603                 GOTO(out, rc = -EPROTO);
604
605         *oa = body->oa;
606
607         EXIT;
608  out:
609         ptlrpc_req_finished(req);
610         return rc;
611 }
612
613 /* Find and cancel locally locks matched by @mode in the resource found by
614  * @objid. Found locks are added into @cancel list. Returns the amount of
615  * locks added to @cancels list. */
616 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
617                                    struct list_head *cancels, ldlm_mode_t mode,
618                                    int lock_flags)
619 {
620         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
621         struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
622         struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
623         int count;
624         ENTRY;
625
626         if (res == NULL)
627                 RETURN(0);
628
629         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
630                                            lock_flags, 0, NULL);
631         ldlm_resource_putref(res);
632         RETURN(count);
633 }
634
635 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
636                                  int rc)
637 {
638         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
639
640         atomic_dec(&cli->cl_destroy_in_flight);
641         cfs_waitq_signal(&cli->cl_destroy_waitq);
642         return 0;
643 }
644
645 static int osc_can_send_destroy(struct client_obd *cli)
646 {
647         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
648             cli->cl_max_rpcs_in_flight) {
649                 /* The destroy request can be sent */
650                 return 1;
651         }
652         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
653             cli->cl_max_rpcs_in_flight) {
654                 /*
655                  * The counter has been modified between the two atomic
656                  * operations.
657                  */
658                 cfs_waitq_signal(&cli->cl_destroy_waitq);
659         }
660         return 0;
661 }
662
663 /* Destroy requests can be async always on the client, and we don't even really
664  * care about the return code since the client cannot do anything at all about
665  * a destroy failure.
666  * When the MDS is unlinking a filename, it saves the file objects into a
667  * recovery llog, and these object records are cancelled when the OST reports
668  * they were destroyed and sync'd to disk (i.e. transaction committed).
669  * If the client dies, or the OST is down when the object should be destroyed,
670  * the records are not cancelled, and when the OST reconnects to the MDS next,
671  * it will retrieve the llog unlink logs and then sends the log cancellation
672  * cookies to the MDS after committing destroy transactions. */
673 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
674                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
675                        struct obd_export *md_export)
676 {
677         struct client_obd     *cli = &exp->exp_obd->u.cli;
678         struct ptlrpc_request *req;
679         struct ost_body       *body;
680         CFS_LIST_HEAD(cancels);
681         int rc, count;
682         ENTRY;
683
684         if (!oa) {
685                 CDEBUG(D_INFO, "oa NULL\n");
686                 RETURN(-EINVAL);
687         }
688
689         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
690                                         LDLM_FL_DISCARD_DATA);
691
692         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
693         if (req == NULL) {
694                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
695                 RETURN(-ENOMEM);
696         }
697
698         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
699                                0, &cancels, count);
700         if (rc) {
701                 ptlrpc_request_free(req);
702                 RETURN(rc);
703         }
704
705         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
706         req->rq_interpret_reply = osc_destroy_interpret;
707
708         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
709                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
710                        sizeof(*oti->oti_logcookies));
711         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
712         LASSERT(body);
713         body->oa = *oa;
714
715         ptlrpc_request_set_replen(req);
716
717         if (!osc_can_send_destroy(cli)) {
718                 struct l_wait_info lwi = { 0 };
719
720                 /*
721                  * Wait until the number of on-going destroy RPCs drops
722                  * under max_rpc_in_flight
723                  */
724                 l_wait_event_exclusive(cli->cl_destroy_waitq,
725                                        osc_can_send_destroy(cli), &lwi);
726         }
727
728         /* Do not wait for response */
729         ptlrpcd_add_req(req);
730         RETURN(0);
731 }
732
733 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
734                                 long writing_bytes)
735 {
736         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
737
738         LASSERT(!(oa->o_valid & bits));
739
740         oa->o_valid |= bits;
741         client_obd_list_lock(&cli->cl_loi_list_lock);
742         oa->o_dirty = cli->cl_dirty;
743         if (cli->cl_dirty > cli->cl_dirty_max) {
744                 CERROR("dirty %lu > dirty_max %lu\n",
745                        cli->cl_dirty, cli->cl_dirty_max);
746                 oa->o_undirty = 0;
747         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
748                 CERROR("dirty %d > system dirty_max %d\n",
749                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
750                 oa->o_undirty = 0;
751         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
752                 CERROR("dirty %lu - dirty_max %lu too big???\n",
753                        cli->cl_dirty, cli->cl_dirty_max);
754                 oa->o_undirty = 0;
755         } else {
756                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
757                                 (cli->cl_max_rpcs_in_flight + 1);
758                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
759         }
760         oa->o_grant = cli->cl_avail_grant;
761         oa->o_dropped = cli->cl_lost_grant;
762         cli->cl_lost_grant = 0;
763         client_obd_list_unlock(&cli->cl_loi_list_lock);
764         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
765                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
766 }
767
768 /* caller must hold loi_list_lock */
769 static void osc_consume_write_grant(struct client_obd *cli,
770                                     struct brw_page *pga)
771 {
772         atomic_inc(&obd_dirty_pages);
773         cli->cl_dirty += CFS_PAGE_SIZE;
774         cli->cl_avail_grant -= CFS_PAGE_SIZE;
775         pga->flag |= OBD_BRW_FROM_GRANT;
776         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
777                CFS_PAGE_SIZE, pga, pga->pg);
778         LASSERT(cli->cl_avail_grant >= 0);
779 }
780
781 /* the companion to osc_consume_write_grant, called when a brw has completed.
782  * must be called with the loi lock held. */
783 static void osc_release_write_grant(struct client_obd *cli,
784                                     struct brw_page *pga, int sent)
785 {
786         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
787         ENTRY;
788
789         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
790                 EXIT;
791                 return;
792         }
793
794         pga->flag &= ~OBD_BRW_FROM_GRANT;
795         atomic_dec(&obd_dirty_pages);
796         cli->cl_dirty -= CFS_PAGE_SIZE;
797         if (!sent) {
798                 cli->cl_lost_grant += CFS_PAGE_SIZE;
799                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
800                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
801         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
802                 /* For short writes we shouldn't count parts of pages that
803                  * span a whole block on the OST side, or our accounting goes
804                  * wrong.  Should match the code in filter_grant_check. */
805                 int offset = pga->off & ~CFS_PAGE_MASK;
806                 int count = pga->count + (offset & (blocksize - 1));
807                 int end = (offset + pga->count) & (blocksize - 1);
808                 if (end)
809                         count += blocksize - end;
810
811                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
812                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
813                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
814                        cli->cl_avail_grant, cli->cl_dirty);
815         }
816
817         EXIT;
818 }
819
820 static unsigned long rpcs_in_flight(struct client_obd *cli)
821 {
822         return cli->cl_r_in_flight + cli->cl_w_in_flight;
823 }
824
825 /* caller must hold loi_list_lock */
826 void osc_wake_cache_waiters(struct client_obd *cli)
827 {
828         struct list_head *l, *tmp;
829         struct osc_cache_waiter *ocw;
830
831         ENTRY;
832         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
833                 /* if we can't dirty more, we must wait until some is written */
834                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
835                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
836                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
837                                "osc max %ld, sys max %d\n", cli->cl_dirty,
838                                cli->cl_dirty_max, obd_max_dirty_pages);
839                         return;
840                 }
841
842                 /* if still dirty cache but no grant wait for pending RPCs that
843                  * may yet return us some grant before doing sync writes */
844                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
845                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
846                                cli->cl_w_in_flight);
847                         return;
848                 }
849
850                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
851                 list_del_init(&ocw->ocw_entry);
852                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
853                         /* no more RPCs in flight to return grant, do sync IO */
854                         ocw->ocw_rc = -EDQUOT;
855                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
856                 } else {
857                         osc_consume_write_grant(cli,
858                                                 &ocw->ocw_oap->oap_brw_page);
859                 }
860
861                 cfs_waitq_signal(&ocw->ocw_waitq);
862         }
863
864         EXIT;
865 }
866
867 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
868 {
869         client_obd_list_lock(&cli->cl_loi_list_lock);
870         cli->cl_avail_grant = ocd->ocd_grant;
871         client_obd_list_unlock(&cli->cl_loi_list_lock);
872
873         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
874                cli->cl_avail_grant, cli->cl_lost_grant);
875         LASSERT(cli->cl_avail_grant >= 0);
876 }
877
878 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
879 {
880         client_obd_list_lock(&cli->cl_loi_list_lock);
881         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
882         if (body->oa.o_valid & OBD_MD_FLGRANT)
883                 cli->cl_avail_grant += body->oa.o_grant;
884         /* waiters are woken in brw_interpret */
885         client_obd_list_unlock(&cli->cl_loi_list_lock);
886 }
887
888 /* We assume that the reason this OSC got a short read is because it read
889  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
890  * via the LOV, and it _knows_ it's reading inside the file, it's just that
891  * this stripe never got written at or beyond this stripe offset yet. */
892 static void handle_short_read(int nob_read, obd_count page_count,
893                               struct brw_page **pga)
894 {
895         char *ptr;
896         int i = 0;
897
898         /* skip bytes read OK */
899         while (nob_read > 0) {
900                 LASSERT (page_count > 0);
901
902                 if (pga[i]->count > nob_read) {
903                         /* EOF inside this page */
904                         ptr = cfs_kmap(pga[i]->pg) +
905                                 (pga[i]->off & ~CFS_PAGE_MASK);
906                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
907                         cfs_kunmap(pga[i]->pg);
908                         page_count--;
909                         i++;
910                         break;
911                 }
912
913                 nob_read -= pga[i]->count;
914                 page_count--;
915                 i++;
916         }
917
918         /* zero remaining pages */
919         while (page_count-- > 0) {
920                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
921                 memset(ptr, 0, pga[i]->count);
922                 cfs_kunmap(pga[i]->pg);
923                 i++;
924         }
925 }
926
927 static int check_write_rcs(struct ptlrpc_request *req,
928                            int requested_nob, int niocount,
929                            obd_count page_count, struct brw_page **pga)
930 {
931         int    *remote_rcs, i;
932
933         /* return error if any niobuf was in error */
934         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
935                                         sizeof(*remote_rcs) * niocount, NULL);
936         if (remote_rcs == NULL) {
937                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
938                 return(-EPROTO);
939         }
940         if (lustre_msg_swabbed(req->rq_repmsg))
941                 for (i = 0; i < niocount; i++)
942                         __swab32s(&remote_rcs[i]);
943
944         for (i = 0; i < niocount; i++) {
945                 if (remote_rcs[i] < 0)
946                         return(remote_rcs[i]);
947
948                 if (remote_rcs[i] != 0) {
949                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
950                                 i, remote_rcs[i], req);
951                         return(-EPROTO);
952                 }
953         }
954
955         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
956                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
957                        requested_nob, req->rq_bulk->bd_nob_transferred);
958                 return(-EPROTO);
959         }
960
961         return (0);
962 }
963
964 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
965 {
966         if (p1->flag != p2->flag) {
967                 unsigned mask = ~OBD_BRW_FROM_GRANT;
968
969                 /* warn if we try to combine flags that we don't know to be
970                  * safe to combine */
971                 if ((p1->flag & mask) != (p2->flag & mask))
972                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
973                                "same brw?\n", p1->flag, p2->flag);
974                 return 0;
975         }
976
977         return (p1->off + p1->count == p2->off);
978 }
979
980 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
981                                    struct brw_page **pga, int opc,
982                                    cksum_type_t cksum_type)
983 {
984         __u32 cksum;
985         int i = 0;
986
987         LASSERT (pg_count > 0);
988         cksum = init_checksum(cksum_type);
989         while (nob > 0 && pg_count > 0) {
990                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
991                 int off = pga[i]->off & ~CFS_PAGE_MASK;
992                 int count = pga[i]->count > nob ? nob : pga[i]->count;
993
994                 /* corrupt the data before we compute the checksum, to
995                  * simulate an OST->client data error */
996                 if (i == 0 && opc == OST_READ &&
997                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
998                         memcpy(ptr + off, "bad1", min(4, nob));
999                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1000                 cfs_kunmap(pga[i]->pg);
1001                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1002                                off, cksum);
1003
1004                 nob -= pga[i]->count;
1005                 pg_count--;
1006                 i++;
1007         }
1008         /* For sending we only compute the wrong checksum instead
1009          * of corrupting the data so it is still correct on a redo */
1010         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1011                 cksum++;
1012
1013         return cksum;
1014 }
1015
1016 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1017                                 struct lov_stripe_md *lsm, obd_count page_count,
1018                                 struct brw_page **pga, 
1019                                 struct ptlrpc_request **reqp,
1020                                 struct obd_capa *ocapa)
1021 {
1022         struct ptlrpc_request   *req;
1023         struct ptlrpc_bulk_desc *desc;
1024         struct ost_body         *body;
1025         struct obd_ioobj        *ioobj;
1026         struct niobuf_remote    *niobuf;
1027         int niocount, i, requested_nob, opc, rc;
1028         struct osc_brw_async_args *aa;
1029         struct req_capsule      *pill;
1030
1031         ENTRY;
1032         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1033                 RETURN(-ENOMEM); /* Recoverable */
1034         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1035                 RETURN(-EINVAL); /* Fatal */
1036
1037         if ((cmd & OBD_BRW_WRITE) != 0) {
1038                 opc = OST_WRITE;
1039                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1040                                                 cli->cl_import->imp_rq_pool,
1041                                                 &RQF_OST_BRW);
1042         } else {
1043                 opc = OST_READ;
1044                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1045         }
1046
1047         if (req == NULL)
1048                 RETURN(-ENOMEM);
1049
1050         for (niocount = i = 1; i < page_count; i++) {
1051                 if (!can_merge_pages(pga[i - 1], pga[i]))
1052                         niocount++;
1053         }
1054
1055         pill = &req->rq_pill;
1056         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1057                              niocount * sizeof(*niobuf));
1058         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1059
1060         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1061         if (rc) {
1062                 ptlrpc_request_free(req);
1063                 RETURN(rc);
1064         }
1065         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1066
1067         if (opc == OST_WRITE)
1068                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1069                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1070         else
1071                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1072                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1073
1074         if (desc == NULL)
1075                 GOTO(out, rc = -ENOMEM);
1076         /* NB request now owns desc and will free it when it gets freed */
1077
1078         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1079         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1080         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1081         LASSERT(body && ioobj && niobuf);
1082
1083         body->oa = *oa;
1084
1085         obdo_to_ioobj(oa, ioobj);
1086         ioobj->ioo_bufcnt = niocount;
1087         osc_pack_capa(req, body, ocapa);
1088         LASSERT (page_count > 0);
1089         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1090                 struct brw_page *pg = pga[i];
1091                 struct brw_page *pg_prev = pga[i - 1];
1092
1093                 LASSERT(pg->count > 0);
1094                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1095                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1096                          pg->off, pg->count);
1097 #ifdef __linux__
1098                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1099                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1100                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1101                          i, page_count,
1102                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1103                          pg_prev->pg, page_private(pg_prev->pg),
1104                          pg_prev->pg->index, pg_prev->off);
1105 #else
1106                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1107                          "i %d p_c %u\n", i, page_count);
1108 #endif
1109                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1110                         (pg->flag & OBD_BRW_SRVLOCK));
1111
1112                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1113                                       pg->count);
1114                 requested_nob += pg->count;
1115
1116                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1117                         niobuf--;
1118                         niobuf->len += pg->count;
1119                 } else {
1120                         niobuf->offset = pg->off;
1121                         niobuf->len    = pg->count;
1122                         niobuf->flags  = pg->flag;
1123                 }
1124         }
1125
1126         LASSERT((void *)(niobuf - niocount) ==
1127                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1128                                niocount * sizeof(*niobuf)));
1129         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1130
1131         /* size[REQ_REC_OFF] still sizeof (*body) */
1132         if (opc == OST_WRITE) {
1133                 if (unlikely(cli->cl_checksum) &&
1134                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1135                         /* store cl_cksum_type in a local variable since
1136                          * it can be changed via lprocfs */
1137                         cksum_type_t cksum_type = cli->cl_cksum_type;
1138
1139                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1140                                 oa->o_flags = body->oa.o_flags = 0;
1141                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1142                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1143                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1144                                                              page_count, pga,
1145                                                              OST_WRITE,
1146                                                              cksum_type);
1147                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1148                                body->oa.o_cksum);
1149                         /* save this in 'oa', too, for later checking */
1150                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1151                         oa->o_flags |= cksum_type_pack(cksum_type);
1152                 } else {
1153                         /* clear out the checksum flag, in case this is a
1154                          * resend but cl_checksum is no longer set. b=11238 */
1155                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1156                 }
1157                 oa->o_cksum = body->oa.o_cksum;
1158                 /* 1 RC per niobuf */
1159                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1160                                      sizeof(__u32) * niocount);
1161         } else {
1162                 if (unlikely(cli->cl_checksum) &&
1163                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1164                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1165                                 body->oa.o_flags = 0;
1166                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1167                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1168                 }
1169                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1170                 /* 1 RC for the whole I/O */
1171         }
1172         ptlrpc_request_set_replen(req);
1173
1174         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1175         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1176         aa->aa_oa = oa;
1177         aa->aa_requested_nob = requested_nob;
1178         aa->aa_nio_count = niocount;
1179         aa->aa_page_count = page_count;
1180         aa->aa_resends = 0;
1181         aa->aa_ppga = pga;
1182         aa->aa_cli = cli;
1183         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1184
1185         *reqp = req;
1186         RETURN(0);
1187
1188  out:
1189         ptlrpc_req_finished(req);
1190         RETURN(rc);
1191 }
1192
1193 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1194                                 __u32 client_cksum, __u32 server_cksum, int nob,
1195                                 obd_count page_count, struct brw_page **pga,
1196                                 cksum_type_t client_cksum_type)
1197 {
1198         __u32 new_cksum;
1199         char *msg;
1200         cksum_type_t cksum_type;
1201
1202         if (server_cksum == client_cksum) {
1203                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1204                 return 0;
1205         }
1206
1207         if (oa->o_valid & OBD_MD_FLFLAGS)
1208                 cksum_type = cksum_type_unpack(oa->o_flags);
1209         else
1210                 cksum_type = OBD_CKSUM_CRC32;
1211
1212         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1213                                       cksum_type);
1214
1215         if (cksum_type != client_cksum_type)
1216                 msg = "the server did not use the checksum type specified in "
1217                       "the original request - likely a protocol problem";
1218         else if (new_cksum == server_cksum)
1219                 msg = "changed on the client after we checksummed it - "
1220                       "likely false positive due to mmap IO (bug 11742)";
1221         else if (new_cksum == client_cksum)
1222                 msg = "changed in transit before arrival at OST";
1223         else
1224                 msg = "changed in transit AND doesn't match the original - "
1225                       "likely false positive due to mmap IO (bug 11742)";
1226
1227         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1228                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1229                            "["LPU64"-"LPU64"]\n",
1230                            msg, libcfs_nid2str(peer->nid),
1231                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1232                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1233                                                         (__u64)0,
1234                            oa->o_id,
1235                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1236                            pga[0]->off,
1237                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1238         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1239                "client csum now %x\n", client_cksum, client_cksum_type,
1240                server_cksum, cksum_type, new_cksum);
1241         return 1;        
1242 }
1243
1244 /* Note rc enters this function as number of bytes transferred */
1245 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1246 {
1247         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1248         const lnet_process_id_t *peer =
1249                         &req->rq_import->imp_connection->c_peer;
1250         struct client_obd *cli = aa->aa_cli;
1251         struct ost_body *body;
1252         __u32 client_cksum = 0;
1253         ENTRY;
1254
1255         if (rc < 0 && rc != -EDQUOT)
1256                 RETURN(rc);
1257
1258         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1259         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1260                                   lustre_swab_ost_body);
1261         if (body == NULL) {
1262                 CDEBUG(D_INFO, "Can't unpack body\n");
1263                 RETURN(-EPROTO);
1264         }
1265
1266         /* set/clear over quota flag for a uid/gid */
1267         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1268             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1269                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1270                              body->oa.o_gid, body->oa.o_valid,
1271                              body->oa.o_flags);
1272
1273         if (rc < 0)
1274                 RETURN(rc);
1275
1276         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1277                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1278
1279         osc_update_grant(cli, body);
1280
1281         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1282                 if (rc > 0) {
1283                         CERROR("Unexpected +ve rc %d\n", rc);
1284                         RETURN(-EPROTO);
1285                 }
1286                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1287
1288                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1289                     check_write_checksum(&body->oa, peer, client_cksum,
1290                                          body->oa.o_cksum, aa->aa_requested_nob,
1291                                          aa->aa_page_count, aa->aa_ppga,
1292                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1293                         RETURN(-EAGAIN);
1294
1295                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1296                         RETURN(-EAGAIN);
1297
1298                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1299                                      aa->aa_page_count, aa->aa_ppga);
1300                 GOTO(out, rc);
1301         }
1302
1303         /* The rest of this function executes only for OST_READs */
1304         if (rc > aa->aa_requested_nob) {
1305                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1306                        aa->aa_requested_nob);
1307                 RETURN(-EPROTO);
1308         }
1309
1310         if (rc != req->rq_bulk->bd_nob_transferred) {
1311                 CERROR ("Unexpected rc %d (%d transferred)\n",
1312                         rc, req->rq_bulk->bd_nob_transferred);
1313                 return (-EPROTO);
1314         }
1315
1316         if (rc < aa->aa_requested_nob)
1317                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1318
1319         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1320                                          aa->aa_ppga))
1321                 GOTO(out, rc = -EAGAIN);
1322
1323         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1324                 static int cksum_counter;
1325                 __u32      server_cksum = body->oa.o_cksum;
1326                 char      *via;
1327                 char      *router;
1328                 cksum_type_t cksum_type;
1329
1330                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1331                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1332                 else
1333                         cksum_type = OBD_CKSUM_CRC32;
1334                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1335                                                  aa->aa_ppga, OST_READ,
1336                                                  cksum_type);
1337
1338                 if (peer->nid == req->rq_bulk->bd_sender) {
1339                         via = router = "";
1340                 } else {
1341                         via = " via ";
1342                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1343                 }
1344
1345                 if (server_cksum == ~0 && rc > 0) {
1346                         CERROR("Protocol error: server %s set the 'checksum' "
1347                                "bit, but didn't send a checksum.  Not fatal, "
1348                                "but please tell CFS.\n",
1349                                libcfs_nid2str(peer->nid));
1350                 } else if (server_cksum != client_cksum) {
1351                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1352                                            "%s%s%s inum "LPU64"/"LPU64" object "
1353                                            LPU64"/"LPU64" extent "
1354                                            "["LPU64"-"LPU64"]\n",
1355                                            req->rq_import->imp_obd->obd_name,
1356                                            libcfs_nid2str(peer->nid),
1357                                            via, router,
1358                                            body->oa.o_valid & OBD_MD_FLFID ?
1359                                                 body->oa.o_fid : (__u64)0,
1360                                            body->oa.o_valid & OBD_MD_FLFID ?
1361                                                 body->oa.o_generation :(__u64)0,
1362                                            body->oa.o_id,
1363                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1364                                                 body->oa.o_gr : (__u64)0,
1365                                            aa->aa_ppga[0]->off,
1366                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1367                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1368                                                                         1);
1369                         CERROR("client %x, server %x, cksum_type %x\n",
1370                                client_cksum, server_cksum, cksum_type);
1371                         cksum_counter = 0;
1372                         aa->aa_oa->o_cksum = client_cksum;
1373                         rc = -EAGAIN;
1374                 } else {
1375                         cksum_counter++;
1376                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1377                         rc = 0;
1378                 }
1379         } else if (unlikely(client_cksum)) {
1380                 static int cksum_missed;
1381
1382                 cksum_missed++;
1383                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1384                         CERROR("Checksum %u requested from %s but not sent\n",
1385                                cksum_missed, libcfs_nid2str(peer->nid));
1386         } else {
1387                 rc = 0;
1388         }
1389 out:
1390         if (rc >= 0)
1391                 *aa->aa_oa = body->oa;
1392
1393         RETURN(rc);
1394 }
1395
1396 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1397                             struct lov_stripe_md *lsm,
1398                             obd_count page_count, struct brw_page **pga,
1399                             struct obd_capa *ocapa)
1400 {
1401         struct ptlrpc_request *req;
1402         int                    rc;
1403         cfs_waitq_t            waitq;
1404         int                    resends = 0;
1405         struct l_wait_info     lwi;
1406
1407         ENTRY;
1408
1409         cfs_waitq_init(&waitq);
1410
1411 restart_bulk:
1412         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1413                                   page_count, pga, &req, ocapa);
1414         if (rc != 0)
1415                 return (rc);
1416
1417         rc = ptlrpc_queue_wait(req);
1418
1419         if (rc == -ETIMEDOUT && req->rq_resend) {
1420                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1421                 ptlrpc_req_finished(req);
1422                 goto restart_bulk;
1423         }
1424
1425         rc = osc_brw_fini_request(req, rc);
1426
1427         ptlrpc_req_finished(req);
1428         if (osc_recoverable_error(rc)) {
1429                 resends++;
1430                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1431                         CERROR("too many resend retries, returning error\n");
1432                         RETURN(-EIO);
1433                 }
1434
1435                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1436                 l_wait_event(waitq, 0, &lwi);
1437
1438                 goto restart_bulk;
1439         }
1440         
1441         RETURN (rc);
1442 }
1443
1444 int osc_brw_redo_request(struct ptlrpc_request *request,
1445                          struct osc_brw_async_args *aa)
1446 {
1447         struct ptlrpc_request *new_req;
1448         struct ptlrpc_request_set *set = request->rq_set;
1449         struct osc_brw_async_args *new_aa;
1450         struct osc_async_page *oap;
1451         int rc = 0;
1452         ENTRY;
1453
1454         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1455                 CERROR("too many resend retries, returning error\n");
1456                 RETURN(-EIO);
1457         }
1458
1459         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1460 /*
1461         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1462         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1463                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1464                                            REQ_REC_OFF + 3);
1465 */
1466         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1467                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1468                                   aa->aa_cli, aa->aa_oa,
1469                                   NULL /* lsm unused by osc currently */,
1470                                   aa->aa_page_count, aa->aa_ppga, 
1471                                   &new_req, NULL /* ocapa */);
1472         if (rc)
1473                 RETURN(rc);
1474
1475         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1476
1477         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1478                 if (oap->oap_request != NULL) {
1479                         LASSERTF(request == oap->oap_request,
1480                                  "request %p != oap_request %p\n",
1481                                  request, oap->oap_request);
1482                         if (oap->oap_interrupted) {
1483                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1484                                 ptlrpc_req_finished(new_req);
1485                                 RETURN(-EINTR);
1486                         }
1487                 }
1488         }
1489         /* New request takes over pga and oaps from old request.
1490          * Note that copying a list_head doesn't work, need to move it... */
1491         aa->aa_resends++;
1492         new_req->rq_interpret_reply = request->rq_interpret_reply;
1493         new_req->rq_async_args = request->rq_async_args;
1494         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1495
1496         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
1497
1498         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1499         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1500         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1501
1502         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1503                 if (oap->oap_request) {
1504                         ptlrpc_req_finished(oap->oap_request);
1505                         oap->oap_request = ptlrpc_request_addref(new_req);
1506                 }
1507         }
1508
1509         /* use ptlrpc_set_add_req is safe because interpret functions work 
1510          * in check_set context. only one way exist with access to request 
1511          * from different thread got -EINTR - this way protected with 
1512          * cl_loi_list_lock */
1513         ptlrpc_set_add_req(set, new_req);
1514
1515         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1516
1517         DEBUG_REQ(D_INFO, new_req, "new request");
1518         RETURN(0);
1519 }
1520
1521 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1522                           struct lov_stripe_md *lsm, obd_count page_count,
1523                           struct brw_page **pga, struct ptlrpc_request_set *set,
1524                           struct obd_capa *ocapa)
1525 {
1526         struct ptlrpc_request     *req;
1527         struct client_obd         *cli = &exp->exp_obd->u.cli;
1528         int                        rc, i;
1529         struct osc_brw_async_args *aa;
1530         ENTRY;
1531
1532         /* Consume write credits even if doing a sync write -
1533          * otherwise we may run out of space on OST due to grant. */
1534         if (cmd == OBD_BRW_WRITE) {
1535                 spin_lock(&cli->cl_loi_list_lock);
1536                 for (i = 0; i < page_count; i++) {
1537                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1538                                 osc_consume_write_grant(cli, pga[i]);
1539                 }
1540                 spin_unlock(&cli->cl_loi_list_lock);
1541         }
1542
1543         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1544                                   &req, ocapa);
1545
1546         aa = (struct osc_brw_async_args *)&req->rq_async_args;
1547         if (cmd == OBD_BRW_READ) {
1548                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1549                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1550                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1551         } else {
1552                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1553                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1554                                  cli->cl_w_in_flight);
1555                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1556         }
1557
1558         LASSERT(list_empty(&aa->aa_oaps));
1559         if (rc == 0) {
1560                 req->rq_interpret_reply = brw_interpret;
1561                 ptlrpc_set_add_req(set, req);
1562                 client_obd_list_lock(&cli->cl_loi_list_lock);
1563                 if (cmd == OBD_BRW_READ)
1564                         cli->cl_r_in_flight++;
1565                 else
1566                         cli->cl_w_in_flight++;
1567                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1568                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1569         } else if (cmd == OBD_BRW_WRITE) {
1570                 client_obd_list_lock(&cli->cl_loi_list_lock);
1571                 for (i = 0; i < page_count; i++)
1572                         osc_release_write_grant(cli, pga[i], 0);
1573                 osc_wake_cache_waiters(cli);
1574                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1575         }
1576         RETURN (rc);
1577 }
1578
1579 /*
1580  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1581  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1582  * fine for our small page arrays and doesn't require allocation.  its an
1583  * insertion sort that swaps elements that are strides apart, shrinking the
1584  * stride down until its '1' and the array is sorted.
1585  */
1586 static void sort_brw_pages(struct brw_page **array, int num)
1587 {
1588         int stride, i, j;
1589         struct brw_page *tmp;
1590
1591         if (num == 1)
1592                 return;
1593         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1594                 ;
1595
1596         do {
1597                 stride /= 3;
1598                 for (i = stride ; i < num ; i++) {
1599                         tmp = array[i];
1600                         j = i;
1601                         while (j >= stride && array[j - stride]->off > tmp->off) {
1602                                 array[j] = array[j - stride];
1603                                 j -= stride;
1604                         }
1605                         array[j] = tmp;
1606                 }
1607         } while (stride > 1);
1608 }
1609
1610 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1611 {
1612         int count = 1;
1613         int offset;
1614         int i = 0;
1615
1616         LASSERT (pages > 0);
1617         offset = pg[i]->off & ~CFS_PAGE_MASK;
1618
1619         for (;;) {
1620                 pages--;
1621                 if (pages == 0)         /* that's all */
1622                         return count;
1623
1624                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1625                         return count;   /* doesn't end on page boundary */
1626
1627                 i++;
1628                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1629                 if (offset != 0)        /* doesn't start on page boundary */
1630                         return count;
1631
1632                 count++;
1633         }
1634 }
1635
1636 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1637 {
1638         struct brw_page **ppga;
1639         int i;
1640
1641         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1642         if (ppga == NULL)
1643                 return NULL;
1644
1645         for (i = 0; i < count; i++)
1646                 ppga[i] = pga + i;
1647         return ppga;
1648 }
1649
1650 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1651 {
1652         LASSERT(ppga != NULL);
1653         OBD_FREE(ppga, sizeof(*ppga) * count);
1654 }
1655
1656 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1657                    obd_count page_count, struct brw_page *pga,
1658                    struct obd_trans_info *oti)
1659 {
1660         struct obdo *saved_oa = NULL;
1661         struct brw_page **ppga, **orig;
1662         struct obd_import *imp = class_exp2cliimp(exp);
1663         struct client_obd *cli = &imp->imp_obd->u.cli;
1664         int rc, page_count_orig;
1665         ENTRY;
1666
1667         if (cmd & OBD_BRW_CHECK) {
1668                 /* The caller just wants to know if there's a chance that this
1669                  * I/O can succeed */
1670
1671                 if (imp == NULL || imp->imp_invalid)
1672                         RETURN(-EIO);
1673                 RETURN(0);
1674         }
1675
1676         /* test_brw with a failed create can trip this, maybe others. */
1677         LASSERT(cli->cl_max_pages_per_rpc);
1678
1679         rc = 0;
1680
1681         orig = ppga = osc_build_ppga(pga, page_count);
1682         if (ppga == NULL)
1683                 RETURN(-ENOMEM);
1684         page_count_orig = page_count;
1685
1686         sort_brw_pages(ppga, page_count);
1687         while (page_count) {
1688                 obd_count pages_per_brw;
1689
1690                 if (page_count > cli->cl_max_pages_per_rpc)
1691                         pages_per_brw = cli->cl_max_pages_per_rpc;
1692                 else
1693                         pages_per_brw = page_count;
1694
1695                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1696
1697                 if (saved_oa != NULL) {
1698                         /* restore previously saved oa */
1699                         *oinfo->oi_oa = *saved_oa;
1700                 } else if (page_count > pages_per_brw) {
1701                         /* save a copy of oa (brw will clobber it) */
1702                         OBDO_ALLOC(saved_oa);
1703                         if (saved_oa == NULL)
1704                                 GOTO(out, rc = -ENOMEM);
1705                         *saved_oa = *oinfo->oi_oa;
1706                 }
1707
1708                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1709                                       pages_per_brw, ppga, oinfo->oi_capa);
1710
1711                 if (rc != 0)
1712                         break;
1713
1714                 page_count -= pages_per_brw;
1715                 ppga += pages_per_brw;
1716         }
1717
1718 out:
1719         osc_release_ppga(orig, page_count_orig);
1720
1721         if (saved_oa != NULL)
1722                 OBDO_FREE(saved_oa);
1723
1724         RETURN(rc);
1725 }
1726
1727 static int osc_brw_async(int cmd, struct obd_export *exp,
1728                          struct obd_info *oinfo, obd_count page_count,
1729                          struct brw_page *pga, struct obd_trans_info *oti,
1730                          struct ptlrpc_request_set *set)
1731 {
1732         struct brw_page **ppga, **orig;
1733         struct client_obd *cli = &exp->exp_obd->u.cli;
1734         int page_count_orig;
1735         int rc = 0;
1736         ENTRY;
1737
1738         if (cmd & OBD_BRW_CHECK) {
1739                 struct obd_import *imp = class_exp2cliimp(exp);
1740                 /* The caller just wants to know if there's a chance that this
1741                  * I/O can succeed */
1742
1743                 if (imp == NULL || imp->imp_invalid)
1744                         RETURN(-EIO);
1745                 RETURN(0);
1746         }
1747
1748         orig = ppga = osc_build_ppga(pga, page_count);
1749         if (ppga == NULL)
1750                 RETURN(-ENOMEM);
1751         page_count_orig = page_count;
1752
1753         sort_brw_pages(ppga, page_count);
1754         while (page_count) {
1755                 struct brw_page **copy;
1756                 obd_count pages_per_brw;
1757
1758                 pages_per_brw = min_t(obd_count, page_count,
1759                                       cli->cl_max_pages_per_rpc);
1760
1761                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1762
1763                 /* use ppga only if single RPC is going to fly */
1764                 if (pages_per_brw != page_count_orig || ppga != orig) {
1765                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1766                         if (copy == NULL)
1767                                 GOTO(out, rc = -ENOMEM);
1768                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1769                 } else
1770                         copy = ppga;
1771
1772                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1773                                     pages_per_brw, copy, set, oinfo->oi_capa);
1774
1775                 if (rc != 0) {
1776                         if (copy != ppga)
1777                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1778                         break;
1779                 }
1780                 if (copy == orig) {
1781                         /* we passed it to async_internal() which is
1782                          * now responsible for releasing memory */
1783                         orig = NULL;
1784                 }
1785
1786                 page_count -= pages_per_brw;
1787                 ppga += pages_per_brw;
1788         }
1789 out:
1790         if (orig)
1791                 osc_release_ppga(orig, page_count_orig);
1792         RETURN(rc);
1793 }
1794
1795 static void osc_check_rpcs(struct client_obd *cli);
1796
1797 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1798  * the dirty accounting.  Writeback completes or truncate happens before
1799  * writing starts.  Must be called with the loi lock held. */
1800 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1801                            int sent)
1802 {
1803         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1804 }
1805
1806
1807 /* This maintains the lists of pending pages to read/write for a given object
1808  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1809  * to quickly find objects that are ready to send an RPC. */
1810 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1811                          int cmd)
1812 {
1813         int optimal;
1814         ENTRY;
1815
1816         if (lop->lop_num_pending == 0)
1817                 RETURN(0);
1818
1819         /* if we have an invalid import we want to drain the queued pages
1820          * by forcing them through rpcs that immediately fail and complete
1821          * the pages.  recovery relies on this to empty the queued pages
1822          * before canceling the locks and evicting down the llite pages */
1823         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1824                 RETURN(1);
1825
1826         /* stream rpcs in queue order as long as as there is an urgent page
1827          * queued.  this is our cheap solution for good batching in the case
1828          * where writepage marks some random page in the middle of the file
1829          * as urgent because of, say, memory pressure */
1830         if (!list_empty(&lop->lop_urgent)) {
1831                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1832                 RETURN(1);
1833         }
1834         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1835         optimal = cli->cl_max_pages_per_rpc;
1836         if (cmd & OBD_BRW_WRITE) {
1837                 /* trigger a write rpc stream as long as there are dirtiers
1838                  * waiting for space.  as they're waiting, they're not going to
1839                  * create more pages to coallesce with what's waiting.. */
1840                 if (!list_empty(&cli->cl_cache_waiters)) {
1841                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1842                         RETURN(1);
1843                 }
1844                 /* +16 to avoid triggering rpcs that would want to include pages
1845                  * that are being queued but which can't be made ready until
1846                  * the queuer finishes with the page. this is a wart for
1847                  * llite::commit_write() */
1848                 optimal += 16;
1849         }
1850         if (lop->lop_num_pending >= optimal)
1851                 RETURN(1);
1852
1853         RETURN(0);
1854 }
1855
1856 static void on_list(struct list_head *item, struct list_head *list,
1857                     int should_be_on)
1858 {
1859         if (list_empty(item) && should_be_on)
1860                 list_add_tail(item, list);
1861         else if (!list_empty(item) && !should_be_on)
1862                 list_del_init(item);
1863 }
1864
1865 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1866  * can find pages to build into rpcs quickly */
1867 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1868 {
1869         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1870                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1871                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1872
1873         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1874                 loi->loi_write_lop.lop_num_pending);
1875
1876         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1877                 loi->loi_read_lop.lop_num_pending);
1878 }
1879
1880 static void lop_update_pending(struct client_obd *cli,
1881                                struct loi_oap_pages *lop, int cmd, int delta)
1882 {
1883         lop->lop_num_pending += delta;
1884         if (cmd & OBD_BRW_WRITE)
1885                 cli->cl_pending_w_pages += delta;
1886         else
1887                 cli->cl_pending_r_pages += delta;
1888 }
1889
1890 /* this is called when a sync waiter receives an interruption.  Its job is to
1891  * get the caller woken as soon as possible.  If its page hasn't been put in an
1892  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1893  * desiring interruption which will forcefully complete the rpc once the rpc
1894  * has timed out */
1895 static void osc_occ_interrupted(struct oig_callback_context *occ)
1896 {
1897         struct osc_async_page *oap;
1898         struct loi_oap_pages *lop;
1899         struct lov_oinfo *loi;
1900         ENTRY;
1901
1902         /* XXX member_of() */
1903         oap = list_entry(occ, struct osc_async_page, oap_occ);
1904
1905         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1906
1907         oap->oap_interrupted = 1;
1908
1909         /* ok, it's been put in an rpc. only one oap gets a request reference */
1910         if (oap->oap_request != NULL) {
1911                 ptlrpc_mark_interrupted(oap->oap_request);
1912                 ptlrpcd_wake(oap->oap_request);
1913                 GOTO(unlock, 0);
1914         }
1915
1916         /* we don't get interruption callbacks until osc_trigger_group_io()
1917          * has been called and put the sync oaps in the pending/urgent lists.*/
1918         if (!list_empty(&oap->oap_pending_item)) {
1919                 list_del_init(&oap->oap_pending_item);
1920                 list_del_init(&oap->oap_urgent_item);
1921
1922                 loi = oap->oap_loi;
1923                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1924                         &loi->loi_write_lop : &loi->loi_read_lop;
1925                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1926                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1927
1928                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1929                 oap->oap_oig = NULL;
1930         }
1931
1932 unlock:
1933         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1934 }
1935
1936 /* this is trying to propogate async writeback errors back up to the
1937  * application.  As an async write fails we record the error code for later if
1938  * the app does an fsync.  As long as errors persist we force future rpcs to be
1939  * sync so that the app can get a sync error and break the cycle of queueing
1940  * pages for which writeback will fail. */
1941 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1942                            int rc)
1943 {
1944         if (rc) {
1945                 if (!ar->ar_rc)
1946                         ar->ar_rc = rc;
1947
1948                 ar->ar_force_sync = 1;
1949                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1950                 return;
1951
1952         }
1953
1954         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1955                 ar->ar_force_sync = 0;
1956 }
1957
1958 static void osc_oap_to_pending(struct osc_async_page *oap)
1959 {
1960         struct loi_oap_pages *lop;
1961
1962         if (oap->oap_cmd & OBD_BRW_WRITE)
1963                 lop = &oap->oap_loi->loi_write_lop;
1964         else
1965                 lop = &oap->oap_loi->loi_read_lop;
1966
1967         if (oap->oap_async_flags & ASYNC_URGENT)
1968                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1969         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1970         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1971 }
1972
1973 /* this must be called holding the loi list lock to give coverage to exit_cache,
1974  * async_flag maintenance, and oap_request */
1975 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1976                               struct osc_async_page *oap, int sent, int rc)
1977 {
1978         __u64 xid = 0;
1979
1980         ENTRY;
1981         if (oap->oap_request != NULL) {
1982                 xid = ptlrpc_req_xid(oap->oap_request);
1983                 ptlrpc_req_finished(oap->oap_request);
1984                 oap->oap_request = NULL;
1985         }
1986
1987         oap->oap_async_flags = 0;
1988         oap->oap_interrupted = 0;
1989
1990         if (oap->oap_cmd & OBD_BRW_WRITE) {
1991                 osc_process_ar(&cli->cl_ar, xid, rc);
1992                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1993         }
1994
1995         if (rc == 0 && oa != NULL) {
1996                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1997                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1998                 if (oa->o_valid & OBD_MD_FLMTIME)
1999                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2000                 if (oa->o_valid & OBD_MD_FLATIME)
2001                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2002                 if (oa->o_valid & OBD_MD_FLCTIME)
2003                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2004         }
2005
2006         if (oap->oap_oig) {
2007                 osc_exit_cache(cli, oap, sent);
2008                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2009                 oap->oap_oig = NULL;
2010                 EXIT;
2011                 return;
2012         }
2013
2014         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2015                                                 oap->oap_cmd, oa, rc);
2016
2017         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2018          * I/O on the page could start, but OSC calls it under lock
2019          * and thus we can add oap back to pending safely */
2020         if (rc)
2021                 /* upper layer wants to leave the page on pending queue */
2022                 osc_oap_to_pending(oap);
2023         else
2024                 osc_exit_cache(cli, oap, sent);
2025         EXIT;
2026 }
2027
2028 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2029 {
2030         struct osc_brw_async_args *aa = data;
2031         struct client_obd *cli;
2032         ENTRY;
2033
2034         rc = osc_brw_fini_request(req, rc);
2035         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2036         if (osc_recoverable_error(rc)) {
2037                 rc = osc_brw_redo_request(req, aa);
2038                 if (rc == 0)
2039                         RETURN(0);
2040         }
2041
2042         cli = aa->aa_cli;
2043
2044         client_obd_list_lock(&cli->cl_loi_list_lock);
2045
2046         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2047          * is called so we know whether to go to sync BRWs or wait for more
2048          * RPCs to complete */
2049         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2050                 cli->cl_w_in_flight--;
2051         else
2052                 cli->cl_r_in_flight--;
2053
2054         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2055                 struct osc_async_page *oap, *tmp;
2056                 /* the caller may re-use the oap after the completion call so
2057                  * we need to clean it up a little */
2058                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2059                         list_del_init(&oap->oap_rpc_item);
2060                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2061                 }
2062                 OBDO_FREE(aa->aa_oa);
2063         } else { /* from async_internal() */
2064                 int i;
2065                 for (i = 0; i < aa->aa_page_count; i++)
2066                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2067         }
2068         osc_wake_cache_waiters(cli);
2069         osc_check_rpcs(cli);
2070         client_obd_list_unlock(&cli->cl_loi_list_lock);
2071
2072         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2073         RETURN(rc);
2074 }
2075
2076 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2077                                             struct list_head *rpc_list,
2078                                             int page_count, int cmd)
2079 {
2080         struct ptlrpc_request *req;
2081         struct brw_page **pga = NULL;
2082         struct osc_brw_async_args *aa;
2083         struct obdo *oa = NULL;
2084         struct obd_async_page_ops *ops = NULL;
2085         void *caller_data = NULL;
2086         struct obd_capa *ocapa;
2087         struct osc_async_page *oap;
2088         int i, rc;
2089
2090         ENTRY;
2091         LASSERT(!list_empty(rpc_list));
2092
2093         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2094         if (pga == NULL)
2095                 RETURN(ERR_PTR(-ENOMEM));
2096
2097         OBDO_ALLOC(oa);
2098         if (oa == NULL)
2099                 GOTO(out, req = ERR_PTR(-ENOMEM));
2100
2101         i = 0;
2102         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2103                 if (ops == NULL) {
2104                         ops = oap->oap_caller_ops;
2105                         caller_data = oap->oap_caller_data;
2106                 }
2107                 pga[i] = &oap->oap_brw_page;
2108                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2109                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2110                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2111                 i++;
2112         }
2113
2114         /* always get the data for the obdo for the rpc */
2115         LASSERT(ops != NULL);
2116         ops->ap_fill_obdo(caller_data, cmd, oa);
2117         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2118
2119         sort_brw_pages(pga, page_count);
2120         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2121                                   pga, &req, ocapa);
2122         capa_put(ocapa);
2123         if (rc != 0) {
2124                 CERROR("prep_req failed: %d\n", rc);
2125                 GOTO(out, req = ERR_PTR(rc));
2126         }
2127
2128         /* Need to update the timestamps after the request is built in case
2129          * we race with setattr (locally or in queue at OST).  If OST gets
2130          * later setattr before earlier BRW (as determined by the request xid),
2131          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2132          * way to do this in a single call.  bug 10150 */
2133         ops->ap_update_obdo(caller_data, cmd, oa,
2134                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2135
2136         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2137         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2138         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2139         list_splice(rpc_list, &aa->aa_oaps);
2140         CFS_INIT_LIST_HEAD(rpc_list);
2141
2142 out:
2143         if (IS_ERR(req)) {
2144                 if (oa)
2145                         OBDO_FREE(oa);
2146                 if (pga)
2147                         OBD_FREE(pga, sizeof(*pga) * page_count);
2148         }
2149         RETURN(req);
2150 }
2151
2152 /* the loi lock is held across this function but it's allowed to release
2153  * and reacquire it during its work */
2154 /**
2155  * prepare pages for ASYNC io and put pages in send queue.
2156  *
2157  * \param cli -
2158  * \param loi -
2159  * \param cmd - OBD_BRW_* macroses
2160  * \param lop - pending pages
2161  *
2162  * \return zero if pages successfully add to send queue.
2163  * \return not zere if error occurring.
2164  */
2165 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2166                             int cmd, struct loi_oap_pages *lop)
2167 {
2168         struct ptlrpc_request *req;
2169         obd_count page_count = 0;
2170         struct osc_async_page *oap = NULL, *tmp;
2171         struct osc_brw_async_args *aa;
2172         struct obd_async_page_ops *ops;
2173         CFS_LIST_HEAD(rpc_list);
2174         unsigned int ending_offset;
2175         unsigned  starting_offset = 0;
2176         int srvlock = 0;
2177         ENTRY;
2178
2179         /* first we find the pages we're allowed to work with */
2180         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2181                                  oap_pending_item) {
2182                 ops = oap->oap_caller_ops;
2183
2184                 LASSERT(oap->oap_magic == OAP_MAGIC);
2185
2186                 if (page_count != 0 &&
2187                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2188                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2189                                " oap %p, page %p, srvlock %u\n",
2190                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2191                         break;
2192                 }
2193                 /* in llite being 'ready' equates to the page being locked
2194                  * until completion unlocks it.  commit_write submits a page
2195                  * as not ready because its unlock will happen unconditionally
2196                  * as the call returns.  if we race with commit_write giving
2197                  * us that page we dont' want to create a hole in the page
2198                  * stream, so we stop and leave the rpc to be fired by
2199                  * another dirtier or kupdated interval (the not ready page
2200                  * will still be on the dirty list).  we could call in
2201                  * at the end of ll_file_write to process the queue again. */
2202                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2203                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2204                         if (rc < 0)
2205                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2206                                                 "instead of ready\n", oap,
2207                                                 oap->oap_page, rc);
2208                         switch (rc) {
2209                         case -EAGAIN:
2210                                 /* llite is telling us that the page is still
2211                                  * in commit_write and that we should try
2212                                  * and put it in an rpc again later.  we
2213                                  * break out of the loop so we don't create
2214                                  * a hole in the sequence of pages in the rpc
2215                                  * stream.*/
2216                                 oap = NULL;
2217                                 break;
2218                         case -EINTR:
2219                                 /* the io isn't needed.. tell the checks
2220                                  * below to complete the rpc with EINTR */
2221                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2222                                 oap->oap_count = -EINTR;
2223                                 break;
2224                         case 0:
2225                                 oap->oap_async_flags |= ASYNC_READY;
2226                                 break;
2227                         default:
2228                                 LASSERTF(0, "oap %p page %p returned %d "
2229                                             "from make_ready\n", oap,
2230                                             oap->oap_page, rc);
2231                                 break;
2232                         }
2233                 }
2234                 if (oap == NULL)
2235                         break;
2236                 /*
2237                  * Page submitted for IO has to be locked. Either by
2238                  * ->ap_make_ready() or by higher layers.
2239                  */
2240 #if defined(__KERNEL__) && defined(__linux__)
2241                  if(!(PageLocked(oap->oap_page) &&
2242                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2243                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2244                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2245                         LBUG();
2246                 }
2247 #endif
2248                 /* If there is a gap at the start of this page, it can't merge
2249                  * with any previous page, so we'll hand the network a
2250                  * "fragmented" page array that it can't transfer in 1 RDMA */
2251                 if (page_count != 0 && oap->oap_page_off != 0)
2252                         break;
2253
2254                 /* take the page out of our book-keeping */
2255                 list_del_init(&oap->oap_pending_item);
2256                 lop_update_pending(cli, lop, cmd, -1);
2257                 list_del_init(&oap->oap_urgent_item);
2258
2259                 if (page_count == 0)
2260                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2261                                           (PTLRPC_MAX_BRW_SIZE - 1);
2262
2263                 /* ask the caller for the size of the io as the rpc leaves. */
2264                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2265                         oap->oap_count =
2266                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2267                 if (oap->oap_count <= 0) {
2268                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2269                                oap->oap_count);
2270                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2271                         continue;
2272                 }
2273
2274                 /* now put the page back in our accounting */
2275                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2276                 if (page_count == 0)
2277                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2278                 if (++page_count >= cli->cl_max_pages_per_rpc)
2279                         break;
2280
2281                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2282                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2283                  * have the same alignment as the initial writes that allocated
2284                  * extents on the server. */
2285                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2286                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2287                 if (ending_offset == 0)
2288                         break;
2289
2290                 /* If there is a gap at the end of this page, it can't merge
2291                  * with any subsequent pages, so we'll hand the network a
2292                  * "fragmented" page array that it can't transfer in 1 RDMA */
2293                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2294                         break;
2295         }
2296
2297         osc_wake_cache_waiters(cli);
2298
2299         if (page_count == 0)
2300                 RETURN(0);
2301
2302         loi_list_maint(cli, loi);
2303
2304         client_obd_list_unlock(&cli->cl_loi_list_lock);
2305
2306         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2307         if (IS_ERR(req)) {
2308                 /* this should happen rarely and is pretty bad, it makes the
2309                  * pending list not follow the dirty order */
2310                 client_obd_list_lock(&cli->cl_loi_list_lock);
2311                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2312                         list_del_init(&oap->oap_rpc_item);
2313
2314                         /* queued sync pages can be torn down while the pages
2315                          * were between the pending list and the rpc */
2316                         if (oap->oap_interrupted) {
2317                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2318                                 osc_ap_completion(cli, NULL, oap, 0,
2319                                                   oap->oap_count);
2320                                 continue;
2321                         }
2322                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2323                 }
2324                 loi_list_maint(cli, loi);
2325                 RETURN(PTR_ERR(req));
2326         }
2327
2328         aa = (struct osc_brw_async_args *)&req->rq_async_args;
2329
2330         if (cmd == OBD_BRW_READ) {
2331                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2332                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2333                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2334                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2335                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2336         } else {
2337                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2338                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2339                                  cli->cl_w_in_flight);
2340                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2341                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2342                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2343         }
2344
2345         client_obd_list_lock(&cli->cl_loi_list_lock);
2346
2347         if (cmd == OBD_BRW_READ)
2348                 cli->cl_r_in_flight++;
2349         else
2350                 cli->cl_w_in_flight++;
2351
2352         /* queued sync pages can be torn down while the pages
2353          * were between the pending list and the rpc */
2354         tmp = NULL;
2355         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2356                 /* only one oap gets a request reference */
2357                 if (tmp == NULL)
2358                         tmp = oap;
2359                 if (oap->oap_interrupted && !req->rq_intr) {
2360                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2361                                oap, req);
2362                         ptlrpc_mark_interrupted(req);
2363                 }
2364         }
2365         if (tmp != NULL)
2366                 tmp->oap_request = ptlrpc_request_addref(req);
2367
2368         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2369                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2370
2371         req->rq_interpret_reply = brw_interpret;
2372         ptlrpcd_add_req(req);
2373         RETURN(1);
2374 }
2375
2376 #define LOI_DEBUG(LOI, STR, args...)                                     \
2377         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2378                !list_empty(&(LOI)->loi_cli_item),                        \
2379                (LOI)->loi_write_lop.lop_num_pending,                     \
2380                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2381                (LOI)->loi_read_lop.lop_num_pending,                      \
2382                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2383                args)                                                     \
2384
2385 /* This is called by osc_check_rpcs() to find which objects have pages that
2386  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2387 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2388 {
2389         ENTRY;
2390         /* first return all objects which we already know to have
2391          * pages ready to be stuffed into rpcs */
2392         if (!list_empty(&cli->cl_loi_ready_list))
2393                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2394                                   struct lov_oinfo, loi_cli_item));
2395
2396         /* then if we have cache waiters, return all objects with queued
2397          * writes.  This is especially important when many small files
2398          * have filled up the cache and not been fired into rpcs because
2399          * they don't pass the nr_pending/object threshhold */
2400         if (!list_empty(&cli->cl_cache_waiters) &&
2401             !list_empty(&cli->cl_loi_write_list))
2402                 RETURN(list_entry(cli->cl_loi_write_list.next,
2403                                   struct lov_oinfo, loi_write_item));
2404
2405         /* then return all queued objects when we have an invalid import
2406          * so that they get flushed */
2407         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2408                 if (!list_empty(&cli->cl_loi_write_list))
2409                         RETURN(list_entry(cli->cl_loi_write_list.next,
2410                                           struct lov_oinfo, loi_write_item));
2411                 if (!list_empty(&cli->cl_loi_read_list))
2412                         RETURN(list_entry(cli->cl_loi_read_list.next,
2413                                           struct lov_oinfo, loi_read_item));
2414         }
2415         RETURN(NULL);
2416 }
2417
2418 /* called with the loi list lock held */
2419 static void osc_check_rpcs(struct client_obd *cli)
2420 {
2421         struct lov_oinfo *loi;
2422         int rc = 0, race_counter = 0;
2423         ENTRY;
2424
2425         while ((loi = osc_next_loi(cli)) != NULL) {
2426                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2427
2428                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2429                         break;
2430
2431                 /* attempt some read/write balancing by alternating between
2432                  * reads and writes in an object.  The makes_rpc checks here
2433                  * would be redundant if we were getting read/write work items
2434                  * instead of objects.  we don't want send_oap_rpc to drain a
2435                  * partial read pending queue when we're given this object to
2436                  * do io on writes while there are cache waiters */
2437                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2438                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2439                                               &loi->loi_write_lop);
2440                         if (rc < 0)
2441                                 break;
2442                         if (rc > 0)
2443                                 race_counter = 0;
2444                         else
2445                                 race_counter++;
2446                 }
2447                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2448                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2449                                               &loi->loi_read_lop);
2450                         if (rc < 0)
2451                                 break;
2452                         if (rc > 0)
2453                                 race_counter = 0;
2454                         else
2455                                 race_counter++;
2456                 }
2457
2458                 /* attempt some inter-object balancing by issueing rpcs
2459                  * for each object in turn */
2460                 if (!list_empty(&loi->loi_cli_item))
2461                         list_del_init(&loi->loi_cli_item);
2462                 if (!list_empty(&loi->loi_write_item))
2463                         list_del_init(&loi->loi_write_item);
2464                 if (!list_empty(&loi->loi_read_item))
2465                         list_del_init(&loi->loi_read_item);
2466
2467                 loi_list_maint(cli, loi);
2468
2469                 /* send_oap_rpc fails with 0 when make_ready tells it to
2470                  * back off.  llite's make_ready does this when it tries
2471                  * to lock a page queued for write that is already locked.
2472                  * we want to try sending rpcs from many objects, but we
2473                  * don't want to spin failing with 0.  */
2474                 if (race_counter == 10)
2475                         break;
2476         }
2477         EXIT;
2478 }
2479
2480 /* we're trying to queue a page in the osc so we're subject to the
2481  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2482  * If the osc's queued pages are already at that limit, then we want to sleep
2483  * until there is space in the osc's queue for us.  We also may be waiting for
2484  * write credits from the OST if there are RPCs in flight that may return some
2485  * before we fall back to sync writes.
2486  *
2487  * We need this know our allocation was granted in the presence of signals */
2488 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2489 {
2490         int rc;
2491         ENTRY;
2492         client_obd_list_lock(&cli->cl_loi_list_lock);
2493         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2494         client_obd_list_unlock(&cli->cl_loi_list_lock);
2495         RETURN(rc);
2496 };
2497
2498 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2499  * grant or cache space. */
2500 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2501                            struct osc_async_page *oap)
2502 {
2503         struct osc_cache_waiter ocw;
2504         struct l_wait_info lwi = { 0 };
2505
2506         ENTRY;
2507
2508         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2509                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2510                cli->cl_dirty_max, obd_max_dirty_pages,
2511                cli->cl_lost_grant, cli->cl_avail_grant);
2512
2513         /* force the caller to try sync io.  this can jump the list
2514          * of queued writes and create a discontiguous rpc stream */
2515         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2516             loi->loi_ar.ar_force_sync)
2517                 RETURN(-EDQUOT);
2518
2519         /* Hopefully normal case - cache space and write credits available */
2520         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2521             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2522             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2523                 /* account for ourselves */
2524                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2525                 RETURN(0);
2526         }
2527
2528         /* Make sure that there are write rpcs in flight to wait for.  This
2529          * is a little silly as this object may not have any pending but
2530          * other objects sure might. */
2531         if (cli->cl_w_in_flight) {
2532                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2533                 cfs_waitq_init(&ocw.ocw_waitq);
2534                 ocw.ocw_oap = oap;
2535                 ocw.ocw_rc = 0;
2536
2537                 loi_list_maint(cli, loi);
2538                 osc_check_rpcs(cli);
2539                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2540
2541                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2542                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2543
2544                 client_obd_list_lock(&cli->cl_loi_list_lock);
2545                 if (!list_empty(&ocw.ocw_entry)) {
2546                         list_del(&ocw.ocw_entry);
2547                         RETURN(-EINTR);
2548                 }
2549                 RETURN(ocw.ocw_rc);
2550         }
2551
2552         RETURN(-EDQUOT);
2553 }
2554
2555 static int osc_reget_short_lock(struct obd_export *exp,
2556                                 struct lov_stripe_md *lsm,
2557                                 void **res, int rw,
2558                                 obd_off start, obd_off end,
2559                                 void **cookie)
2560 {
2561         struct osc_async_page *oap = *res;
2562         int rc;
2563
2564         ENTRY;
2565
2566         spin_lock(&oap->oap_lock);
2567         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2568                                   start, end, cookie);
2569         spin_unlock(&oap->oap_lock);
2570
2571         RETURN(rc);
2572 }
2573
2574 static int osc_release_short_lock(struct obd_export *exp,
2575                                   struct lov_stripe_md *lsm, obd_off end,
2576                                   void *cookie, int rw)
2577 {
2578         ENTRY;
2579         ldlm_lock_fast_release(cookie, rw);
2580         /* no error could have happened at this layer */
2581         RETURN(0);
2582 }
2583
2584 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2585                         struct lov_oinfo *loi, cfs_page_t *page,
2586                         obd_off offset, struct obd_async_page_ops *ops,
2587                         void *data, void **res, int nocache,
2588                         struct lustre_handle *lockh)
2589 {
2590         struct osc_async_page *oap;
2591         struct ldlm_res_id oid = {{0}};
2592         int rc = 0;
2593         ENTRY;
2594
2595         if (!page)
2596                 return size_round(sizeof(*oap));
2597
2598         oap = *res;
2599         oap->oap_magic = OAP_MAGIC;
2600         oap->oap_cli = &exp->exp_obd->u.cli;
2601         oap->oap_loi = loi;
2602
2603         oap->oap_caller_ops = ops;
2604         oap->oap_caller_data = data;
2605
2606         oap->oap_page = page;
2607         oap->oap_obj_off = offset;
2608
2609         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2610         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2611         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2612         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2613
2614         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2615
2616         spin_lock_init(&oap->oap_lock);
2617
2618         /* If the page was marked as notcacheable - don't add to any locks */ 
2619         if (!nocache) {
2620                 oid.name[0] = loi->loi_id;
2621                 oid.name[2] = loi->loi_gr;
2622                 /* This is the only place where we can call cache_add_extent
2623                    without oap_lock, because this page is locked now, and
2624                    the lock we are adding it to is referenced, so cannot lose
2625                    any pages either. */
2626                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2627                 if (rc)
2628                         RETURN(rc);
2629         }
2630
2631         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2632         RETURN(0);
2633 }
2634
2635 struct osc_async_page *oap_from_cookie(void *cookie)
2636 {
2637         struct osc_async_page *oap = cookie;
2638         if (oap->oap_magic != OAP_MAGIC)
2639                 return ERR_PTR(-EINVAL);
2640         return oap;
2641 };
2642
2643 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2644                               struct lov_oinfo *loi, void *cookie,
2645                               int cmd, obd_off off, int count,
2646                               obd_flag brw_flags, enum async_flags async_flags)
2647 {
2648         struct client_obd *cli = &exp->exp_obd->u.cli;
2649         struct osc_async_page *oap;
2650         int rc = 0;
2651         ENTRY;
2652
2653         oap = oap_from_cookie(cookie);
2654         if (IS_ERR(oap))
2655                 RETURN(PTR_ERR(oap));
2656
2657         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2658                 RETURN(-EIO);
2659
2660         if (!list_empty(&oap->oap_pending_item) ||
2661             !list_empty(&oap->oap_urgent_item) ||
2662             !list_empty(&oap->oap_rpc_item))
2663                 RETURN(-EBUSY);
2664
2665         /* check if the file's owner/group is over quota */
2666 #ifdef HAVE_QUOTA_SUPPORT
2667         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2668                 struct obd_async_page_ops *ops;
2669                 struct obdo *oa;
2670
2671                 OBDO_ALLOC(oa);
2672                 if (oa == NULL)
2673                         RETURN(-ENOMEM);
2674
2675                 ops = oap->oap_caller_ops;
2676                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2677                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2678                     NO_QUOTA)
2679                         rc = -EDQUOT;
2680
2681                 OBDO_FREE(oa);
2682                 if (rc)
2683                         RETURN(rc);
2684         }
2685 #endif
2686
2687         if (loi == NULL)
2688                 loi = lsm->lsm_oinfo[0];
2689
2690         client_obd_list_lock(&cli->cl_loi_list_lock);
2691
2692         oap->oap_cmd = cmd;
2693         oap->oap_page_off = off;
2694         oap->oap_count = count;
2695         oap->oap_brw_flags = brw_flags;
2696         oap->oap_async_flags = async_flags;
2697
2698         if (cmd & OBD_BRW_WRITE) {
2699                 rc = osc_enter_cache(cli, loi, oap);
2700                 if (rc) {
2701                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2702                         RETURN(rc);
2703                 }
2704         }
2705
2706         osc_oap_to_pending(oap);
2707         loi_list_maint(cli, loi);
2708
2709         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2710                   cmd);
2711
2712         osc_check_rpcs(cli);
2713         client_obd_list_unlock(&cli->cl_loi_list_lock);
2714
2715         RETURN(0);
2716 }
2717
2718 /* aka (~was & now & flag), but this is more clear :) */
2719 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2720
2721 static int osc_set_async_flags(struct obd_export *exp,
2722                                struct lov_stripe_md *lsm,
2723                                struct lov_oinfo *loi, void *cookie,
2724                                obd_flag async_flags)
2725 {
2726         struct client_obd *cli = &exp->exp_obd->u.cli;
2727         struct loi_oap_pages *lop;
2728         struct osc_async_page *oap;
2729         int rc = 0;
2730         ENTRY;
2731
2732         oap = oap_from_cookie(cookie);
2733         if (IS_ERR(oap))
2734                 RETURN(PTR_ERR(oap));
2735
2736         /*
2737          * bug 7311: OST-side locking is only supported for liblustre for now
2738          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2739          * implementation has to handle case where OST-locked page was picked
2740          * up by, e.g., ->writepage().
2741          */
2742         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2743         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2744                                      * tread here. */
2745
2746         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2747                 RETURN(-EIO);
2748
2749         if (loi == NULL)
2750                 loi = lsm->lsm_oinfo[0];
2751
2752         if (oap->oap_cmd & OBD_BRW_WRITE) {
2753                 lop = &loi->loi_write_lop;
2754         } else {
2755                 lop = &loi->loi_read_lop;
2756         }
2757
2758         client_obd_list_lock(&cli->cl_loi_list_lock);
2759
2760         if (list_empty(&oap->oap_pending_item))
2761                 GOTO(out, rc = -EINVAL);
2762
2763         if ((oap->oap_async_flags & async_flags) == async_flags)
2764                 GOTO(out, rc = 0);
2765
2766         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2767                 oap->oap_async_flags |= ASYNC_READY;
2768
2769         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2770                 if (list_empty(&oap->oap_rpc_item)) {
2771                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2772                         loi_list_maint(cli, loi);
2773                 }
2774         }
2775
2776         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2777                         oap->oap_async_flags);
2778 out:
2779         osc_check_rpcs(cli);
2780         client_obd_list_unlock(&cli->cl_loi_list_lock);
2781         RETURN(rc);
2782 }
2783
2784 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2785                              struct lov_oinfo *loi,
2786                              struct obd_io_group *oig, void *cookie,
2787                              int cmd, obd_off off, int count,
2788                              obd_flag brw_flags,
2789                              obd_flag async_flags)
2790 {
2791         struct client_obd *cli = &exp->exp_obd->u.cli;
2792         struct osc_async_page *oap;
2793         struct loi_oap_pages *lop;
2794         int rc = 0;
2795         ENTRY;
2796
2797         oap = oap_from_cookie(cookie);
2798         if (IS_ERR(oap))
2799                 RETURN(PTR_ERR(oap));
2800
2801         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2802                 RETURN(-EIO);
2803
2804         if (!list_empty(&oap->oap_pending_item) ||
2805             !list_empty(&oap->oap_urgent_item) ||
2806             !list_empty(&oap->oap_rpc_item))
2807                 RETURN(-EBUSY);
2808
2809         if (loi == NULL)
2810                 loi = lsm->lsm_oinfo[0];
2811
2812         client_obd_list_lock(&cli->cl_loi_list_lock);
2813
2814         oap->oap_cmd = cmd;
2815         oap->oap_page_off = off;
2816         oap->oap_count = count;
2817         oap->oap_brw_flags = brw_flags;
2818         oap->oap_async_flags = async_flags;
2819
2820         if (cmd & OBD_BRW_WRITE)
2821                 lop = &loi->loi_write_lop;
2822         else
2823                 lop = &loi->loi_read_lop;
2824
2825         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2826         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2827                 oap->oap_oig = oig;
2828                 rc = oig_add_one(oig, &oap->oap_occ);
2829         }
2830
2831         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2832                   oap, oap->oap_page, rc);
2833
2834         client_obd_list_unlock(&cli->cl_loi_list_lock);
2835
2836         RETURN(rc);
2837 }
2838
2839 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2840                                  struct loi_oap_pages *lop, int cmd)
2841 {
2842         struct list_head *pos, *tmp;
2843         struct osc_async_page *oap;
2844
2845         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2846                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2847                 list_del(&oap->oap_pending_item);
2848                 osc_oap_to_pending(oap);
2849         }
2850         loi_list_maint(cli, loi);
2851 }
2852
2853 static int osc_trigger_group_io(struct obd_export *exp,
2854                                 struct lov_stripe_md *lsm,
2855                                 struct lov_oinfo *loi,
2856                                 struct obd_io_group *oig)
2857 {
2858         struct client_obd *cli = &exp->exp_obd->u.cli;
2859         ENTRY;
2860
2861         if (loi == NULL)
2862                 loi = lsm->lsm_oinfo[0];
2863
2864         client_obd_list_lock(&cli->cl_loi_list_lock);
2865
2866         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2867         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2868
2869         osc_check_rpcs(cli);
2870         client_obd_list_unlock(&cli->cl_loi_list_lock);
2871
2872         RETURN(0);
2873 }
2874
2875 static int osc_teardown_async_page(struct obd_export *exp,
2876                                    struct lov_stripe_md *lsm,
2877                                    struct lov_oinfo *loi, void *cookie)
2878 {
2879         struct client_obd *cli = &exp->exp_obd->u.cli;
2880         struct loi_oap_pages *lop;
2881         struct osc_async_page *oap;
2882         int rc = 0;
2883         ENTRY;
2884
2885         oap = oap_from_cookie(cookie);
2886         if (IS_ERR(oap))
2887                 RETURN(PTR_ERR(oap));
2888
2889         if (loi == NULL)
2890                 loi = lsm->lsm_oinfo[0];
2891
2892         if (oap->oap_cmd & OBD_BRW_WRITE) {
2893                 lop = &loi->loi_write_lop;
2894         } else {
2895                 lop = &loi->loi_read_lop;
2896         }
2897
2898         client_obd_list_lock(&cli->cl_loi_list_lock);
2899
2900         if (!list_empty(&oap->oap_rpc_item))
2901                 GOTO(out, rc = -EBUSY);
2902
2903         osc_exit_cache(cli, oap, 0);
2904         osc_wake_cache_waiters(cli);
2905
2906         if (!list_empty(&oap->oap_urgent_item)) {
2907                 list_del_init(&oap->oap_urgent_item);
2908                 oap->oap_async_flags &= ~ASYNC_URGENT;
2909         }
2910         if (!list_empty(&oap->oap_pending_item)) {
2911                 list_del_init(&oap->oap_pending_item);
2912                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2913         }
2914         loi_list_maint(cli, loi);
2915         cache_remove_extent(cli->cl_cache, oap);
2916
2917         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2918 out:
2919         client_obd_list_unlock(&cli->cl_loi_list_lock);
2920         RETURN(rc);
2921 }
2922
2923 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2924                            struct ldlm_lock_desc *new, void *data,
2925                            int flag)
2926 {
2927         struct lustre_handle lockh = { 0 };
2928         int rc;
2929         ENTRY;  
2930                 
2931         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2932                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2933                 LBUG(); 
2934         }       
2935
2936         switch (flag) {
2937         case LDLM_CB_BLOCKING:
2938                 ldlm_lock2handle(lock, &lockh);
2939                 rc = ldlm_cli_cancel(&lockh);
2940                 if (rc != ELDLM_OK)
2941                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2942                 break;
2943         case LDLM_CB_CANCELING: {
2944
2945                 ldlm_lock2handle(lock, &lockh);
2946                 /* This lock wasn't granted, don't try to do anything */
2947                 if (lock->l_req_mode != lock->l_granted_mode)
2948                         RETURN(0);
2949
2950                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2951                                   &lockh);
2952
2953                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2954                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2955                                                           lock, new, data,flag);
2956                 break;
2957         }
2958         default:
2959                 LBUG();
2960         }
2961
2962         RETURN(0);
2963 }
2964 EXPORT_SYMBOL(osc_extent_blocking_cb);
2965
2966 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2967                                     int flags)
2968 {
2969         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2970
2971         if (lock == NULL) {
2972                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2973                 return;
2974         }
2975         lock_res_and_lock(lock);
2976 #if defined (__KERNEL__) && defined (__linux__)
2977         /* Liang XXX: Darwin and Winnt checking should be added */
2978         if (lock->l_ast_data && lock->l_ast_data != data) {
2979                 struct inode *new_inode = data;
2980                 struct inode *old_inode = lock->l_ast_data;
2981                 if (!(old_inode->i_state & I_FREEING))
2982                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
2983                 LASSERTF(old_inode->i_state & I_FREEING,
2984                          "Found existing inode %p/%lu/%u state %lu in lock: "
2985                          "setting data to %p/%lu/%u\n", old_inode,
2986                          old_inode->i_ino, old_inode->i_generation,
2987                          old_inode->i_state,
2988                          new_inode, new_inode->i_ino, new_inode->i_generation);
2989         }
2990 #endif
2991         lock->l_ast_data = data;
2992         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
2993         unlock_res_and_lock(lock);
2994         LDLM_LOCK_PUT(lock);
2995 }
2996
2997 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2998                              ldlm_iterator_t replace, void *data)
2999 {
3000         struct ldlm_res_id res_id = { .name = {0} };
3001         struct obd_device *obd = class_exp2obd(exp);
3002
3003         res_id.name[0] = lsm->lsm_object_id;
3004         res_id.name[2] = lsm->lsm_object_gr;
3005
3006         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3007         return 0;
3008 }
3009
3010 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3011                             struct obd_info *oinfo, int intent, int rc)
3012 {
3013         ENTRY;
3014
3015         if (intent) {
3016                 /* The request was created before ldlm_cli_enqueue call. */
3017                 if (rc == ELDLM_LOCK_ABORTED) {
3018                         struct ldlm_reply *rep;
3019                         rep = req_capsule_server_get(&req->rq_pill,
3020                                                      &RMF_DLM_REP);
3021
3022                         LASSERT(rep != NULL);
3023                         if (rep->lock_policy_res1)
3024                                 rc = rep->lock_policy_res1;
3025                 }
3026         }
3027
3028         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3029                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3030                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3031                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3032                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3033         }
3034
3035         if (!rc)
3036                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3037
3038         /* Call the update callback. */
3039         rc = oinfo->oi_cb_up(oinfo, rc);
3040         RETURN(rc);
3041 }
3042
3043 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3044                                  struct osc_enqueue_args *aa, int rc)
3045 {
3046         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3047         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3048         struct ldlm_lock *lock;
3049
3050         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3051          * be valid. */
3052         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3053
3054         /* Complete obtaining the lock procedure. */
3055         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3056                                    aa->oa_ei->ei_mode,
3057                                    &aa->oa_oi->oi_flags,
3058                                    &lsm->lsm_oinfo[0]->loi_lvb,
3059                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3060                                    lustre_swab_ost_lvb,
3061                                    aa->oa_oi->oi_lockh, rc);
3062
3063         /* Complete osc stuff. */
3064         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3065
3066         /* Release the lock for async request. */
3067         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3068                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3069
3070         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3071                  aa->oa_oi->oi_lockh, req, aa);
3072         LDLM_LOCK_PUT(lock);
3073         return rc;
3074 }
3075
3076 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3077  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3078  * other synchronous requests, however keeping some locks and trying to obtain
3079  * others may take a considerable amount of time in a case of ost failure; and
3080  * when other sync requests do not get released lock from a client, the client
3081  * is excluded from the cluster -- such scenarious make the life difficult, so
3082  * release locks just after they are obtained. */
3083 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3084                        struct ldlm_enqueue_info *einfo,
3085                        struct ptlrpc_request_set *rqset)
3086 {
3087         struct ldlm_res_id res_id = { .name = {0} };
3088         struct obd_device *obd = exp->exp_obd;
3089         struct ptlrpc_request *req = NULL;
3090         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3091         ldlm_mode_t mode;
3092         int rc;
3093         ENTRY;
3094
3095         res_id.name[0] = oinfo->oi_md->lsm_object_id;
3096         res_id.name[2] = oinfo->oi_md->lsm_object_gr;
3097
3098         /* Filesystem lock extents are extended to page boundaries so that
3099          * dealing with the page cache is a little smoother.  */
3100         oinfo->oi_policy.l_extent.start -=
3101                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3102         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3103
3104         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3105                 goto no_match;
3106
3107         /* Next, search for already existing extent locks that will cover us */
3108         /* If we're trying to read, we also search for an existing PW lock.  The
3109          * VFS and page cache already protect us locally, so lots of readers/
3110          * writers can share a single PW lock.
3111          *
3112          * There are problems with conversion deadlocks, so instead of
3113          * converting a read lock to a write lock, we'll just enqueue a new
3114          * one.
3115          *
3116          * At some point we should cancel the read lock instead of making them
3117          * send us a blocking callback, but there are problems with canceling
3118          * locks out from other users right now, too. */
3119         mode = einfo->ei_mode;
3120         if (einfo->ei_mode == LCK_PR)
3121                 mode |= LCK_PW;
3122         mode = ldlm_lock_match(obd->obd_namespace,
3123                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3124                                einfo->ei_type, &oinfo->oi_policy, mode,
3125                                oinfo->oi_lockh);
3126         if (mode) {
3127                 /* addref the lock only if not async requests and PW lock is
3128                  * matched whereas we asked for PR. */
3129                 if (!rqset && einfo->ei_mode != mode)
3130                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3131                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3132                                         oinfo->oi_flags);
3133                 if (intent) {
3134                         /* I would like to be able to ASSERT here that rss <=
3135                          * kms, but I can't, for reasons which are explained in
3136                          * lov_enqueue() */
3137                 }
3138
3139                 /* We already have a lock, and it's referenced */
3140                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3141
3142                 /* For async requests, decref the lock. */
3143                 if (einfo->ei_mode != mode)
3144                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3145                 else if (rqset)
3146                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3147
3148                 RETURN(ELDLM_OK);
3149         }
3150
3151  no_match:
3152         if (intent) {
3153                 CFS_LIST_HEAD(cancels);
3154                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3155                                            &RQF_LDLM_ENQUEUE_LVB);
3156                 if (req == NULL)
3157                         RETURN(-ENOMEM);
3158
3159                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3160                 if (rc)
3161                         RETURN(rc);
3162
3163                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3164                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3165                 ptlrpc_request_set_replen(req);
3166         }
3167
3168         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3169         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3170
3171         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3172                               &oinfo->oi_policy, &oinfo->oi_flags,
3173                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3174                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3175                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3176                               rqset ? 1 : 0);
3177         if (rqset) {
3178                 if (!rc) {
3179                         struct osc_enqueue_args *aa;
3180                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3181                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
3182                         aa->oa_oi = oinfo;
3183                         aa->oa_ei = einfo;
3184                         aa->oa_exp = exp;
3185
3186                         req->rq_interpret_reply = osc_enqueue_interpret;
3187                         ptlrpc_set_add_req(rqset, req);
3188                 } else if (intent) {
3189                         ptlrpc_req_finished(req);
3190                 }
3191                 RETURN(rc);
3192         }
3193
3194         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3195         if (intent)
3196                 ptlrpc_req_finished(req);
3197
3198         RETURN(rc);
3199 }
3200
3201 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3202                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3203                      int *flags, void *data, struct lustre_handle *lockh)
3204 {
3205         struct ldlm_res_id res_id = { .name = {0} };
3206         struct obd_device *obd = exp->exp_obd;
3207         int lflags = *flags;
3208         ldlm_mode_t rc;
3209         ENTRY;
3210
3211         res_id.name[0] = lsm->lsm_object_id;
3212         res_id.name[2] = lsm->lsm_object_gr;
3213
3214         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3215                 RETURN(-EIO);
3216
3217         /* Filesystem lock extents are extended to page boundaries so that
3218          * dealing with the page cache is a little smoother */
3219         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3220         policy->l_extent.end |= ~CFS_PAGE_MASK;
3221
3222         /* Next, search for already existing extent locks that will cover us */
3223         /* If we're trying to read, we also search for an existing PW lock.  The
3224          * VFS and page cache already protect us locally, so lots of readers/
3225          * writers can share a single PW lock. */
3226         rc = mode;
3227         if (mode == LCK_PR)
3228                 rc |= LCK_PW;
3229         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3230                              &res_id, type, policy, rc, lockh);
3231         if (rc) {
3232                 osc_set_data_with_check(lockh, data, lflags);
3233                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3234                         ldlm_lock_addref(lockh, LCK_PR);
3235                         ldlm_lock_decref(lockh, LCK_PW);
3236                 }
3237                 RETURN(rc);
3238         }
3239         RETURN(rc);
3240 }
3241
3242 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3243                       __u32 mode, struct lustre_handle *lockh)
3244 {
3245         ENTRY;
3246
3247         if (unlikely(mode == LCK_GROUP))
3248                 ldlm_lock_decref_and_cancel(lockh, mode);
3249         else
3250                 ldlm_lock_decref(lockh, mode);
3251
3252         RETURN(0);
3253 }
3254
3255 static int osc_cancel_unused(struct obd_export *exp,
3256                              struct lov_stripe_md *lsm, int flags,
3257                              void *opaque)
3258 {
3259         struct obd_device *obd = class_exp2obd(exp);
3260         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3261
3262         if (lsm != NULL) {
3263                 res_id.name[0] = lsm->lsm_object_id;
3264                 res_id.name[2] = lsm->lsm_object_gr;
3265                 resp = &res_id;
3266         }
3267
3268         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3269 }
3270
3271 static int osc_join_lru(struct obd_export *exp,
3272                         struct lov_stripe_md *lsm, int join)
3273 {
3274         struct obd_device *obd = class_exp2obd(exp);
3275         struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
3276
3277         if (lsm != NULL) {
3278                 res_id.name[0] = lsm->lsm_object_id;
3279                 res_id.name[2] = lsm->lsm_object_gr;
3280                 resp = &res_id;
3281         }
3282
3283         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3284 }
3285
3286 static int osc_statfs_interpret(struct ptlrpc_request *req,
3287                                 struct osc_async_args *aa, int rc)
3288 {
3289         struct obd_statfs *msfs;
3290         ENTRY;
3291
3292         if (rc != 0)
3293                 GOTO(out, rc);
3294
3295         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3296         if (msfs == NULL) {
3297                 GOTO(out, rc = -EPROTO);
3298         }
3299
3300         *aa->aa_oi->oi_osfs = *msfs;
3301 out:
3302         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3303         RETURN(rc);
3304 }
3305
3306 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3307                             __u64 max_age, struct ptlrpc_request_set *rqset)
3308 {
3309         struct ptlrpc_request *req;
3310         struct osc_async_args *aa;
3311         int                    rc;
3312         ENTRY;
3313
3314         /* We could possibly pass max_age in the request (as an absolute
3315          * timestamp or a "seconds.usec ago") so the target can avoid doing
3316          * extra calls into the filesystem if that isn't necessary (e.g.
3317          * during mount that would help a bit).  Having relative timestamps
3318          * is not so great if request processing is slow, while absolute
3319          * timestamps are not ideal because they need time synchronization. */
3320         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3321         if (req == NULL)
3322                 RETURN(-ENOMEM);
3323
3324         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3325         if (rc) {
3326                 ptlrpc_request_free(req);
3327                 RETURN(rc);
3328         }
3329         ptlrpc_request_set_replen(req);
3330         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3331         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3332                 /* procfs requests not want stat in wait for avoid deadlock */
3333                 req->rq_no_resend = 1;
3334                 req->rq_no_delay = 1;
3335         }
3336
3337         req->rq_interpret_reply = osc_statfs_interpret;
3338         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3339         aa = (struct osc_async_args *)&req->rq_async_args;
3340         aa->aa_oi = oinfo;
3341
3342         ptlrpc_set_add_req(rqset, req);
3343         RETURN(0);
3344 }
3345
3346 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3347                       __u64 max_age, __u32 flags)
3348 {
3349         struct obd_statfs     *msfs;
3350         struct ptlrpc_request *req;
3351         int rc;
3352         ENTRY;
3353
3354         /* We could possibly pass max_age in the request (as an absolute
3355          * timestamp or a "seconds.usec ago") so the target can avoid doing
3356          * extra calls into the filesystem if that isn't necessary (e.g.
3357          * during mount that would help a bit).  Having relative timestamps
3358          * is not so great if request processing is slow, while absolute
3359          * timestamps are not ideal because they need time synchronization. */
3360         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3361         if (req == NULL)
3362                 RETURN(-ENOMEM);
3363
3364         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3365         if (rc) {
3366                 ptlrpc_request_free(req);
3367                 RETURN(rc);
3368         }
3369         ptlrpc_request_set_replen(req);
3370         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
3371
3372         if (flags & OBD_STATFS_NODELAY) {
3373                 /* procfs requests not want stat in wait for avoid deadlock */
3374                 req->rq_no_resend = 1;
3375                 req->rq_no_delay = 1;
3376         }
3377
3378         rc = ptlrpc_queue_wait(req);
3379         if (rc)
3380                 GOTO(out, rc);
3381
3382         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3383         if (msfs == NULL) {
3384                 GOTO(out, rc = -EPROTO);
3385         }
3386
3387         *osfs = *msfs;
3388
3389         EXIT;
3390  out:
3391         ptlrpc_req_finished(req);
3392         return rc;
3393 }
3394
3395 /* Retrieve object striping information.
3396  *
3397  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3398  * the maximum number of OST indices which will fit in the user buffer.
3399  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3400  */
3401 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3402 {
3403         struct lov_user_md lum, *lumk;
3404         int rc = 0, lum_size;
3405         ENTRY;
3406
3407         if (!lsm)
3408                 RETURN(-ENODATA);
3409
3410         if (copy_from_user(&lum, lump, sizeof(lum)))
3411                 RETURN(-EFAULT);
3412
3413         if (lum.lmm_magic != LOV_USER_MAGIC)
3414                 RETURN(-EINVAL);
3415
3416         if (lum.lmm_stripe_count > 0) {
3417                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3418                 OBD_ALLOC(lumk, lum_size);
3419                 if (!lumk)
3420                         RETURN(-ENOMEM);
3421
3422                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3423                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3424         } else {
3425                 lum_size = sizeof(lum);
3426                 lumk = &lum;
3427         }
3428
3429         lumk->lmm_object_id = lsm->lsm_object_id;
3430         lumk->lmm_object_gr = lsm->lsm_object_gr;
3431         lumk->lmm_stripe_count = 1;
3432
3433         if (copy_to_user(lump, lumk, lum_size))
3434                 rc = -EFAULT;
3435
3436         if (lumk != &lum)
3437                 OBD_FREE(lumk, lum_size);
3438
3439         RETURN(rc);
3440 }
3441
3442
3443 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3444                          void *karg, void *uarg)
3445 {
3446         struct obd_device *obd = exp->exp_obd;
3447         struct obd_ioctl_data *data = karg;
3448         int err = 0;
3449         ENTRY;
3450
3451         if (!try_module_get(THIS_MODULE)) {
3452                 CERROR("Can't get module. Is it alive?");
3453                 return -EINVAL;
3454         }
3455         switch (cmd) {
3456         case OBD_IOC_LOV_GET_CONFIG: {
3457                 char *buf;
3458                 struct lov_desc *desc;
3459                 struct obd_uuid uuid;
3460
3461                 buf = NULL;
3462                 len = 0;
3463                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3464                         GOTO(out, err = -EINVAL);
3465
3466                 data = (struct obd_ioctl_data *)buf;
3467
3468                 if (sizeof(*desc) > data->ioc_inllen1) {
3469                         obd_ioctl_freedata(buf, len);
3470                         GOTO(out, err = -EINVAL);
3471                 }
3472
3473                 if (data->ioc_inllen2 < sizeof(uuid)) {
3474                         obd_ioctl_freedata(buf, len);
3475                         GOTO(out, err = -EINVAL);
3476                 }
3477
3478                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3479                 desc->ld_tgt_count = 1;
3480                 desc->ld_active_tgt_count = 1;
3481                 desc->ld_default_stripe_count = 1;
3482                 desc->ld_default_stripe_size = 0;
3483                 desc->ld_default_stripe_offset = 0;
3484                 desc->ld_pattern = 0;
3485                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3486
3487                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3488
3489                 err = copy_to_user((void *)uarg, buf, len);
3490                 if (err)
3491                         err = -EFAULT;
3492                 obd_ioctl_freedata(buf, len);
3493                 GOTO(out, err);
3494         }
3495         case LL_IOC_LOV_SETSTRIPE:
3496                 err = obd_alloc_memmd(exp, karg);
3497                 if (err > 0)
3498                         err = 0;
3499                 GOTO(out, err);
3500         case LL_IOC_LOV_GETSTRIPE:
3501                 err = osc_getstripe(karg, uarg);
3502                 GOTO(out, err);
3503         case OBD_IOC_CLIENT_RECOVER:
3504                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3505                                             data->ioc_inlbuf1);
3506                 if (err > 0)
3507                         err = 0;
3508                 GOTO(out, err);
3509         case IOC_OSC_SET_ACTIVE:
3510                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3511                                                data->ioc_offset);
3512                 GOTO(out, err);
3513         case OBD_IOC_POLL_QUOTACHECK:
3514                 err = lquota_poll_check(quota_interface, exp,
3515                                         (struct if_quotacheck *)karg);
3516                 GOTO(out, err);
3517         default:
3518                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3519                        cmd, cfs_curproc_comm());
3520                 GOTO(out, err = -ENOTTY);
3521         }
3522 out:
3523         module_put(THIS_MODULE);
3524         return err;
3525 }
3526
3527 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3528                         void *key, __u32 *vallen, void *val)
3529 {
3530         ENTRY;
3531         if (!vallen || !val)
3532                 RETURN(-EFAULT);
3533
3534         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3535                 __u32 *stripe = val;
3536                 *vallen = sizeof(*stripe);
3537                 *stripe = 0;
3538                 RETURN(0);
3539         } else if (KEY_IS(KEY_LAST_ID)) {
3540                 struct ptlrpc_request *req;
3541                 obd_id                *reply;
3542                 char                  *tmp;
3543                 int                    rc;
3544
3545                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3546                                            &RQF_OST_GET_INFO_LAST_ID);
3547                 if (req == NULL)
3548                         RETURN(-ENOMEM);
3549
3550                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3551                                      RCL_CLIENT, keylen);
3552                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3553                 if (rc) {
3554                         ptlrpc_request_free(req);
3555                         RETURN(rc);
3556                 }
3557
3558                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3559                 memcpy(tmp, key, keylen);
3560
3561                 ptlrpc_request_set_replen(req);
3562                 rc = ptlrpc_queue_wait(req);
3563                 if (rc)
3564                         GOTO(out, rc);
3565
3566                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3567                 if (reply == NULL)
3568                         GOTO(out, rc = -EPROTO);
3569
3570                 *((obd_id *)val) = *reply;
3571         out:
3572                 ptlrpc_req_finished(req);
3573                 RETURN(rc);
3574         }
3575         RETURN(-EINVAL);
3576 }
3577
3578 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3579                                           void *aa, int rc)
3580 {
3581         struct llog_ctxt *ctxt;
3582         struct obd_import *imp = req->rq_import;
3583         ENTRY;
3584
3585         if (rc != 0)
3586                 RETURN(rc);
3587
3588         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3589         if (ctxt) {
3590                 if (rc == 0)
3591                         rc = llog_initiator_connect(ctxt);
3592                 else
3593                         CERROR("cannot establish connection for "
3594                                "ctxt %p: %d\n", ctxt, rc);
3595         }
3596
3597         llog_ctxt_put(ctxt);
3598         spin_lock(&imp->imp_lock);
3599         imp->imp_server_timeout = 1;
3600         imp->imp_pingable = 1;
3601         spin_unlock(&imp->imp_lock);
3602         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3603
3604         RETURN(rc);
3605 }
3606
3607 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3608                               void *key, obd_count vallen, void *val,
3609                               struct ptlrpc_request_set *set)
3610 {
3611         struct ptlrpc_request *req;
3612         struct obd_device     *obd = exp->exp_obd;
3613         struct obd_import     *imp = class_exp2cliimp(exp);
3614         char                  *tmp;
3615         int                    rc;
3616         ENTRY;
3617
3618         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3619
3620         if (KEY_IS(KEY_NEXT_ID)) {
3621                 if (vallen != sizeof(obd_id))
3622                         RETURN(-ERANGE);
3623                 if (val == NULL)
3624                         RETURN(-EINVAL);
3625                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3626                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3627                        exp->exp_obd->obd_name,
3628                        obd->u.cli.cl_oscc.oscc_next_id);
3629
3630                 RETURN(0);
3631         }
3632
3633         if (KEY_IS(KEY_UNLINKED)) {
3634                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3635                 spin_lock(&oscc->oscc_lock);
3636                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3637                 spin_unlock(&oscc->oscc_lock);
3638                 RETURN(0);
3639         }
3640
3641         if (KEY_IS(KEY_INIT_RECOV)) {
3642                 if (vallen != sizeof(int))
3643                         RETURN(-EINVAL);
3644                 spin_lock(&imp->imp_lock);
3645                 imp->imp_initial_recov = *(int *)val;
3646                 spin_unlock(&imp->imp_lock);
3647                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3648                        exp->exp_obd->obd_name,
3649                        imp->imp_initial_recov);
3650                 RETURN(0);
3651         }
3652
3653         if (KEY_IS(KEY_CHECKSUM)) {
3654                 if (vallen != sizeof(int))
3655                         RETURN(-EINVAL);
3656                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3657                 RETURN(0);
3658         }
3659
3660         if (KEY_IS(KEY_FLUSH_CTX)) {
3661                 sptlrpc_import_flush_my_ctx(imp);
3662                 RETURN(0);
3663         }
3664
3665         if (!set)
3666                 RETURN(-EINVAL);
3667
3668         /* We pass all other commands directly to OST. Since nobody calls osc
3669            methods directly and everybody is supposed to go through LOV, we
3670            assume lov checked invalid values for us.
3671            The only recognised values so far are evict_by_nid and mds_conn.
3672            Even if something bad goes through, we'd get a -EINVAL from OST
3673            anyway. */
3674
3675
3676         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3677         if (req == NULL)
3678                 RETURN(-ENOMEM);
3679
3680         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3681                              RCL_CLIENT, keylen);
3682         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3683                              RCL_CLIENT, vallen);
3684         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3685         if (rc) {
3686                 ptlrpc_request_free(req);
3687                 RETURN(rc);
3688         }
3689
3690         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3691         memcpy(tmp, key, keylen);
3692         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3693         memcpy(tmp, val, vallen);
3694
3695         if (KEY_IS(KEY_MDS_CONN)) {
3696                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3697
3698                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3699                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3700                 LASSERT(oscc->oscc_oa.o_gr > 0);
3701                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3702         }
3703
3704         ptlrpc_request_set_replen(req);
3705         ptlrpc_set_add_req(set, req);
3706         ptlrpc_check_set(set);
3707
3708         RETURN(0);
3709 }
3710
3711
3712 static struct llog_operations osc_size_repl_logops = {
3713         lop_cancel: llog_obd_repl_cancel
3714 };
3715
3716 static struct llog_operations osc_mds_ost_orig_logops;
3717 static int osc_llog_init(struct obd_device *obd, int group,
3718                          struct obd_device *tgt, int count,
3719                          struct llog_catid *catid, struct obd_uuid *uuid)
3720 {
3721         int rc;
3722         ENTRY;
3723         LASSERT(group == OBD_LLOG_GROUP);
3724         spin_lock(&obd->obd_dev_lock);
3725         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3726                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3727                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3728                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3729                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3730                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3731         }
3732         spin_unlock(&obd->obd_dev_lock);
3733
3734         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3735                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3736         if (rc) {
3737                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3738                 GOTO (out, rc);
3739         }
3740
3741         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3742                         NULL, &osc_size_repl_logops);
3743         if (rc)
3744                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3745 out:
3746         if (rc) {
3747                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3748                        obd->obd_name, tgt->obd_name, count, catid, rc);
3749                 CERROR("logid "LPX64":0x%x\n",
3750                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3751         }
3752         RETURN(rc);
3753 }
3754
3755 static int osc_llog_finish(struct obd_device *obd, int count)
3756 {
3757         struct llog_ctxt *ctxt;
3758         int rc = 0, rc2 = 0;
3759         ENTRY;
3760
3761         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3762         if (ctxt)
3763                 rc = llog_cleanup(ctxt);
3764
3765         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3766         if (ctxt)
3767                 rc2 = llog_cleanup(ctxt);
3768         if (!rc)
3769                 rc = rc2;
3770
3771         RETURN(rc);
3772 }
3773
3774 static int osc_reconnect(const struct lu_env *env,
3775                          struct obd_export *exp, struct obd_device *obd,
3776                          struct obd_uuid *cluuid,
3777                          struct obd_connect_data *data)
3778 {
3779         struct client_obd *cli = &obd->u.cli;
3780
3781         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3782                 long lost_grant;
3783
3784                 client_obd_list_lock(&cli->cl_loi_list_lock);
3785                 data->ocd_grant = cli->cl_avail_grant ?:
3786                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3787                 lost_grant = cli->cl_lost_grant;
3788                 cli->cl_lost_grant = 0;
3789                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3790
3791                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3792                        "cl_lost_grant: %ld\n", data->ocd_grant,
3793                        cli->cl_avail_grant, lost_grant);
3794                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3795                        " ocd_grant: %d\n", data->ocd_connect_flags,
3796                        data->ocd_version, data->ocd_grant);
3797         }
3798
3799         RETURN(0);
3800 }
3801
3802 static int osc_disconnect(struct obd_export *exp)
3803 {
3804         struct obd_device *obd = class_exp2obd(exp);
3805         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3806         int rc;
3807
3808         if (obd->u.cli.cl_conn_count == 1)
3809                 /* flush any remaining cancel messages out to the target */
3810                 llog_sync(ctxt, exp);
3811
3812         llog_ctxt_put(ctxt);
3813
3814         rc = client_disconnect_export(exp);
3815         return rc;
3816 }
3817
3818 static int osc_import_event(struct obd_device *obd,
3819                             struct obd_import *imp,
3820                             enum obd_import_event event)
3821 {
3822         struct client_obd *cli;
3823         int rc = 0;
3824
3825         ENTRY;
3826         LASSERT(imp->imp_obd == obd);
3827
3828         switch (event) {
3829         case IMP_EVENT_DISCON: {
3830                 /* Only do this on the MDS OSC's */
3831                 if (imp->imp_server_timeout) {
3832                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3833
3834                         spin_lock(&oscc->oscc_lock);
3835                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3836                         spin_unlock(&oscc->oscc_lock);
3837                 }
3838                 cli = &obd->u.cli;
3839                 client_obd_list_lock(&cli->cl_loi_list_lock);
3840                 cli->cl_avail_grant = 0;
3841                 cli->cl_lost_grant = 0;
3842                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3843                 break;
3844         }
3845         case IMP_EVENT_INACTIVE: {
3846                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3847                 break;
3848         }
3849         case IMP_EVENT_INVALIDATE: {
3850                 struct ldlm_namespace *ns = obd->obd_namespace;
3851
3852                 /* Reset grants */
3853                 cli = &obd->u.cli;
3854                 client_obd_list_lock(&cli->cl_loi_list_lock);
3855                 /* all pages go to failing rpcs due to the invalid import */
3856                 osc_check_rpcs(cli);
3857                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3858
3859                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3860
3861                 break;
3862         }
3863         case IMP_EVENT_ACTIVE: {
3864                 /* Only do this on the MDS OSC's */
3865                 if (imp->imp_server_timeout) {
3866                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3867
3868                         spin_lock(&oscc->oscc_lock);
3869                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3870                         spin_unlock(&oscc->oscc_lock);
3871                 }
3872                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3873                 break;
3874         }
3875         case IMP_EVENT_OCD: {
3876                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3877
3878                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3879                         osc_init_grant(&obd->u.cli, ocd);
3880
3881                 /* See bug 7198 */
3882                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3883                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3884
3885                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3886                 break;
3887         }
3888         default:
3889                 CERROR("Unknown import event %d\n", event);
3890                 LBUG();
3891         }
3892         RETURN(rc);
3893 }
3894
3895 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3896 {
3897         int rc;
3898         ENTRY;
3899
3900         ENTRY;
3901         rc = ptlrpcd_addref();
3902         if (rc)
3903                 RETURN(rc);
3904
3905         rc = client_obd_setup(obd, lcfg);
3906         if (rc) {
3907                 ptlrpcd_decref();
3908         } else {
3909                 struct lprocfs_static_vars lvars = { 0 };
3910                 struct client_obd *cli = &obd->u.cli;
3911
3912                 lprocfs_osc_init_vars(&lvars);
3913                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3914                         lproc_osc_attach_seqstat(obd);
3915                         sptlrpc_lprocfs_cliobd_attach(obd);
3916                         ptlrpc_lprocfs_register_obd(obd);
3917                 }
3918
3919                 oscc_init(obd);
3920                 /* We need to allocate a few requests more, because
3921                    brw_interpret tries to create new requests before freeing
3922                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3923                    reserved, but I afraid that might be too much wasted RAM
3924                    in fact, so 2 is just my guess and still should work. */
3925                 cli->cl_import->imp_rq_pool =
3926                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3927                                             OST_MAXREQSIZE,
3928                                             ptlrpc_add_rqs_to_pool);
3929                 cli->cl_cache = cache_create(obd);
3930                 if (!cli->cl_cache) {
3931                         osc_cleanup(obd);
3932                         rc = -ENOMEM;
3933                 }
3934         }
3935
3936         RETURN(rc);
3937 }
3938
3939 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3940 {
3941         int rc = 0;
3942         ENTRY;
3943
3944         switch (stage) {
3945         case OBD_CLEANUP_EARLY: {
3946                 struct obd_import *imp;
3947                 imp = obd->u.cli.cl_import;
3948                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3949                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3950                 ptlrpc_deactivate_import(imp);
3951                 spin_lock(&imp->imp_lock);
3952                 imp->imp_pingable = 0;
3953                 spin_unlock(&imp->imp_lock);
3954                 break;
3955         }
3956         case OBD_CLEANUP_EXPORTS: {
3957                 /* If we set up but never connected, the
3958                    client import will not have been cleaned. */
3959                 if (obd->u.cli.cl_import) {
3960                         struct obd_import *imp;
3961                         imp = obd->u.cli.cl_import;
3962                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3963                                obd->obd_name);
3964                         ptlrpc_invalidate_import(imp);
3965                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
3966                         class_destroy_import(imp);
3967                         obd->u.cli.cl_import = NULL;
3968                 }
3969                 break;
3970         }
3971         case OBD_CLEANUP_SELF_EXP:
3972                 rc = obd_llog_finish(obd, 0);
3973                 if (rc != 0)
3974                         CERROR("failed to cleanup llogging subsystems\n");
3975                 break;
3976         case OBD_CLEANUP_OBD:
3977                 break;
3978         }
3979         RETURN(rc);
3980 }
3981
3982 int osc_cleanup(struct obd_device *obd)
3983 {
3984         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3985         int rc;
3986
3987         ENTRY;
3988         ptlrpc_lprocfs_unregister_obd(obd);
3989         lprocfs_obd_cleanup(obd);
3990
3991         spin_lock(&oscc->oscc_lock);
3992         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3993         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3994         spin_unlock(&oscc->oscc_lock);
3995
3996         /* free memory of osc quota cache */
3997         lquota_cleanup(quota_interface, obd);
3998
3999         cache_destroy(obd->u.cli.cl_cache);
4000         rc = client_obd_cleanup(obd);
4001
4002         ptlrpcd_decref();
4003         RETURN(rc);
4004 }
4005
4006 static int osc_register_page_removal_cb(struct obd_export *exp,
4007                                         obd_page_removal_cb_t func,
4008                                         obd_pin_extent_cb pin_cb)
4009 {
4010         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4011                                            pin_cb);
4012 }
4013
4014 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4015                                           obd_page_removal_cb_t func)
4016 {
4017         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4018 }
4019
4020 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4021                                        obd_lock_cancel_cb cb)
4022 {
4023         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4024
4025         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4026         return 0;
4027 }
4028
4029 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4030                                          obd_lock_cancel_cb cb)
4031 {
4032         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4033                 CERROR("Unregistering cancel cb %p, while only %p was "
4034                        "registered\n", cb,
4035                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4036                 RETURN(-EINVAL);
4037         }
4038
4039         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4040         return 0;
4041 }
4042
4043 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4044 {
4045         struct lustre_cfg *lcfg = buf;
4046         struct lprocfs_static_vars lvars = { 0 };
4047         int rc = 0;
4048
4049         lprocfs_osc_init_vars(&lvars);
4050
4051         switch (lcfg->lcfg_command) {
4052         case LCFG_SPTLRPC_CONF:
4053                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4054                 break;
4055         default:
4056                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4057                                               lcfg, obd);
4058                 break;
4059         }
4060
4061         return(rc);
4062 }
4063
4064 struct obd_ops osc_obd_ops = {
4065         .o_owner                = THIS_MODULE,
4066         .o_setup                = osc_setup,
4067         .o_precleanup           = osc_precleanup,
4068         .o_cleanup              = osc_cleanup,
4069         .o_add_conn             = client_import_add_conn,
4070         .o_del_conn             = client_import_del_conn,
4071         .o_connect              = client_connect_import,
4072         .o_reconnect            = osc_reconnect,
4073         .o_disconnect           = osc_disconnect,
4074         .o_statfs               = osc_statfs,
4075         .o_statfs_async         = osc_statfs_async,
4076         .o_packmd               = osc_packmd,
4077         .o_unpackmd             = osc_unpackmd,
4078         .o_precreate            = osc_precreate,
4079         .o_create               = osc_create,
4080         .o_destroy              = osc_destroy,
4081         .o_getattr              = osc_getattr,
4082         .o_getattr_async        = osc_getattr_async,
4083         .o_setattr              = osc_setattr,
4084         .o_setattr_async        = osc_setattr_async,
4085         .o_brw                  = osc_brw,
4086         .o_brw_async            = osc_brw_async,
4087         .o_prep_async_page      = osc_prep_async_page,
4088         .o_reget_short_lock     = osc_reget_short_lock,
4089         .o_release_short_lock   = osc_release_short_lock,
4090         .o_queue_async_io       = osc_queue_async_io,
4091         .o_set_async_flags      = osc_set_async_flags,
4092         .o_queue_group_io       = osc_queue_group_io,
4093         .o_trigger_group_io     = osc_trigger_group_io,
4094         .o_teardown_async_page  = osc_teardown_async_page,
4095         .o_punch                = osc_punch,
4096         .o_sync                 = osc_sync,
4097         .o_enqueue              = osc_enqueue,
4098         .o_match                = osc_match,
4099         .o_change_cbdata        = osc_change_cbdata,
4100         .o_cancel               = osc_cancel,
4101         .o_cancel_unused        = osc_cancel_unused,
4102         .o_join_lru             = osc_join_lru,
4103         .o_iocontrol            = osc_iocontrol,
4104         .o_get_info             = osc_get_info,
4105         .o_set_info_async       = osc_set_info_async,
4106         .o_import_event         = osc_import_event,
4107         .o_llog_init            = osc_llog_init,
4108         .o_llog_finish          = osc_llog_finish,
4109         .o_process_config       = osc_process_config,
4110         .o_register_page_removal_cb = osc_register_page_removal_cb,
4111         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4112         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4113         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4114 };
4115 int __init osc_init(void)
4116 {
4117         struct lprocfs_static_vars lvars = { 0 };
4118         int rc;
4119         ENTRY;
4120
4121         lprocfs_osc_init_vars(&lvars);
4122
4123         request_module("lquota");
4124         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4125         lquota_init(quota_interface);
4126         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4127
4128         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4129                                  LUSTRE_OSC_NAME, NULL);
4130         if (rc) {
4131                 if (quota_interface)
4132                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4133                 RETURN(rc);
4134         }
4135
4136         RETURN(rc);
4137 }
4138
4139 #ifdef __KERNEL__
4140 static void /*__exit*/ osc_exit(void)
4141 {
4142         lquota_exit(quota_interface);
4143         if (quota_interface)
4144                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4145
4146         class_unregister_type(LUSTRE_OSC_NAME);
4147 }
4148
4149 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4150 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4151 MODULE_LICENSE("GPL");
4152
4153 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4154 #endif