Whamcloud - gitweb
2097a346b10ddbfbd15254ebaf53c7430e3c32cc
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                                         oinfo->oi_oa->o_gr > 0);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         *oinfo->oi_oa = body->oa;
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_async_args *aa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         *aa->aa_oi->oi_oa = body->oa;
364 out:
365         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
366         RETURN(rc);
367 }
368
369 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
370                              struct obd_trans_info *oti,
371                              struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request *req;
374         struct osc_async_args *aa;
375         int                    rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         osc_pack_req_body(req, oinfo);
390
391         ptlrpc_request_set_replen(req);
392
393         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
394                 LASSERT(oti);
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396         }
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PSCOPE_OTHER);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
407                 aa = ptlrpc_req_async_args(req);
408                 aa->aa_oi = oinfo;
409
410                 ptlrpc_set_add_req(rqset, req);
411         }
412
413         RETURN(0);
414 }
415
416 int osc_real_create(struct obd_export *exp, struct obdo *oa,
417                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
418 {
419         struct ptlrpc_request *req;
420         struct ost_body       *body;
421         struct lov_stripe_md  *lsm;
422         int                    rc;
423         ENTRY;
424
425         LASSERT(oa);
426         LASSERT(ea);
427
428         lsm = *ea;
429         if (!lsm) {
430                 rc = obd_alloc_memmd(exp, &lsm);
431                 if (rc < 0)
432                         RETURN(rc);
433         }
434
435         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436         if (req == NULL)
437                 GOTO(out, rc = -ENOMEM);
438
439         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440         if (rc) {
441                 ptlrpc_request_free(req);
442                 GOTO(out, rc);
443         }
444
445         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446         LASSERT(body);
447         body->oa = *oa;
448
449         ptlrpc_request_set_replen(req);
450
451         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
452             oa->o_flags == OBD_FL_DELORPHAN) {
453                 DEBUG_REQ(D_HA, req,
454                           "delorphan from OST integration");
455                 /* Don't resend the delorphan req */
456                 req->rq_no_resend = req->rq_no_delay = 1;
457         }
458
459         rc = ptlrpc_queue_wait(req);
460         if (rc)
461                 GOTO(out_req, rc);
462
463         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
464         if (body == NULL)
465                 GOTO(out_req, rc = -EPROTO);
466
467         *oa = body->oa;
468
469         /* This should really be sent by the OST */
470         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
471         oa->o_valid |= OBD_MD_FLBLKSZ;
472
473         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
474          * have valid lsm_oinfo data structs, so don't go touching that.
475          * This needs to be fixed in a big way.
476          */
477         lsm->lsm_object_id = oa->o_id;
478         lsm->lsm_object_gr = oa->o_gr;
479         *ea = lsm;
480
481         if (oti != NULL) {
482                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483
484                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485                         if (!oti->oti_logcookies)
486                                 oti_alloc_cookies(oti, 1);
487                         *oti->oti_logcookies = oa->o_lcookie;
488                 }
489         }
490
491         CDEBUG(D_HA, "transno: "LPD64"\n",
492                lustre_msg_get_transno(req->rq_repmsg));
493 out_req:
494         ptlrpc_req_finished(req);
495 out:
496         if (rc && !*ea)
497                 obd_free_memmd(exp, &lsm);
498         RETURN(rc);
499 }
500
501 static int osc_punch_interpret(const struct lu_env *env,
502                                struct ptlrpc_request *req,
503                                struct osc_punch_args *aa, int rc)
504 {
505         struct ost_body *body;
506         ENTRY;
507
508         if (rc != 0)
509                 GOTO(out, rc);
510
511         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
512         if (body == NULL)
513                 GOTO(out, rc = -EPROTO);
514
515         *aa->pa_oa = body->oa;
516 out:
517         rc = aa->pa_upcall(aa->pa_cookie, rc);
518         RETURN(rc);
519 }
520
521 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
522                    struct obd_capa *capa,
523                    obd_enqueue_update_f upcall, void *cookie,
524                    struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_punch_args *aa;
528         struct ost_body       *body;
529         int                    rc;
530         ENTRY;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         osc_set_capa_size(req, &RMF_CAPA1, capa);
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544
545         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546         LASSERT(body);
547         body->oa = *oa;
548         osc_pack_capa(req, body, capa);
549
550         ptlrpc_request_set_replen(req);
551
552
553         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
554         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
555         aa = ptlrpc_req_async_args(req);
556         aa->pa_oa     = oa;
557         aa->pa_upcall = upcall;
558         aa->pa_cookie = cookie;
559         if (rqset == PTLRPCD_SET)
560                 ptlrpcd_add_req(req, PSCOPE_OTHER);
561         else
562                 ptlrpc_set_add_req(rqset, req);
563
564         RETURN(0);
565 }
566
567 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
568                      struct obd_trans_info *oti,
569                      struct ptlrpc_request_set *rqset)
570 {
571         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
572         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
573         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
574         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
575                               oinfo->oi_cb_up, oinfo, rqset);
576 }
577
578 static int osc_sync(struct obd_export *exp, struct obdo *oa,
579                     struct lov_stripe_md *md, obd_size start, obd_size end,
580                     void *capa)
581 {
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         int                    rc;
585         ENTRY;
586
587         if (!oa) {
588                 CDEBUG(D_INFO, "oa NULL\n");
589                 RETURN(-EINVAL);
590         }
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
593         if (req == NULL)
594                 RETURN(-ENOMEM);
595
596         osc_set_capa_size(req, &RMF_CAPA1, capa);
597         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
598         if (rc) {
599                 ptlrpc_request_free(req);
600                 RETURN(rc);
601         }
602
603         /* overload the size and blocks fields in the oa with start/end */
604         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
605         LASSERT(body);
606         body->oa = *oa;
607         body->oa.o_size = start;
608         body->oa.o_blocks = end;
609         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
610         osc_pack_capa(req, body, capa);
611
612         ptlrpc_request_set_replen(req);
613
614         rc = ptlrpc_queue_wait(req);
615         if (rc)
616                 GOTO(out, rc);
617
618         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
619         if (body == NULL)
620                 GOTO(out, rc = -EPROTO);
621
622         *oa = body->oa;
623
624         EXIT;
625  out:
626         ptlrpc_req_finished(req);
627         return rc;
628 }
629
630 /* Find and cancel locally locks matched by @mode in the resource found by
631  * @objid. Found locks are added into @cancel list. Returns the amount of
632  * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634                                    struct list_head *cancels, ldlm_mode_t mode,
635                                    int lock_flags)
636 {
637         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638         struct ldlm_res_id res_id;
639         struct ldlm_resource *res;
640         int count;
641         ENTRY;
642
643         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
644         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
645         if (res == NULL)
646                 RETURN(0);
647
648         LDLM_RESOURCE_ADDREF(res);
649         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
650                                            lock_flags, 0, NULL);
651         LDLM_RESOURCE_DELREF(res);
652         ldlm_resource_putref(res);
653         RETURN(count);
654 }
655
656 static int osc_destroy_interpret(const struct lu_env *env,
657                                  struct ptlrpc_request *req, void *data,
658                                  int rc)
659 {
660         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661
662         atomic_dec(&cli->cl_destroy_in_flight);
663         cfs_waitq_signal(&cli->cl_destroy_waitq);
664         return 0;
665 }
666
667 static int osc_can_send_destroy(struct client_obd *cli)
668 {
669         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
670             cli->cl_max_rpcs_in_flight) {
671                 /* The destroy request can be sent */
672                 return 1;
673         }
674         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
675             cli->cl_max_rpcs_in_flight) {
676                 /*
677                  * The counter has been modified between the two atomic
678                  * operations.
679                  */
680                 cfs_waitq_signal(&cli->cl_destroy_waitq);
681         }
682         return 0;
683 }
684
685 /* Destroy requests can be async always on the client, and we don't even really
686  * care about the return code since the client cannot do anything at all about
687  * a destroy failure.
688  * When the MDS is unlinking a filename, it saves the file objects into a
689  * recovery llog, and these object records are cancelled when the OST reports
690  * they were destroyed and sync'd to disk (i.e. transaction committed).
691  * If the client dies, or the OST is down when the object should be destroyed,
692  * the records are not cancelled, and when the OST reconnects to the MDS next,
693  * it will retrieve the llog unlink logs and then sends the log cancellation
694  * cookies to the MDS after committing destroy transactions. */
695 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
696                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
697                        struct obd_export *md_export)
698 {
699         struct client_obd     *cli = &exp->exp_obd->u.cli;
700         struct ptlrpc_request *req;
701         struct ost_body       *body;
702         CFS_LIST_HEAD(cancels);
703         int rc, count;
704         ENTRY;
705
706         if (!oa) {
707                 CDEBUG(D_INFO, "oa NULL\n");
708                 RETURN(-EINVAL);
709         }
710
711         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
712                                         LDLM_FL_DISCARD_DATA);
713
714         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
715         if (req == NULL) {
716                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
717                 RETURN(-ENOMEM);
718         }
719
720         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
721                                0, &cancels, count);
722         if (rc) {
723                 ptlrpc_request_free(req);
724                 RETURN(rc);
725         }
726
727         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
728         req->rq_interpret_reply = osc_destroy_interpret;
729         ptlrpc_at_set_req_timeout(req);
730
731         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
732                 oa->o_lcookie = *oti->oti_logcookies;
733         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
734         LASSERT(body);
735         body->oa = *oa;
736
737         ptlrpc_request_set_replen(req);
738
739         if (!osc_can_send_destroy(cli)) {
740                 struct l_wait_info lwi = { 0 };
741
742                 /*
743                  * Wait until the number of on-going destroy RPCs drops
744                  * under max_rpc_in_flight
745                  */
746                 l_wait_event_exclusive(cli->cl_destroy_waitq,
747                                        osc_can_send_destroy(cli), &lwi);
748         }
749
750         /* Do not wait for response */
751         ptlrpcd_add_req(req, PSCOPE_OTHER);
752         RETURN(0);
753 }
754
755 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
756                                 long writing_bytes)
757 {
758         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
759
760         LASSERT(!(oa->o_valid & bits));
761
762         oa->o_valid |= bits;
763         client_obd_list_lock(&cli->cl_loi_list_lock);
764         oa->o_dirty = cli->cl_dirty;
765         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
766                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
767                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
768                 oa->o_undirty = 0;
769         } else if (atomic_read(&obd_dirty_pages) -
770                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
771                 CERROR("dirty %d - %d > system dirty_max %d\n",
772                        atomic_read(&obd_dirty_pages),
773                        atomic_read(&obd_dirty_transit_pages),
774                        obd_max_dirty_pages);
775                 oa->o_undirty = 0;
776         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
777                 CERROR("dirty %lu - dirty_max %lu too big???\n",
778                        cli->cl_dirty, cli->cl_dirty_max);
779                 oa->o_undirty = 0;
780         } else {
781                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
782                                 (cli->cl_max_rpcs_in_flight + 1);
783                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
784         }
785         oa->o_grant = cli->cl_avail_grant;
786         oa->o_dropped = cli->cl_lost_grant;
787         cli->cl_lost_grant = 0;
788         client_obd_list_unlock(&cli->cl_loi_list_lock);
789         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
790                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
791 }
792
793 /* caller must hold loi_list_lock */
794 static void osc_consume_write_grant(struct client_obd *cli,
795                                     struct brw_page *pga)
796 {
797         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
798         atomic_inc(&obd_dirty_pages);
799         cli->cl_dirty += CFS_PAGE_SIZE;
800         cli->cl_avail_grant -= CFS_PAGE_SIZE;
801         pga->flag |= OBD_BRW_FROM_GRANT;
802         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
803                CFS_PAGE_SIZE, pga, pga->pg);
804         LASSERT(cli->cl_avail_grant >= 0);
805 }
806
807 /* the companion to osc_consume_write_grant, called when a brw has completed.
808  * must be called with the loi lock held. */
809 static void osc_release_write_grant(struct client_obd *cli,
810                                     struct brw_page *pga, int sent)
811 {
812         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
813         ENTRY;
814
815         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
816                 EXIT;
817                 return;
818         }
819
820         pga->flag &= ~OBD_BRW_FROM_GRANT;
821         atomic_dec(&obd_dirty_pages);
822         cli->cl_dirty -= CFS_PAGE_SIZE;
823         if (pga->flag & OBD_BRW_NOCACHE) {
824                 pga->flag &= ~OBD_BRW_NOCACHE;
825                 atomic_dec(&obd_dirty_transit_pages);
826                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
827         }
828         if (!sent) {
829                 cli->cl_lost_grant += CFS_PAGE_SIZE;
830                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
831                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
832         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
833                 /* For short writes we shouldn't count parts of pages that
834                  * span a whole block on the OST side, or our accounting goes
835                  * wrong.  Should match the code in filter_grant_check. */
836                 int offset = pga->off & ~CFS_PAGE_MASK;
837                 int count = pga->count + (offset & (blocksize - 1));
838                 int end = (offset + pga->count) & (blocksize - 1);
839                 if (end)
840                         count += blocksize - end;
841
842                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
843                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
844                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
845                        cli->cl_avail_grant, cli->cl_dirty);
846         }
847
848         EXIT;
849 }
850
851 static unsigned long rpcs_in_flight(struct client_obd *cli)
852 {
853         return cli->cl_r_in_flight + cli->cl_w_in_flight;
854 }
855
856 /* caller must hold loi_list_lock */
857 void osc_wake_cache_waiters(struct client_obd *cli)
858 {
859         struct list_head *l, *tmp;
860         struct osc_cache_waiter *ocw;
861
862         ENTRY;
863         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
864                 /* if we can't dirty more, we must wait until some is written */
865                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
866                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
867                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
868                                "osc max %ld, sys max %d\n", cli->cl_dirty,
869                                cli->cl_dirty_max, obd_max_dirty_pages);
870                         return;
871                 }
872
873                 /* if still dirty cache but no grant wait for pending RPCs that
874                  * may yet return us some grant before doing sync writes */
875                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
876                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
877                                cli->cl_w_in_flight);
878                         return;
879                 }
880
881                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
882                 list_del_init(&ocw->ocw_entry);
883                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
884                         /* no more RPCs in flight to return grant, do sync IO */
885                         ocw->ocw_rc = -EDQUOT;
886                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
887                 } else {
888                         osc_consume_write_grant(cli,
889                                                 &ocw->ocw_oap->oap_brw_page);
890                 }
891
892                 cfs_waitq_signal(&ocw->ocw_waitq);
893         }
894
895         EXIT;
896 }
897
898 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
899 {
900         client_obd_list_lock(&cli->cl_loi_list_lock);
901         cli->cl_avail_grant = ocd->ocd_grant;
902         client_obd_list_unlock(&cli->cl_loi_list_lock);
903
904         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
905                cli->cl_avail_grant, cli->cl_lost_grant);
906         LASSERT(cli->cl_avail_grant >= 0);
907 }
908
909 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
910 {
911         client_obd_list_lock(&cli->cl_loi_list_lock);
912         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
913         if (body->oa.o_valid & OBD_MD_FLGRANT)
914                 cli->cl_avail_grant += body->oa.o_grant;
915         /* waiters are woken in brw_interpret */
916         client_obd_list_unlock(&cli->cl_loi_list_lock);
917 }
918
919 /* We assume that the reason this OSC got a short read is because it read
920  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
921  * via the LOV, and it _knows_ it's reading inside the file, it's just that
922  * this stripe never got written at or beyond this stripe offset yet. */
923 static void handle_short_read(int nob_read, obd_count page_count,
924                               struct brw_page **pga)
925 {
926         char *ptr;
927         int i = 0;
928
929         /* skip bytes read OK */
930         while (nob_read > 0) {
931                 LASSERT (page_count > 0);
932
933                 if (pga[i]->count > nob_read) {
934                         /* EOF inside this page */
935                         ptr = cfs_kmap(pga[i]->pg) +
936                                 (pga[i]->off & ~CFS_PAGE_MASK);
937                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
938                         cfs_kunmap(pga[i]->pg);
939                         page_count--;
940                         i++;
941                         break;
942                 }
943
944                 nob_read -= pga[i]->count;
945                 page_count--;
946                 i++;
947         }
948
949         /* zero remaining pages */
950         while (page_count-- > 0) {
951                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
952                 memset(ptr, 0, pga[i]->count);
953                 cfs_kunmap(pga[i]->pg);
954                 i++;
955         }
956 }
957
958 static int check_write_rcs(struct ptlrpc_request *req,
959                            int requested_nob, int niocount,
960                            obd_count page_count, struct brw_page **pga)
961 {
962         int    *remote_rcs, i;
963
964         /* return error if any niobuf was in error */
965         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
966                                         sizeof(*remote_rcs) * niocount, NULL);
967         if (remote_rcs == NULL) {
968                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
969                 return(-EPROTO);
970         }
971         if (lustre_msg_swabbed(req->rq_repmsg))
972                 for (i = 0; i < niocount; i++)
973                         __swab32s(&remote_rcs[i]);
974
975         for (i = 0; i < niocount; i++) {
976                 if (remote_rcs[i] < 0)
977                         return(remote_rcs[i]);
978
979                 if (remote_rcs[i] != 0) {
980                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
981                                 i, remote_rcs[i], req);
982                         return(-EPROTO);
983                 }
984         }
985
986         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
987                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
988                        req->rq_bulk->bd_nob_transferred, requested_nob);
989                 return(-EPROTO);
990         }
991
992         return (0);
993 }
994
995 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
996 {
997         if (p1->flag != p2->flag) {
998                 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
999
1000                 /* warn if we try to combine flags that we don't know to be
1001                  * safe to combine */
1002                 if ((p1->flag & mask) != (p2->flag & mask))
1003                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1004                                "same brw?\n", p1->flag, p2->flag);
1005                 return 0;
1006         }
1007
1008         return (p1->off + p1->count == p2->off);
1009 }
1010
1011 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1012                                    struct brw_page **pga, int opc,
1013                                    cksum_type_t cksum_type)
1014 {
1015         __u32 cksum;
1016         int i = 0;
1017
1018         LASSERT (pg_count > 0);
1019         cksum = init_checksum(cksum_type);
1020         while (nob > 0 && pg_count > 0) {
1021                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1022                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1023                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1024
1025                 /* corrupt the data before we compute the checksum, to
1026                  * simulate an OST->client data error */
1027                 if (i == 0 && opc == OST_READ &&
1028                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1029                         memcpy(ptr + off, "bad1", min(4, nob));
1030                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1031                 cfs_kunmap(pga[i]->pg);
1032                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1033                                off, cksum);
1034
1035                 nob -= pga[i]->count;
1036                 pg_count--;
1037                 i++;
1038         }
1039         /* For sending we only compute the wrong checksum instead
1040          * of corrupting the data so it is still correct on a redo */
1041         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1042                 cksum++;
1043
1044         return cksum;
1045 }
1046
1047 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1048                                 struct lov_stripe_md *lsm, obd_count page_count,
1049                                 struct brw_page **pga,
1050                                 struct ptlrpc_request **reqp,
1051                                 struct obd_capa *ocapa)
1052 {
1053         struct ptlrpc_request   *req;
1054         struct ptlrpc_bulk_desc *desc;
1055         struct ost_body         *body;
1056         struct obd_ioobj        *ioobj;
1057         struct niobuf_remote    *niobuf;
1058         int niocount, i, requested_nob, opc, rc;
1059         struct osc_brw_async_args *aa;
1060         struct req_capsule      *pill;
1061         struct brw_page *pg_prev;
1062
1063         ENTRY;
1064         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1065                 RETURN(-ENOMEM); /* Recoverable */
1066         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1067                 RETURN(-EINVAL); /* Fatal */
1068
1069         if ((cmd & OBD_BRW_WRITE) != 0) {
1070                 opc = OST_WRITE;
1071                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1072                                                 cli->cl_import->imp_rq_pool,
1073                                                 &RQF_OST_BRW);
1074         } else {
1075                 opc = OST_READ;
1076                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1077         }
1078
1079         if (req == NULL)
1080                 RETURN(-ENOMEM);
1081
1082         for (niocount = i = 1; i < page_count; i++) {
1083                 if (!can_merge_pages(pga[i - 1], pga[i]))
1084                         niocount++;
1085         }
1086
1087         pill = &req->rq_pill;
1088         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1089                              niocount * sizeof(*niobuf));
1090         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1091
1092         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1093         if (rc) {
1094                 ptlrpc_request_free(req);
1095                 RETURN(rc);
1096         }
1097         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1098         ptlrpc_at_set_req_timeout(req);
1099
1100         if (opc == OST_WRITE)
1101                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1102                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1103         else
1104                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1105                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1106
1107         if (desc == NULL)
1108                 GOTO(out, rc = -ENOMEM);
1109         /* NB request now owns desc and will free it when it gets freed */
1110
1111         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1112         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1113         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1114         LASSERT(body && ioobj && niobuf);
1115
1116         body->oa = *oa;
1117
1118         obdo_to_ioobj(oa, ioobj);
1119         ioobj->ioo_bufcnt = niocount;
1120         osc_pack_capa(req, body, ocapa);
1121         LASSERT (page_count > 0);
1122         pg_prev = pga[0];
1123         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1124                 struct brw_page *pg = pga[i];
1125
1126                 LASSERT(pg->count > 0);
1127                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1128                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1129                          pg->off, pg->count);
1130 #ifdef __linux__
1131                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1132                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1133                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1134                          i, page_count,
1135                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1136                          pg_prev->pg, page_private(pg_prev->pg),
1137                          pg_prev->pg->index, pg_prev->off);
1138 #else
1139                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1140                          "i %d p_c %u\n", i, page_count);
1141 #endif
1142                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1143                         (pg->flag & OBD_BRW_SRVLOCK));
1144
1145                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1146                                       pg->count);
1147                 requested_nob += pg->count;
1148
1149                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1150                         niobuf--;
1151                         niobuf->len += pg->count;
1152                 } else {
1153                         niobuf->offset = pg->off;
1154                         niobuf->len    = pg->count;
1155                         niobuf->flags  = pg->flag;
1156                 }
1157                 pg_prev = pg;
1158         }
1159
1160         LASSERTF((void *)(niobuf - niocount) ==
1161                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1162                                niocount * sizeof(*niobuf)),
1163                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1164                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1165                 (void *)(niobuf - niocount));
1166
1167         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1168
1169         /* size[REQ_REC_OFF] still sizeof (*body) */
1170         if (opc == OST_WRITE) {
1171                 if (unlikely(cli->cl_checksum) &&
1172                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1173                         /* store cl_cksum_type in a local variable since
1174                          * it can be changed via lprocfs */
1175                         cksum_type_t cksum_type = cli->cl_cksum_type;
1176
1177                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1178                                 oa->o_flags = body->oa.o_flags = 0;
1179                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1180                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1181                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1182                                                              page_count, pga,
1183                                                              OST_WRITE,
1184                                                              cksum_type);
1185                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1186                                body->oa.o_cksum);
1187                         /* save this in 'oa', too, for later checking */
1188                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1189                         oa->o_flags |= cksum_type_pack(cksum_type);
1190                 } else {
1191                         /* clear out the checksum flag, in case this is a
1192                          * resend but cl_checksum is no longer set. b=11238 */
1193                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1194                 }
1195                 oa->o_cksum = body->oa.o_cksum;
1196                 /* 1 RC per niobuf */
1197                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1198                                      sizeof(__u32) * niocount);
1199         } else {
1200                 if (unlikely(cli->cl_checksum) &&
1201                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1202                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1203                                 body->oa.o_flags = 0;
1204                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1205                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1206                 }
1207                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1208                 /* 1 RC for the whole I/O */
1209         }
1210         ptlrpc_request_set_replen(req);
1211
1212         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1213         aa = ptlrpc_req_async_args(req);
1214         aa->aa_oa = oa;
1215         aa->aa_requested_nob = requested_nob;
1216         aa->aa_nio_count = niocount;
1217         aa->aa_page_count = page_count;
1218         aa->aa_resends = 0;
1219         aa->aa_ppga = pga;
1220         aa->aa_cli = cli;
1221         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1222
1223         *reqp = req;
1224         RETURN(0);
1225
1226  out:
1227         ptlrpc_req_finished(req);
1228         RETURN(rc);
1229 }
1230
1231 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1232                                 __u32 client_cksum, __u32 server_cksum, int nob,
1233                                 obd_count page_count, struct brw_page **pga,
1234                                 cksum_type_t client_cksum_type)
1235 {
1236         __u32 new_cksum;
1237         char *msg;
1238         cksum_type_t cksum_type;
1239
1240         if (server_cksum == client_cksum) {
1241                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1242                 return 0;
1243         }
1244
1245         if (oa->o_valid & OBD_MD_FLFLAGS)
1246                 cksum_type = cksum_type_unpack(oa->o_flags);
1247         else
1248                 cksum_type = OBD_CKSUM_CRC32;
1249
1250         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1251                                       cksum_type);
1252
1253         if (cksum_type != client_cksum_type)
1254                 msg = "the server did not use the checksum type specified in "
1255                       "the original request - likely a protocol problem";
1256         else if (new_cksum == server_cksum)
1257                 msg = "changed on the client after we checksummed it - "
1258                       "likely false positive due to mmap IO (bug 11742)";
1259         else if (new_cksum == client_cksum)
1260                 msg = "changed in transit before arrival at OST";
1261         else
1262                 msg = "changed in transit AND doesn't match the original - "
1263                       "likely false positive due to mmap IO (bug 11742)";
1264
1265         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1266                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1267                            "["LPU64"-"LPU64"]\n",
1268                            msg, libcfs_nid2str(peer->nid),
1269                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1270                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1271                                                         (__u64)0,
1272                            oa->o_id,
1273                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1274                            pga[0]->off,
1275                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1276         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1277                "client csum now %x\n", client_cksum, client_cksum_type,
1278                server_cksum, cksum_type, new_cksum);
1279         return 1;
1280 }
1281
1282 /* Note rc enters this function as number of bytes transferred */
1283 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1284 {
1285         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1286         const lnet_process_id_t *peer =
1287                         &req->rq_import->imp_connection->c_peer;
1288         struct client_obd *cli = aa->aa_cli;
1289         struct ost_body *body;
1290         __u32 client_cksum = 0;
1291         ENTRY;
1292
1293         if (rc < 0 && rc != -EDQUOT)
1294                 RETURN(rc);
1295
1296         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1297         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1298                                   lustre_swab_ost_body);
1299         if (body == NULL) {
1300                 CDEBUG(D_INFO, "Can't unpack body\n");
1301                 RETURN(-EPROTO);
1302         }
1303
1304         /* set/clear over quota flag for a uid/gid */
1305         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1306             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1307                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1308                              body->oa.o_gid, body->oa.o_valid,
1309                              body->oa.o_flags);
1310
1311         if (rc < 0)
1312                 RETURN(rc);
1313
1314         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1315                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1316
1317         osc_update_grant(cli, body);
1318
1319         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1320                 if (rc > 0) {
1321                         CERROR("Unexpected +ve rc %d\n", rc);
1322                         RETURN(-EPROTO);
1323                 }
1324                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1325
1326                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1327                     check_write_checksum(&body->oa, peer, client_cksum,
1328                                          body->oa.o_cksum, aa->aa_requested_nob,
1329                                          aa->aa_page_count, aa->aa_ppga,
1330                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1331                         RETURN(-EAGAIN);
1332
1333                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1334                         RETURN(-EAGAIN);
1335
1336                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1337                                      aa->aa_page_count, aa->aa_ppga);
1338                 GOTO(out, rc);
1339         }
1340
1341         /* The rest of this function executes only for OST_READs */
1342         if (rc > aa->aa_requested_nob) {
1343                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1344                        aa->aa_requested_nob);
1345                 RETURN(-EPROTO);
1346         }
1347
1348         if (rc != req->rq_bulk->bd_nob_transferred) {
1349                 CERROR ("Unexpected rc %d (%d transferred)\n",
1350                         rc, req->rq_bulk->bd_nob_transferred);
1351                 return (-EPROTO);
1352         }
1353
1354         if (rc < aa->aa_requested_nob)
1355                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1356
1357         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1358                                          aa->aa_ppga))
1359                 GOTO(out, rc = -EAGAIN);
1360
1361         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1362                 static int cksum_counter;
1363                 __u32      server_cksum = body->oa.o_cksum;
1364                 char      *via;
1365                 char      *router;
1366                 cksum_type_t cksum_type;
1367
1368                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1369                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1370                 else
1371                         cksum_type = OBD_CKSUM_CRC32;
1372                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1373                                                  aa->aa_ppga, OST_READ,
1374                                                  cksum_type);
1375
1376                 if (peer->nid == req->rq_bulk->bd_sender) {
1377                         via = router = "";
1378                 } else {
1379                         via = " via ";
1380                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1381                 }
1382
1383                 if (server_cksum == ~0 && rc > 0) {
1384                         CERROR("Protocol error: server %s set the 'checksum' "
1385                                "bit, but didn't send a checksum.  Not fatal, "
1386                                "but please notify on http://bugzilla.lustre.org/\n",
1387                                libcfs_nid2str(peer->nid));
1388                 } else if (server_cksum != client_cksum) {
1389                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1390                                            "%s%s%s inum "LPU64"/"LPU64" object "
1391                                            LPU64"/"LPU64" extent "
1392                                            "["LPU64"-"LPU64"]\n",
1393                                            req->rq_import->imp_obd->obd_name,
1394                                            libcfs_nid2str(peer->nid),
1395                                            via, router,
1396                                            body->oa.o_valid & OBD_MD_FLFID ?
1397                                                 body->oa.o_fid : (__u64)0,
1398                                            body->oa.o_valid & OBD_MD_FLFID ?
1399                                                 body->oa.o_generation :(__u64)0,
1400                                            body->oa.o_id,
1401                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1402                                                 body->oa.o_gr : (__u64)0,
1403                                            aa->aa_ppga[0]->off,
1404                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1405                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1406                                                                         1);
1407                         CERROR("client %x, server %x, cksum_type %x\n",
1408                                client_cksum, server_cksum, cksum_type);
1409                         cksum_counter = 0;
1410                         aa->aa_oa->o_cksum = client_cksum;
1411                         rc = -EAGAIN;
1412                 } else {
1413                         cksum_counter++;
1414                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1415                         rc = 0;
1416                 }
1417         } else if (unlikely(client_cksum)) {
1418                 static int cksum_missed;
1419
1420                 cksum_missed++;
1421                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1422                         CERROR("Checksum %u requested from %s but not sent\n",
1423                                cksum_missed, libcfs_nid2str(peer->nid));
1424         } else {
1425                 rc = 0;
1426         }
1427 out:
1428         if (rc >= 0)
1429                 *aa->aa_oa = body->oa;
1430
1431         RETURN(rc);
1432 }
1433
1434 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1435                             struct lov_stripe_md *lsm,
1436                             obd_count page_count, struct brw_page **pga,
1437                             struct obd_capa *ocapa)
1438 {
1439         struct ptlrpc_request *req;
1440         int                    rc;
1441         cfs_waitq_t            waitq;
1442         int                    resends = 0;
1443         struct l_wait_info     lwi;
1444
1445         ENTRY;
1446
1447         cfs_waitq_init(&waitq);
1448
1449 restart_bulk:
1450         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1451                                   page_count, pga, &req, ocapa);
1452         if (rc != 0)
1453                 return (rc);
1454
1455         rc = ptlrpc_queue_wait(req);
1456
1457         if (rc == -ETIMEDOUT && req->rq_resend) {
1458                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1459                 ptlrpc_req_finished(req);
1460                 goto restart_bulk;
1461         }
1462
1463         rc = osc_brw_fini_request(req, rc);
1464
1465         ptlrpc_req_finished(req);
1466         if (osc_recoverable_error(rc)) {
1467                 resends++;
1468                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1469                         CERROR("too many resend retries, returning error\n");
1470                         RETURN(-EIO);
1471                 }
1472
1473                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1474                 l_wait_event(waitq, 0, &lwi);
1475
1476                 goto restart_bulk;
1477         }
1478
1479         RETURN (rc);
1480 }
1481
1482 int osc_brw_redo_request(struct ptlrpc_request *request,
1483                          struct osc_brw_async_args *aa)
1484 {
1485         struct ptlrpc_request *new_req;
1486         struct ptlrpc_request_set *set = request->rq_set;
1487         struct osc_brw_async_args *new_aa;
1488         struct osc_async_page *oap;
1489         int rc = 0;
1490         ENTRY;
1491
1492         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1493                 CERROR("too many resend retries, returning error\n");
1494                 RETURN(-EIO);
1495         }
1496
1497         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1498 /*
1499         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1500         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1501                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1502                                            REQ_REC_OFF + 3);
1503 */
1504         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1505                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1506                                   aa->aa_cli, aa->aa_oa,
1507                                   NULL /* lsm unused by osc currently */,
1508                                   aa->aa_page_count, aa->aa_ppga,
1509                                   &new_req, NULL /* ocapa */);
1510         if (rc)
1511                 RETURN(rc);
1512
1513         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1514
1515         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1516                 if (oap->oap_request != NULL) {
1517                         LASSERTF(request == oap->oap_request,
1518                                  "request %p != oap_request %p\n",
1519                                  request, oap->oap_request);
1520                         if (oap->oap_interrupted) {
1521                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1522                                 ptlrpc_req_finished(new_req);
1523                                 RETURN(-EINTR);
1524                         }
1525                 }
1526         }
1527         /* New request takes over pga and oaps from old request.
1528          * Note that copying a list_head doesn't work, need to move it... */
1529         aa->aa_resends++;
1530         new_req->rq_interpret_reply = request->rq_interpret_reply;
1531         new_req->rq_async_args = request->rq_async_args;
1532         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1533
1534         new_aa = ptlrpc_req_async_args(new_req);
1535
1536         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1537         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1538         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1539
1540         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1541                 if (oap->oap_request) {
1542                         ptlrpc_req_finished(oap->oap_request);
1543                         oap->oap_request = ptlrpc_request_addref(new_req);
1544                 }
1545         }
1546
1547         /* use ptlrpc_set_add_req is safe because interpret functions work
1548          * in check_set context. only one way exist with access to request
1549          * from different thread got -EINTR - this way protected with
1550          * cl_loi_list_lock */
1551         ptlrpc_set_add_req(set, new_req);
1552
1553         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1554
1555         DEBUG_REQ(D_INFO, new_req, "new request");
1556         RETURN(0);
1557 }
1558
1559 /*
1560  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1561  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1562  * fine for our small page arrays and doesn't require allocation.  its an
1563  * insertion sort that swaps elements that are strides apart, shrinking the
1564  * stride down until its '1' and the array is sorted.
1565  */
1566 static void sort_brw_pages(struct brw_page **array, int num)
1567 {
1568         int stride, i, j;
1569         struct brw_page *tmp;
1570
1571         if (num == 1)
1572                 return;
1573         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1574                 ;
1575
1576         do {
1577                 stride /= 3;
1578                 for (i = stride ; i < num ; i++) {
1579                         tmp = array[i];
1580                         j = i;
1581                         while (j >= stride && array[j - stride]->off > tmp->off) {
1582                                 array[j] = array[j - stride];
1583                                 j -= stride;
1584                         }
1585                         array[j] = tmp;
1586                 }
1587         } while (stride > 1);
1588 }
1589
1590 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1591 {
1592         int count = 1;
1593         int offset;
1594         int i = 0;
1595
1596         LASSERT (pages > 0);
1597         offset = pg[i]->off & ~CFS_PAGE_MASK;
1598
1599         for (;;) {
1600                 pages--;
1601                 if (pages == 0)         /* that's all */
1602                         return count;
1603
1604                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1605                         return count;   /* doesn't end on page boundary */
1606
1607                 i++;
1608                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1609                 if (offset != 0)        /* doesn't start on page boundary */
1610                         return count;
1611
1612                 count++;
1613         }
1614 }
1615
1616 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1617 {
1618         struct brw_page **ppga;
1619         int i;
1620
1621         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1622         if (ppga == NULL)
1623                 return NULL;
1624
1625         for (i = 0; i < count; i++)
1626                 ppga[i] = pga + i;
1627         return ppga;
1628 }
1629
1630 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1631 {
1632         LASSERT(ppga != NULL);
1633         OBD_FREE(ppga, sizeof(*ppga) * count);
1634 }
1635
1636 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1637                    obd_count page_count, struct brw_page *pga,
1638                    struct obd_trans_info *oti)
1639 {
1640         struct obdo *saved_oa = NULL;
1641         struct brw_page **ppga, **orig;
1642         struct obd_import *imp = class_exp2cliimp(exp);
1643         struct client_obd *cli = &imp->imp_obd->u.cli;
1644         int rc, page_count_orig;
1645         ENTRY;
1646
1647         if (cmd & OBD_BRW_CHECK) {
1648                 /* The caller just wants to know if there's a chance that this
1649                  * I/O can succeed */
1650
1651                 if (imp == NULL || imp->imp_invalid)
1652                         RETURN(-EIO);
1653                 RETURN(0);
1654         }
1655
1656         /* test_brw with a failed create can trip this, maybe others. */
1657         LASSERT(cli->cl_max_pages_per_rpc);
1658
1659         rc = 0;
1660
1661         orig = ppga = osc_build_ppga(pga, page_count);
1662         if (ppga == NULL)
1663                 RETURN(-ENOMEM);
1664         page_count_orig = page_count;
1665
1666         sort_brw_pages(ppga, page_count);
1667         while (page_count) {
1668                 obd_count pages_per_brw;
1669
1670                 if (page_count > cli->cl_max_pages_per_rpc)
1671                         pages_per_brw = cli->cl_max_pages_per_rpc;
1672                 else
1673                         pages_per_brw = page_count;
1674
1675                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1676
1677                 if (saved_oa != NULL) {
1678                         /* restore previously saved oa */
1679                         *oinfo->oi_oa = *saved_oa;
1680                 } else if (page_count > pages_per_brw) {
1681                         /* save a copy of oa (brw will clobber it) */
1682                         OBDO_ALLOC(saved_oa);
1683                         if (saved_oa == NULL)
1684                                 GOTO(out, rc = -ENOMEM);
1685                         *saved_oa = *oinfo->oi_oa;
1686                 }
1687
1688                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1689                                       pages_per_brw, ppga, oinfo->oi_capa);
1690
1691                 if (rc != 0)
1692                         break;
1693
1694                 page_count -= pages_per_brw;
1695                 ppga += pages_per_brw;
1696         }
1697
1698 out:
1699         osc_release_ppga(orig, page_count_orig);
1700
1701         if (saved_oa != NULL)
1702                 OBDO_FREE(saved_oa);
1703
1704         RETURN(rc);
1705 }
1706
1707 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1708  * the dirty accounting.  Writeback completes or truncate happens before
1709  * writing starts.  Must be called with the loi lock held. */
1710 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1711                            int sent)
1712 {
1713         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1714 }
1715
1716
1717 /* This maintains the lists of pending pages to read/write for a given object
1718  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1719  * to quickly find objects that are ready to send an RPC. */
1720 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1721                          int cmd)
1722 {
1723         int optimal;
1724         ENTRY;
1725
1726         if (lop->lop_num_pending == 0)
1727                 RETURN(0);
1728
1729         /* if we have an invalid import we want to drain the queued pages
1730          * by forcing them through rpcs that immediately fail and complete
1731          * the pages.  recovery relies on this to empty the queued pages
1732          * before canceling the locks and evicting down the llite pages */
1733         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1734                 RETURN(1);
1735
1736         /* stream rpcs in queue order as long as as there is an urgent page
1737          * queued.  this is our cheap solution for good batching in the case
1738          * where writepage marks some random page in the middle of the file
1739          * as urgent because of, say, memory pressure */
1740         if (!list_empty(&lop->lop_urgent)) {
1741                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1742                 RETURN(1);
1743         }
1744         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1745         optimal = cli->cl_max_pages_per_rpc;
1746         if (cmd & OBD_BRW_WRITE) {
1747                 /* trigger a write rpc stream as long as there are dirtiers
1748                  * waiting for space.  as they're waiting, they're not going to
1749                  * create more pages to coallesce with what's waiting.. */
1750                 if (!list_empty(&cli->cl_cache_waiters)) {
1751                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1752                         RETURN(1);
1753                 }
1754                 /* +16 to avoid triggering rpcs that would want to include pages
1755                  * that are being queued but which can't be made ready until
1756                  * the queuer finishes with the page. this is a wart for
1757                  * llite::commit_write() */
1758                 optimal += 16;
1759         }
1760         if (lop->lop_num_pending >= optimal)
1761                 RETURN(1);
1762
1763         RETURN(0);
1764 }
1765
1766 static void on_list(struct list_head *item, struct list_head *list,
1767                     int should_be_on)
1768 {
1769         if (list_empty(item) && should_be_on)
1770                 list_add_tail(item, list);
1771         else if (!list_empty(item) && !should_be_on)
1772                 list_del_init(item);
1773 }
1774
1775 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1776  * can find pages to build into rpcs quickly */
1777 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1778 {
1779         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1780                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1781                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1782
1783         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1784                 loi->loi_write_lop.lop_num_pending);
1785
1786         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1787                 loi->loi_read_lop.lop_num_pending);
1788 }
1789
1790 static void lop_update_pending(struct client_obd *cli,
1791                                struct loi_oap_pages *lop, int cmd, int delta)
1792 {
1793         lop->lop_num_pending += delta;
1794         if (cmd & OBD_BRW_WRITE)
1795                 cli->cl_pending_w_pages += delta;
1796         else
1797                 cli->cl_pending_r_pages += delta;
1798 }
1799
1800 /**
1801  * this is called when a sync waiter receives an interruption.  Its job is to
1802  * get the caller woken as soon as possible.  If its page hasn't been put in an
1803  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1804  * desiring interruption which will forcefully complete the rpc once the rpc
1805  * has timed out.
1806  */
1807 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1808 {
1809         struct loi_oap_pages *lop;
1810         struct lov_oinfo *loi;
1811         int rc = -EBUSY;
1812         ENTRY;
1813
1814         LASSERT(!oap->oap_interrupted);
1815         oap->oap_interrupted = 1;
1816
1817         /* ok, it's been put in an rpc. only one oap gets a request reference */
1818         if (oap->oap_request != NULL) {
1819                 ptlrpc_mark_interrupted(oap->oap_request);
1820                 ptlrpcd_wake(oap->oap_request);
1821                 ptlrpc_req_finished(oap->oap_request);
1822                 oap->oap_request = NULL;
1823         }
1824
1825         /*
1826          * page completion may be called only if ->cpo_prep() method was
1827          * executed by osc_io_submit(), that also adds page the to pending list
1828          */
1829         if (!list_empty(&oap->oap_pending_item)) {
1830                 list_del_init(&oap->oap_pending_item);
1831                 list_del_init(&oap->oap_urgent_item);
1832
1833                 loi = oap->oap_loi;
1834                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1835                         &loi->loi_write_lop : &loi->loi_read_lop;
1836                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1837                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1838                 rc = oap->oap_caller_ops->ap_completion(env,
1839                                           oap->oap_caller_data,
1840                                           oap->oap_cmd, NULL, -EINTR);
1841         }
1842
1843         RETURN(rc);
1844 }
1845
1846 /* this is trying to propogate async writeback errors back up to the
1847  * application.  As an async write fails we record the error code for later if
1848  * the app does an fsync.  As long as errors persist we force future rpcs to be
1849  * sync so that the app can get a sync error and break the cycle of queueing
1850  * pages for which writeback will fail. */
1851 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1852                            int rc)
1853 {
1854         if (rc) {
1855                 if (!ar->ar_rc)
1856                         ar->ar_rc = rc;
1857
1858                 ar->ar_force_sync = 1;
1859                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1860                 return;
1861
1862         }
1863
1864         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1865                 ar->ar_force_sync = 0;
1866 }
1867
1868 void osc_oap_to_pending(struct osc_async_page *oap)
1869 {
1870         struct loi_oap_pages *lop;
1871
1872         if (oap->oap_cmd & OBD_BRW_WRITE)
1873                 lop = &oap->oap_loi->loi_write_lop;
1874         else
1875                 lop = &oap->oap_loi->loi_read_lop;
1876
1877         if (oap->oap_async_flags & ASYNC_URGENT)
1878                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1879         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1880         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1881 }
1882
1883 /* this must be called holding the loi list lock to give coverage to exit_cache,
1884  * async_flag maintenance, and oap_request */
1885 static void osc_ap_completion(const struct lu_env *env,
1886                               struct client_obd *cli, struct obdo *oa,
1887                               struct osc_async_page *oap, int sent, int rc)
1888 {
1889         __u64 xid = 0;
1890
1891         ENTRY;
1892         if (oap->oap_request != NULL) {
1893                 xid = ptlrpc_req_xid(oap->oap_request);
1894                 ptlrpc_req_finished(oap->oap_request);
1895                 oap->oap_request = NULL;
1896         }
1897
1898         oap->oap_async_flags = 0;
1899         oap->oap_interrupted = 0;
1900
1901         if (oap->oap_cmd & OBD_BRW_WRITE) {
1902                 osc_process_ar(&cli->cl_ar, xid, rc);
1903                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1904         }
1905
1906         if (rc == 0 && oa != NULL) {
1907                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1908                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1909                 if (oa->o_valid & OBD_MD_FLMTIME)
1910                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1911                 if (oa->o_valid & OBD_MD_FLATIME)
1912                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1913                 if (oa->o_valid & OBD_MD_FLCTIME)
1914                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1915         }
1916
1917         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1918                                                 oap->oap_cmd, oa, rc);
1919
1920         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1921          * I/O on the page could start, but OSC calls it under lock
1922          * and thus we can add oap back to pending safely */
1923         if (rc)
1924                 /* upper layer wants to leave the page on pending queue */
1925                 osc_oap_to_pending(oap);
1926         else
1927                 osc_exit_cache(cli, oap, sent);
1928         EXIT;
1929 }
1930
1931 static int brw_interpret(const struct lu_env *env,
1932                          struct ptlrpc_request *req, void *data, int rc)
1933 {
1934         struct osc_brw_async_args *aa = data;
1935         struct client_obd *cli;
1936         int async;
1937         ENTRY;
1938
1939         rc = osc_brw_fini_request(req, rc);
1940         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1941         if (osc_recoverable_error(rc)) {
1942                 rc = osc_brw_redo_request(req, aa);
1943                 if (rc == 0)
1944                         RETURN(0);
1945         }
1946
1947         cli = aa->aa_cli;
1948
1949         client_obd_list_lock(&cli->cl_loi_list_lock);
1950
1951         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1952          * is called so we know whether to go to sync BRWs or wait for more
1953          * RPCs to complete */
1954         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1955                 cli->cl_w_in_flight--;
1956         else
1957                 cli->cl_r_in_flight--;
1958
1959         async = list_empty(&aa->aa_oaps);
1960         if (!async) { /* from osc_send_oap_rpc() */
1961                 struct osc_async_page *oap, *tmp;
1962                 /* the caller may re-use the oap after the completion call so
1963                  * we need to clean it up a little */
1964                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1965                         list_del_init(&oap->oap_rpc_item);
1966                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1967                 }
1968                 OBDO_FREE(aa->aa_oa);
1969         } else { /* from async_internal() */
1970                 int i;
1971                 for (i = 0; i < aa->aa_page_count; i++)
1972                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1973         }
1974         osc_wake_cache_waiters(cli);
1975         osc_check_rpcs(env, cli);
1976         client_obd_list_unlock(&cli->cl_loi_list_lock);
1977         if (!async)
1978                 cl_req_completion(env, aa->aa_clerq, rc);
1979         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1980         RETURN(rc);
1981 }
1982
1983 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1984                                             struct client_obd *cli,
1985                                             struct list_head *rpc_list,
1986                                             int page_count, int cmd)
1987 {
1988         struct ptlrpc_request *req;
1989         struct brw_page **pga = NULL;
1990         struct osc_brw_async_args *aa;
1991         struct obdo *oa = NULL;
1992         const struct obd_async_page_ops *ops = NULL;
1993         void *caller_data = NULL;
1994         struct osc_async_page *oap;
1995         struct osc_async_page *tmp;
1996         struct ost_body *body;
1997         struct cl_req *clerq = NULL;
1998         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1999         struct ldlm_lock *lock = NULL;
2000         struct cl_req_attr crattr;
2001         int i, rc;
2002
2003         ENTRY;
2004         LASSERT(!list_empty(rpc_list));
2005
2006         memset(&crattr, 0, sizeof crattr);
2007         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2008         if (pga == NULL)
2009                 GOTO(out, req = ERR_PTR(-ENOMEM));
2010
2011         OBDO_ALLOC(oa);
2012         if (oa == NULL)
2013                 GOTO(out, req = ERR_PTR(-ENOMEM));
2014
2015         i = 0;
2016         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2017                 struct cl_page *page = osc_oap2cl_page(oap);
2018                 if (ops == NULL) {
2019                         ops = oap->oap_caller_ops;
2020                         caller_data = oap->oap_caller_data;
2021
2022                         clerq = cl_req_alloc(env, page, crt,
2023                                              1 /* only 1-object rpcs for
2024                                                 * now */);
2025                         if (IS_ERR(clerq))
2026                                 GOTO(out, req = (void *)clerq);
2027                         lock = oap->oap_ldlm_lock;
2028                 }
2029                 pga[i] = &oap->oap_brw_page;
2030                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2031                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2032                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2033                 i++;
2034                 cl_req_page_add(env, clerq, page);
2035         }
2036
2037         /* always get the data for the obdo for the rpc */
2038         LASSERT(ops != NULL);
2039         crattr.cra_oa = oa;
2040         crattr.cra_capa = NULL;
2041         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2042         if (lock) {
2043                 oa->o_handle = lock->l_remote_handle;
2044                 oa->o_valid |= OBD_MD_FLHANDLE;
2045         }
2046
2047         rc = cl_req_prep(env, clerq);
2048         if (rc != 0) {
2049                 CERROR("cl_req_prep failed: %d\n", rc);
2050                 GOTO(out, req = ERR_PTR(rc));
2051         }
2052
2053         sort_brw_pages(pga, page_count);
2054         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2055                                   pga, &req, crattr.cra_capa);
2056         if (rc != 0) {
2057                 CERROR("prep_req failed: %d\n", rc);
2058                 GOTO(out, req = ERR_PTR(rc));
2059         }
2060
2061         /* Need to update the timestamps after the request is built in case
2062          * we race with setattr (locally or in queue at OST).  If OST gets
2063          * later setattr before earlier BRW (as determined by the request xid),
2064          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2065          * way to do this in a single call.  bug 10150 */
2066         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2067         cl_req_attr_set(env, clerq, &crattr,
2068                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2069
2070         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2071         aa = ptlrpc_req_async_args(req);
2072         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2073         list_splice(rpc_list, &aa->aa_oaps);
2074         CFS_INIT_LIST_HEAD(rpc_list);
2075         aa->aa_clerq = clerq;
2076 out:
2077         capa_put(crattr.cra_capa);
2078         if (IS_ERR(req)) {
2079                 if (oa)
2080                         OBDO_FREE(oa);
2081                 if (pga)
2082                         OBD_FREE(pga, sizeof(*pga) * page_count);
2083                 /* this should happen rarely and is pretty bad, it makes the
2084                  * pending list not follow the dirty order */
2085                 client_obd_list_lock(&cli->cl_loi_list_lock);
2086                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2087                         list_del_init(&oap->oap_rpc_item);
2088
2089                         /* queued sync pages can be torn down while the pages
2090                          * were between the pending list and the rpc */
2091                         if (oap->oap_interrupted) {
2092                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2093                                 osc_ap_completion(env, cli, NULL, oap, 0,
2094                                                   oap->oap_count);
2095                                 continue;
2096                         }
2097                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2098                 }
2099                 if (clerq && !IS_ERR(clerq))
2100                         cl_req_completion(env, clerq, PTR_ERR(req));
2101         }
2102         RETURN(req);
2103 }
2104
2105 /**
2106  * prepare pages for ASYNC io and put pages in send queue.
2107  *
2108  * \param cli -
2109  * \param loi -
2110  * \param cmd - OBD_BRW_* macroses
2111  * \param lop - pending pages
2112  *
2113  * \return zero if pages successfully add to send queue.
2114  * \return not zere if error occurring.
2115  */
2116 static int
2117 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2118                  struct lov_oinfo *loi,
2119                  int cmd, struct loi_oap_pages *lop)
2120 {
2121         struct ptlrpc_request *req;
2122         obd_count page_count = 0;
2123         struct osc_async_page *oap = NULL, *tmp;
2124         struct osc_brw_async_args *aa;
2125         const struct obd_async_page_ops *ops;
2126         CFS_LIST_HEAD(rpc_list);
2127         unsigned int ending_offset;
2128         unsigned  starting_offset = 0;
2129         int srvlock = 0;
2130         struct cl_object *clob = NULL;
2131         ENTRY;
2132
2133         /* first we find the pages we're allowed to work with */
2134         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2135                                  oap_pending_item) {
2136                 ops = oap->oap_caller_ops;
2137
2138                 LASSERT(oap->oap_magic == OAP_MAGIC);
2139
2140                 if (clob == NULL) {
2141                         /* pin object in memory, so that completion call-backs
2142                          * can be safely called under client_obd_list lock. */
2143                         clob = osc_oap2cl_page(oap)->cp_obj;
2144                         cl_object_get(clob);
2145                 }
2146
2147                 if (page_count != 0 &&
2148                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2149                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2150                                " oap %p, page %p, srvlock %u\n",
2151                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2152                         break;
2153                 }
2154                 /* in llite being 'ready' equates to the page being locked
2155                  * until completion unlocks it.  commit_write submits a page
2156                  * as not ready because its unlock will happen unconditionally
2157                  * as the call returns.  if we race with commit_write giving
2158                  * us that page we dont' want to create a hole in the page
2159                  * stream, so we stop and leave the rpc to be fired by
2160                  * another dirtier or kupdated interval (the not ready page
2161                  * will still be on the dirty list).  we could call in
2162                  * at the end of ll_file_write to process the queue again. */
2163                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2164                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2165                                                     cmd);
2166                         if (rc < 0)
2167                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2168                                                 "instead of ready\n", oap,
2169                                                 oap->oap_page, rc);
2170                         switch (rc) {
2171                         case -EAGAIN:
2172                                 /* llite is telling us that the page is still
2173                                  * in commit_write and that we should try
2174                                  * and put it in an rpc again later.  we
2175                                  * break out of the loop so we don't create
2176                                  * a hole in the sequence of pages in the rpc
2177                                  * stream.*/
2178                                 oap = NULL;
2179                                 break;
2180                         case -EINTR:
2181                                 /* the io isn't needed.. tell the checks
2182                                  * below to complete the rpc with EINTR */
2183                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2184                                 oap->oap_count = -EINTR;
2185                                 break;
2186                         case 0:
2187                                 oap->oap_async_flags |= ASYNC_READY;
2188                                 break;
2189                         default:
2190                                 LASSERTF(0, "oap %p page %p returned %d "
2191                                             "from make_ready\n", oap,
2192                                             oap->oap_page, rc);
2193                                 break;
2194                         }
2195                 }
2196                 if (oap == NULL)
2197                         break;
2198                 /*
2199                  * Page submitted for IO has to be locked. Either by
2200                  * ->ap_make_ready() or by higher layers.
2201                  */
2202 #if defined(__KERNEL__) && defined(__linux__)
2203                 {
2204                         struct cl_page *page;
2205
2206                         page = osc_oap2cl_page(oap);
2207
2208                         if (page->cp_type == CPT_CACHEABLE &&
2209                             !(PageLocked(oap->oap_page) &&
2210                               (CheckWriteback(oap->oap_page, cmd)))) {
2211                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2212                                        oap->oap_page,
2213                                        (long)oap->oap_page->flags,
2214                                        oap->oap_async_flags);
2215                                 LBUG();
2216                         }
2217                 }
2218 #endif
2219                 /* If there is a gap at the start of this page, it can't merge
2220                  * with any previous page, so we'll hand the network a
2221                  * "fragmented" page array that it can't transfer in 1 RDMA */
2222                 if (page_count != 0 && oap->oap_page_off != 0)
2223                         break;
2224
2225                 /* take the page out of our book-keeping */
2226                 list_del_init(&oap->oap_pending_item);
2227                 lop_update_pending(cli, lop, cmd, -1);
2228                 list_del_init(&oap->oap_urgent_item);
2229
2230                 if (page_count == 0)
2231                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2232                                           (PTLRPC_MAX_BRW_SIZE - 1);
2233
2234                 /* ask the caller for the size of the io as the rpc leaves. */
2235                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2236                         oap->oap_count =
2237                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2238                                                       cmd);
2239                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2240                 }
2241                 if (oap->oap_count <= 0) {
2242                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2243                                oap->oap_count);
2244                         osc_ap_completion(env, cli, NULL,
2245                                           oap, 0, oap->oap_count);
2246                         continue;
2247                 }
2248
2249                 /* now put the page back in our accounting */
2250                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2251                 if (page_count == 0)
2252                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2253                 if (++page_count >= cli->cl_max_pages_per_rpc)
2254                         break;
2255
2256                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2257                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2258                  * have the same alignment as the initial writes that allocated
2259                  * extents on the server. */
2260                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2261                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2262                 if (ending_offset == 0)
2263                         break;
2264
2265                 /* If there is a gap at the end of this page, it can't merge
2266                  * with any subsequent pages, so we'll hand the network a
2267                  * "fragmented" page array that it can't transfer in 1 RDMA */
2268                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2269                         break;
2270         }
2271
2272         osc_wake_cache_waiters(cli);
2273
2274         loi_list_maint(cli, loi);
2275
2276         client_obd_list_unlock(&cli->cl_loi_list_lock);
2277
2278         if (clob != NULL)
2279                 cl_object_put(env, clob);
2280
2281         if (page_count == 0) {
2282                 client_obd_list_lock(&cli->cl_loi_list_lock);
2283                 RETURN(0);
2284         }
2285
2286         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2287         if (IS_ERR(req)) {
2288                 LASSERT(list_empty(&rpc_list));
2289                 loi_list_maint(cli, loi);
2290                 RETURN(PTR_ERR(req));
2291         }
2292
2293         aa = ptlrpc_req_async_args(req);
2294
2295         if (cmd == OBD_BRW_READ) {
2296                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2297                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2298                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2299                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2300         } else {
2301                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2302                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2303                                  cli->cl_w_in_flight);
2304                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2305                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2306         }
2307         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2308
2309         client_obd_list_lock(&cli->cl_loi_list_lock);
2310
2311         if (cmd == OBD_BRW_READ)
2312                 cli->cl_r_in_flight++;
2313         else
2314                 cli->cl_w_in_flight++;
2315
2316         /* queued sync pages can be torn down while the pages
2317          * were between the pending list and the rpc */
2318         tmp = NULL;
2319         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2320                 /* only one oap gets a request reference */
2321                 if (tmp == NULL)
2322                         tmp = oap;
2323                 if (oap->oap_interrupted && !req->rq_intr) {
2324                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2325                                oap, req);
2326                         ptlrpc_mark_interrupted(req);
2327                 }
2328         }
2329         if (tmp != NULL)
2330                 tmp->oap_request = ptlrpc_request_addref(req);
2331
2332         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2333                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2334
2335         req->rq_interpret_reply = brw_interpret;
2336         ptlrpcd_add_req(req, PSCOPE_BRW);
2337         RETURN(1);
2338 }
2339
2340 #define LOI_DEBUG(LOI, STR, args...)                                     \
2341         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2342                !list_empty(&(LOI)->loi_cli_item),                        \
2343                (LOI)->loi_write_lop.lop_num_pending,                     \
2344                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2345                (LOI)->loi_read_lop.lop_num_pending,                      \
2346                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2347                args)                                                     \
2348
2349 /* This is called by osc_check_rpcs() to find which objects have pages that
2350  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2351 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2352 {
2353         ENTRY;
2354         /* first return all objects which we already know to have
2355          * pages ready to be stuffed into rpcs */
2356         if (!list_empty(&cli->cl_loi_ready_list))
2357                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2358                                   struct lov_oinfo, loi_cli_item));
2359
2360         /* then if we have cache waiters, return all objects with queued
2361          * writes.  This is especially important when many small files
2362          * have filled up the cache and not been fired into rpcs because
2363          * they don't pass the nr_pending/object threshhold */
2364         if (!list_empty(&cli->cl_cache_waiters) &&
2365             !list_empty(&cli->cl_loi_write_list))
2366                 RETURN(list_entry(cli->cl_loi_write_list.next,
2367                                   struct lov_oinfo, loi_write_item));
2368
2369         /* then return all queued objects when we have an invalid import
2370          * so that they get flushed */
2371         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2372                 if (!list_empty(&cli->cl_loi_write_list))
2373                         RETURN(list_entry(cli->cl_loi_write_list.next,
2374                                           struct lov_oinfo, loi_write_item));
2375                 if (!list_empty(&cli->cl_loi_read_list))
2376                         RETURN(list_entry(cli->cl_loi_read_list.next,
2377                                           struct lov_oinfo, loi_read_item));
2378         }
2379         RETURN(NULL);
2380 }
2381
2382 /* called with the loi list lock held */
2383 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2384 {
2385         struct lov_oinfo *loi;
2386         int rc = 0, race_counter = 0;
2387         ENTRY;
2388
2389         while ((loi = osc_next_loi(cli)) != NULL) {
2390                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2391
2392                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2393                         break;
2394
2395                 /* attempt some read/write balancing by alternating between
2396                  * reads and writes in an object.  The makes_rpc checks here
2397                  * would be redundant if we were getting read/write work items
2398                  * instead of objects.  we don't want send_oap_rpc to drain a
2399                  * partial read pending queue when we're given this object to
2400                  * do io on writes while there are cache waiters */
2401                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2402                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2403                                               &loi->loi_write_lop);
2404                         if (rc < 0)
2405                                 break;
2406                         if (rc > 0)
2407                                 race_counter = 0;
2408                         else
2409                                 race_counter++;
2410                 }
2411                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2412                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2413                                               &loi->loi_read_lop);
2414                         if (rc < 0)
2415                                 break;
2416                         if (rc > 0)
2417                                 race_counter = 0;
2418                         else
2419                                 race_counter++;
2420                 }
2421
2422                 /* attempt some inter-object balancing by issueing rpcs
2423                  * for each object in turn */
2424                 if (!list_empty(&loi->loi_cli_item))
2425                         list_del_init(&loi->loi_cli_item);
2426                 if (!list_empty(&loi->loi_write_item))
2427                         list_del_init(&loi->loi_write_item);
2428                 if (!list_empty(&loi->loi_read_item))
2429                         list_del_init(&loi->loi_read_item);
2430
2431                 loi_list_maint(cli, loi);
2432
2433                 /* send_oap_rpc fails with 0 when make_ready tells it to
2434                  * back off.  llite's make_ready does this when it tries
2435                  * to lock a page queued for write that is already locked.
2436                  * we want to try sending rpcs from many objects, but we
2437                  * don't want to spin failing with 0.  */
2438                 if (race_counter == 10)
2439                         break;
2440         }
2441         EXIT;
2442 }
2443
2444 /* we're trying to queue a page in the osc so we're subject to the
2445  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2446  * If the osc's queued pages are already at that limit, then we want to sleep
2447  * until there is space in the osc's queue for us.  We also may be waiting for
2448  * write credits from the OST if there are RPCs in flight that may return some
2449  * before we fall back to sync writes.
2450  *
2451  * We need this know our allocation was granted in the presence of signals */
2452 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2453 {
2454         int rc;
2455         ENTRY;
2456         client_obd_list_lock(&cli->cl_loi_list_lock);
2457         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2458         client_obd_list_unlock(&cli->cl_loi_list_lock);
2459         RETURN(rc);
2460 };
2461
2462 /**
2463  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2464  * is available.
2465  */
2466 int osc_enter_cache_try(const struct lu_env *env,
2467                         struct client_obd *cli, struct lov_oinfo *loi,
2468                         struct osc_async_page *oap, int transient)
2469 {
2470         int has_grant;
2471
2472         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2473         if (has_grant) {
2474                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2475                 if (transient) {
2476                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2477                         atomic_inc(&obd_dirty_transit_pages);
2478                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2479                 }
2480         }
2481         return has_grant;
2482 }
2483
2484 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2485  * grant or cache space. */
2486 static int osc_enter_cache(const struct lu_env *env,
2487                            struct client_obd *cli, struct lov_oinfo *loi,
2488                            struct osc_async_page *oap)
2489 {
2490         struct osc_cache_waiter ocw;
2491         struct l_wait_info lwi = { 0 };
2492
2493         ENTRY;
2494
2495         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2496                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2497                cli->cl_dirty_max, obd_max_dirty_pages,
2498                cli->cl_lost_grant, cli->cl_avail_grant);
2499
2500         /* force the caller to try sync io.  this can jump the list
2501          * of queued writes and create a discontiguous rpc stream */
2502         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2503             loi->loi_ar.ar_force_sync)
2504                 RETURN(-EDQUOT);
2505
2506         /* Hopefully normal case - cache space and write credits available */
2507         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2508             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2509             osc_enter_cache_try(env, cli, loi, oap, 0))
2510                 RETURN(0);
2511
2512         /* Make sure that there are write rpcs in flight to wait for.  This
2513          * is a little silly as this object may not have any pending but
2514          * other objects sure might. */
2515         if (cli->cl_w_in_flight) {
2516                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2517                 cfs_waitq_init(&ocw.ocw_waitq);
2518                 ocw.ocw_oap = oap;
2519                 ocw.ocw_rc = 0;
2520
2521                 loi_list_maint(cli, loi);
2522                 osc_check_rpcs(env, cli);
2523                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2524
2525                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2526                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2527
2528                 client_obd_list_lock(&cli->cl_loi_list_lock);
2529                 if (!list_empty(&ocw.ocw_entry)) {
2530                         list_del(&ocw.ocw_entry);
2531                         RETURN(-EINTR);
2532                 }
2533                 RETURN(ocw.ocw_rc);
2534         }
2535
2536         RETURN(-EDQUOT);
2537 }
2538
2539
2540 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2541                         struct lov_oinfo *loi, cfs_page_t *page,
2542                         obd_off offset, const struct obd_async_page_ops *ops,
2543                         void *data, void **res, int nocache,
2544                         struct lustre_handle *lockh)
2545 {
2546         struct osc_async_page *oap;
2547
2548         ENTRY;
2549
2550         if (!page)
2551                 return size_round(sizeof(*oap));
2552
2553         oap = *res;
2554         oap->oap_magic = OAP_MAGIC;
2555         oap->oap_cli = &exp->exp_obd->u.cli;
2556         oap->oap_loi = loi;
2557
2558         oap->oap_caller_ops = ops;
2559         oap->oap_caller_data = data;
2560
2561         oap->oap_page = page;
2562         oap->oap_obj_off = offset;
2563
2564         LASSERT(!(offset & ~CFS_PAGE_MASK));
2565
2566         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2567         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2568         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2569         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2570
2571         spin_lock_init(&oap->oap_lock);
2572         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2573         RETURN(0);
2574 }
2575
2576 struct osc_async_page *oap_from_cookie(void *cookie)
2577 {
2578         struct osc_async_page *oap = cookie;
2579         if (oap->oap_magic != OAP_MAGIC)
2580                 return ERR_PTR(-EINVAL);
2581         return oap;
2582 };
2583
2584 int osc_queue_async_io(const struct lu_env *env,
2585                        struct obd_export *exp, struct lov_stripe_md *lsm,
2586                        struct lov_oinfo *loi, void *cookie,
2587                        int cmd, obd_off off, int count,
2588                        obd_flag brw_flags, enum async_flags async_flags)
2589 {
2590         struct client_obd *cli = &exp->exp_obd->u.cli;
2591         struct osc_async_page *oap;
2592         int rc = 0;
2593         ENTRY;
2594
2595         oap = oap_from_cookie(cookie);
2596         if (IS_ERR(oap))
2597                 RETURN(PTR_ERR(oap));
2598
2599         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2600                 RETURN(-EIO);
2601
2602         if (!list_empty(&oap->oap_pending_item) ||
2603             !list_empty(&oap->oap_urgent_item) ||
2604             !list_empty(&oap->oap_rpc_item))
2605                 RETURN(-EBUSY);
2606
2607         /* check if the file's owner/group is over quota */
2608 #ifdef HAVE_QUOTA_SUPPORT
2609         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2610                 struct cl_object *obj;
2611                 struct cl_attr    attr; /* XXX put attr into thread info */
2612
2613                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2614
2615                 cl_object_attr_lock(obj);
2616                 rc = cl_object_attr_get(env, obj, &attr);
2617                 cl_object_attr_unlock(obj);
2618
2619                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2620                                             attr.cat_gid) == NO_QUOTA)
2621                         rc = -EDQUOT;
2622                 if (rc)
2623                         RETURN(rc);
2624         }
2625 #endif
2626
2627         if (loi == NULL)
2628                 loi = lsm->lsm_oinfo[0];
2629
2630         client_obd_list_lock(&cli->cl_loi_list_lock);
2631
2632         LASSERT(off + count <= CFS_PAGE_SIZE);
2633         oap->oap_cmd = cmd;
2634         oap->oap_page_off = off;
2635         oap->oap_count = count;
2636         oap->oap_brw_flags = brw_flags;
2637         oap->oap_async_flags = async_flags;
2638
2639         if (cmd & OBD_BRW_WRITE) {
2640                 rc = osc_enter_cache(env, cli, loi, oap);
2641                 if (rc) {
2642                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2643                         RETURN(rc);
2644                 }
2645         }
2646
2647         osc_oap_to_pending(oap);
2648         loi_list_maint(cli, loi);
2649
2650         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2651                   cmd);
2652
2653         osc_check_rpcs(env, cli);
2654         client_obd_list_unlock(&cli->cl_loi_list_lock);
2655
2656         RETURN(0);
2657 }
2658
2659 /* aka (~was & now & flag), but this is more clear :) */
2660 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2661
2662 int osc_set_async_flags_base(struct client_obd *cli,
2663                              struct lov_oinfo *loi, struct osc_async_page *oap,
2664                              obd_flag async_flags)
2665 {
2666         struct loi_oap_pages *lop;
2667         ENTRY;
2668
2669         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2670                 RETURN(-EIO);
2671
2672         if (oap->oap_cmd & OBD_BRW_WRITE) {
2673                 lop = &loi->loi_write_lop;
2674         } else {
2675                 lop = &loi->loi_read_lop;
2676         }
2677
2678         if (list_empty(&oap->oap_pending_item))
2679                 RETURN(-EINVAL);
2680
2681         if ((oap->oap_async_flags & async_flags) == async_flags)
2682                 RETURN(0);
2683
2684         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2685                 oap->oap_async_flags |= ASYNC_READY;
2686
2687         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2688                 if (list_empty(&oap->oap_rpc_item)) {
2689                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2690                         loi_list_maint(cli, loi);
2691                 }
2692         }
2693
2694         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2695                         oap->oap_async_flags);
2696         RETURN(0);
2697 }
2698
2699 int osc_teardown_async_page(struct obd_export *exp,
2700                             struct lov_stripe_md *lsm,
2701                             struct lov_oinfo *loi, void *cookie)
2702 {
2703         struct client_obd *cli = &exp->exp_obd->u.cli;
2704         struct loi_oap_pages *lop;
2705         struct osc_async_page *oap;
2706         int rc = 0;
2707         ENTRY;
2708
2709         oap = oap_from_cookie(cookie);
2710         if (IS_ERR(oap))
2711                 RETURN(PTR_ERR(oap));
2712
2713         if (loi == NULL)
2714                 loi = lsm->lsm_oinfo[0];
2715
2716         if (oap->oap_cmd & OBD_BRW_WRITE) {
2717                 lop = &loi->loi_write_lop;
2718         } else {
2719                 lop = &loi->loi_read_lop;
2720         }
2721
2722         client_obd_list_lock(&cli->cl_loi_list_lock);
2723
2724         if (!list_empty(&oap->oap_rpc_item))
2725                 GOTO(out, rc = -EBUSY);
2726
2727         osc_exit_cache(cli, oap, 0);
2728         osc_wake_cache_waiters(cli);
2729
2730         if (!list_empty(&oap->oap_urgent_item)) {
2731                 list_del_init(&oap->oap_urgent_item);
2732                 oap->oap_async_flags &= ~ASYNC_URGENT;
2733         }
2734         if (!list_empty(&oap->oap_pending_item)) {
2735                 list_del_init(&oap->oap_pending_item);
2736                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2737         }
2738         loi_list_maint(cli, loi);
2739         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2740 out:
2741         client_obd_list_unlock(&cli->cl_loi_list_lock);
2742         RETURN(rc);
2743 }
2744
2745 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2746                                          struct ldlm_enqueue_info *einfo,
2747                                          int flags)
2748 {
2749         void *data = einfo->ei_cbdata;
2750
2751         LASSERT(lock != NULL);
2752         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2753         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2754         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2755         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2756
2757         lock_res_and_lock(lock);
2758         spin_lock(&osc_ast_guard);
2759         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2760         lock->l_ast_data = data;
2761         spin_unlock(&osc_ast_guard);
2762         unlock_res_and_lock(lock);
2763 }
2764
2765 static void osc_set_data_with_check(struct lustre_handle *lockh,
2766                                     struct ldlm_enqueue_info *einfo,
2767                                     int flags)
2768 {
2769         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2770
2771         if (lock != NULL) {
2772                 osc_set_lock_data_with_check(lock, einfo, flags);
2773                 LDLM_LOCK_PUT(lock);
2774         } else
2775                 CERROR("lockh %p, data %p - client evicted?\n",
2776                        lockh, einfo->ei_cbdata);
2777 }
2778
2779 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2780                              ldlm_iterator_t replace, void *data)
2781 {
2782         struct ldlm_res_id res_id;
2783         struct obd_device *obd = class_exp2obd(exp);
2784
2785         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2786         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2787         return 0;
2788 }
2789
2790 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2791                             obd_enqueue_update_f upcall, void *cookie,
2792                             int *flags, int rc)
2793 {
2794         int intent = *flags & LDLM_FL_HAS_INTENT;
2795         ENTRY;
2796
2797         if (intent) {
2798                 /* The request was created before ldlm_cli_enqueue call. */
2799                 if (rc == ELDLM_LOCK_ABORTED) {
2800                         struct ldlm_reply *rep;
2801                         rep = req_capsule_server_get(&req->rq_pill,
2802                                                      &RMF_DLM_REP);
2803
2804                         LASSERT(rep != NULL);
2805                         if (rep->lock_policy_res1)
2806                                 rc = rep->lock_policy_res1;
2807                 }
2808         }
2809
2810         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2811                 *flags |= LDLM_FL_LVB_READY;
2812                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2813                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2814         }
2815
2816         /* Call the update callback. */
2817         rc = (*upcall)(cookie, rc);
2818         RETURN(rc);
2819 }
2820
2821 static int osc_enqueue_interpret(const struct lu_env *env,
2822                                  struct ptlrpc_request *req,
2823                                  struct osc_enqueue_args *aa, int rc)
2824 {
2825         struct ldlm_lock *lock;
2826         struct lustre_handle handle;
2827         __u32 mode;
2828
2829         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2830          * might be freed anytime after lock upcall has been called. */
2831         lustre_handle_copy(&handle, aa->oa_lockh);
2832         mode = aa->oa_ei->ei_mode;
2833
2834         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2835          * be valid. */
2836         lock = ldlm_handle2lock(&handle);
2837
2838         /* Take an additional reference so that a blocking AST that
2839          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2840          * to arrive after an upcall has been executed by
2841          * osc_enqueue_fini(). */
2842         ldlm_lock_addref(&handle, mode);
2843
2844         /* Complete obtaining the lock procedure. */
2845         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2846                                    mode, aa->oa_flags, aa->oa_lvb,
2847                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2848                                    &handle, rc);
2849         /* Complete osc stuff. */
2850         rc = osc_enqueue_fini(req, aa->oa_lvb,
2851                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2852         /* Release the lock for async request. */
2853         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2854                 /*
2855                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2856                  * not already released by
2857                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2858                  */
2859                 ldlm_lock_decref(&handle, mode);
2860
2861         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2862                  aa->oa_lockh, req, aa);
2863         ldlm_lock_decref(&handle, mode);
2864         LDLM_LOCK_PUT(lock);
2865         return rc;
2866 }
2867
2868 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2869                         struct lov_oinfo *loi, int flags,
2870                         struct ost_lvb *lvb, __u32 mode, int rc)
2871 {
2872         if (rc == ELDLM_OK) {
2873                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2874                 __u64 tmp;
2875
2876                 LASSERT(lock != NULL);
2877                 loi->loi_lvb = *lvb;
2878                 tmp = loi->loi_lvb.lvb_size;
2879                 /* Extend KMS up to the end of this lock and no further
2880                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2881                 if (tmp > lock->l_policy_data.l_extent.end)
2882                         tmp = lock->l_policy_data.l_extent.end + 1;
2883                 if (tmp >= loi->loi_kms) {
2884                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2885                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2886                         loi_kms_set(loi, tmp);
2887                 } else {
2888                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2889                                    LPU64"; leaving kms="LPU64", end="LPU64,
2890                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2891                                    lock->l_policy_data.l_extent.end);
2892                 }
2893                 ldlm_lock_allow_match(lock);
2894                 LDLM_LOCK_PUT(lock);
2895         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2896                 loi->loi_lvb = *lvb;
2897                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2898                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2899                 rc = ELDLM_OK;
2900         }
2901 }
2902 EXPORT_SYMBOL(osc_update_enqueue);
2903
2904 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2905
2906 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2907  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2908  * other synchronous requests, however keeping some locks and trying to obtain
2909  * others may take a considerable amount of time in a case of ost failure; and
2910  * when other sync requests do not get released lock from a client, the client
2911  * is excluded from the cluster -- such scenarious make the life difficult, so
2912  * release locks just after they are obtained. */
2913 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2914                      int *flags, ldlm_policy_data_t *policy,
2915                      struct ost_lvb *lvb, int kms_valid,
2916                      obd_enqueue_update_f upcall, void *cookie,
2917                      struct ldlm_enqueue_info *einfo,
2918                      struct lustre_handle *lockh,
2919                      struct ptlrpc_request_set *rqset, int async)
2920 {
2921         struct obd_device *obd = exp->exp_obd;
2922         struct ptlrpc_request *req = NULL;
2923         int intent = *flags & LDLM_FL_HAS_INTENT;
2924         ldlm_mode_t mode;
2925         int rc;
2926         ENTRY;
2927
2928         /* Filesystem lock extents are extended to page boundaries so that
2929          * dealing with the page cache is a little smoother.  */
2930         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2931         policy->l_extent.end |= ~CFS_PAGE_MASK;
2932
2933         /*
2934          * kms is not valid when either object is completely fresh (so that no
2935          * locks are cached), or object was evicted. In the latter case cached
2936          * lock cannot be used, because it would prime inode state with
2937          * potentially stale LVB.
2938          */
2939         if (!kms_valid)
2940                 goto no_match;
2941
2942         /* Next, search for already existing extent locks that will cover us */
2943         /* If we're trying to read, we also search for an existing PW lock.  The
2944          * VFS and page cache already protect us locally, so lots of readers/
2945          * writers can share a single PW lock.
2946          *
2947          * There are problems with conversion deadlocks, so instead of
2948          * converting a read lock to a write lock, we'll just enqueue a new
2949          * one.
2950          *
2951          * At some point we should cancel the read lock instead of making them
2952          * send us a blocking callback, but there are problems with canceling
2953          * locks out from other users right now, too. */
2954         mode = einfo->ei_mode;
2955         if (einfo->ei_mode == LCK_PR)
2956                 mode |= LCK_PW;
2957         mode = ldlm_lock_match(obd->obd_namespace,
2958                                *flags | LDLM_FL_LVB_READY, res_id,
2959                                einfo->ei_type, policy, mode, lockh, 0);
2960         if (mode) {
2961                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2962
2963                 if (matched->l_ast_data == NULL ||
2964                     matched->l_ast_data == einfo->ei_cbdata) {
2965                         /* addref the lock only if not async requests and PW
2966                          * lock is matched whereas we asked for PR. */
2967                         if (!rqset && einfo->ei_mode != mode)
2968                                 ldlm_lock_addref(lockh, LCK_PR);
2969                         osc_set_lock_data_with_check(matched, einfo, *flags);
2970                         if (intent) {
2971                                 /* I would like to be able to ASSERT here that
2972                                  * rss <= kms, but I can't, for reasons which
2973                                  * are explained in lov_enqueue() */
2974                         }
2975
2976                         /* We already have a lock, and it's referenced */
2977                         (*upcall)(cookie, ELDLM_OK);
2978
2979                         /* For async requests, decref the lock. */
2980                         if (einfo->ei_mode != mode)
2981                                 ldlm_lock_decref(lockh, LCK_PW);
2982                         else if (rqset)
2983                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2984                         LDLM_LOCK_PUT(matched);
2985                         RETURN(ELDLM_OK);
2986                 } else
2987                         ldlm_lock_decref(lockh, mode);
2988                 LDLM_LOCK_PUT(matched);
2989         }
2990
2991  no_match:
2992         if (intent) {
2993                 CFS_LIST_HEAD(cancels);
2994                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2995                                            &RQF_LDLM_ENQUEUE_LVB);
2996                 if (req == NULL)
2997                         RETURN(-ENOMEM);
2998
2999                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3000                 if (rc)
3001                         RETURN(rc);
3002
3003                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3004                                      sizeof *lvb);
3005                 ptlrpc_request_set_replen(req);
3006         }
3007
3008         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3009         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3010
3011         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3012                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3013         if (rqset) {
3014                 if (!rc) {
3015                         struct osc_enqueue_args *aa;
3016                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3017                         aa = ptlrpc_req_async_args(req);
3018                         aa->oa_ei = einfo;
3019                         aa->oa_exp = exp;
3020                         aa->oa_flags  = flags;
3021                         aa->oa_upcall = upcall;
3022                         aa->oa_cookie = cookie;
3023                         aa->oa_lvb    = lvb;
3024                         aa->oa_lockh  = lockh;
3025
3026                         req->rq_interpret_reply =
3027                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3028                         if (rqset == PTLRPCD_SET)
3029                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3030                         else
3031                                 ptlrpc_set_add_req(rqset, req);
3032                 } else if (intent) {
3033                         ptlrpc_req_finished(req);
3034                 }
3035                 RETURN(rc);
3036         }
3037
3038         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3039         if (intent)
3040                 ptlrpc_req_finished(req);
3041
3042         RETURN(rc);
3043 }
3044
3045 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3046                        struct ldlm_enqueue_info *einfo,
3047                        struct ptlrpc_request_set *rqset)
3048 {
3049         struct ldlm_res_id res_id;
3050         int rc;
3051         ENTRY;
3052
3053         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3054                            oinfo->oi_md->lsm_object_gr, &res_id);
3055
3056         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3057                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3058                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3059                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3060                               rqset, rqset != NULL);
3061         RETURN(rc);
3062 }
3063
3064 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3065                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3066                    int *flags, void *data, struct lustre_handle *lockh,
3067                    int unref)
3068 {
3069         struct obd_device *obd = exp->exp_obd;
3070         int lflags = *flags;
3071         ldlm_mode_t rc;
3072         ENTRY;
3073
3074         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3075                 RETURN(-EIO);
3076
3077         /* Filesystem lock extents are extended to page boundaries so that
3078          * dealing with the page cache is a little smoother */
3079         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3080         policy->l_extent.end |= ~CFS_PAGE_MASK;
3081
3082         /* Next, search for already existing extent locks that will cover us */
3083         /* If we're trying to read, we also search for an existing PW lock.  The
3084          * VFS and page cache already protect us locally, so lots of readers/
3085          * writers can share a single PW lock. */
3086         rc = mode;
3087         if (mode == LCK_PR)
3088                 rc |= LCK_PW;
3089         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3090                              res_id, type, policy, rc, lockh, unref);
3091         if (rc) {
3092                 if (data != NULL)
3093                         osc_set_data_with_check(lockh, data, lflags);
3094                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3095                         ldlm_lock_addref(lockh, LCK_PR);
3096                         ldlm_lock_decref(lockh, LCK_PW);
3097                 }
3098                 RETURN(rc);
3099         }
3100         RETURN(rc);
3101 }
3102
3103 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3104 {
3105         ENTRY;
3106
3107         if (unlikely(mode == LCK_GROUP))
3108                 ldlm_lock_decref_and_cancel(lockh, mode);
3109         else
3110                 ldlm_lock_decref(lockh, mode);
3111
3112         RETURN(0);
3113 }
3114
3115 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3116                       __u32 mode, struct lustre_handle *lockh)
3117 {
3118         ENTRY;
3119         RETURN(osc_cancel_base(lockh, mode));
3120 }
3121
3122 static int osc_cancel_unused(struct obd_export *exp,
3123                              struct lov_stripe_md *lsm, int flags,
3124                              void *opaque)
3125 {
3126         struct obd_device *obd = class_exp2obd(exp);
3127         struct ldlm_res_id res_id, *resp = NULL;
3128
3129         if (lsm != NULL) {
3130                 resp = osc_build_res_name(lsm->lsm_object_id,
3131                                           lsm->lsm_object_gr, &res_id);
3132         }
3133
3134         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3135 }
3136
3137 static int osc_statfs_interpret(const struct lu_env *env,
3138                                 struct ptlrpc_request *req,
3139                                 struct osc_async_args *aa, int rc)
3140 {
3141         struct obd_statfs *msfs;
3142         ENTRY;
3143
3144         if (rc != 0)
3145                 GOTO(out, rc);
3146
3147         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3148         if (msfs == NULL) {
3149                 GOTO(out, rc = -EPROTO);
3150         }
3151
3152         *aa->aa_oi->oi_osfs = *msfs;
3153 out:
3154         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3155         RETURN(rc);
3156 }
3157
3158 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3159                             __u64 max_age, struct ptlrpc_request_set *rqset)
3160 {
3161         struct ptlrpc_request *req;
3162         struct osc_async_args *aa;
3163         int                    rc;
3164         ENTRY;
3165
3166         /* We could possibly pass max_age in the request (as an absolute
3167          * timestamp or a "seconds.usec ago") so the target can avoid doing
3168          * extra calls into the filesystem if that isn't necessary (e.g.
3169          * during mount that would help a bit).  Having relative timestamps
3170          * is not so great if request processing is slow, while absolute
3171          * timestamps are not ideal because they need time synchronization. */
3172         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3173         if (req == NULL)
3174                 RETURN(-ENOMEM);
3175
3176         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3177         if (rc) {
3178                 ptlrpc_request_free(req);
3179                 RETURN(rc);
3180         }
3181         ptlrpc_request_set_replen(req);
3182         req->rq_request_portal = OST_CREATE_PORTAL;
3183         ptlrpc_at_set_req_timeout(req);
3184
3185         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3186                 /* procfs requests not want stat in wait for avoid deadlock */
3187                 req->rq_no_resend = 1;
3188                 req->rq_no_delay = 1;
3189         }
3190
3191         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3192         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3193         aa = ptlrpc_req_async_args(req);
3194         aa->aa_oi = oinfo;
3195
3196         ptlrpc_set_add_req(rqset, req);
3197         RETURN(0);
3198 }
3199
3200 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3201                       __u64 max_age, __u32 flags)
3202 {
3203         struct obd_statfs     *msfs;
3204         struct ptlrpc_request *req;
3205         struct obd_import     *imp = NULL;
3206         int rc;
3207         ENTRY;
3208
3209         /*Since the request might also come from lprocfs, so we need
3210          *sync this with client_disconnect_export Bug15684*/
3211         down_read(&obd->u.cli.cl_sem);
3212         if (obd->u.cli.cl_import)
3213                 imp = class_import_get(obd->u.cli.cl_import);
3214         up_read(&obd->u.cli.cl_sem);
3215         if (!imp)
3216                 RETURN(-ENODEV);
3217
3218         /* We could possibly pass max_age in the request (as an absolute
3219          * timestamp or a "seconds.usec ago") so the target can avoid doing
3220          * extra calls into the filesystem if that isn't necessary (e.g.
3221          * during mount that would help a bit).  Having relative timestamps
3222          * is not so great if request processing is slow, while absolute
3223          * timestamps are not ideal because they need time synchronization. */
3224         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3225
3226         class_import_put(imp);
3227
3228         if (req == NULL)
3229                 RETURN(-ENOMEM);
3230
3231         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3232         if (rc) {
3233                 ptlrpc_request_free(req);
3234                 RETURN(rc);
3235         }
3236         ptlrpc_request_set_replen(req);
3237         req->rq_request_portal = OST_CREATE_PORTAL;
3238         ptlrpc_at_set_req_timeout(req);
3239
3240         if (flags & OBD_STATFS_NODELAY) {
3241                 /* procfs requests not want stat in wait for avoid deadlock */
3242                 req->rq_no_resend = 1;
3243                 req->rq_no_delay = 1;
3244         }
3245
3246         rc = ptlrpc_queue_wait(req);
3247         if (rc)
3248                 GOTO(out, rc);
3249
3250         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3251         if (msfs == NULL) {
3252                 GOTO(out, rc = -EPROTO);
3253         }
3254
3255         *osfs = *msfs;
3256
3257         EXIT;
3258  out:
3259         ptlrpc_req_finished(req);
3260         return rc;
3261 }
3262
3263 /* Retrieve object striping information.
3264  *
3265  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3266  * the maximum number of OST indices which will fit in the user buffer.
3267  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3268  */
3269 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3270 {
3271         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3272         struct lov_user_md_v3 lum, *lumk;
3273         struct lov_user_ost_data_v1 *lmm_objects;
3274         int rc = 0, lum_size;
3275         ENTRY;
3276
3277         if (!lsm)
3278                 RETURN(-ENODATA);
3279
3280         /* we only need the header part from user space to get lmm_magic and
3281          * lmm_stripe_count, (the header part is common to v1 and v3) */
3282         lum_size = sizeof(struct lov_user_md_v1);
3283         if (copy_from_user(&lum, lump, lum_size))
3284                 RETURN(-EFAULT);
3285
3286         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3287             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3288                 RETURN(-EINVAL);
3289
3290         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3291         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3292         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3293         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3294
3295         /* we can use lov_mds_md_size() to compute lum_size
3296          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3297         if (lum.lmm_stripe_count > 0) {
3298                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3299                 OBD_ALLOC(lumk, lum_size);
3300                 if (!lumk)
3301                         RETURN(-ENOMEM);
3302
3303                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3304                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3305                 else
3306                         lmm_objects = &(lumk->lmm_objects[0]);
3307                 lmm_objects->l_object_id = lsm->lsm_object_id;
3308         } else {
3309                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3310                 lumk = &lum;
3311         }
3312
3313         lumk->lmm_object_id = lsm->lsm_object_id;
3314         lumk->lmm_object_gr = lsm->lsm_object_gr;
3315         lumk->lmm_stripe_count = 1;
3316
3317         if (copy_to_user(lump, lumk, lum_size))
3318                 rc = -EFAULT;
3319
3320         if (lumk != &lum)
3321                 OBD_FREE(lumk, lum_size);
3322
3323         RETURN(rc);
3324 }
3325
3326
3327 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3328                          void *karg, void *uarg)
3329 {
3330         struct obd_device *obd = exp->exp_obd;
3331         struct obd_ioctl_data *data = karg;
3332         int err = 0;
3333         ENTRY;
3334
3335         if (!try_module_get(THIS_MODULE)) {
3336                 CERROR("Can't get module. Is it alive?");
3337                 return -EINVAL;
3338         }
3339         switch (cmd) {
3340         case OBD_IOC_LOV_GET_CONFIG: {
3341                 char *buf;
3342                 struct lov_desc *desc;
3343                 struct obd_uuid uuid;
3344
3345                 buf = NULL;
3346                 len = 0;
3347                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3348                         GOTO(out, err = -EINVAL);
3349
3350                 data = (struct obd_ioctl_data *)buf;
3351
3352                 if (sizeof(*desc) > data->ioc_inllen1) {
3353                         obd_ioctl_freedata(buf, len);
3354                         GOTO(out, err = -EINVAL);
3355                 }
3356
3357                 if (data->ioc_inllen2 < sizeof(uuid)) {
3358                         obd_ioctl_freedata(buf, len);
3359                         GOTO(out, err = -EINVAL);
3360                 }
3361
3362                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3363                 desc->ld_tgt_count = 1;
3364                 desc->ld_active_tgt_count = 1;
3365                 desc->ld_default_stripe_count = 1;
3366                 desc->ld_default_stripe_size = 0;
3367                 desc->ld_default_stripe_offset = 0;
3368                 desc->ld_pattern = 0;
3369                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3370
3371                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3372
3373                 err = copy_to_user((void *)uarg, buf, len);
3374                 if (err)
3375                         err = -EFAULT;
3376                 obd_ioctl_freedata(buf, len);
3377                 GOTO(out, err);
3378         }
3379         case LL_IOC_LOV_SETSTRIPE:
3380                 err = obd_alloc_memmd(exp, karg);
3381                 if (err > 0)
3382                         err = 0;
3383                 GOTO(out, err);
3384         case LL_IOC_LOV_GETSTRIPE:
3385                 err = osc_getstripe(karg, uarg);
3386                 GOTO(out, err);
3387         case OBD_IOC_CLIENT_RECOVER:
3388                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3389                                             data->ioc_inlbuf1);
3390                 if (err > 0)
3391                         err = 0;
3392                 GOTO(out, err);
3393         case IOC_OSC_SET_ACTIVE:
3394                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3395                                                data->ioc_offset);
3396                 GOTO(out, err);
3397         case OBD_IOC_POLL_QUOTACHECK:
3398                 err = lquota_poll_check(quota_interface, exp,
3399                                         (struct if_quotacheck *)karg);
3400                 GOTO(out, err);
3401         default:
3402                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3403                        cmd, cfs_curproc_comm());
3404                 GOTO(out, err = -ENOTTY);
3405         }
3406 out:
3407         module_put(THIS_MODULE);
3408         return err;
3409 }
3410
3411 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3412                         void *key, __u32 *vallen, void *val,
3413                         struct lov_stripe_md *lsm)
3414 {
3415         ENTRY;
3416         if (!vallen || !val)
3417                 RETURN(-EFAULT);
3418
3419         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3420                 __u32 *stripe = val;
3421                 *vallen = sizeof(*stripe);
3422                 *stripe = 0;
3423                 RETURN(0);
3424         } else if (KEY_IS(KEY_LAST_ID)) {
3425                 struct ptlrpc_request *req;
3426                 obd_id                *reply;
3427                 char                  *tmp;
3428                 int                    rc;
3429
3430                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3431                                            &RQF_OST_GET_INFO_LAST_ID);
3432                 if (req == NULL)
3433                         RETURN(-ENOMEM);
3434
3435                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3436                                      RCL_CLIENT, keylen);
3437                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3438                 if (rc) {
3439                         ptlrpc_request_free(req);
3440                         RETURN(rc);
3441                 }
3442
3443                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3444                 memcpy(tmp, key, keylen);
3445
3446                 ptlrpc_request_set_replen(req);
3447                 rc = ptlrpc_queue_wait(req);
3448                 if (rc)
3449                         GOTO(out, rc);
3450
3451                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3452                 if (reply == NULL)
3453                         GOTO(out, rc = -EPROTO);
3454
3455                 *((obd_id *)val) = *reply;
3456         out:
3457                 ptlrpc_req_finished(req);
3458                 RETURN(rc);
3459         } else if (KEY_IS(KEY_FIEMAP)) {
3460                 struct ptlrpc_request *req;
3461                 struct ll_user_fiemap *reply;
3462                 char *tmp;
3463                 int rc;
3464
3465                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3466                                            &RQF_OST_GET_INFO_FIEMAP);
3467                 if (req == NULL)
3468                         RETURN(-ENOMEM);
3469
3470                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3471                                      RCL_CLIENT, keylen);
3472                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3473                                      RCL_CLIENT, *vallen);
3474                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3475                                      RCL_SERVER, *vallen);
3476
3477                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3478                 if (rc) {
3479                         ptlrpc_request_free(req);
3480                         RETURN(rc);
3481                 }
3482
3483                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3484                 memcpy(tmp, key, keylen);
3485                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3486                 memcpy(tmp, val, *vallen);
3487
3488                 ptlrpc_request_set_replen(req);
3489                 rc = ptlrpc_queue_wait(req);
3490                 if (rc)
3491                         GOTO(out1, rc);
3492
3493                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3494                 if (reply == NULL)
3495                         GOTO(out1, rc = -EPROTO);
3496
3497                 memcpy(val, reply, *vallen);
3498         out1:
3499                 ptlrpc_req_finished(req);
3500
3501                 RETURN(rc);
3502         }
3503
3504         RETURN(-EINVAL);
3505 }
3506
3507 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3508                                           struct ptlrpc_request *req,
3509                                           void *aa, int rc)
3510 {
3511         struct llog_ctxt *ctxt;
3512         struct obd_import *imp = req->rq_import;
3513         ENTRY;
3514
3515         if (rc != 0)
3516                 RETURN(rc);
3517
3518         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3519         if (ctxt) {
3520                 if (rc == 0)
3521                         rc = llog_initiator_connect(ctxt);
3522                 else
3523                         CERROR("cannot establish connection for "
3524                                "ctxt %p: %d\n", ctxt, rc);
3525         }
3526
3527         llog_ctxt_put(ctxt);
3528         spin_lock(&imp->imp_lock);
3529         imp->imp_server_timeout = 1;
3530         imp->imp_pingable = 1;
3531         spin_unlock(&imp->imp_lock);
3532         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3533
3534         RETURN(rc);
3535 }
3536
3537 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3538                               void *key, obd_count vallen, void *val,
3539                               struct ptlrpc_request_set *set)
3540 {
3541         struct ptlrpc_request *req;
3542         struct obd_device     *obd = exp->exp_obd;
3543         struct obd_import     *imp = class_exp2cliimp(exp);
3544         char                  *tmp;
3545         int                    rc;
3546         ENTRY;
3547
3548         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3549
3550         if (KEY_IS(KEY_NEXT_ID)) {
3551                 if (vallen != sizeof(obd_id))
3552                         RETURN(-ERANGE);
3553                 if (val == NULL)
3554                         RETURN(-EINVAL);
3555                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3556                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3557                        exp->exp_obd->obd_name,
3558                        obd->u.cli.cl_oscc.oscc_next_id);
3559
3560                 RETURN(0);
3561         }
3562
3563         if (KEY_IS(KEY_UNLINKED)) {
3564                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3565                 spin_lock(&oscc->oscc_lock);
3566                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3567                 spin_unlock(&oscc->oscc_lock);
3568                 RETURN(0);
3569         }
3570
3571         if (KEY_IS(KEY_INIT_RECOV)) {
3572                 if (vallen != sizeof(int))
3573                         RETURN(-EINVAL);
3574                 spin_lock(&imp->imp_lock);
3575                 imp->imp_initial_recov = *(int *)val;
3576                 spin_unlock(&imp->imp_lock);
3577                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3578                        exp->exp_obd->obd_name,
3579                        imp->imp_initial_recov);
3580                 RETURN(0);
3581         }
3582
3583         if (KEY_IS(KEY_CHECKSUM)) {
3584                 if (vallen != sizeof(int))
3585                         RETURN(-EINVAL);
3586                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3587                 RETURN(0);
3588         }
3589
3590         if (KEY_IS(KEY_FLUSH_CTX)) {
3591                 sptlrpc_import_flush_my_ctx(imp);
3592                 RETURN(0);
3593         }
3594
3595         if (!set)
3596                 RETURN(-EINVAL);
3597
3598         /* We pass all other commands directly to OST. Since nobody calls osc
3599            methods directly and everybody is supposed to go through LOV, we
3600            assume lov checked invalid values for us.
3601            The only recognised values so far are evict_by_nid and mds_conn.
3602            Even if something bad goes through, we'd get a -EINVAL from OST
3603            anyway. */
3604
3605
3606         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3607         if (req == NULL)
3608                 RETURN(-ENOMEM);
3609
3610         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3611                              RCL_CLIENT, keylen);
3612         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3613                              RCL_CLIENT, vallen);
3614         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3615         if (rc) {
3616                 ptlrpc_request_free(req);
3617                 RETURN(rc);
3618         }
3619
3620         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3621         memcpy(tmp, key, keylen);
3622         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3623         memcpy(tmp, val, vallen);
3624
3625         if (KEY_IS(KEY_MDS_CONN)) {
3626                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3627
3628                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3629                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3630                 LASSERT(oscc->oscc_oa.o_gr > 0);
3631                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3632         }
3633
3634         ptlrpc_request_set_replen(req);
3635         ptlrpc_set_add_req(set, req);
3636         ptlrpc_check_set(NULL, set);
3637
3638         RETURN(0);
3639 }
3640
3641
3642 static struct llog_operations osc_size_repl_logops = {
3643         lop_cancel: llog_obd_repl_cancel
3644 };
3645
3646 static struct llog_operations osc_mds_ost_orig_logops;
3647 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3648                          struct obd_device *tgt, int count,
3649                          struct llog_catid *catid, struct obd_uuid *uuid)
3650 {
3651         int rc;
3652         ENTRY;
3653
3654         LASSERT(olg == &obd->obd_olg);
3655         spin_lock(&obd->obd_dev_lock);
3656         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3657                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3658                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3659                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3660                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3661                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3662         }
3663         spin_unlock(&obd->obd_dev_lock);
3664
3665         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3666                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3667         if (rc) {
3668                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3669                 GOTO (out, rc);
3670         }
3671
3672         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3673                         NULL, &osc_size_repl_logops);
3674         if (rc) {
3675                 struct llog_ctxt *ctxt =
3676                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3677                 if (ctxt)
3678                         llog_cleanup(ctxt);
3679                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3680         }
3681         GOTO(out, rc);
3682 out:
3683         if (rc) {
3684                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3685                        obd->obd_name, tgt->obd_name, count, catid, rc);
3686                 CERROR("logid "LPX64":0x%x\n",
3687                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3688         }
3689         return rc;
3690 }
3691
3692 static int osc_llog_finish(struct obd_device *obd, int count)
3693 {
3694         struct llog_ctxt *ctxt;
3695         int rc = 0, rc2 = 0;
3696         ENTRY;
3697
3698         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3699         if (ctxt)
3700                 rc = llog_cleanup(ctxt);
3701
3702         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3703         if (ctxt)
3704                 rc2 = llog_cleanup(ctxt);
3705         if (!rc)
3706                 rc = rc2;
3707
3708         RETURN(rc);
3709 }
3710
3711 static int osc_reconnect(const struct lu_env *env,
3712                          struct obd_export *exp, struct obd_device *obd,
3713                          struct obd_uuid *cluuid,
3714                          struct obd_connect_data *data,
3715                          void *localdata)
3716 {
3717         struct client_obd *cli = &obd->u.cli;
3718
3719         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3720                 long lost_grant;
3721
3722                 client_obd_list_lock(&cli->cl_loi_list_lock);
3723                 data->ocd_grant = cli->cl_avail_grant ?:
3724                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3725                 lost_grant = cli->cl_lost_grant;
3726                 cli->cl_lost_grant = 0;
3727                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3728
3729                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3730                        "cl_lost_grant: %ld\n", data->ocd_grant,
3731                        cli->cl_avail_grant, lost_grant);
3732                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3733                        " ocd_grant: %d\n", data->ocd_connect_flags,
3734                        data->ocd_version, data->ocd_grant);
3735         }
3736
3737         RETURN(0);
3738 }
3739
3740 static int osc_disconnect(struct obd_export *exp)
3741 {
3742         struct obd_device *obd = class_exp2obd(exp);
3743         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3744         int rc;
3745
3746         if (obd->u.cli.cl_conn_count == 1)
3747                 /* flush any remaining cancel messages out to the target */
3748                 llog_sync(ctxt, exp);
3749
3750         llog_ctxt_put(ctxt);
3751
3752         rc = client_disconnect_export(exp);
3753         return rc;
3754 }
3755
3756 static int osc_import_event(struct obd_device *obd,
3757                             struct obd_import *imp,
3758                             enum obd_import_event event)
3759 {
3760         struct client_obd *cli;
3761         int rc = 0;
3762
3763         ENTRY;
3764         LASSERT(imp->imp_obd == obd);
3765
3766         switch (event) {
3767         case IMP_EVENT_DISCON: {
3768                 /* Only do this on the MDS OSC's */
3769                 if (imp->imp_server_timeout) {
3770                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3771
3772                         spin_lock(&oscc->oscc_lock);
3773                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3774                         spin_unlock(&oscc->oscc_lock);
3775                 }
3776                 cli = &obd->u.cli;
3777                 client_obd_list_lock(&cli->cl_loi_list_lock);
3778                 cli->cl_avail_grant = 0;
3779                 cli->cl_lost_grant = 0;
3780                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3781                 break;
3782         }
3783         case IMP_EVENT_INACTIVE: {
3784                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3785                 break;
3786         }
3787         case IMP_EVENT_INVALIDATE: {
3788                 struct ldlm_namespace *ns = obd->obd_namespace;
3789                 struct lu_env         *env;
3790                 int                    refcheck;
3791
3792                 env = cl_env_get(&refcheck);
3793                 if (!IS_ERR(env)) {
3794                         /* Reset grants */
3795                         cli = &obd->u.cli;
3796                         client_obd_list_lock(&cli->cl_loi_list_lock);
3797                         /* all pages go to failing rpcs due to the invalid
3798                          * import */
3799                         osc_check_rpcs(env, cli);
3800                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3801
3802                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3803                         cl_env_put(env, &refcheck);
3804                 } else
3805                         rc = PTR_ERR(env);
3806                 break;
3807         }
3808         case IMP_EVENT_ACTIVE: {
3809                 /* Only do this on the MDS OSC's */
3810                 if (imp->imp_server_timeout) {
3811                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3812
3813                         spin_lock(&oscc->oscc_lock);
3814                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3815                         spin_unlock(&oscc->oscc_lock);
3816                 }
3817                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3818                 break;
3819         }
3820         case IMP_EVENT_OCD: {
3821                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3822
3823                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3824                         osc_init_grant(&obd->u.cli, ocd);
3825
3826                 /* See bug 7198 */
3827                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3828                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3829
3830                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3831                 break;
3832         }
3833         default:
3834                 CERROR("Unknown import event %d\n", event);
3835                 LBUG();
3836         }
3837         RETURN(rc);
3838 }
3839
3840 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3841 {
3842         int rc;
3843         ENTRY;
3844
3845         ENTRY;
3846         rc = ptlrpcd_addref();
3847         if (rc)
3848                 RETURN(rc);
3849
3850         rc = client_obd_setup(obd, lcfg);
3851         if (rc) {
3852                 ptlrpcd_decref();
3853         } else {
3854                 struct lprocfs_static_vars lvars = { 0 };
3855                 struct client_obd *cli = &obd->u.cli;
3856
3857                 lprocfs_osc_init_vars(&lvars);
3858                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3859                         lproc_osc_attach_seqstat(obd);
3860                         sptlrpc_lprocfs_cliobd_attach(obd);
3861                         ptlrpc_lprocfs_register_obd(obd);
3862                 }
3863
3864                 oscc_init(obd);
3865                 /* We need to allocate a few requests more, because
3866                    brw_interpret tries to create new requests before freeing
3867                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3868                    reserved, but I afraid that might be too much wasted RAM
3869                    in fact, so 2 is just my guess and still should work. */
3870                 cli->cl_import->imp_rq_pool =
3871                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3872                                             OST_MAXREQSIZE,
3873                                             ptlrpc_add_rqs_to_pool);
3874         }
3875
3876         RETURN(rc);
3877 }
3878
3879 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3880 {
3881         int rc = 0;
3882         ENTRY;
3883
3884         switch (stage) {
3885         case OBD_CLEANUP_EARLY: {
3886                 struct obd_import *imp;
3887                 imp = obd->u.cli.cl_import;
3888                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3889                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3890                 ptlrpc_deactivate_import(imp);
3891                 spin_lock(&imp->imp_lock);
3892                 imp->imp_pingable = 0;
3893                 spin_unlock(&imp->imp_lock);
3894                 break;
3895         }
3896         case OBD_CLEANUP_EXPORTS: {
3897                 /* If we set up but never connected, the
3898                    client import will not have been cleaned. */
3899                 if (obd->u.cli.cl_import) {
3900                         struct obd_import *imp;
3901                         down_write(&obd->u.cli.cl_sem);
3902                         imp = obd->u.cli.cl_import;
3903                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3904                                obd->obd_name);
3905                         ptlrpc_invalidate_import(imp);
3906                         if (imp->imp_rq_pool) {
3907                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3908                                 imp->imp_rq_pool = NULL;
3909                         }
3910                         class_destroy_import(imp);
3911                         up_write(&obd->u.cli.cl_sem);
3912                         obd->u.cli.cl_import = NULL;
3913                 }
3914                 rc = obd_llog_finish(obd, 0);
3915                 if (rc != 0)
3916                         CERROR("failed to cleanup llogging subsystems\n");
3917                 break;
3918                 }
3919         }
3920         RETURN(rc);
3921 }
3922
3923 int osc_cleanup(struct obd_device *obd)
3924 {
3925         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3926         int rc;
3927
3928         ENTRY;
3929         ptlrpc_lprocfs_unregister_obd(obd);
3930         lprocfs_obd_cleanup(obd);
3931
3932         spin_lock(&oscc->oscc_lock);
3933         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3934         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3935         spin_unlock(&oscc->oscc_lock);
3936
3937         /* free memory of osc quota cache */
3938         lquota_cleanup(quota_interface, obd);
3939
3940         rc = client_obd_cleanup(obd);
3941
3942         ptlrpcd_decref();
3943         RETURN(rc);
3944 }
3945
3946 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3947 {
3948         struct lprocfs_static_vars lvars = { 0 };
3949         int rc = 0;
3950
3951         lprocfs_osc_init_vars(&lvars);
3952
3953         switch (lcfg->lcfg_command) {
3954         case LCFG_SPTLRPC_CONF:
3955                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3956                 break;
3957         default:
3958                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3959                                               lcfg, obd);
3960                 break;
3961         }
3962
3963         return(rc);
3964 }
3965
3966 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3967 {
3968         return osc_process_config_base(obd, buf);
3969 }
3970
3971 struct obd_ops osc_obd_ops = {
3972         .o_owner                = THIS_MODULE,
3973         .o_setup                = osc_setup,
3974         .o_precleanup           = osc_precleanup,
3975         .o_cleanup              = osc_cleanup,
3976         .o_add_conn             = client_import_add_conn,
3977         .o_del_conn             = client_import_del_conn,
3978         .o_connect              = client_connect_import,
3979         .o_reconnect            = osc_reconnect,
3980         .o_disconnect           = osc_disconnect,
3981         .o_statfs               = osc_statfs,
3982         .o_statfs_async         = osc_statfs_async,
3983         .o_packmd               = osc_packmd,
3984         .o_unpackmd             = osc_unpackmd,
3985         .o_precreate            = osc_precreate,
3986         .o_create               = osc_create,
3987         .o_destroy              = osc_destroy,
3988         .o_getattr              = osc_getattr,
3989         .o_getattr_async        = osc_getattr_async,
3990         .o_setattr              = osc_setattr,
3991         .o_setattr_async        = osc_setattr_async,
3992         .o_brw                  = osc_brw,
3993         .o_punch                = osc_punch,
3994         .o_sync                 = osc_sync,
3995         .o_enqueue              = osc_enqueue,
3996         .o_change_cbdata        = osc_change_cbdata,
3997         .o_cancel               = osc_cancel,
3998         .o_cancel_unused        = osc_cancel_unused,
3999         .o_iocontrol            = osc_iocontrol,
4000         .o_get_info             = osc_get_info,
4001         .o_set_info_async       = osc_set_info_async,
4002         .o_import_event         = osc_import_event,
4003         .o_llog_init            = osc_llog_init,
4004         .o_llog_finish          = osc_llog_finish,
4005         .o_process_config       = osc_process_config,
4006 };
4007
4008 extern struct lu_kmem_descr  osc_caches[];
4009 extern spinlock_t            osc_ast_guard;
4010 extern struct lock_class_key osc_ast_guard_class;
4011
4012 int __init osc_init(void)
4013 {
4014         struct lprocfs_static_vars lvars = { 0 };
4015         int rc;
4016         ENTRY;
4017
4018         /* print an address of _any_ initialized kernel symbol from this
4019          * module, to allow debugging with gdb that doesn't support data
4020          * symbols from modules.*/
4021         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4022
4023         rc = lu_kmem_init(osc_caches);
4024
4025         lprocfs_osc_init_vars(&lvars);
4026
4027         request_module("lquota");
4028         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4029         lquota_init(quota_interface);
4030         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4031
4032         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4033                                  LUSTRE_OSC_NAME, &osc_device_type);
4034         if (rc) {
4035                 if (quota_interface)
4036                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4037                 lu_kmem_fini(osc_caches);
4038                 RETURN(rc);
4039         }
4040
4041         spin_lock_init(&osc_ast_guard);
4042         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4043
4044         RETURN(rc);
4045 }
4046
4047 #ifdef __KERNEL__
4048 static void /*__exit*/ osc_exit(void)
4049 {
4050         lu_device_type_fini(&osc_device_type);
4051
4052         lquota_exit(quota_interface);
4053         if (quota_interface)
4054                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4055
4056         class_unregister_type(LUSTRE_OSC_NAME);
4057         lu_kmem_fini(osc_caches);
4058 }
4059
4060 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4061 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4062 MODULE_LICENSE("GPL");
4063
4064 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4065 #endif