Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  *  For testing and management it is treated as an obd_device,
26  *  although * it does not export a full OBD method table (the
27  *  requests are coming * in over the wire, so object target modules
28  *  do not have a full * method table.)
29  *
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_OSC
36
37 #include <libcfs/libcfs.h>
38
39 #ifndef __KERNEL__
40 # include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <lustre_net.h>
45 #include <lustre/lustre_user.h>
46 #include <obd_cksum.h>
47 #include <obd_ost.h>
48 #include <obd_lov.h>
49
50 #ifdef  __CYGWIN__
51 # include <ctype.h>
52 #endif
53
54 #include <lustre_ha.h>
55 #include <lprocfs_status.h>
56 #include <lustre_log.h>
57 #include <lustre_debug.h>
58 #include <lustre_param.h>
59 #include <lustre_cache.h>
60 #include "osc_internal.h"
61
62 static quota_interface_t *quota_interface = NULL;
63 extern quota_interface_t osc_quota_interface;
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
67 int osc_cleanup(struct obd_device *obd);
68
69 /* Pack OSC object metadata for disk storage (LE byte order). */
70 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
71                       struct lov_stripe_md *lsm)
72 {
73         int lmm_size;
74         ENTRY;
75
76         lmm_size = sizeof(**lmmp);
77         if (!lmmp)
78                 RETURN(lmm_size);
79
80         if (*lmmp && !lsm) {
81                 OBD_FREE(*lmmp, lmm_size);
82                 *lmmp = NULL;
83                 RETURN(0);
84         }
85
86         if (!*lmmp) {
87                 OBD_ALLOC(*lmmp, lmm_size);
88                 if (!*lmmp)
89                         RETURN(-ENOMEM);
90         }
91
92         if (lsm) {
93                 LASSERT(lsm->lsm_object_id);
94                 LASSERT(lsm->lsm_object_gr);
95                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
96                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
97         }
98
99         RETURN(lmm_size);
100 }
101
102 /* Unpack OSC object metadata from disk storage (LE byte order). */
103 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
104                         struct lov_mds_md *lmm, int lmm_bytes)
105 {
106         int lsm_size;
107         ENTRY;
108
109         if (lmm != NULL) {
110                 if (lmm_bytes < sizeof (*lmm)) {
111                         CERROR("lov_mds_md too small: %d, need %d\n",
112                                lmm_bytes, (int)sizeof(*lmm));
113                         RETURN(-EINVAL);
114                 }
115                 /* XXX LOV_MAGIC etc check? */
116
117                 if (lmm->lmm_object_id == 0) {
118                         CERROR("lov_mds_md: zero lmm_object_id\n");
119                         RETURN(-EINVAL);
120                 }
121         }
122
123         lsm_size = lov_stripe_md_size(1);
124         if (lsmp == NULL)
125                 RETURN(lsm_size);
126
127         if (*lsmp != NULL && lmm == NULL) {
128                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
129                 OBD_FREE(*lsmp, lsm_size);
130                 *lsmp = NULL;
131                 RETURN(0);
132         }
133
134         if (*lsmp == NULL) {
135                 OBD_ALLOC(*lsmp, lsm_size);
136                 if (*lsmp == NULL)
137                         RETURN(-ENOMEM);
138                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
139                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
140                         OBD_FREE(*lsmp, lsm_size);
141                         RETURN(-ENOMEM);
142                 }
143                 loi_init((*lsmp)->lsm_oinfo[0]);
144         }
145
146         if (lmm != NULL) {
147                 /* XXX zero *lsmp? */
148                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
149                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
150                 LASSERT((*lsmp)->lsm_object_id);
151                 LASSERT((*lsmp)->lsm_object_gr);
152         }
153
154         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
155
156         RETURN(lsm_size);
157 }
158
159 static inline void osc_pack_capa(struct ptlrpc_request *req,
160                                  struct ost_body *body, void *capa)
161 {
162         struct obd_capa *oc = (struct obd_capa *)capa;
163         struct lustre_capa *c;
164
165         if (!capa)
166                 return;
167
168         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
169         LASSERT(c);
170         capa_cpy(c, oc);
171         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
172         DEBUG_CAPA(D_SEC, c, "pack");
173 }
174
175 static inline void osc_pack_req_body(struct ptlrpc_request *req,
176                                      struct obd_info *oinfo)
177 {
178         struct ost_body *body;
179
180         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
181         LASSERT(body);
182
183         body->oa = *oinfo->oi_oa;
184         osc_pack_capa(req, body, oinfo->oi_capa);
185 }
186
187 static inline void osc_set_capa_size(struct ptlrpc_request *req,
188                                      const struct req_msg_field *field,
189                                      struct obd_capa *oc)
190 {
191         if (oc == NULL)
192                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
193         else
194                 /* it is already calculated as sizeof struct obd_capa */
195                 ;
196 }
197
198 static int osc_getattr_interpret(struct ptlrpc_request *req,
199                                  struct osc_async_args *aa, int rc)
200 {
201         struct ost_body *body;
202         ENTRY;
203
204         if (rc != 0)
205                 GOTO(out, rc);
206
207         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
208                                   lustre_swab_ost_body);
209         if (body) {
210                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
211                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
212
213                 /* This should really be sent by the OST */
214                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
215                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
216         } else {
217                 CDEBUG(D_INFO, "can't unpack ost_body\n");
218                 rc = -EPROTO;
219                 aa->aa_oi->oi_oa->o_valid = 0;
220         }
221 out:
222         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
223         RETURN(rc);
224 }
225
226 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
227                              struct ptlrpc_request_set *set)
228 {
229         struct ptlrpc_request *req;
230         struct osc_async_args *aa;
231         int                    rc;
232         ENTRY;
233
234         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
235         if (req == NULL)
236                 RETURN(-ENOMEM);
237
238         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
239         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
240         if (rc) {
241                 ptlrpc_request_free(req);
242                 RETURN(rc);
243         }
244
245         osc_pack_req_body(req, oinfo);
246
247         ptlrpc_request_set_replen(req);
248         req->rq_interpret_reply = osc_getattr_interpret;
249
250         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
251         aa = ptlrpc_req_async_args(req);
252         aa->aa_oi = oinfo;
253
254         ptlrpc_set_add_req(set, req);
255         RETURN(0);
256 }
257
258 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
259 {
260         struct ptlrpc_request *req;
261         struct ost_body       *body;
262         int                    rc;
263         ENTRY;
264
265         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
266         if (req == NULL)
267                 RETURN(-ENOMEM);
268
269         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
270         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
271         if (rc) {
272                 ptlrpc_request_free(req);
273                 RETURN(rc);
274         }
275
276         osc_pack_req_body(req, oinfo);
277
278         ptlrpc_request_set_replen(req);
279  
280         rc = ptlrpc_queue_wait(req);
281         if (rc)
282                 GOTO(out, rc);
283
284         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
285         if (body == NULL)
286                 GOTO(out, rc = -EPROTO);
287
288         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
289         *oinfo->oi_oa = body->oa;
290
291         /* This should really be sent by the OST */
292         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
293         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
294
295         EXIT;
296  out:
297         ptlrpc_req_finished(req);
298         return rc;
299 }
300
301 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
302                        struct obd_trans_info *oti)
303 {
304         struct ptlrpc_request *req;
305         struct ost_body       *body;
306         int                    rc;
307         ENTRY;
308
309         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
310                                         oinfo->oi_oa->o_gr > 0);
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
313         if (req == NULL)
314                 RETURN(-ENOMEM);
315
316         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
317         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
318         if (rc) {
319                 ptlrpc_request_free(req);
320                 RETURN(rc);
321         }
322
323         osc_pack_req_body(req, oinfo);
324
325         ptlrpc_request_set_replen(req);
326  
327
328         rc = ptlrpc_queue_wait(req);
329         if (rc)
330                 GOTO(out, rc);
331
332         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
333         if (body == NULL)
334                 GOTO(out, rc = -EPROTO);
335
336         *oinfo->oi_oa = body->oa;
337
338         EXIT;
339 out:
340         ptlrpc_req_finished(req);
341         RETURN(rc);
342 }
343
344 static int osc_setattr_interpret(struct ptlrpc_request *req,
345                                  struct osc_async_args *aa, int rc)
346 {
347         struct ost_body *body;
348         ENTRY;
349
350         if (rc != 0)
351                 GOTO(out, rc);
352
353         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
354         if (body == NULL)
355                 GOTO(out, rc = -EPROTO);
356
357         *aa->aa_oi->oi_oa = body->oa;
358 out:
359         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
360         RETURN(rc);
361 }
362
363 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
364                              struct obd_trans_info *oti,
365                              struct ptlrpc_request_set *rqset)
366 {
367         struct ptlrpc_request *req;
368         struct osc_async_args *aa;
369         int                    rc;
370         ENTRY;
371
372         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
373         if (req == NULL)
374                 RETURN(-ENOMEM);
375
376         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
377         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
378         if (rc) {
379                 ptlrpc_request_free(req);
380                 RETURN(rc);
381         }
382
383         osc_pack_req_body(req, oinfo);
384
385         ptlrpc_request_set_replen(req);
386  
387         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
388                 LASSERT(oti);
389                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
390         }
391
392         /* do mds to ost setattr asynchronouly */
393         if (!rqset) {
394                 /* Do not wait for response. */
395                 ptlrpcd_add_req(req);
396         } else {
397                 req->rq_interpret_reply = osc_setattr_interpret;
398
399                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
400                 aa = ptlrpc_req_async_args(req);
401                 aa->aa_oi = oinfo;
402
403                 ptlrpc_set_add_req(rqset, req);
404         }
405
406         RETURN(0);
407 }
408
409 int osc_real_create(struct obd_export *exp, struct obdo *oa,
410                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
411 {
412         struct ptlrpc_request *req;
413         struct ost_body       *body;
414         struct lov_stripe_md  *lsm;
415         int                    rc;
416         ENTRY;
417
418         LASSERT(oa);
419         LASSERT(ea);
420
421         lsm = *ea;
422         if (!lsm) {
423                 rc = obd_alloc_memmd(exp, &lsm);
424                 if (rc < 0)
425                         RETURN(rc);
426         }
427
428         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
429         if (req == NULL)
430                 GOTO(out, rc = -ENOMEM);
431
432         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
433         if (rc) {
434                 ptlrpc_request_free(req);
435                 GOTO(out, rc);
436         }
437
438         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
439         LASSERT(body);
440         body->oa = *oa;
441
442         ptlrpc_request_set_replen(req);
443
444         if (oa->o_valid & OBD_MD_FLINLINE) {
445                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
446                         oa->o_flags == OBD_FL_DELORPHAN);
447                 DEBUG_REQ(D_HA, req,
448                           "delorphan from OST integration");
449                 /* Don't resend the delorphan req */
450                 req->rq_no_resend = req->rq_no_delay = 1;
451         }
452
453         rc = ptlrpc_queue_wait(req);
454         if (rc)
455                 GOTO(out_req, rc);
456
457         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
458         if (body == NULL)
459                 GOTO(out_req, rc = -EPROTO);
460
461         *oa = body->oa;
462
463         /* This should really be sent by the OST */
464         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
465         oa->o_valid |= OBD_MD_FLBLKSZ;
466
467         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
468          * have valid lsm_oinfo data structs, so don't go touching that.
469          * This needs to be fixed in a big way.
470          */
471         lsm->lsm_object_id = oa->o_id;
472         lsm->lsm_object_gr = oa->o_gr;
473         *ea = lsm;
474
475         if (oti != NULL) {
476                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
477
478                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
479                         if (!oti->oti_logcookies)
480                                 oti_alloc_cookies(oti, 1);
481                         *oti->oti_logcookies = *obdo_logcookie(oa);
482                 }
483         }
484
485         CDEBUG(D_HA, "transno: "LPD64"\n",
486                lustre_msg_get_transno(req->rq_repmsg));
487 out_req:
488         ptlrpc_req_finished(req);
489 out:
490         if (rc && !*ea)
491                 obd_free_memmd(exp, &lsm);
492         RETURN(rc);
493 }
494
495 static int osc_punch_interpret(struct ptlrpc_request *req,
496                                struct osc_async_args *aa, int rc)
497 {
498         struct ost_body *body;
499         ENTRY;
500
501         if (rc != 0)
502                 GOTO(out, rc);
503
504         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
505         if (body == NULL)
506                 GOTO(out, rc = -EPROTO);
507
508         *aa->aa_oi->oi_oa = body->oa;
509 out:
510         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
511         RETURN(rc);
512 }
513
514 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
515                      struct obd_trans_info *oti,
516                      struct ptlrpc_request_set *rqset)
517 {
518         struct ptlrpc_request *req;
519         struct osc_async_args *aa;
520         struct ost_body       *body;
521         int                    rc;
522         ENTRY;
523
524         if (!oinfo->oi_oa) {
525                 CDEBUG(D_INFO, "oa NULL\n");
526                 RETURN(-EINVAL);
527         }
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541         osc_pack_req_body(req, oinfo);
542
543         /* overload the size and blocks fields in the oa with start/end */
544         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545         LASSERT(body);
546         body->oa.o_size = oinfo->oi_policy.l_extent.start;
547         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
548         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
549         ptlrpc_request_set_replen(req);
550
551
552         req->rq_interpret_reply = osc_punch_interpret;
553         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
554         aa = ptlrpc_req_async_args(req);
555         aa->aa_oi = oinfo;
556         ptlrpc_set_add_req(rqset, req);
557
558         RETURN(0);
559 }
560
561 static int osc_sync(struct obd_export *exp, struct obdo *oa,
562                     struct lov_stripe_md *md, obd_size start, obd_size end,
563                     void *capa)
564 {
565         struct ptlrpc_request *req;
566         struct ost_body       *body;
567         int                    rc;
568         ENTRY;
569
570         if (!oa) {
571                 CDEBUG(D_INFO, "oa NULL\n");
572                 RETURN(-EINVAL);
573         }
574
575         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
576         if (req == NULL)
577                 RETURN(-ENOMEM);
578
579         osc_set_capa_size(req, &RMF_CAPA1, capa);
580         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
581         if (rc) {
582                 ptlrpc_request_free(req);
583                 RETURN(rc);
584         }
585
586         /* overload the size and blocks fields in the oa with start/end */
587         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
588         LASSERT(body);
589         body->oa = *oa;
590         body->oa.o_size = start;
591         body->oa.o_blocks = end;
592         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
593         osc_pack_capa(req, body, capa);
594
595         ptlrpc_request_set_replen(req);
596
597         rc = ptlrpc_queue_wait(req);
598         if (rc)
599                 GOTO(out, rc);
600
601         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602         if (body == NULL)
603                 GOTO(out, rc = -EPROTO);
604
605         *oa = body->oa;
606
607         EXIT;
608  out:
609         ptlrpc_req_finished(req);
610         return rc;
611 }
612
613 /* Find and cancel locally locks matched by @mode in the resource found by
614  * @objid. Found locks are added into @cancel list. Returns the amount of
615  * locks added to @cancels list. */
616 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
617                                    struct list_head *cancels, ldlm_mode_t mode,
618                                    int lock_flags)
619 {
620         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
621         struct ldlm_res_id res_id;
622         struct ldlm_resource *res;
623         int count;
624         ENTRY;
625
626         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
627         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
628         if (res == NULL)
629                 RETURN(0);
630
631         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
632                                            lock_flags, 0, NULL);
633         ldlm_resource_putref(res);
634         RETURN(count);
635 }
636
637 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
638                                  int rc)
639 {
640         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
641
642         atomic_dec(&cli->cl_destroy_in_flight);
643         cfs_waitq_signal(&cli->cl_destroy_waitq);
644         return 0;
645 }
646
647 static int osc_can_send_destroy(struct client_obd *cli)
648 {
649         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
650             cli->cl_max_rpcs_in_flight) {
651                 /* The destroy request can be sent */
652                 return 1;
653         }
654         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
655             cli->cl_max_rpcs_in_flight) {
656                 /*
657                  * The counter has been modified between the two atomic
658                  * operations.
659                  */
660                 cfs_waitq_signal(&cli->cl_destroy_waitq);
661         }
662         return 0;
663 }
664
665 /* Destroy requests can be async always on the client, and we don't even really
666  * care about the return code since the client cannot do anything at all about
667  * a destroy failure.
668  * When the MDS is unlinking a filename, it saves the file objects into a
669  * recovery llog, and these object records are cancelled when the OST reports
670  * they were destroyed and sync'd to disk (i.e. transaction committed).
671  * If the client dies, or the OST is down when the object should be destroyed,
672  * the records are not cancelled, and when the OST reconnects to the MDS next,
673  * it will retrieve the llog unlink logs and then sends the log cancellation
674  * cookies to the MDS after committing destroy transactions. */
675 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
676                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
677                        struct obd_export *md_export)
678 {
679         struct client_obd     *cli = &exp->exp_obd->u.cli;
680         struct ptlrpc_request *req;
681         struct ost_body       *body;
682         CFS_LIST_HEAD(cancels);
683         int rc, count;
684         ENTRY;
685
686         if (!oa) {
687                 CDEBUG(D_INFO, "oa NULL\n");
688                 RETURN(-EINVAL);
689         }
690
691         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
692                                         LDLM_FL_DISCARD_DATA);
693
694         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
695         if (req == NULL) {
696                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
697                 RETURN(-ENOMEM);
698         }
699
700         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
701                                0, &cancels, count);
702         if (rc) {
703                 ptlrpc_request_free(req);
704                 RETURN(rc);
705         }
706
707         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
708         req->rq_interpret_reply = osc_destroy_interpret;
709         ptlrpc_at_set_req_timeout(req);
710
711         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
712                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
713                        sizeof(*oti->oti_logcookies));
714         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
715         LASSERT(body);
716         body->oa = *oa;
717
718         ptlrpc_request_set_replen(req);
719
720         if (!osc_can_send_destroy(cli)) {
721                 struct l_wait_info lwi = { 0 };
722
723                 /*
724                  * Wait until the number of on-going destroy RPCs drops
725                  * under max_rpc_in_flight
726                  */
727                 l_wait_event_exclusive(cli->cl_destroy_waitq,
728                                        osc_can_send_destroy(cli), &lwi);
729         }
730
731         /* Do not wait for response */
732         ptlrpcd_add_req(req);
733         RETURN(0);
734 }
735
736 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
737                                 long writing_bytes)
738 {
739         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
740
741         LASSERT(!(oa->o_valid & bits));
742
743         oa->o_valid |= bits;
744         client_obd_list_lock(&cli->cl_loi_list_lock);
745         oa->o_dirty = cli->cl_dirty;
746         if (cli->cl_dirty > cli->cl_dirty_max) {
747                 CERROR("dirty %lu > dirty_max %lu\n",
748                        cli->cl_dirty, cli->cl_dirty_max);
749                 oa->o_undirty = 0;
750         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
751                 CERROR("dirty %d > system dirty_max %d\n",
752                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
753                 oa->o_undirty = 0;
754         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
755                 CERROR("dirty %lu - dirty_max %lu too big???\n",
756                        cli->cl_dirty, cli->cl_dirty_max);
757                 oa->o_undirty = 0;
758         } else {
759                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
760                                 (cli->cl_max_rpcs_in_flight + 1);
761                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
762         }
763         oa->o_grant = cli->cl_avail_grant;
764         oa->o_dropped = cli->cl_lost_grant;
765         cli->cl_lost_grant = 0;
766         client_obd_list_unlock(&cli->cl_loi_list_lock);
767         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
768                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
769 }
770
771 /* caller must hold loi_list_lock */
772 static void osc_consume_write_grant(struct client_obd *cli,
773                                     struct brw_page *pga)
774 {
775         atomic_inc(&obd_dirty_pages);
776         cli->cl_dirty += CFS_PAGE_SIZE;
777         cli->cl_avail_grant -= CFS_PAGE_SIZE;
778         pga->flag |= OBD_BRW_FROM_GRANT;
779         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
780                CFS_PAGE_SIZE, pga, pga->pg);
781         LASSERT(cli->cl_avail_grant >= 0);
782 }
783
784 /* the companion to osc_consume_write_grant, called when a brw has completed.
785  * must be called with the loi lock held. */
786 static void osc_release_write_grant(struct client_obd *cli,
787                                     struct brw_page *pga, int sent)
788 {
789         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
790         ENTRY;
791
792         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
793                 EXIT;
794                 return;
795         }
796
797         pga->flag &= ~OBD_BRW_FROM_GRANT;
798         atomic_dec(&obd_dirty_pages);
799         cli->cl_dirty -= CFS_PAGE_SIZE;
800         if (!sent) {
801                 cli->cl_lost_grant += CFS_PAGE_SIZE;
802                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
803                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
804         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
805                 /* For short writes we shouldn't count parts of pages that
806                  * span a whole block on the OST side, or our accounting goes
807                  * wrong.  Should match the code in filter_grant_check. */
808                 int offset = pga->off & ~CFS_PAGE_MASK;
809                 int count = pga->count + (offset & (blocksize - 1));
810                 int end = (offset + pga->count) & (blocksize - 1);
811                 if (end)
812                         count += blocksize - end;
813
814                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
815                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
816                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
817                        cli->cl_avail_grant, cli->cl_dirty);
818         }
819
820         EXIT;
821 }
822
823 static unsigned long rpcs_in_flight(struct client_obd *cli)
824 {
825         return cli->cl_r_in_flight + cli->cl_w_in_flight;
826 }
827
828 /* caller must hold loi_list_lock */
829 void osc_wake_cache_waiters(struct client_obd *cli)
830 {
831         struct list_head *l, *tmp;
832         struct osc_cache_waiter *ocw;
833
834         ENTRY;
835         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
836                 /* if we can't dirty more, we must wait until some is written */
837                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
838                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
839                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
840                                "osc max %ld, sys max %d\n", cli->cl_dirty,
841                                cli->cl_dirty_max, obd_max_dirty_pages);
842                         return;
843                 }
844
845                 /* if still dirty cache but no grant wait for pending RPCs that
846                  * may yet return us some grant before doing sync writes */
847                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
848                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
849                                cli->cl_w_in_flight);
850                         return;
851                 }
852
853                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
854                 list_del_init(&ocw->ocw_entry);
855                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
856                         /* no more RPCs in flight to return grant, do sync IO */
857                         ocw->ocw_rc = -EDQUOT;
858                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
859                 } else {
860                         osc_consume_write_grant(cli,
861                                                 &ocw->ocw_oap->oap_brw_page);
862                 }
863
864                 cfs_waitq_signal(&ocw->ocw_waitq);
865         }
866
867         EXIT;
868 }
869
870 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
871 {
872         client_obd_list_lock(&cli->cl_loi_list_lock);
873         cli->cl_avail_grant = ocd->ocd_grant;
874         client_obd_list_unlock(&cli->cl_loi_list_lock);
875
876         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
877                cli->cl_avail_grant, cli->cl_lost_grant);
878         LASSERT(cli->cl_avail_grant >= 0);
879 }
880
881 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
882 {
883         client_obd_list_lock(&cli->cl_loi_list_lock);
884         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
885         if (body->oa.o_valid & OBD_MD_FLGRANT)
886                 cli->cl_avail_grant += body->oa.o_grant;
887         /* waiters are woken in brw_interpret */
888         client_obd_list_unlock(&cli->cl_loi_list_lock);
889 }
890
891 /* We assume that the reason this OSC got a short read is because it read
892  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
893  * via the LOV, and it _knows_ it's reading inside the file, it's just that
894  * this stripe never got written at or beyond this stripe offset yet. */
895 static void handle_short_read(int nob_read, obd_count page_count,
896                               struct brw_page **pga)
897 {
898         char *ptr;
899         int i = 0;
900
901         /* skip bytes read OK */
902         while (nob_read > 0) {
903                 LASSERT (page_count > 0);
904
905                 if (pga[i]->count > nob_read) {
906                         /* EOF inside this page */
907                         ptr = cfs_kmap(pga[i]->pg) +
908                                 (pga[i]->off & ~CFS_PAGE_MASK);
909                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
910                         cfs_kunmap(pga[i]->pg);
911                         page_count--;
912                         i++;
913                         break;
914                 }
915
916                 nob_read -= pga[i]->count;
917                 page_count--;
918                 i++;
919         }
920
921         /* zero remaining pages */
922         while (page_count-- > 0) {
923                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
924                 memset(ptr, 0, pga[i]->count);
925                 cfs_kunmap(pga[i]->pg);
926                 i++;
927         }
928 }
929
930 static int check_write_rcs(struct ptlrpc_request *req,
931                            int requested_nob, int niocount,
932                            obd_count page_count, struct brw_page **pga)
933 {
934         int    *remote_rcs, i;
935
936         /* return error if any niobuf was in error */
937         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
938                                         sizeof(*remote_rcs) * niocount, NULL);
939         if (remote_rcs == NULL) {
940                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
941                 return(-EPROTO);
942         }
943         if (lustre_msg_swabbed(req->rq_repmsg))
944                 for (i = 0; i < niocount; i++)
945                         __swab32s(&remote_rcs[i]);
946
947         for (i = 0; i < niocount; i++) {
948                 if (remote_rcs[i] < 0)
949                         return(remote_rcs[i]);
950
951                 if (remote_rcs[i] != 0) {
952                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
953                                 i, remote_rcs[i], req);
954                         return(-EPROTO);
955                 }
956         }
957
958         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
959                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
960                        requested_nob, req->rq_bulk->bd_nob_transferred);
961                 return(-EPROTO);
962         }
963
964         return (0);
965 }
966
967 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
968 {
969         if (p1->flag != p2->flag) {
970                 unsigned mask = ~OBD_BRW_FROM_GRANT;
971
972                 /* warn if we try to combine flags that we don't know to be
973                  * safe to combine */
974                 if ((p1->flag & mask) != (p2->flag & mask))
975                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
976                                "same brw?\n", p1->flag, p2->flag);
977                 return 0;
978         }
979
980         return (p1->off + p1->count == p2->off);
981 }
982
983 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
984                                    struct brw_page **pga, int opc,
985                                    cksum_type_t cksum_type)
986 {
987         __u32 cksum;
988         int i = 0;
989
990         LASSERT (pg_count > 0);
991         cksum = init_checksum(cksum_type);
992         while (nob > 0 && pg_count > 0) {
993                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
994                 int off = pga[i]->off & ~CFS_PAGE_MASK;
995                 int count = pga[i]->count > nob ? nob : pga[i]->count;
996
997                 /* corrupt the data before we compute the checksum, to
998                  * simulate an OST->client data error */
999                 if (i == 0 && opc == OST_READ &&
1000                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1001                         memcpy(ptr + off, "bad1", min(4, nob));
1002                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1003                 cfs_kunmap(pga[i]->pg);
1004                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1005                                off, cksum);
1006
1007                 nob -= pga[i]->count;
1008                 pg_count--;
1009                 i++;
1010         }
1011         /* For sending we only compute the wrong checksum instead
1012          * of corrupting the data so it is still correct on a redo */
1013         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1014                 cksum++;
1015
1016         return cksum;
1017 }
1018
1019 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1020                                 struct lov_stripe_md *lsm, obd_count page_count,
1021                                 struct brw_page **pga, 
1022                                 struct ptlrpc_request **reqp,
1023                                 struct obd_capa *ocapa)
1024 {
1025         struct ptlrpc_request   *req;
1026         struct ptlrpc_bulk_desc *desc;
1027         struct ost_body         *body;
1028         struct obd_ioobj        *ioobj;
1029         struct niobuf_remote    *niobuf;
1030         int niocount, i, requested_nob, opc, rc;
1031         struct osc_brw_async_args *aa;
1032         struct req_capsule      *pill;
1033         struct brw_page *pg_prev;
1034
1035         ENTRY;
1036         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1037                 RETURN(-ENOMEM); /* Recoverable */
1038         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1039                 RETURN(-EINVAL); /* Fatal */
1040
1041         if ((cmd & OBD_BRW_WRITE) != 0) {
1042                 opc = OST_WRITE;
1043                 req = ptlrpc_request_alloc_pool(cli->cl_import, 
1044                                                 cli->cl_import->imp_rq_pool,
1045                                                 &RQF_OST_BRW);
1046         } else {
1047                 opc = OST_READ;
1048                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1049         }
1050
1051         if (req == NULL)
1052                 RETURN(-ENOMEM);
1053
1054         for (niocount = i = 1; i < page_count; i++) {
1055                 if (!can_merge_pages(pga[i - 1], pga[i]))
1056                         niocount++;
1057         }
1058
1059         pill = &req->rq_pill;
1060         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1061                              niocount * sizeof(*niobuf));
1062         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1063
1064         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1065         if (rc) {
1066                 ptlrpc_request_free(req);
1067                 RETURN(rc);
1068         }
1069         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1070         ptlrpc_at_set_req_timeout(req);
1071
1072         if (opc == OST_WRITE)
1073                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1074                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1075         else
1076                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1077                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1078
1079         if (desc == NULL)
1080                 GOTO(out, rc = -ENOMEM);
1081         /* NB request now owns desc and will free it when it gets freed */
1082
1083         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1084         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1085         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1086         LASSERT(body && ioobj && niobuf);
1087
1088         body->oa = *oa;
1089
1090         obdo_to_ioobj(oa, ioobj);
1091         ioobj->ioo_bufcnt = niocount;
1092         osc_pack_capa(req, body, ocapa);
1093         LASSERT (page_count > 0);
1094         pg_prev = pga[0];
1095         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1096                 struct brw_page *pg = pga[i];
1097
1098                 LASSERT(pg->count > 0);
1099                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1100                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1101                          pg->off, pg->count);
1102 #ifdef __linux__
1103                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1104                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1105                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1106                          i, page_count,
1107                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1108                          pg_prev->pg, page_private(pg_prev->pg),
1109                          pg_prev->pg->index, pg_prev->off);
1110 #else
1111                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1112                          "i %d p_c %u\n", i, page_count);
1113 #endif
1114                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1115                         (pg->flag & OBD_BRW_SRVLOCK));
1116
1117                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1118                                       pg->count);
1119                 requested_nob += pg->count;
1120
1121                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1122                         niobuf--;
1123                         niobuf->len += pg->count;
1124                 } else {
1125                         niobuf->offset = pg->off;
1126                         niobuf->len    = pg->count;
1127                         niobuf->flags  = pg->flag;
1128                 }
1129                 pg_prev = pg;
1130         }
1131
1132         LASSERTF((void *)(niobuf - niocount) ==
1133                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1134                                niocount * sizeof(*niobuf)),
1135                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
1136                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
1137                 (void *)(niobuf - niocount));
1138
1139         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1140
1141         /* size[REQ_REC_OFF] still sizeof (*body) */
1142         if (opc == OST_WRITE) {
1143                 if (unlikely(cli->cl_checksum) &&
1144                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1145                         /* store cl_cksum_type in a local variable since
1146                          * it can be changed via lprocfs */
1147                         cksum_type_t cksum_type = cli->cl_cksum_type;
1148
1149                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1150                                 oa->o_flags = body->oa.o_flags = 0;
1151                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1152                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1153                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1154                                                              page_count, pga,
1155                                                              OST_WRITE,
1156                                                              cksum_type);
1157                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1158                                body->oa.o_cksum);
1159                         /* save this in 'oa', too, for later checking */
1160                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1161                         oa->o_flags |= cksum_type_pack(cksum_type);
1162                 } else {
1163                         /* clear out the checksum flag, in case this is a
1164                          * resend but cl_checksum is no longer set. b=11238 */
1165                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1166                 }
1167                 oa->o_cksum = body->oa.o_cksum;
1168                 /* 1 RC per niobuf */
1169                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1170                                      sizeof(__u32) * niocount);
1171         } else {
1172                 if (unlikely(cli->cl_checksum) &&
1173                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1174                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1175                                 body->oa.o_flags = 0;
1176                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1177                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1178                 }
1179                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1180                 /* 1 RC for the whole I/O */
1181         }
1182         ptlrpc_request_set_replen(req);
1183
1184         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1185         aa = ptlrpc_req_async_args(req);
1186         aa->aa_oa = oa;
1187         aa->aa_requested_nob = requested_nob;
1188         aa->aa_nio_count = niocount;
1189         aa->aa_page_count = page_count;
1190         aa->aa_resends = 0;
1191         aa->aa_ppga = pga;
1192         aa->aa_cli = cli;
1193         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1194
1195         *reqp = req;
1196         RETURN(0);
1197
1198  out:
1199         ptlrpc_req_finished(req);
1200         RETURN(rc);
1201 }
1202
1203 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1204                                 __u32 client_cksum, __u32 server_cksum, int nob,
1205                                 obd_count page_count, struct brw_page **pga,
1206                                 cksum_type_t client_cksum_type)
1207 {
1208         __u32 new_cksum;
1209         char *msg;
1210         cksum_type_t cksum_type;
1211
1212         if (server_cksum == client_cksum) {
1213                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1214                 return 0;
1215         }
1216
1217         if (oa->o_valid & OBD_MD_FLFLAGS)
1218                 cksum_type = cksum_type_unpack(oa->o_flags);
1219         else
1220                 cksum_type = OBD_CKSUM_CRC32;
1221
1222         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1223                                       cksum_type);
1224
1225         if (cksum_type != client_cksum_type)
1226                 msg = "the server did not use the checksum type specified in "
1227                       "the original request - likely a protocol problem";
1228         else if (new_cksum == server_cksum)
1229                 msg = "changed on the client after we checksummed it - "
1230                       "likely false positive due to mmap IO (bug 11742)";
1231         else if (new_cksum == client_cksum)
1232                 msg = "changed in transit before arrival at OST";
1233         else
1234                 msg = "changed in transit AND doesn't match the original - "
1235                       "likely false positive due to mmap IO (bug 11742)";
1236
1237         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1238                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1239                            "["LPU64"-"LPU64"]\n",
1240                            msg, libcfs_nid2str(peer->nid),
1241                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1242                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
1243                                                         (__u64)0,
1244                            oa->o_id,
1245                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1246                            pga[0]->off,
1247                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1248         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1249                "client csum now %x\n", client_cksum, client_cksum_type,
1250                server_cksum, cksum_type, new_cksum);
1251         return 1;        
1252 }
1253
1254 /* Note rc enters this function as number of bytes transferred */
1255 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1256 {
1257         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1258         const lnet_process_id_t *peer =
1259                         &req->rq_import->imp_connection->c_peer;
1260         struct client_obd *cli = aa->aa_cli;
1261         struct ost_body *body;
1262         __u32 client_cksum = 0;
1263         ENTRY;
1264
1265         if (rc < 0 && rc != -EDQUOT)
1266                 RETURN(rc);
1267
1268         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1269         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1270                                   lustre_swab_ost_body);
1271         if (body == NULL) {
1272                 CDEBUG(D_INFO, "Can't unpack body\n");
1273                 RETURN(-EPROTO);
1274         }
1275
1276         /* set/clear over quota flag for a uid/gid */
1277         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1278             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1279                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1280                              body->oa.o_gid, body->oa.o_valid,
1281                              body->oa.o_flags);
1282
1283         if (rc < 0)
1284                 RETURN(rc);
1285
1286         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1287                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1288
1289         osc_update_grant(cli, body);
1290
1291         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1292                 if (rc > 0) {
1293                         CERROR("Unexpected +ve rc %d\n", rc);
1294                         RETURN(-EPROTO);
1295                 }
1296                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1297
1298                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1299                     check_write_checksum(&body->oa, peer, client_cksum,
1300                                          body->oa.o_cksum, aa->aa_requested_nob,
1301                                          aa->aa_page_count, aa->aa_ppga,
1302                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1303                         RETURN(-EAGAIN);
1304
1305                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1306                         RETURN(-EAGAIN);
1307
1308                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1309                                      aa->aa_page_count, aa->aa_ppga);
1310                 GOTO(out, rc);
1311         }
1312
1313         /* The rest of this function executes only for OST_READs */
1314         if (rc > aa->aa_requested_nob) {
1315                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1316                        aa->aa_requested_nob);
1317                 RETURN(-EPROTO);
1318         }
1319
1320         if (rc != req->rq_bulk->bd_nob_transferred) {
1321                 CERROR ("Unexpected rc %d (%d transferred)\n",
1322                         rc, req->rq_bulk->bd_nob_transferred);
1323                 return (-EPROTO);
1324         }
1325
1326         if (rc < aa->aa_requested_nob)
1327                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1328
1329         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1330                                          aa->aa_ppga))
1331                 GOTO(out, rc = -EAGAIN);
1332
1333         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1334                 static int cksum_counter;
1335                 __u32      server_cksum = body->oa.o_cksum;
1336                 char      *via;
1337                 char      *router;
1338                 cksum_type_t cksum_type;
1339
1340                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1341                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1342                 else
1343                         cksum_type = OBD_CKSUM_CRC32;
1344                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1345                                                  aa->aa_ppga, OST_READ,
1346                                                  cksum_type);
1347
1348                 if (peer->nid == req->rq_bulk->bd_sender) {
1349                         via = router = "";
1350                 } else {
1351                         via = " via ";
1352                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1353                 }
1354
1355                 if (server_cksum == ~0 && rc > 0) {
1356                         CERROR("Protocol error: server %s set the 'checksum' "
1357                                "bit, but didn't send a checksum.  Not fatal, "
1358                                "but please tell CFS.\n",
1359                                libcfs_nid2str(peer->nid));
1360                 } else if (server_cksum != client_cksum) {
1361                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1362                                            "%s%s%s inum "LPU64"/"LPU64" object "
1363                                            LPU64"/"LPU64" extent "
1364                                            "["LPU64"-"LPU64"]\n",
1365                                            req->rq_import->imp_obd->obd_name,
1366                                            libcfs_nid2str(peer->nid),
1367                                            via, router,
1368                                            body->oa.o_valid & OBD_MD_FLFID ?
1369                                                 body->oa.o_fid : (__u64)0,
1370                                            body->oa.o_valid & OBD_MD_FLFID ?
1371                                                 body->oa.o_generation :(__u64)0,
1372                                            body->oa.o_id,
1373                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1374                                                 body->oa.o_gr : (__u64)0,
1375                                            aa->aa_ppga[0]->off,
1376                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1377                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1378                                                                         1);
1379                         CERROR("client %x, server %x, cksum_type %x\n",
1380                                client_cksum, server_cksum, cksum_type);
1381                         cksum_counter = 0;
1382                         aa->aa_oa->o_cksum = client_cksum;
1383                         rc = -EAGAIN;
1384                 } else {
1385                         cksum_counter++;
1386                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1387                         rc = 0;
1388                 }
1389         } else if (unlikely(client_cksum)) {
1390                 static int cksum_missed;
1391
1392                 cksum_missed++;
1393                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1394                         CERROR("Checksum %u requested from %s but not sent\n",
1395                                cksum_missed, libcfs_nid2str(peer->nid));
1396         } else {
1397                 rc = 0;
1398         }
1399 out:
1400         if (rc >= 0)
1401                 *aa->aa_oa = body->oa;
1402
1403         RETURN(rc);
1404 }
1405
1406 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1407                             struct lov_stripe_md *lsm,
1408                             obd_count page_count, struct brw_page **pga,
1409                             struct obd_capa *ocapa)
1410 {
1411         struct ptlrpc_request *req;
1412         int                    rc;
1413         cfs_waitq_t            waitq;
1414         int                    resends = 0;
1415         struct l_wait_info     lwi;
1416
1417         ENTRY;
1418
1419         cfs_waitq_init(&waitq);
1420
1421 restart_bulk:
1422         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1423                                   page_count, pga, &req, ocapa);
1424         if (rc != 0)
1425                 return (rc);
1426
1427         rc = ptlrpc_queue_wait(req);
1428
1429         if (rc == -ETIMEDOUT && req->rq_resend) {
1430                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1431                 ptlrpc_req_finished(req);
1432                 goto restart_bulk;
1433         }
1434
1435         rc = osc_brw_fini_request(req, rc);
1436
1437         ptlrpc_req_finished(req);
1438         if (osc_recoverable_error(rc)) {
1439                 resends++;
1440                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1441                         CERROR("too many resend retries, returning error\n");
1442                         RETURN(-EIO);
1443                 }
1444
1445                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1446                 l_wait_event(waitq, 0, &lwi);
1447
1448                 goto restart_bulk;
1449         }
1450         
1451         RETURN (rc);
1452 }
1453
1454 int osc_brw_redo_request(struct ptlrpc_request *request,
1455                          struct osc_brw_async_args *aa)
1456 {
1457         struct ptlrpc_request *new_req;
1458         struct ptlrpc_request_set *set = request->rq_set;
1459         struct osc_brw_async_args *new_aa;
1460         struct osc_async_page *oap;
1461         int rc = 0;
1462         ENTRY;
1463
1464         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1465                 CERROR("too many resend retries, returning error\n");
1466                 RETURN(-EIO);
1467         }
1468
1469         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1470 /*
1471         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1472         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1473                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1474                                            REQ_REC_OFF + 3);
1475 */
1476         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1477                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1478                                   aa->aa_cli, aa->aa_oa,
1479                                   NULL /* lsm unused by osc currently */,
1480                                   aa->aa_page_count, aa->aa_ppga, 
1481                                   &new_req, NULL /* ocapa */);
1482         if (rc)
1483                 RETURN(rc);
1484
1485         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1486
1487         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1488                 if (oap->oap_request != NULL) {
1489                         LASSERTF(request == oap->oap_request,
1490                                  "request %p != oap_request %p\n",
1491                                  request, oap->oap_request);
1492                         if (oap->oap_interrupted) {
1493                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1494                                 ptlrpc_req_finished(new_req);
1495                                 RETURN(-EINTR);
1496                         }
1497                 }
1498         }
1499         /* New request takes over pga and oaps from old request.
1500          * Note that copying a list_head doesn't work, need to move it... */
1501         aa->aa_resends++;
1502         new_req->rq_interpret_reply = request->rq_interpret_reply;
1503         new_req->rq_async_args = request->rq_async_args;
1504         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1505
1506         new_aa = ptlrpc_req_async_args(new_req);
1507
1508         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1509         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1510         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1511
1512         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1513                 if (oap->oap_request) {
1514                         ptlrpc_req_finished(oap->oap_request);
1515                         oap->oap_request = ptlrpc_request_addref(new_req);
1516                 }
1517         }
1518
1519         /* use ptlrpc_set_add_req is safe because interpret functions work 
1520          * in check_set context. only one way exist with access to request 
1521          * from different thread got -EINTR - this way protected with 
1522          * cl_loi_list_lock */
1523         ptlrpc_set_add_req(set, new_req);
1524
1525         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1526
1527         DEBUG_REQ(D_INFO, new_req, "new request");
1528         RETURN(0);
1529 }
1530
1531 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1532                           struct lov_stripe_md *lsm, obd_count page_count,
1533                           struct brw_page **pga, struct ptlrpc_request_set *set,
1534                           struct obd_capa *ocapa)
1535 {
1536         struct ptlrpc_request     *req;
1537         struct client_obd         *cli = &exp->exp_obd->u.cli;
1538         int                        rc, i;
1539         struct osc_brw_async_args *aa;
1540         ENTRY;
1541
1542         /* Consume write credits even if doing a sync write -
1543          * otherwise we may run out of space on OST due to grant. */
1544         if (cmd == OBD_BRW_WRITE) {
1545                 spin_lock(&cli->cl_loi_list_lock);
1546                 for (i = 0; i < page_count; i++) {
1547                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1548                                 osc_consume_write_grant(cli, pga[i]);
1549                 }
1550                 spin_unlock(&cli->cl_loi_list_lock);
1551         }
1552
1553         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1554                                   &req, ocapa);
1555
1556         aa = ptlrpc_req_async_args(req);
1557         if (cmd == OBD_BRW_READ) {
1558                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1559                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1560                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
1561         } else {
1562                  lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1563                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1564                                  cli->cl_w_in_flight);
1565                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
1566         }
1567
1568         LASSERT(list_empty(&aa->aa_oaps));
1569         if (rc == 0) {
1570                 req->rq_interpret_reply = brw_interpret;
1571                 ptlrpc_set_add_req(set, req);
1572                 client_obd_list_lock(&cli->cl_loi_list_lock);
1573                 if (cmd == OBD_BRW_READ)
1574                         cli->cl_r_in_flight++;
1575                 else
1576                         cli->cl_w_in_flight++;
1577                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1578                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1579         } else if (cmd == OBD_BRW_WRITE) {
1580                 client_obd_list_lock(&cli->cl_loi_list_lock);
1581                 for (i = 0; i < page_count; i++)
1582                         osc_release_write_grant(cli, pga[i], 0);
1583                 osc_wake_cache_waiters(cli);
1584                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1585         }
1586         RETURN (rc);
1587 }
1588
1589 /*
1590  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1591  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1592  * fine for our small page arrays and doesn't require allocation.  its an
1593  * insertion sort that swaps elements that are strides apart, shrinking the
1594  * stride down until its '1' and the array is sorted.
1595  */
1596 static void sort_brw_pages(struct brw_page **array, int num)
1597 {
1598         int stride, i, j;
1599         struct brw_page *tmp;
1600
1601         if (num == 1)
1602                 return;
1603         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1604                 ;
1605
1606         do {
1607                 stride /= 3;
1608                 for (i = stride ; i < num ; i++) {
1609                         tmp = array[i];
1610                         j = i;
1611                         while (j >= stride && array[j - stride]->off > tmp->off) {
1612                                 array[j] = array[j - stride];
1613                                 j -= stride;
1614                         }
1615                         array[j] = tmp;
1616                 }
1617         } while (stride > 1);
1618 }
1619
1620 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1621 {
1622         int count = 1;
1623         int offset;
1624         int i = 0;
1625
1626         LASSERT (pages > 0);
1627         offset = pg[i]->off & ~CFS_PAGE_MASK;
1628
1629         for (;;) {
1630                 pages--;
1631                 if (pages == 0)         /* that's all */
1632                         return count;
1633
1634                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1635                         return count;   /* doesn't end on page boundary */
1636
1637                 i++;
1638                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1639                 if (offset != 0)        /* doesn't start on page boundary */
1640                         return count;
1641
1642                 count++;
1643         }
1644 }
1645
1646 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1647 {
1648         struct brw_page **ppga;
1649         int i;
1650
1651         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1652         if (ppga == NULL)
1653                 return NULL;
1654
1655         for (i = 0; i < count; i++)
1656                 ppga[i] = pga + i;
1657         return ppga;
1658 }
1659
1660 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1661 {
1662         LASSERT(ppga != NULL);
1663         OBD_FREE(ppga, sizeof(*ppga) * count);
1664 }
1665
1666 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1667                    obd_count page_count, struct brw_page *pga,
1668                    struct obd_trans_info *oti)
1669 {
1670         struct obdo *saved_oa = NULL;
1671         struct brw_page **ppga, **orig;
1672         struct obd_import *imp = class_exp2cliimp(exp);
1673         struct client_obd *cli = &imp->imp_obd->u.cli;
1674         int rc, page_count_orig;
1675         ENTRY;
1676
1677         if (cmd & OBD_BRW_CHECK) {
1678                 /* The caller just wants to know if there's a chance that this
1679                  * I/O can succeed */
1680
1681                 if (imp == NULL || imp->imp_invalid)
1682                         RETURN(-EIO);
1683                 RETURN(0);
1684         }
1685
1686         /* test_brw with a failed create can trip this, maybe others. */
1687         LASSERT(cli->cl_max_pages_per_rpc);
1688
1689         rc = 0;
1690
1691         orig = ppga = osc_build_ppga(pga, page_count);
1692         if (ppga == NULL)
1693                 RETURN(-ENOMEM);
1694         page_count_orig = page_count;
1695
1696         sort_brw_pages(ppga, page_count);
1697         while (page_count) {
1698                 obd_count pages_per_brw;
1699
1700                 if (page_count > cli->cl_max_pages_per_rpc)
1701                         pages_per_brw = cli->cl_max_pages_per_rpc;
1702                 else
1703                         pages_per_brw = page_count;
1704
1705                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1706
1707                 if (saved_oa != NULL) {
1708                         /* restore previously saved oa */
1709                         *oinfo->oi_oa = *saved_oa;
1710                 } else if (page_count > pages_per_brw) {
1711                         /* save a copy of oa (brw will clobber it) */
1712                         OBDO_ALLOC(saved_oa);
1713                         if (saved_oa == NULL)
1714                                 GOTO(out, rc = -ENOMEM);
1715                         *saved_oa = *oinfo->oi_oa;
1716                 }
1717
1718                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1719                                       pages_per_brw, ppga, oinfo->oi_capa);
1720
1721                 if (rc != 0)
1722                         break;
1723
1724                 page_count -= pages_per_brw;
1725                 ppga += pages_per_brw;
1726         }
1727
1728 out:
1729         osc_release_ppga(orig, page_count_orig);
1730
1731         if (saved_oa != NULL)
1732                 OBDO_FREE(saved_oa);
1733
1734         RETURN(rc);
1735 }
1736
1737 static int osc_brw_async(int cmd, struct obd_export *exp,
1738                          struct obd_info *oinfo, obd_count page_count,
1739                          struct brw_page *pga, struct obd_trans_info *oti,
1740                          struct ptlrpc_request_set *set)
1741 {
1742         struct brw_page **ppga, **orig;
1743         struct client_obd *cli = &exp->exp_obd->u.cli;
1744         int page_count_orig;
1745         int rc = 0;
1746         ENTRY;
1747
1748         if (cmd & OBD_BRW_CHECK) {
1749                 struct obd_import *imp = class_exp2cliimp(exp);
1750                 /* The caller just wants to know if there's a chance that this
1751                  * I/O can succeed */
1752
1753                 if (imp == NULL || imp->imp_invalid)
1754                         RETURN(-EIO);
1755                 RETURN(0);
1756         }
1757
1758         orig = ppga = osc_build_ppga(pga, page_count);
1759         if (ppga == NULL)
1760                 RETURN(-ENOMEM);
1761         page_count_orig = page_count;
1762
1763         sort_brw_pages(ppga, page_count);
1764         while (page_count) {
1765                 struct brw_page **copy;
1766                 obd_count pages_per_brw;
1767
1768                 pages_per_brw = min_t(obd_count, page_count,
1769                                       cli->cl_max_pages_per_rpc);
1770
1771                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1772
1773                 /* use ppga only if single RPC is going to fly */
1774                 if (pages_per_brw != page_count_orig || ppga != orig) {
1775                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1776                         if (copy == NULL)
1777                                 GOTO(out, rc = -ENOMEM);
1778                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1779                 } else
1780                         copy = ppga;
1781
1782                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1783                                     pages_per_brw, copy, set, oinfo->oi_capa);
1784
1785                 if (rc != 0) {
1786                         if (copy != ppga)
1787                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1788                         break;
1789                 }
1790                 if (copy == orig) {
1791                         /* we passed it to async_internal() which is
1792                          * now responsible for releasing memory */
1793                         orig = NULL;
1794                 }
1795
1796                 page_count -= pages_per_brw;
1797                 ppga += pages_per_brw;
1798         }
1799 out:
1800         if (orig)
1801                 osc_release_ppga(orig, page_count_orig);
1802         RETURN(rc);
1803 }
1804
1805 static void osc_check_rpcs(struct client_obd *cli);
1806
1807 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1808  * the dirty accounting.  Writeback completes or truncate happens before
1809  * writing starts.  Must be called with the loi lock held. */
1810 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1811                            int sent)
1812 {
1813         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1814 }
1815
1816
1817 /* This maintains the lists of pending pages to read/write for a given object
1818  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1819  * to quickly find objects that are ready to send an RPC. */
1820 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1821                          int cmd)
1822 {
1823         int optimal;
1824         ENTRY;
1825
1826         if (lop->lop_num_pending == 0)
1827                 RETURN(0);
1828
1829         /* if we have an invalid import we want to drain the queued pages
1830          * by forcing them through rpcs that immediately fail and complete
1831          * the pages.  recovery relies on this to empty the queued pages
1832          * before canceling the locks and evicting down the llite pages */
1833         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1834                 RETURN(1);
1835
1836         /* stream rpcs in queue order as long as as there is an urgent page
1837          * queued.  this is our cheap solution for good batching in the case
1838          * where writepage marks some random page in the middle of the file
1839          * as urgent because of, say, memory pressure */
1840         if (!list_empty(&lop->lop_urgent)) {
1841                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1842                 RETURN(1);
1843         }
1844         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1845         optimal = cli->cl_max_pages_per_rpc;
1846         if (cmd & OBD_BRW_WRITE) {
1847                 /* trigger a write rpc stream as long as there are dirtiers
1848                  * waiting for space.  as they're waiting, they're not going to
1849                  * create more pages to coallesce with what's waiting.. */
1850                 if (!list_empty(&cli->cl_cache_waiters)) {
1851                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1852                         RETURN(1);
1853                 }
1854                 /* +16 to avoid triggering rpcs that would want to include pages
1855                  * that are being queued but which can't be made ready until
1856                  * the queuer finishes with the page. this is a wart for
1857                  * llite::commit_write() */
1858                 optimal += 16;
1859         }
1860         if (lop->lop_num_pending >= optimal)
1861                 RETURN(1);
1862
1863         RETURN(0);
1864 }
1865
1866 static void on_list(struct list_head *item, struct list_head *list,
1867                     int should_be_on)
1868 {
1869         if (list_empty(item) && should_be_on)
1870                 list_add_tail(item, list);
1871         else if (!list_empty(item) && !should_be_on)
1872                 list_del_init(item);
1873 }
1874
1875 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1876  * can find pages to build into rpcs quickly */
1877 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1878 {
1879         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1880                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1881                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1882
1883         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1884                 loi->loi_write_lop.lop_num_pending);
1885
1886         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1887                 loi->loi_read_lop.lop_num_pending);
1888 }
1889
1890 static void lop_update_pending(struct client_obd *cli,
1891                                struct loi_oap_pages *lop, int cmd, int delta)
1892 {
1893         lop->lop_num_pending += delta;
1894         if (cmd & OBD_BRW_WRITE)
1895                 cli->cl_pending_w_pages += delta;
1896         else
1897                 cli->cl_pending_r_pages += delta;
1898 }
1899
1900 /* this is called when a sync waiter receives an interruption.  Its job is to
1901  * get the caller woken as soon as possible.  If its page hasn't been put in an
1902  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1903  * desiring interruption which will forcefully complete the rpc once the rpc
1904  * has timed out */
1905 static void osc_occ_interrupted(struct oig_callback_context *occ)
1906 {
1907         struct osc_async_page *oap;
1908         struct loi_oap_pages *lop;
1909         struct lov_oinfo *loi;
1910         ENTRY;
1911
1912         /* XXX member_of() */
1913         oap = list_entry(occ, struct osc_async_page, oap_occ);
1914
1915         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1916
1917         oap->oap_interrupted = 1;
1918
1919         /* ok, it's been put in an rpc. only one oap gets a request reference */
1920         if (oap->oap_request != NULL) {
1921                 ptlrpc_mark_interrupted(oap->oap_request);
1922                 ptlrpcd_wake(oap->oap_request);
1923                 GOTO(unlock, 0);
1924         }
1925
1926         /* we don't get interruption callbacks until osc_trigger_group_io()
1927          * has been called and put the sync oaps in the pending/urgent lists.*/
1928         if (!list_empty(&oap->oap_pending_item)) {
1929                 list_del_init(&oap->oap_pending_item);
1930                 list_del_init(&oap->oap_urgent_item);
1931
1932                 loi = oap->oap_loi;
1933                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1934                         &loi->loi_write_lop : &loi->loi_read_lop;
1935                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1936                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1937
1938                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1939                 oap->oap_oig = NULL;
1940         }
1941
1942 unlock:
1943         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1944 }
1945
1946 /* this is trying to propogate async writeback errors back up to the
1947  * application.  As an async write fails we record the error code for later if
1948  * the app does an fsync.  As long as errors persist we force future rpcs to be
1949  * sync so that the app can get a sync error and break the cycle of queueing
1950  * pages for which writeback will fail. */
1951 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1952                            int rc)
1953 {
1954         if (rc) {
1955                 if (!ar->ar_rc)
1956                         ar->ar_rc = rc;
1957
1958                 ar->ar_force_sync = 1;
1959                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1960                 return;
1961
1962         }
1963
1964         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1965                 ar->ar_force_sync = 0;
1966 }
1967
1968 static void osc_oap_to_pending(struct osc_async_page *oap)
1969 {
1970         struct loi_oap_pages *lop;
1971
1972         if (oap->oap_cmd & OBD_BRW_WRITE)
1973                 lop = &oap->oap_loi->loi_write_lop;
1974         else
1975                 lop = &oap->oap_loi->loi_read_lop;
1976
1977         if (oap->oap_async_flags & ASYNC_URGENT)
1978                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1979         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1980         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1981 }
1982
1983 /* this must be called holding the loi list lock to give coverage to exit_cache,
1984  * async_flag maintenance, and oap_request */
1985 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1986                               struct osc_async_page *oap, int sent, int rc)
1987 {
1988         __u64 xid = 0;
1989
1990         ENTRY;
1991         if (oap->oap_request != NULL) {
1992                 xid = ptlrpc_req_xid(oap->oap_request);
1993                 ptlrpc_req_finished(oap->oap_request);
1994                 oap->oap_request = NULL;
1995         }
1996
1997         oap->oap_async_flags = 0;
1998         oap->oap_interrupted = 0;
1999
2000         if (oap->oap_cmd & OBD_BRW_WRITE) {
2001                 osc_process_ar(&cli->cl_ar, xid, rc);
2002                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2003         }
2004
2005         if (rc == 0 && oa != NULL) {
2006                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2007                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2008                 if (oa->o_valid & OBD_MD_FLMTIME)
2009                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2010                 if (oa->o_valid & OBD_MD_FLATIME)
2011                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2012                 if (oa->o_valid & OBD_MD_FLCTIME)
2013                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2014         }
2015
2016         if (oap->oap_oig) {
2017                 osc_exit_cache(cli, oap, sent);
2018                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2019                 oap->oap_oig = NULL;
2020                 EXIT;
2021                 return;
2022         }
2023
2024         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2025                                                 oap->oap_cmd, oa, rc);
2026
2027         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2028          * I/O on the page could start, but OSC calls it under lock
2029          * and thus we can add oap back to pending safely */
2030         if (rc)
2031                 /* upper layer wants to leave the page on pending queue */
2032                 osc_oap_to_pending(oap);
2033         else
2034                 osc_exit_cache(cli, oap, sent);
2035         EXIT;
2036 }
2037
2038 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2039 {
2040         struct osc_brw_async_args *aa = data;
2041         struct client_obd *cli;
2042         ENTRY;
2043
2044         rc = osc_brw_fini_request(req, rc);
2045         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2046         if (osc_recoverable_error(rc)) {
2047                 rc = osc_brw_redo_request(req, aa);
2048                 if (rc == 0)
2049                         RETURN(0);
2050         }
2051
2052         cli = aa->aa_cli;
2053
2054         client_obd_list_lock(&cli->cl_loi_list_lock);
2055
2056         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2057          * is called so we know whether to go to sync BRWs or wait for more
2058          * RPCs to complete */
2059         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2060                 cli->cl_w_in_flight--;
2061         else
2062                 cli->cl_r_in_flight--;
2063
2064         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2065                 struct osc_async_page *oap, *tmp;
2066                 /* the caller may re-use the oap after the completion call so
2067                  * we need to clean it up a little */
2068                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2069                         list_del_init(&oap->oap_rpc_item);
2070                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2071                 }
2072                 OBDO_FREE(aa->aa_oa);
2073         } else { /* from async_internal() */
2074                 int i;
2075                 for (i = 0; i < aa->aa_page_count; i++)
2076                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2077         }
2078         osc_wake_cache_waiters(cli);
2079         osc_check_rpcs(cli);
2080         client_obd_list_unlock(&cli->cl_loi_list_lock);
2081
2082         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2083         RETURN(rc);
2084 }
2085
2086 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2087                                             struct list_head *rpc_list,
2088                                             int page_count, int cmd)
2089 {
2090         struct ptlrpc_request *req;
2091         struct brw_page **pga = NULL;
2092         struct osc_brw_async_args *aa;
2093         struct obdo *oa = NULL;
2094         struct obd_async_page_ops *ops = NULL;
2095         void *caller_data = NULL;
2096         struct obd_capa *ocapa;
2097         struct osc_async_page *oap;
2098         int i, rc;
2099
2100         ENTRY;
2101         LASSERT(!list_empty(rpc_list));
2102
2103         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2104         if (pga == NULL)
2105                 RETURN(ERR_PTR(-ENOMEM));
2106
2107         OBDO_ALLOC(oa);
2108         if (oa == NULL)
2109                 GOTO(out, req = ERR_PTR(-ENOMEM));
2110
2111         i = 0;
2112         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2113                 if (ops == NULL) {
2114                         ops = oap->oap_caller_ops;
2115                         caller_data = oap->oap_caller_data;
2116                 }
2117                 pga[i] = &oap->oap_brw_page;
2118                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2119                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2120                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2121                 i++;
2122         }
2123
2124         /* always get the data for the obdo for the rpc */
2125         LASSERT(ops != NULL);
2126         ops->ap_fill_obdo(caller_data, cmd, oa);
2127         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2128
2129         sort_brw_pages(pga, page_count);
2130         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2131                                   pga, &req, ocapa);
2132         capa_put(ocapa);
2133         if (rc != 0) {
2134                 CERROR("prep_req failed: %d\n", rc);
2135                 GOTO(out, req = ERR_PTR(rc));
2136         }
2137
2138         /* Need to update the timestamps after the request is built in case
2139          * we race with setattr (locally or in queue at OST).  If OST gets
2140          * later setattr before earlier BRW (as determined by the request xid),
2141          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2142          * way to do this in a single call.  bug 10150 */
2143         ops->ap_update_obdo(caller_data, cmd, oa,
2144                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2145
2146         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2147         aa = ptlrpc_req_async_args(req);
2148         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2149         list_splice(rpc_list, &aa->aa_oaps);
2150         CFS_INIT_LIST_HEAD(rpc_list);
2151
2152 out:
2153         if (IS_ERR(req)) {
2154                 if (oa)
2155                         OBDO_FREE(oa);
2156                 if (pga)
2157                         OBD_FREE(pga, sizeof(*pga) * page_count);
2158         }
2159         RETURN(req);
2160 }
2161
2162 /* the loi lock is held across this function but it's allowed to release
2163  * and reacquire it during its work */
2164 /**
2165  * prepare pages for ASYNC io and put pages in send queue.
2166  *
2167  * \param cli -
2168  * \param loi -
2169  * \param cmd - OBD_BRW_* macroses
2170  * \param lop - pending pages
2171  *
2172  * \return zero if pages successfully add to send queue.
2173  * \return not zere if error occurring.
2174  */
2175 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2176                             int cmd, struct loi_oap_pages *lop)
2177 {
2178         struct ptlrpc_request *req;
2179         obd_count page_count = 0;
2180         struct osc_async_page *oap = NULL, *tmp;
2181         struct osc_brw_async_args *aa;
2182         struct obd_async_page_ops *ops;
2183         CFS_LIST_HEAD(rpc_list);
2184         unsigned int ending_offset;
2185         unsigned  starting_offset = 0;
2186         int srvlock = 0;
2187         ENTRY;
2188
2189         /* first we find the pages we're allowed to work with */
2190         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2191                                  oap_pending_item) {
2192                 ops = oap->oap_caller_ops;
2193
2194                 LASSERT(oap->oap_magic == OAP_MAGIC);
2195
2196                 if (page_count != 0 &&
2197                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2198                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2199                                " oap %p, page %p, srvlock %u\n",
2200                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2201                         break;
2202                 }
2203                 /* in llite being 'ready' equates to the page being locked
2204                  * until completion unlocks it.  commit_write submits a page
2205                  * as not ready because its unlock will happen unconditionally
2206                  * as the call returns.  if we race with commit_write giving
2207                  * us that page we dont' want to create a hole in the page
2208                  * stream, so we stop and leave the rpc to be fired by
2209                  * another dirtier or kupdated interval (the not ready page
2210                  * will still be on the dirty list).  we could call in
2211                  * at the end of ll_file_write to process the queue again. */
2212                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2213                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2214                         if (rc < 0)
2215                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2216                                                 "instead of ready\n", oap,
2217                                                 oap->oap_page, rc);
2218                         switch (rc) {
2219                         case -EAGAIN:
2220                                 /* llite is telling us that the page is still
2221                                  * in commit_write and that we should try
2222                                  * and put it in an rpc again later.  we
2223                                  * break out of the loop so we don't create
2224                                  * a hole in the sequence of pages in the rpc
2225                                  * stream.*/
2226                                 oap = NULL;
2227                                 break;
2228                         case -EINTR:
2229                                 /* the io isn't needed.. tell the checks
2230                                  * below to complete the rpc with EINTR */
2231                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2232                                 oap->oap_count = -EINTR;
2233                                 break;
2234                         case 0:
2235                                 oap->oap_async_flags |= ASYNC_READY;
2236                                 break;
2237                         default:
2238                                 LASSERTF(0, "oap %p page %p returned %d "
2239                                             "from make_ready\n", oap,
2240                                             oap->oap_page, rc);
2241                                 break;
2242                         }
2243                 }
2244                 if (oap == NULL)
2245                         break;
2246                 /*
2247                  * Page submitted for IO has to be locked. Either by
2248                  * ->ap_make_ready() or by higher layers.
2249                  */
2250 #if defined(__KERNEL__) && defined(__linux__)
2251                  if(!(PageLocked(oap->oap_page) &&
2252                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2253                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2254                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2255                         LBUG();
2256                 }
2257 #endif
2258                 /* If there is a gap at the start of this page, it can't merge
2259                  * with any previous page, so we'll hand the network a
2260                  * "fragmented" page array that it can't transfer in 1 RDMA */
2261                 if (page_count != 0 && oap->oap_page_off != 0)
2262                         break;
2263
2264                 /* take the page out of our book-keeping */
2265                 list_del_init(&oap->oap_pending_item);
2266                 lop_update_pending(cli, lop, cmd, -1);
2267                 list_del_init(&oap->oap_urgent_item);
2268
2269                 if (page_count == 0)
2270                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2271                                           (PTLRPC_MAX_BRW_SIZE - 1);
2272
2273                 /* ask the caller for the size of the io as the rpc leaves. */
2274                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2275                         oap->oap_count =
2276                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2277                 if (oap->oap_count <= 0) {
2278                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2279                                oap->oap_count);
2280                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2281                         continue;
2282                 }
2283
2284                 /* now put the page back in our accounting */
2285                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2286                 if (page_count == 0)
2287                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2288                 if (++page_count >= cli->cl_max_pages_per_rpc)
2289                         break;
2290
2291                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2292                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2293                  * have the same alignment as the initial writes that allocated
2294                  * extents on the server. */
2295                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2296                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2297                 if (ending_offset == 0)
2298                         break;
2299
2300                 /* If there is a gap at the end of this page, it can't merge
2301                  * with any subsequent pages, so we'll hand the network a
2302                  * "fragmented" page array that it can't transfer in 1 RDMA */
2303                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2304                         break;
2305         }
2306
2307         osc_wake_cache_waiters(cli);
2308
2309         if (page_count == 0)
2310                 RETURN(0);
2311
2312         loi_list_maint(cli, loi);
2313
2314         client_obd_list_unlock(&cli->cl_loi_list_lock);
2315
2316         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2317         if (IS_ERR(req)) {
2318                 /* this should happen rarely and is pretty bad, it makes the
2319                  * pending list not follow the dirty order */
2320                 client_obd_list_lock(&cli->cl_loi_list_lock);
2321                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2322                         list_del_init(&oap->oap_rpc_item);
2323
2324                         /* queued sync pages can be torn down while the pages
2325                          * were between the pending list and the rpc */
2326                         if (oap->oap_interrupted) {
2327                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2328                                 osc_ap_completion(cli, NULL, oap, 0,
2329                                                   oap->oap_count);
2330                                 continue;
2331                         }
2332                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2333                 }
2334                 loi_list_maint(cli, loi);
2335                 RETURN(PTR_ERR(req));
2336         }
2337
2338         aa = ptlrpc_req_async_args(req);
2339
2340         if (cmd == OBD_BRW_READ) {
2341                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2342                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2343                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2344                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2345                 ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
2346         } else {
2347                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2348                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2349                                  cli->cl_w_in_flight);
2350                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2351                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2352                 ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
2353         }
2354
2355         client_obd_list_lock(&cli->cl_loi_list_lock);
2356
2357         if (cmd == OBD_BRW_READ)
2358                 cli->cl_r_in_flight++;
2359         else
2360                 cli->cl_w_in_flight++;
2361
2362         /* queued sync pages can be torn down while the pages
2363          * were between the pending list and the rpc */
2364         tmp = NULL;
2365         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2366                 /* only one oap gets a request reference */
2367                 if (tmp == NULL)
2368                         tmp = oap;
2369                 if (oap->oap_interrupted && !req->rq_intr) {
2370                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2371                                oap, req);
2372                         ptlrpc_mark_interrupted(req);
2373                 }
2374         }
2375         if (tmp != NULL)
2376                 tmp->oap_request = ptlrpc_request_addref(req);
2377
2378         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2379                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2380
2381         req->rq_interpret_reply = brw_interpret;
2382         ptlrpcd_add_req(req);
2383         RETURN(1);
2384 }
2385
2386 #define LOI_DEBUG(LOI, STR, args...)                                     \
2387         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2388                !list_empty(&(LOI)->loi_cli_item),                        \
2389                (LOI)->loi_write_lop.lop_num_pending,                     \
2390                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2391                (LOI)->loi_read_lop.lop_num_pending,                      \
2392                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2393                args)                                                     \
2394
2395 /* This is called by osc_check_rpcs() to find which objects have pages that
2396  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2397 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2398 {
2399         ENTRY;
2400         /* first return all objects which we already know to have
2401          * pages ready to be stuffed into rpcs */
2402         if (!list_empty(&cli->cl_loi_ready_list))
2403                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2404                                   struct lov_oinfo, loi_cli_item));
2405
2406         /* then if we have cache waiters, return all objects with queued
2407          * writes.  This is especially important when many small files
2408          * have filled up the cache and not been fired into rpcs because
2409          * they don't pass the nr_pending/object threshhold */
2410         if (!list_empty(&cli->cl_cache_waiters) &&
2411             !list_empty(&cli->cl_loi_write_list))
2412                 RETURN(list_entry(cli->cl_loi_write_list.next,
2413                                   struct lov_oinfo, loi_write_item));
2414
2415         /* then return all queued objects when we have an invalid import
2416          * so that they get flushed */
2417         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2418                 if (!list_empty(&cli->cl_loi_write_list))
2419                         RETURN(list_entry(cli->cl_loi_write_list.next,
2420                                           struct lov_oinfo, loi_write_item));
2421                 if (!list_empty(&cli->cl_loi_read_list))
2422                         RETURN(list_entry(cli->cl_loi_read_list.next,
2423                                           struct lov_oinfo, loi_read_item));
2424         }
2425         RETURN(NULL);
2426 }
2427
2428 /* called with the loi list lock held */
2429 static void osc_check_rpcs(struct client_obd *cli)
2430 {
2431         struct lov_oinfo *loi;
2432         int rc = 0, race_counter = 0;
2433         ENTRY;
2434
2435         while ((loi = osc_next_loi(cli)) != NULL) {
2436                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2437
2438                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2439                         break;
2440
2441                 /* attempt some read/write balancing by alternating between
2442                  * reads and writes in an object.  The makes_rpc checks here
2443                  * would be redundant if we were getting read/write work items
2444                  * instead of objects.  we don't want send_oap_rpc to drain a
2445                  * partial read pending queue when we're given this object to
2446                  * do io on writes while there are cache waiters */
2447                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2448                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2449                                               &loi->loi_write_lop);
2450                         if (rc < 0)
2451                                 break;
2452                         if (rc > 0)
2453                                 race_counter = 0;
2454                         else
2455                                 race_counter++;
2456                 }
2457                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2458                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2459                                               &loi->loi_read_lop);
2460                         if (rc < 0)
2461                                 break;
2462                         if (rc > 0)
2463                                 race_counter = 0;
2464                         else
2465                                 race_counter++;
2466                 }
2467
2468                 /* attempt some inter-object balancing by issueing rpcs
2469                  * for each object in turn */
2470                 if (!list_empty(&loi->loi_cli_item))
2471                         list_del_init(&loi->loi_cli_item);
2472                 if (!list_empty(&loi->loi_write_item))
2473                         list_del_init(&loi->loi_write_item);
2474                 if (!list_empty(&loi->loi_read_item))
2475                         list_del_init(&loi->loi_read_item);
2476
2477                 loi_list_maint(cli, loi);
2478
2479                 /* send_oap_rpc fails with 0 when make_ready tells it to
2480                  * back off.  llite's make_ready does this when it tries
2481                  * to lock a page queued for write that is already locked.
2482                  * we want to try sending rpcs from many objects, but we
2483                  * don't want to spin failing with 0.  */
2484                 if (race_counter == 10)
2485                         break;
2486         }
2487         EXIT;
2488 }
2489
2490 /* we're trying to queue a page in the osc so we're subject to the
2491  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2492  * If the osc's queued pages are already at that limit, then we want to sleep
2493  * until there is space in the osc's queue for us.  We also may be waiting for
2494  * write credits from the OST if there are RPCs in flight that may return some
2495  * before we fall back to sync writes.
2496  *
2497  * We need this know our allocation was granted in the presence of signals */
2498 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2499 {
2500         int rc;
2501         ENTRY;
2502         client_obd_list_lock(&cli->cl_loi_list_lock);
2503         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2504         client_obd_list_unlock(&cli->cl_loi_list_lock);
2505         RETURN(rc);
2506 };
2507
2508 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2509  * grant or cache space. */
2510 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2511                            struct osc_async_page *oap)
2512 {
2513         struct osc_cache_waiter ocw;
2514         struct l_wait_info lwi = { 0 };
2515
2516         ENTRY;
2517
2518         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2519                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2520                cli->cl_dirty_max, obd_max_dirty_pages,
2521                cli->cl_lost_grant, cli->cl_avail_grant);
2522
2523         /* force the caller to try sync io.  this can jump the list
2524          * of queued writes and create a discontiguous rpc stream */
2525         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2526             loi->loi_ar.ar_force_sync)
2527                 RETURN(-EDQUOT);
2528
2529         /* Hopefully normal case - cache space and write credits available */
2530         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2531             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2532             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2533                 /* account for ourselves */
2534                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2535                 RETURN(0);
2536         }
2537
2538         /* Make sure that there are write rpcs in flight to wait for.  This
2539          * is a little silly as this object may not have any pending but
2540          * other objects sure might. */
2541         if (cli->cl_w_in_flight) {
2542                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2543                 cfs_waitq_init(&ocw.ocw_waitq);
2544                 ocw.ocw_oap = oap;
2545                 ocw.ocw_rc = 0;
2546
2547                 loi_list_maint(cli, loi);
2548                 osc_check_rpcs(cli);
2549                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2550
2551                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2552                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2553
2554                 client_obd_list_lock(&cli->cl_loi_list_lock);
2555                 if (!list_empty(&ocw.ocw_entry)) {
2556                         list_del(&ocw.ocw_entry);
2557                         RETURN(-EINTR);
2558                 }
2559                 RETURN(ocw.ocw_rc);
2560         }
2561
2562         RETURN(-EDQUOT);
2563 }
2564
2565 /**
2566  * Checks if requested extent lock is compatible with a lock under the page.
2567  *
2568  * Checks if the lock under \a page is compatible with a read or write lock
2569  * (specified by \a rw) for an extent [\a start , \a end].
2570  *
2571  * \param exp osc export
2572  * \param lsm striping information for the file
2573  * \param res osc_async_page placeholder
2574  * \param rw OBD_BRW_READ if requested for reading,
2575  *           OBD_BRW_WRITE if requested for writing
2576  * \param start start of the requested extent
2577  * \param end end of the requested extent
2578  * \param cookie transparent parameter for passing locking context
2579  *
2580  * \post result == 1, *cookie == context, appropriate lock is referenced or
2581  * \post result == 0
2582  *
2583  * \retval 1 owned lock is reused for the request
2584  * \retval 0 no lock reused for the request
2585  *
2586  * \see osc_release_short_lock
2587  */
2588 static int osc_reget_short_lock(struct obd_export *exp,
2589                                 struct lov_stripe_md *lsm,
2590                                 void **res, int rw,
2591                                 obd_off start, obd_off end,
2592                                 void **cookie)
2593 {
2594         struct osc_async_page *oap = *res;
2595         int rc;
2596
2597         ENTRY;
2598
2599         spin_lock(&oap->oap_lock);
2600         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2601                                   start, end, cookie);
2602         spin_unlock(&oap->oap_lock);
2603
2604         RETURN(rc);
2605 }
2606
2607 /**
2608  * Releases a reference to a lock taken in a "fast" way.
2609  *
2610  * Releases a read or a write (specified by \a rw) lock
2611  * referenced by \a cookie.
2612  *
2613  * \param exp osc export
2614  * \param lsm striping information for the file
2615  * \param end end of the locked extent
2616  * \param rw OBD_BRW_READ if requested for reading,
2617  *           OBD_BRW_WRITE if requested for writing
2618  * \param cookie transparent parameter for passing locking context
2619  *
2620  * \post appropriate lock is dereferenced
2621  *
2622  * \see osc_reget_short_lock
2623  */
2624 static int osc_release_short_lock(struct obd_export *exp,
2625                                   struct lov_stripe_md *lsm, obd_off end,
2626                                   void *cookie, int rw)
2627 {
2628         ENTRY;
2629         ldlm_lock_fast_release(cookie, rw);
2630         /* no error could have happened at this layer */
2631         RETURN(0);
2632 }
2633
2634 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2635                         struct lov_oinfo *loi, cfs_page_t *page,
2636                         obd_off offset, struct obd_async_page_ops *ops,
2637                         void *data, void **res, int nocache,
2638                         struct lustre_handle *lockh)
2639 {
2640         struct osc_async_page *oap;
2641         struct ldlm_res_id oid;
2642         int rc = 0;
2643         ENTRY;
2644
2645         if (!page)
2646                 return size_round(sizeof(*oap));
2647
2648         oap = *res;
2649         oap->oap_magic = OAP_MAGIC;
2650         oap->oap_cli = &exp->exp_obd->u.cli;
2651         oap->oap_loi = loi;
2652
2653         oap->oap_caller_ops = ops;
2654         oap->oap_caller_data = data;
2655
2656         oap->oap_page = page;
2657         oap->oap_obj_off = offset;
2658
2659         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2660         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2661         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2662         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2663
2664         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2665
2666         spin_lock_init(&oap->oap_lock);
2667
2668         /* If the page was marked as notcacheable - don't add to any locks */ 
2669         if (!nocache) {
2670                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2671                 /* This is the only place where we can call cache_add_extent
2672                    without oap_lock, because this page is locked now, and
2673                    the lock we are adding it to is referenced, so cannot lose
2674                    any pages either. */
2675                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2676                 if (rc)
2677                         RETURN(rc);
2678         }
2679
2680         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2681         RETURN(0);
2682 }
2683
2684 struct osc_async_page *oap_from_cookie(void *cookie)
2685 {
2686         struct osc_async_page *oap = cookie;
2687         if (oap->oap_magic != OAP_MAGIC)
2688                 return ERR_PTR(-EINVAL);
2689         return oap;
2690 };
2691
2692 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2693                               struct lov_oinfo *loi, void *cookie,
2694                               int cmd, obd_off off, int count,
2695                               obd_flag brw_flags, enum async_flags async_flags)
2696 {
2697         struct client_obd *cli = &exp->exp_obd->u.cli;
2698         struct osc_async_page *oap;
2699         int rc = 0;
2700         ENTRY;
2701
2702         oap = oap_from_cookie(cookie);
2703         if (IS_ERR(oap))
2704                 RETURN(PTR_ERR(oap));
2705
2706         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2707                 RETURN(-EIO);
2708
2709         if (!list_empty(&oap->oap_pending_item) ||
2710             !list_empty(&oap->oap_urgent_item) ||
2711             !list_empty(&oap->oap_rpc_item))
2712                 RETURN(-EBUSY);
2713
2714         /* check if the file's owner/group is over quota */
2715 #ifdef HAVE_QUOTA_SUPPORT
2716         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2717                 struct obd_async_page_ops *ops;
2718                 struct obdo *oa;
2719
2720                 OBDO_ALLOC(oa);
2721                 if (oa == NULL)
2722                         RETURN(-ENOMEM);
2723
2724                 ops = oap->oap_caller_ops;
2725                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2726                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2727                     NO_QUOTA)
2728                         rc = -EDQUOT;
2729
2730                 OBDO_FREE(oa);
2731                 if (rc)
2732                         RETURN(rc);
2733         }
2734 #endif
2735
2736         if (loi == NULL)
2737                 loi = lsm->lsm_oinfo[0];
2738
2739         client_obd_list_lock(&cli->cl_loi_list_lock);
2740
2741         oap->oap_cmd = cmd;
2742         oap->oap_page_off = off;
2743         oap->oap_count = count;
2744         oap->oap_brw_flags = brw_flags;
2745         oap->oap_async_flags = async_flags;
2746
2747         if (cmd & OBD_BRW_WRITE) {
2748                 rc = osc_enter_cache(cli, loi, oap);
2749                 if (rc) {
2750                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2751                         RETURN(rc);
2752                 }
2753         }
2754
2755         osc_oap_to_pending(oap);
2756         loi_list_maint(cli, loi);
2757
2758         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2759                   cmd);
2760
2761         osc_check_rpcs(cli);
2762         client_obd_list_unlock(&cli->cl_loi_list_lock);
2763
2764         RETURN(0);
2765 }
2766
2767 /* aka (~was & now & flag), but this is more clear :) */
2768 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2769
2770 static int osc_set_async_flags(struct obd_export *exp,
2771                                struct lov_stripe_md *lsm,
2772                                struct lov_oinfo *loi, void *cookie,
2773                                obd_flag async_flags)
2774 {
2775         struct client_obd *cli = &exp->exp_obd->u.cli;
2776         struct loi_oap_pages *lop;
2777         struct osc_async_page *oap;
2778         int rc = 0;
2779         ENTRY;
2780
2781         oap = oap_from_cookie(cookie);
2782         if (IS_ERR(oap))
2783                 RETURN(PTR_ERR(oap));
2784
2785         /*
2786          * bug 7311: OST-side locking is only supported for liblustre for now
2787          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2788          * implementation has to handle case where OST-locked page was picked
2789          * up by, e.g., ->writepage().
2790          */
2791         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2792         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2793                                      * tread here. */
2794
2795         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2796                 RETURN(-EIO);
2797
2798         if (loi == NULL)
2799                 loi = lsm->lsm_oinfo[0];
2800
2801         if (oap->oap_cmd & OBD_BRW_WRITE) {
2802                 lop = &loi->loi_write_lop;
2803         } else {
2804                 lop = &loi->loi_read_lop;
2805         }
2806
2807         client_obd_list_lock(&cli->cl_loi_list_lock);
2808
2809         if (list_empty(&oap->oap_pending_item))
2810                 GOTO(out, rc = -EINVAL);
2811
2812         if ((oap->oap_async_flags & async_flags) == async_flags)
2813                 GOTO(out, rc = 0);
2814
2815         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2816                 oap->oap_async_flags |= ASYNC_READY;
2817
2818         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2819                 if (list_empty(&oap->oap_rpc_item)) {
2820                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2821                         loi_list_maint(cli, loi);
2822                 }
2823         }
2824
2825         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2826                         oap->oap_async_flags);
2827 out:
2828         osc_check_rpcs(cli);
2829         client_obd_list_unlock(&cli->cl_loi_list_lock);
2830         RETURN(rc);
2831 }
2832
2833 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2834                              struct lov_oinfo *loi,
2835                              struct obd_io_group *oig, void *cookie,
2836                              int cmd, obd_off off, int count,
2837                              obd_flag brw_flags,
2838                              obd_flag async_flags)
2839 {
2840         struct client_obd *cli = &exp->exp_obd->u.cli;
2841         struct osc_async_page *oap;
2842         struct loi_oap_pages *lop;
2843         int rc = 0;
2844         ENTRY;
2845
2846         oap = oap_from_cookie(cookie);
2847         if (IS_ERR(oap))
2848                 RETURN(PTR_ERR(oap));
2849
2850         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2851                 RETURN(-EIO);
2852
2853         if (!list_empty(&oap->oap_pending_item) ||
2854             !list_empty(&oap->oap_urgent_item) ||
2855             !list_empty(&oap->oap_rpc_item))
2856                 RETURN(-EBUSY);
2857
2858         if (loi == NULL)
2859                 loi = lsm->lsm_oinfo[0];
2860
2861         client_obd_list_lock(&cli->cl_loi_list_lock);
2862
2863         oap->oap_cmd = cmd;
2864         oap->oap_page_off = off;
2865         oap->oap_count = count;
2866         oap->oap_brw_flags = brw_flags;
2867         oap->oap_async_flags = async_flags;
2868
2869         if (cmd & OBD_BRW_WRITE)
2870                 lop = &loi->loi_write_lop;
2871         else
2872                 lop = &loi->loi_read_lop;
2873
2874         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2875         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2876                 oap->oap_oig = oig;
2877                 rc = oig_add_one(oig, &oap->oap_occ);
2878         }
2879
2880         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2881                   oap, oap->oap_page, rc);
2882
2883         client_obd_list_unlock(&cli->cl_loi_list_lock);
2884
2885         RETURN(rc);
2886 }
2887
2888 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2889                                  struct loi_oap_pages *lop, int cmd)
2890 {
2891         struct list_head *pos, *tmp;
2892         struct osc_async_page *oap;
2893
2894         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2895                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2896                 list_del(&oap->oap_pending_item);
2897                 osc_oap_to_pending(oap);
2898         }
2899         loi_list_maint(cli, loi);
2900 }
2901
2902 static int osc_trigger_group_io(struct obd_export *exp,
2903                                 struct lov_stripe_md *lsm,
2904                                 struct lov_oinfo *loi,
2905                                 struct obd_io_group *oig)
2906 {
2907         struct client_obd *cli = &exp->exp_obd->u.cli;
2908         ENTRY;
2909
2910         if (loi == NULL)
2911                 loi = lsm->lsm_oinfo[0];
2912
2913         client_obd_list_lock(&cli->cl_loi_list_lock);
2914
2915         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2916         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2917
2918         osc_check_rpcs(cli);
2919         client_obd_list_unlock(&cli->cl_loi_list_lock);
2920
2921         RETURN(0);
2922 }
2923
2924 static int osc_teardown_async_page(struct obd_export *exp,
2925                                    struct lov_stripe_md *lsm,
2926                                    struct lov_oinfo *loi, void *cookie)
2927 {
2928         struct client_obd *cli = &exp->exp_obd->u.cli;
2929         struct loi_oap_pages *lop;
2930         struct osc_async_page *oap;
2931         int rc = 0;
2932         ENTRY;
2933
2934         oap = oap_from_cookie(cookie);
2935         if (IS_ERR(oap))
2936                 RETURN(PTR_ERR(oap));
2937
2938         if (loi == NULL)
2939                 loi = lsm->lsm_oinfo[0];
2940
2941         if (oap->oap_cmd & OBD_BRW_WRITE) {
2942                 lop = &loi->loi_write_lop;
2943         } else {
2944                 lop = &loi->loi_read_lop;
2945         }
2946
2947         client_obd_list_lock(&cli->cl_loi_list_lock);
2948
2949         if (!list_empty(&oap->oap_rpc_item))
2950                 GOTO(out, rc = -EBUSY);
2951
2952         osc_exit_cache(cli, oap, 0);
2953         osc_wake_cache_waiters(cli);
2954
2955         if (!list_empty(&oap->oap_urgent_item)) {
2956                 list_del_init(&oap->oap_urgent_item);
2957                 oap->oap_async_flags &= ~ASYNC_URGENT;
2958         }
2959         if (!list_empty(&oap->oap_pending_item)) {
2960                 list_del_init(&oap->oap_pending_item);
2961                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2962         }
2963         loi_list_maint(cli, loi);
2964         cache_remove_extent(cli->cl_cache, oap);
2965
2966         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2967 out:
2968         client_obd_list_unlock(&cli->cl_loi_list_lock);
2969         RETURN(rc);
2970 }
2971
2972 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2973                            struct ldlm_lock_desc *new, void *data,
2974                            int flag)
2975 {
2976         struct lustre_handle lockh = { 0 };
2977         int rc;
2978         ENTRY;  
2979                 
2980         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2981                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2982                 LBUG(); 
2983         }       
2984
2985         switch (flag) {
2986         case LDLM_CB_BLOCKING:
2987                 ldlm_lock2handle(lock, &lockh);
2988                 rc = ldlm_cli_cancel(&lockh);
2989                 if (rc != ELDLM_OK)
2990                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2991                 break;
2992         case LDLM_CB_CANCELING: {
2993
2994                 ldlm_lock2handle(lock, &lockh);
2995                 /* This lock wasn't granted, don't try to do anything */
2996                 if (lock->l_req_mode != lock->l_granted_mode)
2997                         RETURN(0);
2998
2999                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3000                                   &lockh);
3001
3002                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3003                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3004                                                           lock, new, data,flag);
3005                 break;
3006         }
3007         default:
3008                 LBUG();
3009         }
3010
3011         RETURN(0);
3012 }
3013 EXPORT_SYMBOL(osc_extent_blocking_cb);
3014
3015 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3016                                     int flags)
3017 {
3018         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3019
3020         if (lock == NULL) {
3021                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3022                 return;
3023         }
3024         lock_res_and_lock(lock);
3025 #if defined (__KERNEL__) && defined (__linux__)
3026         /* Liang XXX: Darwin and Winnt checking should be added */
3027         if (lock->l_ast_data && lock->l_ast_data != data) {
3028                 struct inode *new_inode = data;
3029                 struct inode *old_inode = lock->l_ast_data;
3030                 if (!(old_inode->i_state & I_FREEING))
3031                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3032                 LASSERTF(old_inode->i_state & I_FREEING,
3033                          "Found existing inode %p/%lu/%u state %lu in lock: "
3034                          "setting data to %p/%lu/%u\n", old_inode,
3035                          old_inode->i_ino, old_inode->i_generation,
3036                          old_inode->i_state,
3037                          new_inode, new_inode->i_ino, new_inode->i_generation);
3038         }
3039 #endif
3040         lock->l_ast_data = data;
3041         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3042         unlock_res_and_lock(lock);
3043         LDLM_LOCK_PUT(lock);
3044 }
3045
3046 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3047                              ldlm_iterator_t replace, void *data)
3048 {
3049         struct ldlm_res_id res_id; 
3050         struct obd_device *obd = class_exp2obd(exp);
3051
3052         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3053         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3054         return 0;
3055 }
3056
3057 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3058                             struct obd_info *oinfo, int intent, int rc)
3059 {
3060         ENTRY;
3061
3062         if (intent) {
3063                 /* The request was created before ldlm_cli_enqueue call. */
3064                 if (rc == ELDLM_LOCK_ABORTED) {
3065                         struct ldlm_reply *rep;
3066                         rep = req_capsule_server_get(&req->rq_pill,
3067                                                      &RMF_DLM_REP);
3068
3069                         LASSERT(rep != NULL);
3070                         if (rep->lock_policy_res1)
3071                                 rc = rep->lock_policy_res1;
3072                 }
3073         }
3074
3075         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3076                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3077                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3078                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3079                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3080         }
3081
3082         if (!rc)
3083                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3084
3085         /* Call the update callback. */
3086         rc = oinfo->oi_cb_up(oinfo, rc);
3087         RETURN(rc);
3088 }
3089
3090 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3091                                  struct osc_enqueue_args *aa, int rc)
3092 {
3093         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3094         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3095         struct ldlm_lock *lock;
3096
3097         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3098          * be valid. */
3099         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3100
3101         /* Complete obtaining the lock procedure. */
3102         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3103                                    aa->oa_ei->ei_mode,
3104                                    &aa->oa_oi->oi_flags,
3105                                    &lsm->lsm_oinfo[0]->loi_lvb,
3106                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3107                                    lustre_swab_ost_lvb,
3108                                    aa->oa_oi->oi_lockh, rc);
3109
3110         /* Complete osc stuff. */
3111         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3112
3113         /* Release the lock for async request. */
3114         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3115                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3116
3117         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3118                  aa->oa_oi->oi_lockh, req, aa);
3119         LDLM_LOCK_PUT(lock);
3120         return rc;
3121 }
3122
3123 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3124  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3125  * other synchronous requests, however keeping some locks and trying to obtain
3126  * others may take a considerable amount of time in a case of ost failure; and
3127  * when other sync requests do not get released lock from a client, the client
3128  * is excluded from the cluster -- such scenarious make the life difficult, so
3129  * release locks just after they are obtained. */
3130 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3131                        struct ldlm_enqueue_info *einfo,
3132                        struct ptlrpc_request_set *rqset)
3133 {
3134         struct ldlm_res_id res_id;
3135         struct obd_device *obd = exp->exp_obd;
3136         struct ptlrpc_request *req = NULL;
3137         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3138         ldlm_mode_t mode;
3139         int rc;
3140         ENTRY;
3141
3142
3143         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3144                            oinfo->oi_md->lsm_object_gr, &res_id);
3145         /* Filesystem lock extents are extended to page boundaries so that
3146          * dealing with the page cache is a little smoother.  */
3147         oinfo->oi_policy.l_extent.start -=
3148                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3149         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3150
3151         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3152                 goto no_match;
3153
3154         /* Next, search for already existing extent locks that will cover us */
3155         /* If we're trying to read, we also search for an existing PW lock.  The
3156          * VFS and page cache already protect us locally, so lots of readers/
3157          * writers can share a single PW lock.
3158          *
3159          * There are problems with conversion deadlocks, so instead of
3160          * converting a read lock to a write lock, we'll just enqueue a new
3161          * one.
3162          *
3163          * At some point we should cancel the read lock instead of making them
3164          * send us a blocking callback, but there are problems with canceling
3165          * locks out from other users right now, too. */
3166         mode = einfo->ei_mode;
3167         if (einfo->ei_mode == LCK_PR)
3168                 mode |= LCK_PW;
3169         mode = ldlm_lock_match(obd->obd_namespace,
3170                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3171                                einfo->ei_type, &oinfo->oi_policy, mode,
3172                                oinfo->oi_lockh);
3173         if (mode) {
3174                 /* addref the lock only if not async requests and PW lock is
3175                  * matched whereas we asked for PR. */
3176                 if (!rqset && einfo->ei_mode != mode)
3177                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3178                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3179                                         oinfo->oi_flags);
3180                 if (intent) {
3181                         /* I would like to be able to ASSERT here that rss <=
3182                          * kms, but I can't, for reasons which are explained in
3183                          * lov_enqueue() */
3184                 }
3185
3186                 /* We already have a lock, and it's referenced */
3187                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3188
3189                 /* For async requests, decref the lock. */
3190                 if (einfo->ei_mode != mode)
3191                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3192                 else if (rqset)
3193                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3194
3195                 RETURN(ELDLM_OK);
3196         }
3197
3198  no_match:
3199         if (intent) {
3200                 CFS_LIST_HEAD(cancels);
3201                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3202                                            &RQF_LDLM_ENQUEUE_LVB);
3203                 if (req == NULL)
3204                         RETURN(-ENOMEM);
3205
3206                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3207                 if (rc)
3208                         RETURN(rc);
3209
3210                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3211                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3212                 ptlrpc_request_set_replen(req);
3213         }
3214
3215         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3216         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3217
3218         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3219                               &oinfo->oi_policy, &oinfo->oi_flags,
3220                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3221                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3222                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3223                               rqset ? 1 : 0);
3224         if (rqset) {
3225                 if (!rc) {
3226                         struct osc_enqueue_args *aa;
3227                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3228                         aa = ptlrpc_req_async_args(req);
3229                         aa->oa_oi = oinfo;
3230                         aa->oa_ei = einfo;
3231                         aa->oa_exp = exp;
3232
3233                         req->rq_interpret_reply = osc_enqueue_interpret;
3234                         ptlrpc_set_add_req(rqset, req);
3235                 } else if (intent) {
3236                         ptlrpc_req_finished(req);
3237                 }
3238                 RETURN(rc);
3239         }
3240
3241         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3242         if (intent)
3243                 ptlrpc_req_finished(req);
3244
3245         RETURN(rc);
3246 }
3247
3248 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3249                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3250                      int *flags, void *data, struct lustre_handle *lockh)
3251 {
3252         struct ldlm_res_id res_id;
3253         struct obd_device *obd = exp->exp_obd;
3254         int lflags = *flags;
3255         ldlm_mode_t rc;
3256         ENTRY;
3257
3258         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3259         
3260         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3261                 RETURN(-EIO);
3262
3263         /* Filesystem lock extents are extended to page boundaries so that
3264          * dealing with the page cache is a little smoother */
3265         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3266         policy->l_extent.end |= ~CFS_PAGE_MASK;
3267
3268         /* Next, search for already existing extent locks that will cover us */
3269         /* If we're trying to read, we also search for an existing PW lock.  The
3270          * VFS and page cache already protect us locally, so lots of readers/
3271          * writers can share a single PW lock. */
3272         rc = mode;
3273         if (mode == LCK_PR)
3274                 rc |= LCK_PW;
3275         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3276                              &res_id, type, policy, rc, lockh);
3277         if (rc) {
3278                 osc_set_data_with_check(lockh, data, lflags);
3279                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3280                         ldlm_lock_addref(lockh, LCK_PR);
3281                         ldlm_lock_decref(lockh, LCK_PW);
3282                 }
3283                 RETURN(rc);
3284         }
3285         RETURN(rc);
3286 }
3287
3288 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3289                       __u32 mode, struct lustre_handle *lockh)
3290 {
3291         ENTRY;
3292
3293         if (unlikely(mode == LCK_GROUP))
3294                 ldlm_lock_decref_and_cancel(lockh, mode);
3295         else
3296                 ldlm_lock_decref(lockh, mode);
3297
3298         RETURN(0);
3299 }
3300
3301 static int osc_cancel_unused(struct obd_export *exp,
3302                              struct lov_stripe_md *lsm, int flags,
3303                              void *opaque)
3304 {
3305         struct obd_device *obd = class_exp2obd(exp);
3306         struct ldlm_res_id res_id, *resp = NULL;
3307
3308         if (lsm != NULL) {
3309                 resp = osc_build_res_name(lsm->lsm_object_id,
3310                                           lsm->lsm_object_gr, &res_id);
3311         }
3312
3313         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3314 }
3315
3316 static int osc_join_lru(struct obd_export *exp,
3317                         struct lov_stripe_md *lsm, int join)
3318 {
3319         struct obd_device *obd = class_exp2obd(exp);
3320         struct ldlm_res_id res_id, *resp = NULL;
3321
3322         if (lsm != NULL) {
3323                 resp = osc_build_res_name(lsm->lsm_object_id,
3324                                           lsm->lsm_object_gr, &res_id);
3325         }
3326
3327         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3328 }
3329
3330 static int osc_statfs_interpret(struct ptlrpc_request *req,
3331                                 struct osc_async_args *aa, int rc)
3332 {
3333         struct obd_statfs *msfs;
3334         ENTRY;
3335
3336         if (rc != 0)
3337                 GOTO(out, rc);
3338
3339         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3340         if (msfs == NULL) {
3341                 GOTO(out, rc = -EPROTO);
3342         }
3343
3344         *aa->aa_oi->oi_osfs = *msfs;
3345 out:
3346         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3347         RETURN(rc);
3348 }
3349
3350 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3351                             __u64 max_age, struct ptlrpc_request_set *rqset)
3352 {
3353         struct ptlrpc_request *req;
3354         struct osc_async_args *aa;
3355         int                    rc;
3356         ENTRY;
3357
3358         /* We could possibly pass max_age in the request (as an absolute
3359          * timestamp or a "seconds.usec ago") so the target can avoid doing
3360          * extra calls into the filesystem if that isn't necessary (e.g.
3361          * during mount that would help a bit).  Having relative timestamps
3362          * is not so great if request processing is slow, while absolute
3363          * timestamps are not ideal because they need time synchronization. */
3364         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3365         if (req == NULL)
3366                 RETURN(-ENOMEM);
3367
3368         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3369         if (rc) {
3370                 ptlrpc_request_free(req);
3371                 RETURN(rc);
3372         }
3373         ptlrpc_request_set_replen(req);
3374         req->rq_request_portal = OST_CREATE_PORTAL;
3375         ptlrpc_at_set_req_timeout(req);
3376
3377         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3378                 /* procfs requests not want stat in wait for avoid deadlock */
3379                 req->rq_no_resend = 1;
3380                 req->rq_no_delay = 1;
3381         }
3382
3383         req->rq_interpret_reply = osc_statfs_interpret;
3384         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3385         aa = ptlrpc_req_async_args(req);
3386         aa->aa_oi = oinfo;
3387
3388         ptlrpc_set_add_req(rqset, req);
3389         RETURN(0);
3390 }
3391
3392 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3393                       __u64 max_age, __u32 flags)
3394 {
3395         struct obd_statfs     *msfs;
3396         struct ptlrpc_request *req;
3397         struct obd_import     *imp = NULL;
3398         int rc;
3399         ENTRY;
3400
3401         /*Since the request might also come from lprocfs, so we need 
3402          *sync this with client_disconnect_export Bug15684*/
3403         down_read(&obd->u.cli.cl_sem);
3404         if (obd->u.cli.cl_import)
3405                 imp = class_import_get(obd->u.cli.cl_import);
3406         up_read(&obd->u.cli.cl_sem);
3407         if (!imp)
3408                 RETURN(-ENODEV);
3409         
3410         /* We could possibly pass max_age in the request (as an absolute
3411          * timestamp or a "seconds.usec ago") so the target can avoid doing
3412          * extra calls into the filesystem if that isn't necessary (e.g.
3413          * during mount that would help a bit).  Having relative timestamps
3414          * is not so great if request processing is slow, while absolute
3415          * timestamps are not ideal because they need time synchronization. */
3416         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3417         
3418         class_import_put(imp);
3419         
3420         if (req == NULL)
3421                 RETURN(-ENOMEM);
3422
3423         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3424         if (rc) {
3425                 ptlrpc_request_free(req);
3426                 RETURN(rc);
3427         }
3428         ptlrpc_request_set_replen(req);
3429         req->rq_request_portal = OST_CREATE_PORTAL;
3430         ptlrpc_at_set_req_timeout(req);
3431
3432         if (flags & OBD_STATFS_NODELAY) {
3433                 /* procfs requests not want stat in wait for avoid deadlock */
3434                 req->rq_no_resend = 1;
3435                 req->rq_no_delay = 1;
3436         }
3437
3438         rc = ptlrpc_queue_wait(req);
3439         if (rc)
3440                 GOTO(out, rc);
3441
3442         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3443         if (msfs == NULL) {
3444                 GOTO(out, rc = -EPROTO);
3445         }
3446
3447         *osfs = *msfs;
3448
3449         EXIT;
3450  out:
3451         ptlrpc_req_finished(req);
3452         return rc;
3453 }
3454
3455 /* Retrieve object striping information.
3456  *
3457  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3458  * the maximum number of OST indices which will fit in the user buffer.
3459  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3460  */
3461 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3462 {
3463         struct lov_user_md lum, *lumk;
3464         int rc = 0, lum_size;
3465         ENTRY;
3466
3467         if (!lsm)
3468                 RETURN(-ENODATA);
3469
3470         if (copy_from_user(&lum, lump, sizeof(lum)))
3471                 RETURN(-EFAULT);
3472
3473         if (lum.lmm_magic != LOV_USER_MAGIC)
3474                 RETURN(-EINVAL);
3475
3476         if (lum.lmm_stripe_count > 0) {
3477                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3478                 OBD_ALLOC(lumk, lum_size);
3479                 if (!lumk)
3480                         RETURN(-ENOMEM);
3481
3482                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3483                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3484         } else {
3485                 lum_size = sizeof(lum);
3486                 lumk = &lum;
3487         }
3488
3489         lumk->lmm_object_id = lsm->lsm_object_id;
3490         lumk->lmm_object_gr = lsm->lsm_object_gr;
3491         lumk->lmm_stripe_count = 1;
3492
3493         if (copy_to_user(lump, lumk, lum_size))
3494                 rc = -EFAULT;
3495
3496         if (lumk != &lum)
3497                 OBD_FREE(lumk, lum_size);
3498
3499         RETURN(rc);
3500 }
3501
3502
3503 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3504                          void *karg, void *uarg)
3505 {
3506         struct obd_device *obd = exp->exp_obd;
3507         struct obd_ioctl_data *data = karg;
3508         int err = 0;
3509         ENTRY;
3510
3511         if (!try_module_get(THIS_MODULE)) {
3512                 CERROR("Can't get module. Is it alive?");
3513                 return -EINVAL;
3514         }
3515         switch (cmd) {
3516         case OBD_IOC_LOV_GET_CONFIG: {
3517                 char *buf;
3518                 struct lov_desc *desc;
3519                 struct obd_uuid uuid;
3520
3521                 buf = NULL;
3522                 len = 0;
3523                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3524                         GOTO(out, err = -EINVAL);
3525
3526                 data = (struct obd_ioctl_data *)buf;
3527
3528                 if (sizeof(*desc) > data->ioc_inllen1) {
3529                         obd_ioctl_freedata(buf, len);
3530                         GOTO(out, err = -EINVAL);
3531                 }
3532
3533                 if (data->ioc_inllen2 < sizeof(uuid)) {
3534                         obd_ioctl_freedata(buf, len);
3535                         GOTO(out, err = -EINVAL);
3536                 }
3537
3538                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3539                 desc->ld_tgt_count = 1;
3540                 desc->ld_active_tgt_count = 1;
3541                 desc->ld_default_stripe_count = 1;
3542                 desc->ld_default_stripe_size = 0;
3543                 desc->ld_default_stripe_offset = 0;
3544                 desc->ld_pattern = 0;
3545                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3546
3547                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3548
3549                 err = copy_to_user((void *)uarg, buf, len);
3550                 if (err)
3551                         err = -EFAULT;
3552                 obd_ioctl_freedata(buf, len);
3553                 GOTO(out, err);
3554         }
3555         case LL_IOC_LOV_SETSTRIPE:
3556                 err = obd_alloc_memmd(exp, karg);
3557                 if (err > 0)
3558                         err = 0;
3559                 GOTO(out, err);
3560         case LL_IOC_LOV_GETSTRIPE:
3561                 err = osc_getstripe(karg, uarg);
3562                 GOTO(out, err);
3563         case OBD_IOC_CLIENT_RECOVER:
3564                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3565                                             data->ioc_inlbuf1);
3566                 if (err > 0)
3567                         err = 0;
3568                 GOTO(out, err);
3569         case IOC_OSC_SET_ACTIVE:
3570                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3571                                                data->ioc_offset);
3572                 GOTO(out, err);
3573         case OBD_IOC_POLL_QUOTACHECK:
3574                 err = lquota_poll_check(quota_interface, exp,
3575                                         (struct if_quotacheck *)karg);
3576                 GOTO(out, err);
3577         default:
3578                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3579                        cmd, cfs_curproc_comm());
3580                 GOTO(out, err = -ENOTTY);
3581         }
3582 out:
3583         module_put(THIS_MODULE);
3584         return err;
3585 }
3586
3587 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3588                         void *key, __u32 *vallen, void *val)
3589 {
3590         ENTRY;
3591         if (!vallen || !val)
3592                 RETURN(-EFAULT);
3593
3594         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3595                 __u32 *stripe = val;
3596                 *vallen = sizeof(*stripe);
3597                 *stripe = 0;
3598                 RETURN(0);
3599         } else if (KEY_IS(KEY_LAST_ID)) {
3600                 struct ptlrpc_request *req;
3601                 obd_id                *reply;
3602                 char                  *tmp;
3603                 int                    rc;
3604
3605                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3606                                            &RQF_OST_GET_INFO_LAST_ID);
3607                 if (req == NULL)
3608                         RETURN(-ENOMEM);
3609
3610                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3611                                      RCL_CLIENT, keylen);
3612                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3613                 if (rc) {
3614                         ptlrpc_request_free(req);
3615                         RETURN(rc);
3616                 }
3617
3618                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3619                 memcpy(tmp, key, keylen);
3620
3621                 ptlrpc_request_set_replen(req);
3622                 rc = ptlrpc_queue_wait(req);
3623                 if (rc)
3624                         GOTO(out, rc);
3625
3626                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3627                 if (reply == NULL)
3628                         GOTO(out, rc = -EPROTO);
3629
3630                 *((obd_id *)val) = *reply;
3631         out:
3632                 ptlrpc_req_finished(req);
3633                 RETURN(rc);
3634         }
3635         RETURN(-EINVAL);
3636 }
3637
3638 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3639                                           void *aa, int rc)
3640 {
3641         struct llog_ctxt *ctxt;
3642         struct obd_import *imp = req->rq_import;
3643         ENTRY;
3644
3645         if (rc != 0)
3646                 RETURN(rc);
3647
3648         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3649         if (ctxt) {
3650                 if (rc == 0)
3651                         rc = llog_initiator_connect(ctxt);
3652                 else
3653                         CERROR("cannot establish connection for "
3654                                "ctxt %p: %d\n", ctxt, rc);
3655         }
3656
3657         llog_ctxt_put(ctxt);
3658         spin_lock(&imp->imp_lock);
3659         imp->imp_server_timeout = 1;
3660         imp->imp_pingable = 1;
3661         spin_unlock(&imp->imp_lock);
3662         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3663
3664         RETURN(rc);
3665 }
3666
3667 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3668                               void *key, obd_count vallen, void *val,
3669                               struct ptlrpc_request_set *set)
3670 {
3671         struct ptlrpc_request *req;
3672         struct obd_device     *obd = exp->exp_obd;
3673         struct obd_import     *imp = class_exp2cliimp(exp);
3674         char                  *tmp;
3675         int                    rc;
3676         ENTRY;
3677
3678         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3679
3680         if (KEY_IS(KEY_NEXT_ID)) {
3681                 if (vallen != sizeof(obd_id))
3682                         RETURN(-ERANGE);
3683                 if (val == NULL)
3684                         RETURN(-EINVAL);
3685                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3686                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3687                        exp->exp_obd->obd_name,
3688                        obd->u.cli.cl_oscc.oscc_next_id);
3689
3690                 RETURN(0);
3691         }
3692
3693         if (KEY_IS(KEY_UNLINKED)) {
3694                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3695                 spin_lock(&oscc->oscc_lock);
3696                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3697                 spin_unlock(&oscc->oscc_lock);
3698                 RETURN(0);
3699         }
3700
3701         if (KEY_IS(KEY_INIT_RECOV)) {
3702                 if (vallen != sizeof(int))
3703                         RETURN(-EINVAL);
3704                 spin_lock(&imp->imp_lock);
3705                 imp->imp_initial_recov = *(int *)val;
3706                 spin_unlock(&imp->imp_lock);
3707                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3708                        exp->exp_obd->obd_name,
3709                        imp->imp_initial_recov);
3710                 RETURN(0);
3711         }
3712
3713         if (KEY_IS(KEY_CHECKSUM)) {
3714                 if (vallen != sizeof(int))
3715                         RETURN(-EINVAL);
3716                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3717                 RETURN(0);
3718         }
3719
3720         if (KEY_IS(KEY_FLUSH_CTX)) {
3721                 sptlrpc_import_flush_my_ctx(imp);
3722                 RETURN(0);
3723         }
3724
3725         if (!set)
3726                 RETURN(-EINVAL);
3727
3728         /* We pass all other commands directly to OST. Since nobody calls osc
3729            methods directly and everybody is supposed to go through LOV, we
3730            assume lov checked invalid values for us.
3731            The only recognised values so far are evict_by_nid and mds_conn.
3732            Even if something bad goes through, we'd get a -EINVAL from OST
3733            anyway. */
3734
3735
3736         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3737         if (req == NULL)
3738                 RETURN(-ENOMEM);
3739
3740         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3741                              RCL_CLIENT, keylen);
3742         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3743                              RCL_CLIENT, vallen);
3744         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3745         if (rc) {
3746                 ptlrpc_request_free(req);
3747                 RETURN(rc);
3748         }
3749
3750         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3751         memcpy(tmp, key, keylen);
3752         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3753         memcpy(tmp, val, vallen);
3754
3755         if (KEY_IS(KEY_MDS_CONN)) {
3756                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3757
3758                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3759                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3760                 LASSERT(oscc->oscc_oa.o_gr > 0);
3761                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3762         }
3763
3764         ptlrpc_request_set_replen(req);
3765         ptlrpc_set_add_req(set, req);
3766         ptlrpc_check_set(set);
3767
3768         RETURN(0);
3769 }
3770
3771
3772 static struct llog_operations osc_size_repl_logops = {
3773         lop_cancel: llog_obd_repl_cancel
3774 };
3775
3776 static struct llog_operations osc_mds_ost_orig_logops;
3777 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3778                          struct obd_device *tgt, int count,
3779                          struct llog_catid *catid, struct obd_uuid *uuid)
3780 {
3781         int rc;
3782         ENTRY;
3783
3784         LASSERT(olg == &obd->obd_olg);
3785         spin_lock(&obd->obd_dev_lock);
3786         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3787                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3788                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3789                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3790                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3791                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3792         }
3793         spin_unlock(&obd->obd_dev_lock);
3794
3795         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3796                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3797         if (rc) {
3798                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3799                 GOTO (out, rc);
3800         }
3801
3802         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3803                         NULL, &osc_size_repl_logops);
3804         if (rc)
3805                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3806 out:
3807         if (rc) {
3808                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3809                        obd->obd_name, tgt->obd_name, count, catid, rc);
3810                 CERROR("logid "LPX64":0x%x\n",
3811                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3812         }
3813         RETURN(rc);
3814 }
3815
3816 static int osc_llog_finish(struct obd_device *obd, int count)
3817 {
3818         struct llog_ctxt *ctxt;
3819         int rc = 0, rc2 = 0;
3820         ENTRY;
3821
3822         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3823         if (ctxt)
3824                 rc = llog_cleanup(ctxt);
3825
3826         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3827         if (ctxt)
3828                 rc2 = llog_cleanup(ctxt);
3829         if (!rc)
3830                 rc = rc2;
3831
3832         RETURN(rc);
3833 }
3834
3835 static int osc_reconnect(const struct lu_env *env,
3836                          struct obd_export *exp, struct obd_device *obd,
3837                          struct obd_uuid *cluuid,
3838                          struct obd_connect_data *data)
3839 {
3840         struct client_obd *cli = &obd->u.cli;
3841
3842         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3843                 long lost_grant;
3844
3845                 client_obd_list_lock(&cli->cl_loi_list_lock);
3846                 data->ocd_grant = cli->cl_avail_grant ?:
3847                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3848                 lost_grant = cli->cl_lost_grant;
3849                 cli->cl_lost_grant = 0;
3850                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3851
3852                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3853                        "cl_lost_grant: %ld\n", data->ocd_grant,
3854                        cli->cl_avail_grant, lost_grant);
3855                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3856                        " ocd_grant: %d\n", data->ocd_connect_flags,
3857                        data->ocd_version, data->ocd_grant);
3858         }
3859
3860         RETURN(0);
3861 }
3862
3863 static int osc_disconnect(struct obd_export *exp)
3864 {
3865         struct obd_device *obd = class_exp2obd(exp);
3866         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3867         int rc;
3868
3869         if (obd->u.cli.cl_conn_count == 1)
3870                 /* flush any remaining cancel messages out to the target */
3871                 llog_sync(ctxt, exp);
3872
3873         llog_ctxt_put(ctxt);
3874
3875         rc = client_disconnect_export(exp);
3876         return rc;
3877 }
3878
3879 static int osc_import_event(struct obd_device *obd,
3880                             struct obd_import *imp,
3881                             enum obd_import_event event)
3882 {
3883         struct client_obd *cli;
3884         int rc = 0;
3885
3886         ENTRY;
3887         LASSERT(imp->imp_obd == obd);
3888
3889         switch (event) {
3890         case IMP_EVENT_DISCON: {
3891                 /* Only do this on the MDS OSC's */
3892                 if (imp->imp_server_timeout) {
3893                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3894
3895                         spin_lock(&oscc->oscc_lock);
3896                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3897                         spin_unlock(&oscc->oscc_lock);
3898                 }
3899                 cli = &obd->u.cli;
3900                 client_obd_list_lock(&cli->cl_loi_list_lock);
3901                 cli->cl_avail_grant = 0;
3902                 cli->cl_lost_grant = 0;
3903                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3904                 break;
3905         }
3906         case IMP_EVENT_INACTIVE: {
3907                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3908                 break;
3909         }
3910         case IMP_EVENT_INVALIDATE: {
3911                 struct ldlm_namespace *ns = obd->obd_namespace;
3912
3913                 /* Reset grants */
3914                 cli = &obd->u.cli;
3915                 client_obd_list_lock(&cli->cl_loi_list_lock);
3916                 /* all pages go to failing rpcs due to the invalid import */
3917                 osc_check_rpcs(cli);
3918                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3919
3920                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3921
3922                 break;
3923         }
3924         case IMP_EVENT_ACTIVE: {
3925                 /* Only do this on the MDS OSC's */
3926                 if (imp->imp_server_timeout) {
3927                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3928
3929                         spin_lock(&oscc->oscc_lock);
3930                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3931                         spin_unlock(&oscc->oscc_lock);
3932                 }
3933                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3934                 break;
3935         }
3936         case IMP_EVENT_OCD: {
3937                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3938
3939                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3940                         osc_init_grant(&obd->u.cli, ocd);
3941
3942                 /* See bug 7198 */
3943                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3944                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3945
3946                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3947                 break;
3948         }
3949         default:
3950                 CERROR("Unknown import event %d\n", event);
3951                 LBUG();
3952         }
3953         RETURN(rc);
3954 }
3955
3956 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3957 {
3958         int rc;
3959         ENTRY;
3960
3961         ENTRY;
3962         rc = ptlrpcd_addref();
3963         if (rc)
3964                 RETURN(rc);
3965
3966         rc = client_obd_setup(obd, lcfg);
3967         if (rc) {
3968                 ptlrpcd_decref();
3969         } else {
3970                 struct lprocfs_static_vars lvars = { 0 };
3971                 struct client_obd *cli = &obd->u.cli;
3972
3973                 lprocfs_osc_init_vars(&lvars);
3974                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3975                         lproc_osc_attach_seqstat(obd);
3976                         sptlrpc_lprocfs_cliobd_attach(obd);
3977                         ptlrpc_lprocfs_register_obd(obd);
3978                 }
3979
3980                 oscc_init(obd);
3981                 /* We need to allocate a few requests more, because
3982                    brw_interpret tries to create new requests before freeing
3983                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3984                    reserved, but I afraid that might be too much wasted RAM
3985                    in fact, so 2 is just my guess and still should work. */
3986                 cli->cl_import->imp_rq_pool =
3987                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3988                                             OST_MAXREQSIZE,
3989                                             ptlrpc_add_rqs_to_pool);
3990                 cli->cl_cache = cache_create(obd);
3991                 if (!cli->cl_cache) {
3992                         osc_cleanup(obd);
3993                         rc = -ENOMEM;
3994                 }
3995         }
3996
3997         RETURN(rc);
3998 }
3999
4000 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4001 {
4002         int rc = 0;
4003         ENTRY;
4004
4005         switch (stage) {
4006         case OBD_CLEANUP_EARLY: {
4007                 struct obd_import *imp;
4008                 imp = obd->u.cli.cl_import;
4009                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4010                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4011                 ptlrpc_deactivate_import(imp);
4012                 spin_lock(&imp->imp_lock);
4013                 imp->imp_pingable = 0;
4014                 spin_unlock(&imp->imp_lock);
4015                 break;
4016         }
4017         case OBD_CLEANUP_EXPORTS: {
4018                 /* If we set up but never connected, the
4019                    client import will not have been cleaned. */
4020                 if (obd->u.cli.cl_import) {
4021                         struct obd_import *imp;
4022                         imp = obd->u.cli.cl_import;
4023                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4024                                obd->obd_name);
4025                         ptlrpc_invalidate_import(imp);
4026                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
4027                         class_destroy_import(imp);
4028                         obd->u.cli.cl_import = NULL;
4029                 }
4030                 rc = obd_llog_finish(obd, 0);
4031                 if (rc != 0)
4032                         CERROR("failed to cleanup llogging subsystems\n");
4033                 break;
4034                 }
4035         }
4036         RETURN(rc);
4037 }
4038
4039 int osc_cleanup(struct obd_device *obd)
4040 {
4041         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4042         int rc;
4043
4044         ENTRY;
4045         ptlrpc_lprocfs_unregister_obd(obd);
4046         lprocfs_obd_cleanup(obd);
4047
4048         spin_lock(&oscc->oscc_lock);
4049         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4050         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4051         spin_unlock(&oscc->oscc_lock);
4052
4053         /* free memory of osc quota cache */
4054         lquota_cleanup(quota_interface, obd);
4055
4056         cache_destroy(obd->u.cli.cl_cache);
4057         rc = client_obd_cleanup(obd);
4058
4059         ptlrpcd_decref();
4060         RETURN(rc);
4061 }
4062
4063 static int osc_register_page_removal_cb(struct obd_export *exp,
4064                                         obd_page_removal_cb_t func,
4065                                         obd_pin_extent_cb pin_cb)
4066 {
4067         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4068                                            pin_cb);
4069 }
4070
4071 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4072                                           obd_page_removal_cb_t func)
4073 {
4074         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4075 }
4076
4077 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4078                                        obd_lock_cancel_cb cb)
4079 {
4080         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4081
4082         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4083         return 0;
4084 }
4085
4086 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4087                                          obd_lock_cancel_cb cb)
4088 {
4089         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4090                 CERROR("Unregistering cancel cb %p, while only %p was "
4091                        "registered\n", cb,
4092                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4093                 RETURN(-EINVAL);
4094         }
4095
4096         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4097         return 0;
4098 }
4099
4100 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4101 {
4102         struct lustre_cfg *lcfg = buf;
4103         struct lprocfs_static_vars lvars = { 0 };
4104         int rc = 0;
4105
4106         lprocfs_osc_init_vars(&lvars);
4107
4108         switch (lcfg->lcfg_command) {
4109         case LCFG_SPTLRPC_CONF:
4110                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4111                 break;
4112         default:
4113                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4114                                               lcfg, obd);
4115                 break;
4116         }
4117
4118         return(rc);
4119 }
4120
4121 struct obd_ops osc_obd_ops = {
4122         .o_owner                = THIS_MODULE,
4123         .o_setup                = osc_setup,
4124         .o_precleanup           = osc_precleanup,
4125         .o_cleanup              = osc_cleanup,
4126         .o_add_conn             = client_import_add_conn,
4127         .o_del_conn             = client_import_del_conn,
4128         .o_connect              = client_connect_import,
4129         .o_reconnect            = osc_reconnect,
4130         .o_disconnect           = osc_disconnect,
4131         .o_statfs               = osc_statfs,
4132         .o_statfs_async         = osc_statfs_async,
4133         .o_packmd               = osc_packmd,
4134         .o_unpackmd             = osc_unpackmd,
4135         .o_precreate            = osc_precreate,
4136         .o_create               = osc_create,
4137         .o_destroy              = osc_destroy,
4138         .o_getattr              = osc_getattr,
4139         .o_getattr_async        = osc_getattr_async,
4140         .o_setattr              = osc_setattr,
4141         .o_setattr_async        = osc_setattr_async,
4142         .o_brw                  = osc_brw,
4143         .o_brw_async            = osc_brw_async,
4144         .o_prep_async_page      = osc_prep_async_page,
4145         .o_reget_short_lock     = osc_reget_short_lock,
4146         .o_release_short_lock   = osc_release_short_lock,
4147         .o_queue_async_io       = osc_queue_async_io,
4148         .o_set_async_flags      = osc_set_async_flags,
4149         .o_queue_group_io       = osc_queue_group_io,
4150         .o_trigger_group_io     = osc_trigger_group_io,
4151         .o_teardown_async_page  = osc_teardown_async_page,
4152         .o_punch                = osc_punch,
4153         .o_sync                 = osc_sync,
4154         .o_enqueue              = osc_enqueue,
4155         .o_match                = osc_match,
4156         .o_change_cbdata        = osc_change_cbdata,
4157         .o_cancel               = osc_cancel,
4158         .o_cancel_unused        = osc_cancel_unused,
4159         .o_join_lru             = osc_join_lru,
4160         .o_iocontrol            = osc_iocontrol,
4161         .o_get_info             = osc_get_info,
4162         .o_set_info_async       = osc_set_info_async,
4163         .o_import_event         = osc_import_event,
4164         .o_llog_init            = osc_llog_init,
4165         .o_llog_finish          = osc_llog_finish,
4166         .o_process_config       = osc_process_config,
4167         .o_register_page_removal_cb = osc_register_page_removal_cb,
4168         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4169         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4170         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4171 };
4172 int __init osc_init(void)
4173 {
4174         struct lprocfs_static_vars lvars = { 0 };
4175         int rc;
4176         ENTRY;
4177
4178         lprocfs_osc_init_vars(&lvars);
4179
4180         request_module("lquota");
4181         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4182         lquota_init(quota_interface);
4183         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4184
4185         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4186                                  LUSTRE_OSC_NAME, NULL);
4187         if (rc) {
4188                 if (quota_interface)
4189                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4190                 RETURN(rc);
4191         }
4192
4193         RETURN(rc);
4194 }
4195
4196 #ifdef __KERNEL__
4197 static void /*__exit*/ osc_exit(void)
4198 {
4199         lquota_exit(quota_interface);
4200         if (quota_interface)
4201                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4202
4203         class_unregister_type(LUSTRE_OSC_NAME);
4204 }
4205
4206 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4207 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4208 MODULE_LICENSE("GPL");
4209
4210 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4211 #endif