Whamcloud - gitweb
Land b_head_quota onto HEAD (20081116_0105)
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                                         oinfo->oi_oa->o_gr > 0);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         *oinfo->oi_oa = body->oa;
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_async_args *aa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         *aa->aa_oi->oi_oa = body->oa;
364 out:
365         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
366         RETURN(rc);
367 }
368
369 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
370                              struct obd_trans_info *oti,
371                              struct ptlrpc_request_set *rqset)
372 {
373         struct ptlrpc_request *req;
374         struct osc_async_args *aa;
375         int                    rc;
376         ENTRY;
377
378         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379         if (req == NULL)
380                 RETURN(-ENOMEM);
381
382         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384         if (rc) {
385                 ptlrpc_request_free(req);
386                 RETURN(rc);
387         }
388
389         osc_pack_req_body(req, oinfo);
390
391         ptlrpc_request_set_replen(req);
392
393         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
394                 LASSERT(oti);
395                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
396         }
397
398         /* do mds to ost setattr asynchronously */
399         if (!rqset) {
400                 /* Do not wait for response. */
401                 ptlrpcd_add_req(req, PSCOPE_OTHER);
402         } else {
403                 req->rq_interpret_reply =
404                         (ptlrpc_interpterer_t)osc_setattr_interpret;
405
406                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
407                 aa = ptlrpc_req_async_args(req);
408                 aa->aa_oi = oinfo;
409
410                 ptlrpc_set_add_req(rqset, req);
411         }
412
413         RETURN(0);
414 }
415
416 int osc_real_create(struct obd_export *exp, struct obdo *oa,
417                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
418 {
419         struct ptlrpc_request *req;
420         struct ost_body       *body;
421         struct lov_stripe_md  *lsm;
422         int                    rc;
423         ENTRY;
424
425         LASSERT(oa);
426         LASSERT(ea);
427
428         lsm = *ea;
429         if (!lsm) {
430                 rc = obd_alloc_memmd(exp, &lsm);
431                 if (rc < 0)
432                         RETURN(rc);
433         }
434
435         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
436         if (req == NULL)
437                 GOTO(out, rc = -ENOMEM);
438
439         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
440         if (rc) {
441                 ptlrpc_request_free(req);
442                 GOTO(out, rc);
443         }
444
445         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446         LASSERT(body);
447         body->oa = *oa;
448
449         ptlrpc_request_set_replen(req);
450
451         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
452             oa->o_flags == OBD_FL_DELORPHAN) {
453                 DEBUG_REQ(D_HA, req,
454                           "delorphan from OST integration");
455                 /* Don't resend the delorphan req */
456                 req->rq_no_resend = req->rq_no_delay = 1;
457         }
458
459         rc = ptlrpc_queue_wait(req);
460         if (rc)
461                 GOTO(out_req, rc);
462
463         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
464         if (body == NULL)
465                 GOTO(out_req, rc = -EPROTO);
466
467         *oa = body->oa;
468
469         /* This should really be sent by the OST */
470         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
471         oa->o_valid |= OBD_MD_FLBLKSZ;
472
473         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
474          * have valid lsm_oinfo data structs, so don't go touching that.
475          * This needs to be fixed in a big way.
476          */
477         lsm->lsm_object_id = oa->o_id;
478         lsm->lsm_object_gr = oa->o_gr;
479         *ea = lsm;
480
481         if (oti != NULL) {
482                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
483
484                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
485                         if (!oti->oti_logcookies)
486                                 oti_alloc_cookies(oti, 1);
487                         *oti->oti_logcookies = oa->o_lcookie;
488                 }
489         }
490
491         CDEBUG(D_HA, "transno: "LPD64"\n",
492                lustre_msg_get_transno(req->rq_repmsg));
493 out_req:
494         ptlrpc_req_finished(req);
495 out:
496         if (rc && !*ea)
497                 obd_free_memmd(exp, &lsm);
498         RETURN(rc);
499 }
500
501 static int osc_punch_interpret(const struct lu_env *env,
502                                struct ptlrpc_request *req,
503                                struct osc_punch_args *aa, int rc)
504 {
505         struct ost_body *body;
506         ENTRY;
507
508         if (rc != 0)
509                 GOTO(out, rc);
510
511         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
512         if (body == NULL)
513                 GOTO(out, rc = -EPROTO);
514
515         *aa->pa_oa = body->oa;
516 out:
517         rc = aa->pa_upcall(aa->pa_cookie, rc);
518         RETURN(rc);
519 }
520
521 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
522                    struct obd_capa *capa,
523                    obd_enqueue_update_f upcall, void *cookie,
524                    struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_punch_args *aa;
528         struct ost_body       *body;
529         int                    rc;
530         ENTRY;
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         osc_set_capa_size(req, &RMF_CAPA1, capa);
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544
545         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546         LASSERT(body);
547         body->oa = *oa;
548         osc_pack_capa(req, body, capa);
549
550         ptlrpc_request_set_replen(req);
551
552
553         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
554         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
555         aa = ptlrpc_req_async_args(req);
556         aa->pa_oa     = oa;
557         aa->pa_upcall = upcall;
558         aa->pa_cookie = cookie;
559         if (rqset == PTLRPCD_SET)
560                 ptlrpcd_add_req(req, PSCOPE_OTHER);
561         else
562                 ptlrpc_set_add_req(rqset, req);
563
564         RETURN(0);
565 }
566
567 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
568                      struct obd_trans_info *oti,
569                      struct ptlrpc_request_set *rqset)
570 {
571         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
572         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
573         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
574         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
575                               oinfo->oi_cb_up, oinfo, rqset);
576 }
577
578 static int osc_sync(struct obd_export *exp, struct obdo *oa,
579                     struct lov_stripe_md *md, obd_size start, obd_size end,
580                     void *capa)
581 {
582         struct ptlrpc_request *req;
583         struct ost_body       *body;
584         int                    rc;
585         ENTRY;
586
587         if (!oa) {
588                 CDEBUG(D_INFO, "oa NULL\n");
589                 RETURN(-EINVAL);
590         }
591
592         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
593         if (req == NULL)
594                 RETURN(-ENOMEM);
595
596         osc_set_capa_size(req, &RMF_CAPA1, capa);
597         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
598         if (rc) {
599                 ptlrpc_request_free(req);
600                 RETURN(rc);
601         }
602
603         /* overload the size and blocks fields in the oa with start/end */
604         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
605         LASSERT(body);
606         body->oa = *oa;
607         body->oa.o_size = start;
608         body->oa.o_blocks = end;
609         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
610         osc_pack_capa(req, body, capa);
611
612         ptlrpc_request_set_replen(req);
613
614         rc = ptlrpc_queue_wait(req);
615         if (rc)
616                 GOTO(out, rc);
617
618         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
619         if (body == NULL)
620                 GOTO(out, rc = -EPROTO);
621
622         *oa = body->oa;
623
624         EXIT;
625  out:
626         ptlrpc_req_finished(req);
627         return rc;
628 }
629
630 /* Find and cancel locally locks matched by @mode in the resource found by
631  * @objid. Found locks are added into @cancel list. Returns the amount of
632  * locks added to @cancels list. */
633 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
634                                    struct list_head *cancels, ldlm_mode_t mode,
635                                    int lock_flags)
636 {
637         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
638         struct ldlm_res_id res_id;
639         struct ldlm_resource *res;
640         int count;
641         ENTRY;
642
643         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
644         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
645         if (res == NULL)
646                 RETURN(0);
647
648         LDLM_RESOURCE_ADDREF(res);
649         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
650                                            lock_flags, 0, NULL);
651         LDLM_RESOURCE_DELREF(res);
652         ldlm_resource_putref(res);
653         RETURN(count);
654 }
655
656 static int osc_destroy_interpret(const struct lu_env *env,
657                                  struct ptlrpc_request *req, void *data,
658                                  int rc)
659 {
660         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
661
662         atomic_dec(&cli->cl_destroy_in_flight);
663         cfs_waitq_signal(&cli->cl_destroy_waitq);
664         return 0;
665 }
666
667 static int osc_can_send_destroy(struct client_obd *cli)
668 {
669         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
670             cli->cl_max_rpcs_in_flight) {
671                 /* The destroy request can be sent */
672                 return 1;
673         }
674         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
675             cli->cl_max_rpcs_in_flight) {
676                 /*
677                  * The counter has been modified between the two atomic
678                  * operations.
679                  */
680                 cfs_waitq_signal(&cli->cl_destroy_waitq);
681         }
682         return 0;
683 }
684
685 /* Destroy requests can be async always on the client, and we don't even really
686  * care about the return code since the client cannot do anything at all about
687  * a destroy failure.
688  * When the MDS is unlinking a filename, it saves the file objects into a
689  * recovery llog, and these object records are cancelled when the OST reports
690  * they were destroyed and sync'd to disk (i.e. transaction committed).
691  * If the client dies, or the OST is down when the object should be destroyed,
692  * the records are not cancelled, and when the OST reconnects to the MDS next,
693  * it will retrieve the llog unlink logs and then sends the log cancellation
694  * cookies to the MDS after committing destroy transactions. */
695 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
696                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
697                        struct obd_export *md_export, void *capa)
698 {
699         struct client_obd     *cli = &exp->exp_obd->u.cli;
700         struct ptlrpc_request *req;
701         struct ost_body       *body;
702         CFS_LIST_HEAD(cancels);
703         int rc, count;
704         ENTRY;
705
706         if (!oa) {
707                 CDEBUG(D_INFO, "oa NULL\n");
708                 RETURN(-EINVAL);
709         }
710
711         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
712                                         LDLM_FL_DISCARD_DATA);
713
714         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
715         if (req == NULL) {
716                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
717                 RETURN(-ENOMEM);
718         }
719
720         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
721         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
722                                0, &cancels, count);
723         if (rc) {
724                 ptlrpc_request_free(req);
725                 RETURN(rc);
726         }
727
728         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
729         req->rq_interpret_reply = osc_destroy_interpret;
730         ptlrpc_at_set_req_timeout(req);
731
732         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
733                 oa->o_lcookie = *oti->oti_logcookies;
734         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
735         LASSERT(body);
736         body->oa = *oa;
737
738         osc_pack_capa(req, body, (struct obd_capa *)capa);
739         ptlrpc_request_set_replen(req);
740
741         if (!osc_can_send_destroy(cli)) {
742                 struct l_wait_info lwi = { 0 };
743
744                 /*
745                  * Wait until the number of on-going destroy RPCs drops
746                  * under max_rpc_in_flight
747                  */
748                 l_wait_event_exclusive(cli->cl_destroy_waitq,
749                                        osc_can_send_destroy(cli), &lwi);
750         }
751
752         /* Do not wait for response */
753         ptlrpcd_add_req(req, PSCOPE_OTHER);
754         RETURN(0);
755 }
756
757 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
758                                 long writing_bytes)
759 {
760         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
761
762         LASSERT(!(oa->o_valid & bits));
763
764         oa->o_valid |= bits;
765         client_obd_list_lock(&cli->cl_loi_list_lock);
766         oa->o_dirty = cli->cl_dirty;
767         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
768                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
769                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
770                 oa->o_undirty = 0;
771         } else if (atomic_read(&obd_dirty_pages) -
772                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
773                 CERROR("dirty %d - %d > system dirty_max %d\n",
774                        atomic_read(&obd_dirty_pages),
775                        atomic_read(&obd_dirty_transit_pages),
776                        obd_max_dirty_pages);
777                 oa->o_undirty = 0;
778         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
779                 CERROR("dirty %lu - dirty_max %lu too big???\n",
780                        cli->cl_dirty, cli->cl_dirty_max);
781                 oa->o_undirty = 0;
782         } else {
783                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
784                                 (cli->cl_max_rpcs_in_flight + 1);
785                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
786         }
787         oa->o_grant = cli->cl_avail_grant;
788         oa->o_dropped = cli->cl_lost_grant;
789         cli->cl_lost_grant = 0;
790         client_obd_list_unlock(&cli->cl_loi_list_lock);
791         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
792                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
793 }
794
795 /* caller must hold loi_list_lock */
796 static void osc_consume_write_grant(struct client_obd *cli,
797                                     struct brw_page *pga)
798 {
799         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
800         atomic_inc(&obd_dirty_pages);
801         cli->cl_dirty += CFS_PAGE_SIZE;
802         cli->cl_avail_grant -= CFS_PAGE_SIZE;
803         pga->flag |= OBD_BRW_FROM_GRANT;
804         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
805                CFS_PAGE_SIZE, pga, pga->pg);
806         LASSERT(cli->cl_avail_grant >= 0);
807 }
808
809 /* the companion to osc_consume_write_grant, called when a brw has completed.
810  * must be called with the loi lock held. */
811 static void osc_release_write_grant(struct client_obd *cli,
812                                     struct brw_page *pga, int sent)
813 {
814         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
815         ENTRY;
816
817         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
818                 EXIT;
819                 return;
820         }
821
822         pga->flag &= ~OBD_BRW_FROM_GRANT;
823         atomic_dec(&obd_dirty_pages);
824         cli->cl_dirty -= CFS_PAGE_SIZE;
825         if (pga->flag & OBD_BRW_NOCACHE) {
826                 pga->flag &= ~OBD_BRW_NOCACHE;
827                 atomic_dec(&obd_dirty_transit_pages);
828                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
829         }
830         if (!sent) {
831                 cli->cl_lost_grant += CFS_PAGE_SIZE;
832                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
833                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
834         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
835                 /* For short writes we shouldn't count parts of pages that
836                  * span a whole block on the OST side, or our accounting goes
837                  * wrong.  Should match the code in filter_grant_check. */
838                 int offset = pga->off & ~CFS_PAGE_MASK;
839                 int count = pga->count + (offset & (blocksize - 1));
840                 int end = (offset + pga->count) & (blocksize - 1);
841                 if (end)
842                         count += blocksize - end;
843
844                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
845                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
846                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
847                        cli->cl_avail_grant, cli->cl_dirty);
848         }
849
850         EXIT;
851 }
852
853 static unsigned long rpcs_in_flight(struct client_obd *cli)
854 {
855         return cli->cl_r_in_flight + cli->cl_w_in_flight;
856 }
857
858 /* caller must hold loi_list_lock */
859 void osc_wake_cache_waiters(struct client_obd *cli)
860 {
861         struct list_head *l, *tmp;
862         struct osc_cache_waiter *ocw;
863
864         ENTRY;
865         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
866                 /* if we can't dirty more, we must wait until some is written */
867                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
868                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
869                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
870                                "osc max %ld, sys max %d\n", cli->cl_dirty,
871                                cli->cl_dirty_max, obd_max_dirty_pages);
872                         return;
873                 }
874
875                 /* if still dirty cache but no grant wait for pending RPCs that
876                  * may yet return us some grant before doing sync writes */
877                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
878                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
879                                cli->cl_w_in_flight);
880                         return;
881                 }
882
883                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
884                 list_del_init(&ocw->ocw_entry);
885                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
886                         /* no more RPCs in flight to return grant, do sync IO */
887                         ocw->ocw_rc = -EDQUOT;
888                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
889                 } else {
890                         osc_consume_write_grant(cli,
891                                                 &ocw->ocw_oap->oap_brw_page);
892                 }
893
894                 cfs_waitq_signal(&ocw->ocw_waitq);
895         }
896
897         EXIT;
898 }
899
900 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
901 {
902         client_obd_list_lock(&cli->cl_loi_list_lock);
903         cli->cl_avail_grant = ocd->ocd_grant;
904         client_obd_list_unlock(&cli->cl_loi_list_lock);
905
906         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
907                cli->cl_avail_grant, cli->cl_lost_grant);
908         LASSERT(cli->cl_avail_grant >= 0);
909 }
910
911 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
912 {
913         client_obd_list_lock(&cli->cl_loi_list_lock);
914         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
915         if (body->oa.o_valid & OBD_MD_FLGRANT)
916                 cli->cl_avail_grant += body->oa.o_grant;
917         /* waiters are woken in brw_interpret */
918         client_obd_list_unlock(&cli->cl_loi_list_lock);
919 }
920
921 /* We assume that the reason this OSC got a short read is because it read
922  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
923  * via the LOV, and it _knows_ it's reading inside the file, it's just that
924  * this stripe never got written at or beyond this stripe offset yet. */
925 static void handle_short_read(int nob_read, obd_count page_count,
926                               struct brw_page **pga)
927 {
928         char *ptr;
929         int i = 0;
930
931         /* skip bytes read OK */
932         while (nob_read > 0) {
933                 LASSERT (page_count > 0);
934
935                 if (pga[i]->count > nob_read) {
936                         /* EOF inside this page */
937                         ptr = cfs_kmap(pga[i]->pg) +
938                                 (pga[i]->off & ~CFS_PAGE_MASK);
939                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
940                         cfs_kunmap(pga[i]->pg);
941                         page_count--;
942                         i++;
943                         break;
944                 }
945
946                 nob_read -= pga[i]->count;
947                 page_count--;
948                 i++;
949         }
950
951         /* zero remaining pages */
952         while (page_count-- > 0) {
953                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
954                 memset(ptr, 0, pga[i]->count);
955                 cfs_kunmap(pga[i]->pg);
956                 i++;
957         }
958 }
959
960 static int check_write_rcs(struct ptlrpc_request *req,
961                            int requested_nob, int niocount,
962                            obd_count page_count, struct brw_page **pga)
963 {
964         int    *remote_rcs, i;
965
966         /* return error if any niobuf was in error */
967         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
968                                         sizeof(*remote_rcs) * niocount, NULL);
969         if (remote_rcs == NULL) {
970                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
971                 return(-EPROTO);
972         }
973         if (lustre_msg_swabbed(req->rq_repmsg))
974                 for (i = 0; i < niocount; i++)
975                         __swab32s(&remote_rcs[i]);
976
977         for (i = 0; i < niocount; i++) {
978                 if (remote_rcs[i] < 0)
979                         return(remote_rcs[i]);
980
981                 if (remote_rcs[i] != 0) {
982                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
983                                 i, remote_rcs[i], req);
984                         return(-EPROTO);
985                 }
986         }
987
988         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
989                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
990                        req->rq_bulk->bd_nob_transferred, requested_nob);
991                 return(-EPROTO);
992         }
993
994         return (0);
995 }
996
997 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
998 {
999         if (p1->flag != p2->flag) {
1000                 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
1001
1002                 /* warn if we try to combine flags that we don't know to be
1003                  * safe to combine */
1004                 if ((p1->flag & mask) != (p2->flag & mask))
1005                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1006                                "same brw?\n", p1->flag, p2->flag);
1007                 return 0;
1008         }
1009
1010         return (p1->off + p1->count == p2->off);
1011 }
1012
1013 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1014                                    struct brw_page **pga, int opc,
1015                                    cksum_type_t cksum_type)
1016 {
1017         __u32 cksum;
1018         int i = 0;
1019
1020         LASSERT (pg_count > 0);
1021         cksum = init_checksum(cksum_type);
1022         while (nob > 0 && pg_count > 0) {
1023                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1024                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1025                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1026
1027                 /* corrupt the data before we compute the checksum, to
1028                  * simulate an OST->client data error */
1029                 if (i == 0 && opc == OST_READ &&
1030                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1031                         memcpy(ptr + off, "bad1", min(4, nob));
1032                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1033                 cfs_kunmap(pga[i]->pg);
1034                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1035                                off, cksum);
1036
1037                 nob -= pga[i]->count;
1038                 pg_count--;
1039                 i++;
1040         }
1041         /* For sending we only compute the wrong checksum instead
1042          * of corrupting the data so it is still correct on a redo */
1043         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1044                 cksum++;
1045
1046         return cksum;
1047 }
1048
1049 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1050                                 struct lov_stripe_md *lsm, obd_count page_count,
1051                                 struct brw_page **pga,
1052                                 struct ptlrpc_request **reqp,
1053                                 struct obd_capa *ocapa, int reserve)
1054 {
1055         struct ptlrpc_request   *req;
1056         struct ptlrpc_bulk_desc *desc;
1057         struct ost_body         *body;
1058         struct obd_ioobj        *ioobj;
1059         struct niobuf_remote    *niobuf;
1060         int niocount, i, requested_nob, opc, rc;
1061         struct osc_brw_async_args *aa;
1062         struct req_capsule      *pill;
1063         struct brw_page *pg_prev;
1064
1065         ENTRY;
1066         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1067                 RETURN(-ENOMEM); /* Recoverable */
1068         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1069                 RETURN(-EINVAL); /* Fatal */
1070
1071         if ((cmd & OBD_BRW_WRITE) != 0) {
1072                 opc = OST_WRITE;
1073                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1074                                                 cli->cl_import->imp_rq_pool,
1075                                                 &RQF_OST_BRW);
1076         } else {
1077                 opc = OST_READ;
1078                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1079         }
1080         if (req == NULL)
1081                 RETURN(-ENOMEM);
1082
1083         for (niocount = i = 1; i < page_count; i++) {
1084                 if (!can_merge_pages(pga[i - 1], pga[i]))
1085                         niocount++;
1086         }
1087
1088         pill = &req->rq_pill;
1089         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1090                              niocount * sizeof(*niobuf));
1091         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1092
1093         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1094         if (rc) {
1095                 ptlrpc_request_free(req);
1096                 RETURN(rc);
1097         }
1098         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1099         ptlrpc_at_set_req_timeout(req);
1100
1101         if (opc == OST_WRITE)
1102                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1103                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1104         else
1105                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1106                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1107
1108         if (desc == NULL)
1109                 GOTO(out, rc = -ENOMEM);
1110         /* NB request now owns desc and will free it when it gets freed */
1111
1112         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1113         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1114         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1115         LASSERT(body && ioobj && niobuf);
1116
1117         body->oa = *oa;
1118
1119         obdo_to_ioobj(oa, ioobj);
1120         ioobj->ioo_bufcnt = niocount;
1121         osc_pack_capa(req, body, ocapa);
1122         LASSERT (page_count > 0);
1123         pg_prev = pga[0];
1124         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1125                 struct brw_page *pg = pga[i];
1126
1127                 LASSERT(pg->count > 0);
1128                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1129                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1130                          pg->off, pg->count);
1131 #ifdef __linux__
1132                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1133                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1134                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1135                          i, page_count,
1136                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1137                          pg_prev->pg, page_private(pg_prev->pg),
1138                          pg_prev->pg->index, pg_prev->off);
1139 #else
1140                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1141                          "i %d p_c %u\n", i, page_count);
1142 #endif
1143                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1144                         (pg->flag & OBD_BRW_SRVLOCK));
1145
1146                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1147                                       pg->count);
1148                 requested_nob += pg->count;
1149
1150                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1151                         niobuf--;
1152                         niobuf->len += pg->count;
1153                 } else {
1154                         niobuf->offset = pg->off;
1155                         niobuf->len    = pg->count;
1156                         niobuf->flags  = pg->flag;
1157                 }
1158                 pg_prev = pg;
1159         }
1160
1161         LASSERTF((void *)(niobuf - niocount) ==
1162                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1163                                niocount * sizeof(*niobuf)),
1164                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1165                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1166                 (void *)(niobuf - niocount));
1167
1168         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1169
1170         /* size[REQ_REC_OFF] still sizeof (*body) */
1171         if (opc == OST_WRITE) {
1172                 if (unlikely(cli->cl_checksum) &&
1173                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1174                         /* store cl_cksum_type in a local variable since
1175                          * it can be changed via lprocfs */
1176                         cksum_type_t cksum_type = cli->cl_cksum_type;
1177
1178                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1179                                 oa->o_flags = body->oa.o_flags = 0;
1180                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1181                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1182                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1183                                                              page_count, pga,
1184                                                              OST_WRITE,
1185                                                              cksum_type);
1186                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1187                                body->oa.o_cksum);
1188                         /* save this in 'oa', too, for later checking */
1189                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1190                         oa->o_flags |= cksum_type_pack(cksum_type);
1191                 } else {
1192                         /* clear out the checksum flag, in case this is a
1193                          * resend but cl_checksum is no longer set. b=11238 */
1194                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1195                 }
1196                 oa->o_cksum = body->oa.o_cksum;
1197                 /* 1 RC per niobuf */
1198                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1199                                      sizeof(__u32) * niocount);
1200         } else {
1201                 if (unlikely(cli->cl_checksum) &&
1202                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1203                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1204                                 body->oa.o_flags = 0;
1205                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1206                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1207                 }
1208                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1209                 /* 1 RC for the whole I/O */
1210         }
1211         ptlrpc_request_set_replen(req);
1212
1213         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1214         aa = ptlrpc_req_async_args(req);
1215         aa->aa_oa = oa;
1216         aa->aa_requested_nob = requested_nob;
1217         aa->aa_nio_count = niocount;
1218         aa->aa_page_count = page_count;
1219         aa->aa_resends = 0;
1220         aa->aa_ppga = pga;
1221         aa->aa_cli = cli;
1222         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1223         if (ocapa && reserve)
1224                 aa->aa_ocapa = capa_get(ocapa);
1225
1226         *reqp = req;
1227         RETURN(0);
1228
1229  out:
1230         ptlrpc_req_finished(req);
1231         RETURN(rc);
1232 }
1233
1234 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1235                                 __u32 client_cksum, __u32 server_cksum, int nob,
1236                                 obd_count page_count, struct brw_page **pga,
1237                                 cksum_type_t client_cksum_type)
1238 {
1239         __u32 new_cksum;
1240         char *msg;
1241         cksum_type_t cksum_type;
1242
1243         if (server_cksum == client_cksum) {
1244                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1245                 return 0;
1246         }
1247
1248         if (oa->o_valid & OBD_MD_FLFLAGS)
1249                 cksum_type = cksum_type_unpack(oa->o_flags);
1250         else
1251                 cksum_type = OBD_CKSUM_CRC32;
1252
1253         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1254                                       cksum_type);
1255
1256         if (cksum_type != client_cksum_type)
1257                 msg = "the server did not use the checksum type specified in "
1258                       "the original request - likely a protocol problem";
1259         else if (new_cksum == server_cksum)
1260                 msg = "changed on the client after we checksummed it - "
1261                       "likely false positive due to mmap IO (bug 11742)";
1262         else if (new_cksum == client_cksum)
1263                 msg = "changed in transit before arrival at OST";
1264         else
1265                 msg = "changed in transit AND doesn't match the original - "
1266                       "likely false positive due to mmap IO (bug 11742)";
1267
1268         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1269                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1270                            "["LPU64"-"LPU64"]\n",
1271                            msg, libcfs_nid2str(peer->nid),
1272                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1273                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1274                                                         (__u64)0,
1275                            oa->o_id,
1276                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1277                            pga[0]->off,
1278                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1279         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1280                "client csum now %x\n", client_cksum, client_cksum_type,
1281                server_cksum, cksum_type, new_cksum);
1282         return 1;
1283 }
1284
1285 /* Note rc enters this function as number of bytes transferred */
1286 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1287 {
1288         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1289         const lnet_process_id_t *peer =
1290                         &req->rq_import->imp_connection->c_peer;
1291         struct client_obd *cli = aa->aa_cli;
1292         struct ost_body *body;
1293         __u32 client_cksum = 0;
1294         ENTRY;
1295
1296         if (rc < 0 && rc != -EDQUOT)
1297                 RETURN(rc);
1298
1299         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1300         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1301                                   lustre_swab_ost_body);
1302         if (body == NULL) {
1303                 CDEBUG(D_INFO, "Can't unpack body\n");
1304                 RETURN(-EPROTO);
1305         }
1306
1307         /* set/clear over quota flag for a uid/gid */
1308         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1309             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1310                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1311                              body->oa.o_gid, body->oa.o_valid,
1312                              body->oa.o_flags);
1313
1314         if (rc < 0)
1315                 RETURN(rc);
1316
1317         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1318                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1319
1320         osc_update_grant(cli, body);
1321
1322         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1323                 if (rc > 0) {
1324                         CERROR("Unexpected +ve rc %d\n", rc);
1325                         RETURN(-EPROTO);
1326                 }
1327                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1328
1329                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1330                     check_write_checksum(&body->oa, peer, client_cksum,
1331                                          body->oa.o_cksum, aa->aa_requested_nob,
1332                                          aa->aa_page_count, aa->aa_ppga,
1333                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1334                         RETURN(-EAGAIN);
1335
1336                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1337                         RETURN(-EAGAIN);
1338
1339                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1340                                      aa->aa_page_count, aa->aa_ppga);
1341                 GOTO(out, rc);
1342         }
1343
1344         /* The rest of this function executes only for OST_READs */
1345         if (rc > aa->aa_requested_nob) {
1346                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1347                        aa->aa_requested_nob);
1348                 RETURN(-EPROTO);
1349         }
1350
1351         if (rc != req->rq_bulk->bd_nob_transferred) {
1352                 CERROR ("Unexpected rc %d (%d transferred)\n",
1353                         rc, req->rq_bulk->bd_nob_transferred);
1354                 return (-EPROTO);
1355         }
1356
1357         if (rc < aa->aa_requested_nob)
1358                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1359
1360         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1361                                          aa->aa_ppga))
1362                 GOTO(out, rc = -EAGAIN);
1363
1364         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1365                 static int cksum_counter;
1366                 __u32      server_cksum = body->oa.o_cksum;
1367                 char      *via;
1368                 char      *router;
1369                 cksum_type_t cksum_type;
1370
1371                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1372                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1373                 else
1374                         cksum_type = OBD_CKSUM_CRC32;
1375                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1376                                                  aa->aa_ppga, OST_READ,
1377                                                  cksum_type);
1378
1379                 if (peer->nid == req->rq_bulk->bd_sender) {
1380                         via = router = "";
1381                 } else {
1382                         via = " via ";
1383                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1384                 }
1385
1386                 if (server_cksum == ~0 && rc > 0) {
1387                         CERROR("Protocol error: server %s set the 'checksum' "
1388                                "bit, but didn't send a checksum.  Not fatal, "
1389                                "but please notify on http://bugzilla.lustre.org/\n",
1390                                libcfs_nid2str(peer->nid));
1391                 } else if (server_cksum != client_cksum) {
1392                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1393                                            "%s%s%s inum "LPU64"/"LPU64" object "
1394                                            LPU64"/"LPU64" extent "
1395                                            "["LPU64"-"LPU64"]\n",
1396                                            req->rq_import->imp_obd->obd_name,
1397                                            libcfs_nid2str(peer->nid),
1398                                            via, router,
1399                                            body->oa.o_valid & OBD_MD_FLFID ?
1400                                                 body->oa.o_fid : (__u64)0,
1401                                            body->oa.o_valid & OBD_MD_FLFID ?
1402                                                 body->oa.o_generation :(__u64)0,
1403                                            body->oa.o_id,
1404                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1405                                                 body->oa.o_gr : (__u64)0,
1406                                            aa->aa_ppga[0]->off,
1407                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1408                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1409                                                                         1);
1410                         CERROR("client %x, server %x, cksum_type %x\n",
1411                                client_cksum, server_cksum, cksum_type);
1412                         cksum_counter = 0;
1413                         aa->aa_oa->o_cksum = client_cksum;
1414                         rc = -EAGAIN;
1415                 } else {
1416                         cksum_counter++;
1417                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1418                         rc = 0;
1419                 }
1420         } else if (unlikely(client_cksum)) {
1421                 static int cksum_missed;
1422
1423                 cksum_missed++;
1424                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1425                         CERROR("Checksum %u requested from %s but not sent\n",
1426                                cksum_missed, libcfs_nid2str(peer->nid));
1427         } else {
1428                 rc = 0;
1429         }
1430 out:
1431         if (rc >= 0)
1432                 *aa->aa_oa = body->oa;
1433
1434         RETURN(rc);
1435 }
1436
1437 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1438                             struct lov_stripe_md *lsm,
1439                             obd_count page_count, struct brw_page **pga,
1440                             struct obd_capa *ocapa)
1441 {
1442         struct ptlrpc_request *req;
1443         int                    rc;
1444         cfs_waitq_t            waitq;
1445         int                    resends = 0;
1446         struct l_wait_info     lwi;
1447
1448         ENTRY;
1449
1450         cfs_waitq_init(&waitq);
1451
1452 restart_bulk:
1453         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1454                                   page_count, pga, &req, ocapa, 0);
1455         if (rc != 0)
1456                 return (rc);
1457
1458         rc = ptlrpc_queue_wait(req);
1459
1460         if (rc == -ETIMEDOUT && req->rq_resend) {
1461                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1462                 ptlrpc_req_finished(req);
1463                 goto restart_bulk;
1464         }
1465
1466         rc = osc_brw_fini_request(req, rc);
1467
1468         ptlrpc_req_finished(req);
1469         if (osc_recoverable_error(rc)) {
1470                 resends++;
1471                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1472                         CERROR("too many resend retries, returning error\n");
1473                         RETURN(-EIO);
1474                 }
1475
1476                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1477                 l_wait_event(waitq, 0, &lwi);
1478
1479                 goto restart_bulk;
1480         }
1481
1482         RETURN (rc);
1483 }
1484
1485 int osc_brw_redo_request(struct ptlrpc_request *request,
1486                          struct osc_brw_async_args *aa)
1487 {
1488         struct ptlrpc_request *new_req;
1489         struct ptlrpc_request_set *set = request->rq_set;
1490         struct osc_brw_async_args *new_aa;
1491         struct osc_async_page *oap;
1492         int rc = 0;
1493         ENTRY;
1494
1495         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1496                 CERROR("too many resend retries, returning error\n");
1497                 RETURN(-EIO);
1498         }
1499
1500         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1501
1502         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1503                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1504                                   aa->aa_cli, aa->aa_oa,
1505                                   NULL /* lsm unused by osc currently */,
1506                                   aa->aa_page_count, aa->aa_ppga,
1507                                   &new_req, aa->aa_ocapa, 0);
1508         if (rc)
1509                 RETURN(rc);
1510
1511         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1512
1513         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1514                 if (oap->oap_request != NULL) {
1515                         LASSERTF(request == oap->oap_request,
1516                                  "request %p != oap_request %p\n",
1517                                  request, oap->oap_request);
1518                         if (oap->oap_interrupted) {
1519                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1520                                 ptlrpc_req_finished(new_req);
1521                                 RETURN(-EINTR);
1522                         }
1523                 }
1524         }
1525         /* New request takes over pga and oaps from old request.
1526          * Note that copying a list_head doesn't work, need to move it... */
1527         aa->aa_resends++;
1528         new_req->rq_interpret_reply = request->rq_interpret_reply;
1529         new_req->rq_async_args = request->rq_async_args;
1530         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1531
1532         new_aa = ptlrpc_req_async_args(new_req);
1533
1534         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1535         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1536         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1537
1538         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1539                 if (oap->oap_request) {
1540                         ptlrpc_req_finished(oap->oap_request);
1541                         oap->oap_request = ptlrpc_request_addref(new_req);
1542                 }
1543         }
1544
1545         new_aa->aa_ocapa = aa->aa_ocapa;
1546         aa->aa_ocapa = NULL;
1547
1548         /* use ptlrpc_set_add_req is safe because interpret functions work
1549          * in check_set context. only one way exist with access to request
1550          * from different thread got -EINTR - this way protected with
1551          * cl_loi_list_lock */
1552         ptlrpc_set_add_req(set, new_req);
1553
1554         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1555
1556         DEBUG_REQ(D_INFO, new_req, "new request");
1557         RETURN(0);
1558 }
1559
1560 /*
1561  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1562  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1563  * fine for our small page arrays and doesn't require allocation.  its an
1564  * insertion sort that swaps elements that are strides apart, shrinking the
1565  * stride down until its '1' and the array is sorted.
1566  */
1567 static void sort_brw_pages(struct brw_page **array, int num)
1568 {
1569         int stride, i, j;
1570         struct brw_page *tmp;
1571
1572         if (num == 1)
1573                 return;
1574         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1575                 ;
1576
1577         do {
1578                 stride /= 3;
1579                 for (i = stride ; i < num ; i++) {
1580                         tmp = array[i];
1581                         j = i;
1582                         while (j >= stride && array[j - stride]->off > tmp->off) {
1583                                 array[j] = array[j - stride];
1584                                 j -= stride;
1585                         }
1586                         array[j] = tmp;
1587                 }
1588         } while (stride > 1);
1589 }
1590
1591 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1592 {
1593         int count = 1;
1594         int offset;
1595         int i = 0;
1596
1597         LASSERT (pages > 0);
1598         offset = pg[i]->off & ~CFS_PAGE_MASK;
1599
1600         for (;;) {
1601                 pages--;
1602                 if (pages == 0)         /* that's all */
1603                         return count;
1604
1605                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1606                         return count;   /* doesn't end on page boundary */
1607
1608                 i++;
1609                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1610                 if (offset != 0)        /* doesn't start on page boundary */
1611                         return count;
1612
1613                 count++;
1614         }
1615 }
1616
1617 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1618 {
1619         struct brw_page **ppga;
1620         int i;
1621
1622         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1623         if (ppga == NULL)
1624                 return NULL;
1625
1626         for (i = 0; i < count; i++)
1627                 ppga[i] = pga + i;
1628         return ppga;
1629 }
1630
1631 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1632 {
1633         LASSERT(ppga != NULL);
1634         OBD_FREE(ppga, sizeof(*ppga) * count);
1635 }
1636
1637 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1638                    obd_count page_count, struct brw_page *pga,
1639                    struct obd_trans_info *oti)
1640 {
1641         struct obdo *saved_oa = NULL;
1642         struct brw_page **ppga, **orig;
1643         struct obd_import *imp = class_exp2cliimp(exp);
1644         struct client_obd *cli = &imp->imp_obd->u.cli;
1645         int rc, page_count_orig;
1646         ENTRY;
1647
1648         if (cmd & OBD_BRW_CHECK) {
1649                 /* The caller just wants to know if there's a chance that this
1650                  * I/O can succeed */
1651
1652                 if (imp == NULL || imp->imp_invalid)
1653                         RETURN(-EIO);
1654                 RETURN(0);
1655         }
1656
1657         /* test_brw with a failed create can trip this, maybe others. */
1658         LASSERT(cli->cl_max_pages_per_rpc);
1659
1660         rc = 0;
1661
1662         orig = ppga = osc_build_ppga(pga, page_count);
1663         if (ppga == NULL)
1664                 RETURN(-ENOMEM);
1665         page_count_orig = page_count;
1666
1667         sort_brw_pages(ppga, page_count);
1668         while (page_count) {
1669                 obd_count pages_per_brw;
1670
1671                 if (page_count > cli->cl_max_pages_per_rpc)
1672                         pages_per_brw = cli->cl_max_pages_per_rpc;
1673                 else
1674                         pages_per_brw = page_count;
1675
1676                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1677
1678                 if (saved_oa != NULL) {
1679                         /* restore previously saved oa */
1680                         *oinfo->oi_oa = *saved_oa;
1681                 } else if (page_count > pages_per_brw) {
1682                         /* save a copy of oa (brw will clobber it) */
1683                         OBDO_ALLOC(saved_oa);
1684                         if (saved_oa == NULL)
1685                                 GOTO(out, rc = -ENOMEM);
1686                         *saved_oa = *oinfo->oi_oa;
1687                 }
1688
1689                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1690                                       pages_per_brw, ppga, oinfo->oi_capa);
1691
1692                 if (rc != 0)
1693                         break;
1694
1695                 page_count -= pages_per_brw;
1696                 ppga += pages_per_brw;
1697         }
1698
1699 out:
1700         osc_release_ppga(orig, page_count_orig);
1701
1702         if (saved_oa != NULL)
1703                 OBDO_FREE(saved_oa);
1704
1705         RETURN(rc);
1706 }
1707
1708 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1709  * the dirty accounting.  Writeback completes or truncate happens before
1710  * writing starts.  Must be called with the loi lock held. */
1711 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1712                            int sent)
1713 {
1714         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1715 }
1716
1717
1718 /* This maintains the lists of pending pages to read/write for a given object
1719  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1720  * to quickly find objects that are ready to send an RPC. */
1721 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1722                          int cmd)
1723 {
1724         int optimal;
1725         ENTRY;
1726
1727         if (lop->lop_num_pending == 0)
1728                 RETURN(0);
1729
1730         /* if we have an invalid import we want to drain the queued pages
1731          * by forcing them through rpcs that immediately fail and complete
1732          * the pages.  recovery relies on this to empty the queued pages
1733          * before canceling the locks and evicting down the llite pages */
1734         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1735                 RETURN(1);
1736
1737         /* stream rpcs in queue order as long as as there is an urgent page
1738          * queued.  this is our cheap solution for good batching in the case
1739          * where writepage marks some random page in the middle of the file
1740          * as urgent because of, say, memory pressure */
1741         if (!list_empty(&lop->lop_urgent)) {
1742                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1743                 RETURN(1);
1744         }
1745         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1746         optimal = cli->cl_max_pages_per_rpc;
1747         if (cmd & OBD_BRW_WRITE) {
1748                 /* trigger a write rpc stream as long as there are dirtiers
1749                  * waiting for space.  as they're waiting, they're not going to
1750                  * create more pages to coallesce with what's waiting.. */
1751                 if (!list_empty(&cli->cl_cache_waiters)) {
1752                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1753                         RETURN(1);
1754                 }
1755                 /* +16 to avoid triggering rpcs that would want to include pages
1756                  * that are being queued but which can't be made ready until
1757                  * the queuer finishes with the page. this is a wart for
1758                  * llite::commit_write() */
1759                 optimal += 16;
1760         }
1761         if (lop->lop_num_pending >= optimal)
1762                 RETURN(1);
1763
1764         RETURN(0);
1765 }
1766
1767 static void on_list(struct list_head *item, struct list_head *list,
1768                     int should_be_on)
1769 {
1770         if (list_empty(item) && should_be_on)
1771                 list_add_tail(item, list);
1772         else if (!list_empty(item) && !should_be_on)
1773                 list_del_init(item);
1774 }
1775
1776 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1777  * can find pages to build into rpcs quickly */
1778 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1779 {
1780         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1781                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1782                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1783
1784         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1785                 loi->loi_write_lop.lop_num_pending);
1786
1787         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1788                 loi->loi_read_lop.lop_num_pending);
1789 }
1790
1791 static void lop_update_pending(struct client_obd *cli,
1792                                struct loi_oap_pages *lop, int cmd, int delta)
1793 {
1794         lop->lop_num_pending += delta;
1795         if (cmd & OBD_BRW_WRITE)
1796                 cli->cl_pending_w_pages += delta;
1797         else
1798                 cli->cl_pending_r_pages += delta;
1799 }
1800
1801 /**
1802  * this is called when a sync waiter receives an interruption.  Its job is to
1803  * get the caller woken as soon as possible.  If its page hasn't been put in an
1804  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1805  * desiring interruption which will forcefully complete the rpc once the rpc
1806  * has timed out.
1807  */
1808 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1809 {
1810         struct loi_oap_pages *lop;
1811         struct lov_oinfo *loi;
1812         int rc = -EBUSY;
1813         ENTRY;
1814
1815         LASSERT(!oap->oap_interrupted);
1816         oap->oap_interrupted = 1;
1817
1818         /* ok, it's been put in an rpc. only one oap gets a request reference */
1819         if (oap->oap_request != NULL) {
1820                 ptlrpc_mark_interrupted(oap->oap_request);
1821                 ptlrpcd_wake(oap->oap_request);
1822                 ptlrpc_req_finished(oap->oap_request);
1823                 oap->oap_request = NULL;
1824         }
1825
1826         /*
1827          * page completion may be called only if ->cpo_prep() method was
1828          * executed by osc_io_submit(), that also adds page the to pending list
1829          */
1830         if (!list_empty(&oap->oap_pending_item)) {
1831                 list_del_init(&oap->oap_pending_item);
1832                 list_del_init(&oap->oap_urgent_item);
1833
1834                 loi = oap->oap_loi;
1835                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1836                         &loi->loi_write_lop : &loi->loi_read_lop;
1837                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1838                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1839                 rc = oap->oap_caller_ops->ap_completion(env,
1840                                           oap->oap_caller_data,
1841                                           oap->oap_cmd, NULL, -EINTR);
1842         }
1843
1844         RETURN(rc);
1845 }
1846
1847 /* this is trying to propogate async writeback errors back up to the
1848  * application.  As an async write fails we record the error code for later if
1849  * the app does an fsync.  As long as errors persist we force future rpcs to be
1850  * sync so that the app can get a sync error and break the cycle of queueing
1851  * pages for which writeback will fail. */
1852 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1853                            int rc)
1854 {
1855         if (rc) {
1856                 if (!ar->ar_rc)
1857                         ar->ar_rc = rc;
1858
1859                 ar->ar_force_sync = 1;
1860                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1861                 return;
1862
1863         }
1864
1865         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1866                 ar->ar_force_sync = 0;
1867 }
1868
1869 void osc_oap_to_pending(struct osc_async_page *oap)
1870 {
1871         struct loi_oap_pages *lop;
1872
1873         if (oap->oap_cmd & OBD_BRW_WRITE)
1874                 lop = &oap->oap_loi->loi_write_lop;
1875         else
1876                 lop = &oap->oap_loi->loi_read_lop;
1877
1878         if (oap->oap_async_flags & ASYNC_URGENT)
1879                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1880         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1881         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1882 }
1883
1884 /* this must be called holding the loi list lock to give coverage to exit_cache,
1885  * async_flag maintenance, and oap_request */
1886 static void osc_ap_completion(const struct lu_env *env,
1887                               struct client_obd *cli, struct obdo *oa,
1888                               struct osc_async_page *oap, int sent, int rc)
1889 {
1890         __u64 xid = 0;
1891
1892         ENTRY;
1893         if (oap->oap_request != NULL) {
1894                 xid = ptlrpc_req_xid(oap->oap_request);
1895                 ptlrpc_req_finished(oap->oap_request);
1896                 oap->oap_request = NULL;
1897         }
1898
1899         oap->oap_async_flags = 0;
1900         oap->oap_interrupted = 0;
1901
1902         if (oap->oap_cmd & OBD_BRW_WRITE) {
1903                 osc_process_ar(&cli->cl_ar, xid, rc);
1904                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1905         }
1906
1907         if (rc == 0 && oa != NULL) {
1908                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1909                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1910                 if (oa->o_valid & OBD_MD_FLMTIME)
1911                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1912                 if (oa->o_valid & OBD_MD_FLATIME)
1913                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1914                 if (oa->o_valid & OBD_MD_FLCTIME)
1915                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1916         }
1917
1918         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1919                                                 oap->oap_cmd, oa, rc);
1920
1921         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1922          * I/O on the page could start, but OSC calls it under lock
1923          * and thus we can add oap back to pending safely */
1924         if (rc)
1925                 /* upper layer wants to leave the page on pending queue */
1926                 osc_oap_to_pending(oap);
1927         else
1928                 osc_exit_cache(cli, oap, sent);
1929         EXIT;
1930 }
1931
1932 static int brw_interpret(const struct lu_env *env,
1933                          struct ptlrpc_request *req, void *data, int rc)
1934 {
1935         struct osc_brw_async_args *aa = data;
1936         struct client_obd *cli;
1937         int async;
1938         ENTRY;
1939
1940         rc = osc_brw_fini_request(req, rc);
1941         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1942         if (osc_recoverable_error(rc)) {
1943                 rc = osc_brw_redo_request(req, aa);
1944                 if (rc == 0)
1945                         RETURN(0);
1946         }
1947
1948         if (aa->aa_ocapa) {
1949                 capa_put(aa->aa_ocapa);
1950                 aa->aa_ocapa = NULL;
1951         }
1952
1953         cli = aa->aa_cli;
1954
1955         client_obd_list_lock(&cli->cl_loi_list_lock);
1956
1957         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1958          * is called so we know whether to go to sync BRWs or wait for more
1959          * RPCs to complete */
1960         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1961                 cli->cl_w_in_flight--;
1962         else
1963                 cli->cl_r_in_flight--;
1964
1965         async = list_empty(&aa->aa_oaps);
1966         if (!async) { /* from osc_send_oap_rpc() */
1967                 struct osc_async_page *oap, *tmp;
1968                 /* the caller may re-use the oap after the completion call so
1969                  * we need to clean it up a little */
1970                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1971                         list_del_init(&oap->oap_rpc_item);
1972                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1973                 }
1974                 OBDO_FREE(aa->aa_oa);
1975         } else { /* from async_internal() */
1976                 int i;
1977                 for (i = 0; i < aa->aa_page_count; i++)
1978                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1979         }
1980         osc_wake_cache_waiters(cli);
1981         osc_check_rpcs(env, cli);
1982         client_obd_list_unlock(&cli->cl_loi_list_lock);
1983         if (!async)
1984                 cl_req_completion(env, aa->aa_clerq, rc);
1985         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1986         RETURN(rc);
1987 }
1988
1989 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1990                                             struct client_obd *cli,
1991                                             struct list_head *rpc_list,
1992                                             int page_count, int cmd)
1993 {
1994         struct ptlrpc_request *req;
1995         struct brw_page **pga = NULL;
1996         struct osc_brw_async_args *aa;
1997         struct obdo *oa = NULL;
1998         const struct obd_async_page_ops *ops = NULL;
1999         void *caller_data = NULL;
2000         struct osc_async_page *oap;
2001         struct osc_async_page *tmp;
2002         struct ost_body *body;
2003         struct cl_req *clerq = NULL;
2004         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2005         struct ldlm_lock *lock = NULL;
2006         struct cl_req_attr crattr;
2007         int i, rc;
2008
2009         ENTRY;
2010         LASSERT(!list_empty(rpc_list));
2011
2012         memset(&crattr, 0, sizeof crattr);
2013         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2014         if (pga == NULL)
2015                 GOTO(out, req = ERR_PTR(-ENOMEM));
2016
2017         OBDO_ALLOC(oa);
2018         if (oa == NULL)
2019                 GOTO(out, req = ERR_PTR(-ENOMEM));
2020
2021         i = 0;
2022         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2023                 struct cl_page *page = osc_oap2cl_page(oap);
2024                 if (ops == NULL) {
2025                         ops = oap->oap_caller_ops;
2026                         caller_data = oap->oap_caller_data;
2027
2028                         clerq = cl_req_alloc(env, page, crt,
2029                                              1 /* only 1-object rpcs for
2030                                                 * now */);
2031                         if (IS_ERR(clerq))
2032                                 GOTO(out, req = (void *)clerq);
2033                         lock = oap->oap_ldlm_lock;
2034                 }
2035                 pga[i] = &oap->oap_brw_page;
2036                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2037                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2038                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2039                 i++;
2040                 cl_req_page_add(env, clerq, page);
2041         }
2042
2043         /* always get the data for the obdo for the rpc */
2044         LASSERT(ops != NULL);
2045         crattr.cra_oa = oa;
2046         crattr.cra_capa = NULL;
2047         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2048         if (lock) {
2049                 oa->o_handle = lock->l_remote_handle;
2050                 oa->o_valid |= OBD_MD_FLHANDLE;
2051         }
2052
2053         rc = cl_req_prep(env, clerq);
2054         if (rc != 0) {
2055                 CERROR("cl_req_prep failed: %d\n", rc);
2056                 GOTO(out, req = ERR_PTR(rc));
2057         }
2058
2059         sort_brw_pages(pga, page_count);
2060         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2061                                   pga, &req, crattr.cra_capa, 1);
2062         if (rc != 0) {
2063                 CERROR("prep_req failed: %d\n", rc);
2064                 GOTO(out, req = ERR_PTR(rc));
2065         }
2066
2067         /* Need to update the timestamps after the request is built in case
2068          * we race with setattr (locally or in queue at OST).  If OST gets
2069          * later setattr before earlier BRW (as determined by the request xid),
2070          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2071          * way to do this in a single call.  bug 10150 */
2072         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2073         cl_req_attr_set(env, clerq, &crattr,
2074                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2075
2076         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2077         aa = ptlrpc_req_async_args(req);
2078         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2079         list_splice(rpc_list, &aa->aa_oaps);
2080         CFS_INIT_LIST_HEAD(rpc_list);
2081         aa->aa_clerq = clerq;
2082 out:
2083         capa_put(crattr.cra_capa);
2084         if (IS_ERR(req)) {
2085                 if (oa)
2086                         OBDO_FREE(oa);
2087                 if (pga)
2088                         OBD_FREE(pga, sizeof(*pga) * page_count);
2089                 /* this should happen rarely and is pretty bad, it makes the
2090                  * pending list not follow the dirty order */
2091                 client_obd_list_lock(&cli->cl_loi_list_lock);
2092                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2093                         list_del_init(&oap->oap_rpc_item);
2094
2095                         /* queued sync pages can be torn down while the pages
2096                          * were between the pending list and the rpc */
2097                         if (oap->oap_interrupted) {
2098                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2099                                 osc_ap_completion(env, cli, NULL, oap, 0,
2100                                                   oap->oap_count);
2101                                 continue;
2102                         }
2103                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2104                 }
2105                 if (clerq && !IS_ERR(clerq))
2106                         cl_req_completion(env, clerq, PTR_ERR(req));
2107         }
2108         RETURN(req);
2109 }
2110
2111 /**
2112  * prepare pages for ASYNC io and put pages in send queue.
2113  *
2114  * \param cli -
2115  * \param loi -
2116  * \param cmd - OBD_BRW_* macroses
2117  * \param lop - pending pages
2118  *
2119  * \return zero if pages successfully add to send queue.
2120  * \return not zere if error occurring.
2121  */
2122 static int
2123 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2124                  struct lov_oinfo *loi,
2125                  int cmd, struct loi_oap_pages *lop)
2126 {
2127         struct ptlrpc_request *req;
2128         obd_count page_count = 0;
2129         struct osc_async_page *oap = NULL, *tmp;
2130         struct osc_brw_async_args *aa;
2131         const struct obd_async_page_ops *ops;
2132         CFS_LIST_HEAD(rpc_list);
2133         unsigned int ending_offset;
2134         unsigned  starting_offset = 0;
2135         int srvlock = 0;
2136         struct cl_object *clob = NULL;
2137         ENTRY;
2138
2139         /* first we find the pages we're allowed to work with */
2140         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2141                                  oap_pending_item) {
2142                 ops = oap->oap_caller_ops;
2143
2144                 LASSERT(oap->oap_magic == OAP_MAGIC);
2145
2146                 if (clob == NULL) {
2147                         /* pin object in memory, so that completion call-backs
2148                          * can be safely called under client_obd_list lock. */
2149                         clob = osc_oap2cl_page(oap)->cp_obj;
2150                         cl_object_get(clob);
2151                 }
2152
2153                 if (page_count != 0 &&
2154                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2155                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2156                                " oap %p, page %p, srvlock %u\n",
2157                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2158                         break;
2159                 }
2160                 /* in llite being 'ready' equates to the page being locked
2161                  * until completion unlocks it.  commit_write submits a page
2162                  * as not ready because its unlock will happen unconditionally
2163                  * as the call returns.  if we race with commit_write giving
2164                  * us that page we dont' want to create a hole in the page
2165                  * stream, so we stop and leave the rpc to be fired by
2166                  * another dirtier or kupdated interval (the not ready page
2167                  * will still be on the dirty list).  we could call in
2168                  * at the end of ll_file_write to process the queue again. */
2169                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2170                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2171                                                     cmd);
2172                         if (rc < 0)
2173                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2174                                                 "instead of ready\n", oap,
2175                                                 oap->oap_page, rc);
2176                         switch (rc) {
2177                         case -EAGAIN:
2178                                 /* llite is telling us that the page is still
2179                                  * in commit_write and that we should try
2180                                  * and put it in an rpc again later.  we
2181                                  * break out of the loop so we don't create
2182                                  * a hole in the sequence of pages in the rpc
2183                                  * stream.*/
2184                                 oap = NULL;
2185                                 break;
2186                         case -EINTR:
2187                                 /* the io isn't needed.. tell the checks
2188                                  * below to complete the rpc with EINTR */
2189                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2190                                 oap->oap_count = -EINTR;
2191                                 break;
2192                         case 0:
2193                                 oap->oap_async_flags |= ASYNC_READY;
2194                                 break;
2195                         default:
2196                                 LASSERTF(0, "oap %p page %p returned %d "
2197                                             "from make_ready\n", oap,
2198                                             oap->oap_page, rc);
2199                                 break;
2200                         }
2201                 }
2202                 if (oap == NULL)
2203                         break;
2204                 /*
2205                  * Page submitted for IO has to be locked. Either by
2206                  * ->ap_make_ready() or by higher layers.
2207                  */
2208 #if defined(__KERNEL__) && defined(__linux__)
2209                 {
2210                         struct cl_page *page;
2211
2212                         page = osc_oap2cl_page(oap);
2213
2214                         if (page->cp_type == CPT_CACHEABLE &&
2215                             !(PageLocked(oap->oap_page) &&
2216                               (CheckWriteback(oap->oap_page, cmd)))) {
2217                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2218                                        oap->oap_page,
2219                                        (long)oap->oap_page->flags,
2220                                        oap->oap_async_flags);
2221                                 LBUG();
2222                         }
2223                 }
2224 #endif
2225                 /* If there is a gap at the start of this page, it can't merge
2226                  * with any previous page, so we'll hand the network a
2227                  * "fragmented" page array that it can't transfer in 1 RDMA */
2228                 if (page_count != 0 && oap->oap_page_off != 0)
2229                         break;
2230
2231                 /* take the page out of our book-keeping */
2232                 list_del_init(&oap->oap_pending_item);
2233                 lop_update_pending(cli, lop, cmd, -1);
2234                 list_del_init(&oap->oap_urgent_item);
2235
2236                 if (page_count == 0)
2237                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2238                                           (PTLRPC_MAX_BRW_SIZE - 1);
2239
2240                 /* ask the caller for the size of the io as the rpc leaves. */
2241                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2242                         oap->oap_count =
2243                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2244                                                       cmd);
2245                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2246                 }
2247                 if (oap->oap_count <= 0) {
2248                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2249                                oap->oap_count);
2250                         osc_ap_completion(env, cli, NULL,
2251                                           oap, 0, oap->oap_count);
2252                         continue;
2253                 }
2254
2255                 /* now put the page back in our accounting */
2256                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2257                 if (page_count == 0)
2258                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2259                 if (++page_count >= cli->cl_max_pages_per_rpc)
2260                         break;
2261
2262                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2263                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2264                  * have the same alignment as the initial writes that allocated
2265                  * extents on the server. */
2266                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2267                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2268                 if (ending_offset == 0)
2269                         break;
2270
2271                 /* If there is a gap at the end of this page, it can't merge
2272                  * with any subsequent pages, so we'll hand the network a
2273                  * "fragmented" page array that it can't transfer in 1 RDMA */
2274                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2275                         break;
2276         }
2277
2278         osc_wake_cache_waiters(cli);
2279
2280         loi_list_maint(cli, loi);
2281
2282         client_obd_list_unlock(&cli->cl_loi_list_lock);
2283
2284         if (clob != NULL)
2285                 cl_object_put(env, clob);
2286
2287         if (page_count == 0) {
2288                 client_obd_list_lock(&cli->cl_loi_list_lock);
2289                 RETURN(0);
2290         }
2291
2292         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2293         if (IS_ERR(req)) {
2294                 LASSERT(list_empty(&rpc_list));
2295                 loi_list_maint(cli, loi);
2296                 RETURN(PTR_ERR(req));
2297         }
2298
2299         aa = ptlrpc_req_async_args(req);
2300
2301         if (cmd == OBD_BRW_READ) {
2302                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2303                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2304                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2305                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2306         } else {
2307                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2308                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2309                                  cli->cl_w_in_flight);
2310                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2311                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2312         }
2313         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2314
2315         client_obd_list_lock(&cli->cl_loi_list_lock);
2316
2317         if (cmd == OBD_BRW_READ)
2318                 cli->cl_r_in_flight++;
2319         else
2320                 cli->cl_w_in_flight++;
2321
2322         /* queued sync pages can be torn down while the pages
2323          * were between the pending list and the rpc */
2324         tmp = NULL;
2325         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2326                 /* only one oap gets a request reference */
2327                 if (tmp == NULL)
2328                         tmp = oap;
2329                 if (oap->oap_interrupted && !req->rq_intr) {
2330                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2331                                oap, req);
2332                         ptlrpc_mark_interrupted(req);
2333                 }
2334         }
2335         if (tmp != NULL)
2336                 tmp->oap_request = ptlrpc_request_addref(req);
2337
2338         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2339                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2340
2341         req->rq_interpret_reply = brw_interpret;
2342         ptlrpcd_add_req(req, PSCOPE_BRW);
2343         RETURN(1);
2344 }
2345
2346 #define LOI_DEBUG(LOI, STR, args...)                                     \
2347         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2348                !list_empty(&(LOI)->loi_cli_item),                        \
2349                (LOI)->loi_write_lop.lop_num_pending,                     \
2350                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2351                (LOI)->loi_read_lop.lop_num_pending,                      \
2352                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2353                args)                                                     \
2354
2355 /* This is called by osc_check_rpcs() to find which objects have pages that
2356  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2357 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2358 {
2359         ENTRY;
2360         /* first return all objects which we already know to have
2361          * pages ready to be stuffed into rpcs */
2362         if (!list_empty(&cli->cl_loi_ready_list))
2363                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2364                                   struct lov_oinfo, loi_cli_item));
2365
2366         /* then if we have cache waiters, return all objects with queued
2367          * writes.  This is especially important when many small files
2368          * have filled up the cache and not been fired into rpcs because
2369          * they don't pass the nr_pending/object threshhold */
2370         if (!list_empty(&cli->cl_cache_waiters) &&
2371             !list_empty(&cli->cl_loi_write_list))
2372                 RETURN(list_entry(cli->cl_loi_write_list.next,
2373                                   struct lov_oinfo, loi_write_item));
2374
2375         /* then return all queued objects when we have an invalid import
2376          * so that they get flushed */
2377         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2378                 if (!list_empty(&cli->cl_loi_write_list))
2379                         RETURN(list_entry(cli->cl_loi_write_list.next,
2380                                           struct lov_oinfo, loi_write_item));
2381                 if (!list_empty(&cli->cl_loi_read_list))
2382                         RETURN(list_entry(cli->cl_loi_read_list.next,
2383                                           struct lov_oinfo, loi_read_item));
2384         }
2385         RETURN(NULL);
2386 }
2387
2388 /* called with the loi list lock held */
2389 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2390 {
2391         struct lov_oinfo *loi;
2392         int rc = 0, race_counter = 0;
2393         ENTRY;
2394
2395         while ((loi = osc_next_loi(cli)) != NULL) {
2396                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2397
2398                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2399                         break;
2400
2401                 /* attempt some read/write balancing by alternating between
2402                  * reads and writes in an object.  The makes_rpc checks here
2403                  * would be redundant if we were getting read/write work items
2404                  * instead of objects.  we don't want send_oap_rpc to drain a
2405                  * partial read pending queue when we're given this object to
2406                  * do io on writes while there are cache waiters */
2407                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2408                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2409                                               &loi->loi_write_lop);
2410                         if (rc < 0)
2411                                 break;
2412                         if (rc > 0)
2413                                 race_counter = 0;
2414                         else
2415                                 race_counter++;
2416                 }
2417                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2418                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2419                                               &loi->loi_read_lop);
2420                         if (rc < 0)
2421                                 break;
2422                         if (rc > 0)
2423                                 race_counter = 0;
2424                         else
2425                                 race_counter++;
2426                 }
2427
2428                 /* attempt some inter-object balancing by issueing rpcs
2429                  * for each object in turn */
2430                 if (!list_empty(&loi->loi_cli_item))
2431                         list_del_init(&loi->loi_cli_item);
2432                 if (!list_empty(&loi->loi_write_item))
2433                         list_del_init(&loi->loi_write_item);
2434                 if (!list_empty(&loi->loi_read_item))
2435                         list_del_init(&loi->loi_read_item);
2436
2437                 loi_list_maint(cli, loi);
2438
2439                 /* send_oap_rpc fails with 0 when make_ready tells it to
2440                  * back off.  llite's make_ready does this when it tries
2441                  * to lock a page queued for write that is already locked.
2442                  * we want to try sending rpcs from many objects, but we
2443                  * don't want to spin failing with 0.  */
2444                 if (race_counter == 10)
2445                         break;
2446         }
2447         EXIT;
2448 }
2449
2450 /* we're trying to queue a page in the osc so we're subject to the
2451  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2452  * If the osc's queued pages are already at that limit, then we want to sleep
2453  * until there is space in the osc's queue for us.  We also may be waiting for
2454  * write credits from the OST if there are RPCs in flight that may return some
2455  * before we fall back to sync writes.
2456  *
2457  * We need this know our allocation was granted in the presence of signals */
2458 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2459 {
2460         int rc;
2461         ENTRY;
2462         client_obd_list_lock(&cli->cl_loi_list_lock);
2463         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2464         client_obd_list_unlock(&cli->cl_loi_list_lock);
2465         RETURN(rc);
2466 };
2467
2468 /**
2469  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2470  * is available.
2471  */
2472 int osc_enter_cache_try(const struct lu_env *env,
2473                         struct client_obd *cli, struct lov_oinfo *loi,
2474                         struct osc_async_page *oap, int transient)
2475 {
2476         int has_grant;
2477
2478         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2479         if (has_grant) {
2480                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2481                 if (transient) {
2482                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2483                         atomic_inc(&obd_dirty_transit_pages);
2484                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2485                 }
2486         }
2487         return has_grant;
2488 }
2489
2490 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2491  * grant or cache space. */
2492 static int osc_enter_cache(const struct lu_env *env,
2493                            struct client_obd *cli, struct lov_oinfo *loi,
2494                            struct osc_async_page *oap)
2495 {
2496         struct osc_cache_waiter ocw;
2497         struct l_wait_info lwi = { 0 };
2498
2499         ENTRY;
2500
2501         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2502                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2503                cli->cl_dirty_max, obd_max_dirty_pages,
2504                cli->cl_lost_grant, cli->cl_avail_grant);
2505
2506         /* force the caller to try sync io.  this can jump the list
2507          * of queued writes and create a discontiguous rpc stream */
2508         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2509             loi->loi_ar.ar_force_sync)
2510                 RETURN(-EDQUOT);
2511
2512         /* Hopefully normal case - cache space and write credits available */
2513         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2514             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2515             osc_enter_cache_try(env, cli, loi, oap, 0))
2516                 RETURN(0);
2517
2518         /* Make sure that there are write rpcs in flight to wait for.  This
2519          * is a little silly as this object may not have any pending but
2520          * other objects sure might. */
2521         if (cli->cl_w_in_flight) {
2522                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2523                 cfs_waitq_init(&ocw.ocw_waitq);
2524                 ocw.ocw_oap = oap;
2525                 ocw.ocw_rc = 0;
2526
2527                 loi_list_maint(cli, loi);
2528                 osc_check_rpcs(env, cli);
2529                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2530
2531                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2532                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2533
2534                 client_obd_list_lock(&cli->cl_loi_list_lock);
2535                 if (!list_empty(&ocw.ocw_entry)) {
2536                         list_del(&ocw.ocw_entry);
2537                         RETURN(-EINTR);
2538                 }
2539                 RETURN(ocw.ocw_rc);
2540         }
2541
2542         RETURN(-EDQUOT);
2543 }
2544
2545
2546 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2547                         struct lov_oinfo *loi, cfs_page_t *page,
2548                         obd_off offset, const struct obd_async_page_ops *ops,
2549                         void *data, void **res, int nocache,
2550                         struct lustre_handle *lockh)
2551 {
2552         struct osc_async_page *oap;
2553
2554         ENTRY;
2555
2556         if (!page)
2557                 return size_round(sizeof(*oap));
2558
2559         oap = *res;
2560         oap->oap_magic = OAP_MAGIC;
2561         oap->oap_cli = &exp->exp_obd->u.cli;
2562         oap->oap_loi = loi;
2563
2564         oap->oap_caller_ops = ops;
2565         oap->oap_caller_data = data;
2566
2567         oap->oap_page = page;
2568         oap->oap_obj_off = offset;
2569         if (!client_is_remote(exp) &&
2570             cfs_capable(CFS_CAP_SYS_RESOURCE))
2571                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2572
2573         LASSERT(!(offset & ~CFS_PAGE_MASK));
2574
2575         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2576         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2577         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2578         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2579
2580         spin_lock_init(&oap->oap_lock);
2581         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2582         RETURN(0);
2583 }
2584
2585 struct osc_async_page *oap_from_cookie(void *cookie)
2586 {
2587         struct osc_async_page *oap = cookie;
2588         if (oap->oap_magic != OAP_MAGIC)
2589                 return ERR_PTR(-EINVAL);
2590         return oap;
2591 };
2592
2593 int osc_queue_async_io(const struct lu_env *env,
2594                        struct obd_export *exp, struct lov_stripe_md *lsm,
2595                        struct lov_oinfo *loi, void *cookie,
2596                        int cmd, obd_off off, int count,
2597                        obd_flag brw_flags, enum async_flags async_flags)
2598 {
2599         struct client_obd *cli = &exp->exp_obd->u.cli;
2600         struct osc_async_page *oap;
2601         int rc = 0;
2602         ENTRY;
2603
2604         oap = oap_from_cookie(cookie);
2605         if (IS_ERR(oap))
2606                 RETURN(PTR_ERR(oap));
2607
2608         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2609                 RETURN(-EIO);
2610
2611         if (!list_empty(&oap->oap_pending_item) ||
2612             !list_empty(&oap->oap_urgent_item) ||
2613             !list_empty(&oap->oap_rpc_item))
2614                 RETURN(-EBUSY);
2615
2616         /* check if the file's owner/group is over quota */
2617         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2618                 struct cl_object *obj;
2619                 struct cl_attr    attr; /* XXX put attr into thread info */
2620
2621                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2622
2623                 cl_object_attr_lock(obj);
2624                 rc = cl_object_attr_get(env, obj, &attr);
2625                 cl_object_attr_unlock(obj);
2626
2627                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2628                                             attr.cat_gid) == NO_QUOTA)
2629                         rc = -EDQUOT;
2630                 if (rc)
2631                         RETURN(rc);
2632         }
2633
2634         if (loi == NULL)
2635                 loi = lsm->lsm_oinfo[0];
2636
2637         client_obd_list_lock(&cli->cl_loi_list_lock);
2638
2639         LASSERT(off + count <= CFS_PAGE_SIZE);
2640         oap->oap_cmd = cmd;
2641         oap->oap_page_off = off;
2642         oap->oap_count = count;
2643         oap->oap_brw_flags = brw_flags;
2644         oap->oap_async_flags = async_flags;
2645
2646         if (cmd & OBD_BRW_WRITE) {
2647                 rc = osc_enter_cache(env, cli, loi, oap);
2648                 if (rc) {
2649                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2650                         RETURN(rc);
2651                 }
2652         }
2653
2654         osc_oap_to_pending(oap);
2655         loi_list_maint(cli, loi);
2656
2657         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2658                   cmd);
2659
2660         osc_check_rpcs(env, cli);
2661         client_obd_list_unlock(&cli->cl_loi_list_lock);
2662
2663         RETURN(0);
2664 }
2665
2666 /* aka (~was & now & flag), but this is more clear :) */
2667 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2668
2669 int osc_set_async_flags_base(struct client_obd *cli,
2670                              struct lov_oinfo *loi, struct osc_async_page *oap,
2671                              obd_flag async_flags)
2672 {
2673         struct loi_oap_pages *lop;
2674         ENTRY;
2675
2676         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2677                 RETURN(-EIO);
2678
2679         if (oap->oap_cmd & OBD_BRW_WRITE) {
2680                 lop = &loi->loi_write_lop;
2681         } else {
2682                 lop = &loi->loi_read_lop;
2683         }
2684
2685         if (list_empty(&oap->oap_pending_item))
2686                 RETURN(-EINVAL);
2687
2688         if ((oap->oap_async_flags & async_flags) == async_flags)
2689                 RETURN(0);
2690
2691         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2692                 oap->oap_async_flags |= ASYNC_READY;
2693
2694         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2695                 if (list_empty(&oap->oap_rpc_item)) {
2696                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2697                         loi_list_maint(cli, loi);
2698                 }
2699         }
2700
2701         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2702                         oap->oap_async_flags);
2703         RETURN(0);
2704 }
2705
2706 int osc_teardown_async_page(struct obd_export *exp,
2707                             struct lov_stripe_md *lsm,
2708                             struct lov_oinfo *loi, void *cookie)
2709 {
2710         struct client_obd *cli = &exp->exp_obd->u.cli;
2711         struct loi_oap_pages *lop;
2712         struct osc_async_page *oap;
2713         int rc = 0;
2714         ENTRY;
2715
2716         oap = oap_from_cookie(cookie);
2717         if (IS_ERR(oap))
2718                 RETURN(PTR_ERR(oap));
2719
2720         if (loi == NULL)
2721                 loi = lsm->lsm_oinfo[0];
2722
2723         if (oap->oap_cmd & OBD_BRW_WRITE) {
2724                 lop = &loi->loi_write_lop;
2725         } else {
2726                 lop = &loi->loi_read_lop;
2727         }
2728
2729         client_obd_list_lock(&cli->cl_loi_list_lock);
2730
2731         if (!list_empty(&oap->oap_rpc_item))
2732                 GOTO(out, rc = -EBUSY);
2733
2734         osc_exit_cache(cli, oap, 0);
2735         osc_wake_cache_waiters(cli);
2736
2737         if (!list_empty(&oap->oap_urgent_item)) {
2738                 list_del_init(&oap->oap_urgent_item);
2739                 oap->oap_async_flags &= ~ASYNC_URGENT;
2740         }
2741         if (!list_empty(&oap->oap_pending_item)) {
2742                 list_del_init(&oap->oap_pending_item);
2743                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2744         }
2745         loi_list_maint(cli, loi);
2746         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2747 out:
2748         client_obd_list_unlock(&cli->cl_loi_list_lock);
2749         RETURN(rc);
2750 }
2751
2752 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2753                                          struct ldlm_enqueue_info *einfo,
2754                                          int flags)
2755 {
2756         void *data = einfo->ei_cbdata;
2757
2758         LASSERT(lock != NULL);
2759         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2760         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2761         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2762         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2763
2764         lock_res_and_lock(lock);
2765         spin_lock(&osc_ast_guard);
2766         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2767         lock->l_ast_data = data;
2768         spin_unlock(&osc_ast_guard);
2769         unlock_res_and_lock(lock);
2770 }
2771
2772 static void osc_set_data_with_check(struct lustre_handle *lockh,
2773                                     struct ldlm_enqueue_info *einfo,
2774                                     int flags)
2775 {
2776         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2777
2778         if (lock != NULL) {
2779                 osc_set_lock_data_with_check(lock, einfo, flags);
2780                 LDLM_LOCK_PUT(lock);
2781         } else
2782                 CERROR("lockh %p, data %p - client evicted?\n",
2783                        lockh, einfo->ei_cbdata);
2784 }
2785
2786 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2787                              ldlm_iterator_t replace, void *data)
2788 {
2789         struct ldlm_res_id res_id;
2790         struct obd_device *obd = class_exp2obd(exp);
2791
2792         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2793         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2794         return 0;
2795 }
2796
2797 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2798                             obd_enqueue_update_f upcall, void *cookie,
2799                             int *flags, int rc)
2800 {
2801         int intent = *flags & LDLM_FL_HAS_INTENT;
2802         ENTRY;
2803
2804         if (intent) {
2805                 /* The request was created before ldlm_cli_enqueue call. */
2806                 if (rc == ELDLM_LOCK_ABORTED) {
2807                         struct ldlm_reply *rep;
2808                         rep = req_capsule_server_get(&req->rq_pill,
2809                                                      &RMF_DLM_REP);
2810
2811                         LASSERT(rep != NULL);
2812                         if (rep->lock_policy_res1)
2813                                 rc = rep->lock_policy_res1;
2814                 }
2815         }
2816
2817         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2818                 *flags |= LDLM_FL_LVB_READY;
2819                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2820                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2821         }
2822
2823         /* Call the update callback. */
2824         rc = (*upcall)(cookie, rc);
2825         RETURN(rc);
2826 }
2827
2828 static int osc_enqueue_interpret(const struct lu_env *env,
2829                                  struct ptlrpc_request *req,
2830                                  struct osc_enqueue_args *aa, int rc)
2831 {
2832         struct ldlm_lock *lock;
2833         struct lustre_handle handle;
2834         __u32 mode;
2835
2836         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2837          * might be freed anytime after lock upcall has been called. */
2838         lustre_handle_copy(&handle, aa->oa_lockh);
2839         mode = aa->oa_ei->ei_mode;
2840
2841         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2842          * be valid. */
2843         lock = ldlm_handle2lock(&handle);
2844
2845         /* Take an additional reference so that a blocking AST that
2846          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2847          * to arrive after an upcall has been executed by
2848          * osc_enqueue_fini(). */
2849         ldlm_lock_addref(&handle, mode);
2850
2851         /* Complete obtaining the lock procedure. */
2852         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2853                                    mode, aa->oa_flags, aa->oa_lvb,
2854                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2855                                    &handle, rc);
2856         /* Complete osc stuff. */
2857         rc = osc_enqueue_fini(req, aa->oa_lvb,
2858                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2859         /* Release the lock for async request. */
2860         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2861                 /*
2862                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2863                  * not already released by
2864                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2865                  */
2866                 ldlm_lock_decref(&handle, mode);
2867
2868         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2869                  aa->oa_lockh, req, aa);
2870         ldlm_lock_decref(&handle, mode);
2871         LDLM_LOCK_PUT(lock);
2872         return rc;
2873 }
2874
2875 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2876                         struct lov_oinfo *loi, int flags,
2877                         struct ost_lvb *lvb, __u32 mode, int rc)
2878 {
2879         if (rc == ELDLM_OK) {
2880                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2881                 __u64 tmp;
2882
2883                 LASSERT(lock != NULL);
2884                 loi->loi_lvb = *lvb;
2885                 tmp = loi->loi_lvb.lvb_size;
2886                 /* Extend KMS up to the end of this lock and no further
2887                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2888                 if (tmp > lock->l_policy_data.l_extent.end)
2889                         tmp = lock->l_policy_data.l_extent.end + 1;
2890                 if (tmp >= loi->loi_kms) {
2891                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2892                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2893                         loi_kms_set(loi, tmp);
2894                 } else {
2895                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2896                                    LPU64"; leaving kms="LPU64", end="LPU64,
2897                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2898                                    lock->l_policy_data.l_extent.end);
2899                 }
2900                 ldlm_lock_allow_match(lock);
2901                 LDLM_LOCK_PUT(lock);
2902         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2903                 loi->loi_lvb = *lvb;
2904                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2905                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2906                 rc = ELDLM_OK;
2907         }
2908 }
2909 EXPORT_SYMBOL(osc_update_enqueue);
2910
2911 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2912
2913 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2914  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2915  * other synchronous requests, however keeping some locks and trying to obtain
2916  * others may take a considerable amount of time in a case of ost failure; and
2917  * when other sync requests do not get released lock from a client, the client
2918  * is excluded from the cluster -- such scenarious make the life difficult, so
2919  * release locks just after they are obtained. */
2920 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2921                      int *flags, ldlm_policy_data_t *policy,
2922                      struct ost_lvb *lvb, int kms_valid,
2923                      obd_enqueue_update_f upcall, void *cookie,
2924                      struct ldlm_enqueue_info *einfo,
2925                      struct lustre_handle *lockh,
2926                      struct ptlrpc_request_set *rqset, int async)
2927 {
2928         struct obd_device *obd = exp->exp_obd;
2929         struct ptlrpc_request *req = NULL;
2930         int intent = *flags & LDLM_FL_HAS_INTENT;
2931         ldlm_mode_t mode;
2932         int rc;
2933         ENTRY;
2934
2935         /* Filesystem lock extents are extended to page boundaries so that
2936          * dealing with the page cache is a little smoother.  */
2937         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2938         policy->l_extent.end |= ~CFS_PAGE_MASK;
2939
2940         /*
2941          * kms is not valid when either object is completely fresh (so that no
2942          * locks are cached), or object was evicted. In the latter case cached
2943          * lock cannot be used, because it would prime inode state with
2944          * potentially stale LVB.
2945          */
2946         if (!kms_valid)
2947                 goto no_match;
2948
2949         /* Next, search for already existing extent locks that will cover us */
2950         /* If we're trying to read, we also search for an existing PW lock.  The
2951          * VFS and page cache already protect us locally, so lots of readers/
2952          * writers can share a single PW lock.
2953          *
2954          * There are problems with conversion deadlocks, so instead of
2955          * converting a read lock to a write lock, we'll just enqueue a new
2956          * one.
2957          *
2958          * At some point we should cancel the read lock instead of making them
2959          * send us a blocking callback, but there are problems with canceling
2960          * locks out from other users right now, too. */
2961         mode = einfo->ei_mode;
2962         if (einfo->ei_mode == LCK_PR)
2963                 mode |= LCK_PW;
2964         mode = ldlm_lock_match(obd->obd_namespace,
2965                                *flags | LDLM_FL_LVB_READY, res_id,
2966                                einfo->ei_type, policy, mode, lockh, 0);
2967         if (mode) {
2968                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2969
2970                 if (matched->l_ast_data == NULL ||
2971                     matched->l_ast_data == einfo->ei_cbdata) {
2972                         /* addref the lock only if not async requests and PW
2973                          * lock is matched whereas we asked for PR. */
2974                         if (!rqset && einfo->ei_mode != mode)
2975                                 ldlm_lock_addref(lockh, LCK_PR);
2976                         osc_set_lock_data_with_check(matched, einfo, *flags);
2977                         if (intent) {
2978                                 /* I would like to be able to ASSERT here that
2979                                  * rss <= kms, but I can't, for reasons which
2980                                  * are explained in lov_enqueue() */
2981                         }
2982
2983                         /* We already have a lock, and it's referenced */
2984                         (*upcall)(cookie, ELDLM_OK);
2985
2986                         /* For async requests, decref the lock. */
2987                         if (einfo->ei_mode != mode)
2988                                 ldlm_lock_decref(lockh, LCK_PW);
2989                         else if (rqset)
2990                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2991                         LDLM_LOCK_PUT(matched);
2992                         RETURN(ELDLM_OK);
2993                 } else
2994                         ldlm_lock_decref(lockh, mode);
2995                 LDLM_LOCK_PUT(matched);
2996         }
2997
2998  no_match:
2999         if (intent) {
3000                 CFS_LIST_HEAD(cancels);
3001                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3002                                            &RQF_LDLM_ENQUEUE_LVB);
3003                 if (req == NULL)
3004                         RETURN(-ENOMEM);
3005
3006                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3007                 if (rc)
3008                         RETURN(rc);
3009
3010                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3011                                      sizeof *lvb);
3012                 ptlrpc_request_set_replen(req);
3013         }
3014
3015         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3016         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3017
3018         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3019                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3020         if (rqset) {
3021                 if (!rc) {
3022                         struct osc_enqueue_args *aa;
3023                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3024                         aa = ptlrpc_req_async_args(req);
3025                         aa->oa_ei = einfo;
3026                         aa->oa_exp = exp;
3027                         aa->oa_flags  = flags;
3028                         aa->oa_upcall = upcall;
3029                         aa->oa_cookie = cookie;
3030                         aa->oa_lvb    = lvb;
3031                         aa->oa_lockh  = lockh;
3032
3033                         req->rq_interpret_reply =
3034                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3035                         if (rqset == PTLRPCD_SET)
3036                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3037                         else
3038                                 ptlrpc_set_add_req(rqset, req);
3039                 } else if (intent) {
3040                         ptlrpc_req_finished(req);
3041                 }
3042                 RETURN(rc);
3043         }
3044
3045         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3046         if (intent)
3047                 ptlrpc_req_finished(req);
3048
3049         RETURN(rc);
3050 }
3051
3052 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3053                        struct ldlm_enqueue_info *einfo,
3054                        struct ptlrpc_request_set *rqset)
3055 {
3056         struct ldlm_res_id res_id;
3057         int rc;
3058         ENTRY;
3059
3060         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3061                            oinfo->oi_md->lsm_object_gr, &res_id);
3062
3063         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3064                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3065                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3066                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3067                               rqset, rqset != NULL);
3068         RETURN(rc);
3069 }
3070
3071 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3072                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3073                    int *flags, void *data, struct lustre_handle *lockh,
3074                    int unref)
3075 {
3076         struct obd_device *obd = exp->exp_obd;
3077         int lflags = *flags;
3078         ldlm_mode_t rc;
3079         ENTRY;
3080
3081         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3082                 RETURN(-EIO);
3083
3084         /* Filesystem lock extents are extended to page boundaries so that
3085          * dealing with the page cache is a little smoother */
3086         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3087         policy->l_extent.end |= ~CFS_PAGE_MASK;
3088
3089         /* Next, search for already existing extent locks that will cover us */
3090         /* If we're trying to read, we also search for an existing PW lock.  The
3091          * VFS and page cache already protect us locally, so lots of readers/
3092          * writers can share a single PW lock. */
3093         rc = mode;
3094         if (mode == LCK_PR)
3095                 rc |= LCK_PW;
3096         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3097                              res_id, type, policy, rc, lockh, unref);
3098         if (rc) {
3099                 if (data != NULL)
3100                         osc_set_data_with_check(lockh, data, lflags);
3101                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3102                         ldlm_lock_addref(lockh, LCK_PR);
3103                         ldlm_lock_decref(lockh, LCK_PW);
3104                 }
3105                 RETURN(rc);
3106         }
3107         RETURN(rc);
3108 }
3109
3110 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3111 {
3112         ENTRY;
3113
3114         if (unlikely(mode == LCK_GROUP))
3115                 ldlm_lock_decref_and_cancel(lockh, mode);
3116         else
3117                 ldlm_lock_decref(lockh, mode);
3118
3119         RETURN(0);
3120 }
3121
3122 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3123                       __u32 mode, struct lustre_handle *lockh)
3124 {
3125         ENTRY;
3126         RETURN(osc_cancel_base(lockh, mode));
3127 }
3128
3129 static int osc_cancel_unused(struct obd_export *exp,
3130                              struct lov_stripe_md *lsm, int flags,
3131                              void *opaque)
3132 {
3133         struct obd_device *obd = class_exp2obd(exp);
3134         struct ldlm_res_id res_id, *resp = NULL;
3135
3136         if (lsm != NULL) {
3137                 resp = osc_build_res_name(lsm->lsm_object_id,
3138                                           lsm->lsm_object_gr, &res_id);
3139         }
3140
3141         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3142 }
3143
3144 static int osc_statfs_interpret(const struct lu_env *env,
3145                                 struct ptlrpc_request *req,
3146                                 struct osc_async_args *aa, int rc)
3147 {
3148         struct obd_statfs *msfs;
3149         ENTRY;
3150
3151         if (rc != 0)
3152                 GOTO(out, rc);
3153
3154         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3155         if (msfs == NULL) {
3156                 GOTO(out, rc = -EPROTO);
3157         }
3158
3159         *aa->aa_oi->oi_osfs = *msfs;
3160 out:
3161         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3162         RETURN(rc);
3163 }
3164
3165 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3166                             __u64 max_age, struct ptlrpc_request_set *rqset)
3167 {
3168         struct ptlrpc_request *req;
3169         struct osc_async_args *aa;
3170         int                    rc;
3171         ENTRY;
3172
3173         /* We could possibly pass max_age in the request (as an absolute
3174          * timestamp or a "seconds.usec ago") so the target can avoid doing
3175          * extra calls into the filesystem if that isn't necessary (e.g.
3176          * during mount that would help a bit).  Having relative timestamps
3177          * is not so great if request processing is slow, while absolute
3178          * timestamps are not ideal because they need time synchronization. */
3179         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3180         if (req == NULL)
3181                 RETURN(-ENOMEM);
3182
3183         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3184         if (rc) {
3185                 ptlrpc_request_free(req);
3186                 RETURN(rc);
3187         }
3188         ptlrpc_request_set_replen(req);
3189         req->rq_request_portal = OST_CREATE_PORTAL;
3190         ptlrpc_at_set_req_timeout(req);
3191
3192         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3193                 /* procfs requests not want stat in wait for avoid deadlock */
3194                 req->rq_no_resend = 1;
3195                 req->rq_no_delay = 1;
3196         }
3197
3198         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3199         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3200         aa = ptlrpc_req_async_args(req);
3201         aa->aa_oi = oinfo;
3202
3203         ptlrpc_set_add_req(rqset, req);
3204         RETURN(0);
3205 }
3206
3207 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3208                       __u64 max_age, __u32 flags)
3209 {
3210         struct obd_statfs     *msfs;
3211         struct ptlrpc_request *req;
3212         struct obd_import     *imp = NULL;
3213         int rc;
3214         ENTRY;
3215
3216         /*Since the request might also come from lprocfs, so we need
3217          *sync this with client_disconnect_export Bug15684*/
3218         down_read(&obd->u.cli.cl_sem);
3219         if (obd->u.cli.cl_import)
3220                 imp = class_import_get(obd->u.cli.cl_import);
3221         up_read(&obd->u.cli.cl_sem);
3222         if (!imp)
3223                 RETURN(-ENODEV);
3224
3225         /* We could possibly pass max_age in the request (as an absolute
3226          * timestamp or a "seconds.usec ago") so the target can avoid doing
3227          * extra calls into the filesystem if that isn't necessary (e.g.
3228          * during mount that would help a bit).  Having relative timestamps
3229          * is not so great if request processing is slow, while absolute
3230          * timestamps are not ideal because they need time synchronization. */
3231         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3232
3233         class_import_put(imp);
3234
3235         if (req == NULL)
3236                 RETURN(-ENOMEM);
3237
3238         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3239         if (rc) {
3240                 ptlrpc_request_free(req);
3241                 RETURN(rc);
3242         }
3243         ptlrpc_request_set_replen(req);
3244         req->rq_request_portal = OST_CREATE_PORTAL;
3245         ptlrpc_at_set_req_timeout(req);
3246
3247         if (flags & OBD_STATFS_NODELAY) {
3248                 /* procfs requests not want stat in wait for avoid deadlock */
3249                 req->rq_no_resend = 1;
3250                 req->rq_no_delay = 1;
3251         }
3252
3253         rc = ptlrpc_queue_wait(req);
3254         if (rc)
3255                 GOTO(out, rc);
3256
3257         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3258         if (msfs == NULL) {
3259                 GOTO(out, rc = -EPROTO);
3260         }
3261
3262         *osfs = *msfs;
3263
3264         EXIT;
3265  out:
3266         ptlrpc_req_finished(req);
3267         return rc;
3268 }
3269
3270 /* Retrieve object striping information.
3271  *
3272  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3273  * the maximum number of OST indices which will fit in the user buffer.
3274  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3275  */
3276 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3277 {
3278         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3279         struct lov_user_md_v3 lum, *lumk;
3280         struct lov_user_ost_data_v1 *lmm_objects;
3281         int rc = 0, lum_size;
3282         ENTRY;
3283
3284         if (!lsm)
3285                 RETURN(-ENODATA);
3286
3287         /* we only need the header part from user space to get lmm_magic and
3288          * lmm_stripe_count, (the header part is common to v1 and v3) */
3289         lum_size = sizeof(struct lov_user_md_v1);
3290         if (copy_from_user(&lum, lump, lum_size))
3291                 RETURN(-EFAULT);
3292
3293         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3294             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3295                 RETURN(-EINVAL);
3296
3297         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3298         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3299         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3300         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3301
3302         /* we can use lov_mds_md_size() to compute lum_size
3303          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3304         if (lum.lmm_stripe_count > 0) {
3305                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3306                 OBD_ALLOC(lumk, lum_size);
3307                 if (!lumk)
3308                         RETURN(-ENOMEM);
3309
3310                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3311                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3312                 else
3313                         lmm_objects = &(lumk->lmm_objects[0]);
3314                 lmm_objects->l_object_id = lsm->lsm_object_id;
3315         } else {
3316                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3317                 lumk = &lum;
3318         }
3319
3320         lumk->lmm_object_id = lsm->lsm_object_id;
3321         lumk->lmm_object_gr = lsm->lsm_object_gr;
3322         lumk->lmm_stripe_count = 1;
3323
3324         if (copy_to_user(lump, lumk, lum_size))
3325                 rc = -EFAULT;
3326
3327         if (lumk != &lum)
3328                 OBD_FREE(lumk, lum_size);
3329
3330         RETURN(rc);
3331 }
3332
3333
3334 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3335                          void *karg, void *uarg)
3336 {
3337         struct obd_device *obd = exp->exp_obd;
3338         struct obd_ioctl_data *data = karg;
3339         int err = 0;
3340         ENTRY;
3341
3342         if (!try_module_get(THIS_MODULE)) {
3343                 CERROR("Can't get module. Is it alive?");
3344                 return -EINVAL;
3345         }
3346         switch (cmd) {
3347         case OBD_IOC_LOV_GET_CONFIG: {
3348                 char *buf;
3349                 struct lov_desc *desc;
3350                 struct obd_uuid uuid;
3351
3352                 buf = NULL;
3353                 len = 0;
3354                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3355                         GOTO(out, err = -EINVAL);
3356
3357                 data = (struct obd_ioctl_data *)buf;
3358
3359                 if (sizeof(*desc) > data->ioc_inllen1) {
3360                         obd_ioctl_freedata(buf, len);
3361                         GOTO(out, err = -EINVAL);
3362                 }
3363
3364                 if (data->ioc_inllen2 < sizeof(uuid)) {
3365                         obd_ioctl_freedata(buf, len);
3366                         GOTO(out, err = -EINVAL);
3367                 }
3368
3369                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3370                 desc->ld_tgt_count = 1;
3371                 desc->ld_active_tgt_count = 1;
3372                 desc->ld_default_stripe_count = 1;
3373                 desc->ld_default_stripe_size = 0;
3374                 desc->ld_default_stripe_offset = 0;
3375                 desc->ld_pattern = 0;
3376                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3377
3378                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3379
3380                 err = copy_to_user((void *)uarg, buf, len);
3381                 if (err)
3382                         err = -EFAULT;
3383                 obd_ioctl_freedata(buf, len);
3384                 GOTO(out, err);
3385         }
3386         case LL_IOC_LOV_SETSTRIPE:
3387                 err = obd_alloc_memmd(exp, karg);
3388                 if (err > 0)
3389                         err = 0;
3390                 GOTO(out, err);
3391         case LL_IOC_LOV_GETSTRIPE:
3392                 err = osc_getstripe(karg, uarg);
3393                 GOTO(out, err);
3394         case OBD_IOC_CLIENT_RECOVER:
3395                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3396                                             data->ioc_inlbuf1);
3397                 if (err > 0)
3398                         err = 0;
3399                 GOTO(out, err);
3400         case IOC_OSC_SET_ACTIVE:
3401                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3402                                                data->ioc_offset);
3403                 GOTO(out, err);
3404         case OBD_IOC_POLL_QUOTACHECK:
3405                 err = lquota_poll_check(quota_interface, exp,
3406                                         (struct if_quotacheck *)karg);
3407                 GOTO(out, err);
3408         default:
3409                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3410                        cmd, cfs_curproc_comm());
3411                 GOTO(out, err = -ENOTTY);
3412         }
3413 out:
3414         module_put(THIS_MODULE);
3415         return err;
3416 }
3417
3418 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3419                         void *key, __u32 *vallen, void *val,
3420                         struct lov_stripe_md *lsm)
3421 {
3422         ENTRY;
3423         if (!vallen || !val)
3424                 RETURN(-EFAULT);
3425
3426         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3427                 __u32 *stripe = val;
3428                 *vallen = sizeof(*stripe);
3429                 *stripe = 0;
3430                 RETURN(0);
3431         } else if (KEY_IS(KEY_LAST_ID)) {
3432                 struct ptlrpc_request *req;
3433                 obd_id                *reply;
3434                 char                  *tmp;
3435                 int                    rc;
3436
3437                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3438                                            &RQF_OST_GET_INFO_LAST_ID);
3439                 if (req == NULL)
3440                         RETURN(-ENOMEM);
3441
3442                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3443                                      RCL_CLIENT, keylen);
3444                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3445                 if (rc) {
3446                         ptlrpc_request_free(req);
3447                         RETURN(rc);
3448                 }
3449
3450                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3451                 memcpy(tmp, key, keylen);
3452
3453                 ptlrpc_request_set_replen(req);
3454                 rc = ptlrpc_queue_wait(req);
3455                 if (rc)
3456                         GOTO(out, rc);
3457
3458                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3459                 if (reply == NULL)
3460                         GOTO(out, rc = -EPROTO);
3461
3462                 *((obd_id *)val) = *reply;
3463         out:
3464                 ptlrpc_req_finished(req);
3465                 RETURN(rc);
3466         } else if (KEY_IS(KEY_FIEMAP)) {
3467                 struct ptlrpc_request *req;
3468                 struct ll_user_fiemap *reply;
3469                 char *tmp;
3470                 int rc;
3471
3472                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3473                                            &RQF_OST_GET_INFO_FIEMAP);
3474                 if (req == NULL)
3475                         RETURN(-ENOMEM);
3476
3477                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3478                                      RCL_CLIENT, keylen);
3479                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3480                                      RCL_CLIENT, *vallen);
3481                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3482                                      RCL_SERVER, *vallen);
3483
3484                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3485                 if (rc) {
3486                         ptlrpc_request_free(req);
3487                         RETURN(rc);
3488                 }
3489
3490                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3491                 memcpy(tmp, key, keylen);
3492                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3493                 memcpy(tmp, val, *vallen);
3494
3495                 ptlrpc_request_set_replen(req);
3496                 rc = ptlrpc_queue_wait(req);
3497                 if (rc)
3498                         GOTO(out1, rc);
3499
3500                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3501                 if (reply == NULL)
3502                         GOTO(out1, rc = -EPROTO);
3503
3504                 memcpy(val, reply, *vallen);
3505         out1:
3506                 ptlrpc_req_finished(req);
3507
3508                 RETURN(rc);
3509         }
3510
3511         RETURN(-EINVAL);
3512 }
3513
3514 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3515                                           struct ptlrpc_request *req,
3516                                           void *aa, int rc)
3517 {
3518         struct llog_ctxt *ctxt;
3519         struct obd_import *imp = req->rq_import;
3520         ENTRY;
3521
3522         if (rc != 0)
3523                 RETURN(rc);
3524
3525         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3526         if (ctxt) {
3527                 if (rc == 0)
3528                         rc = llog_initiator_connect(ctxt);
3529                 else
3530                         CERROR("cannot establish connection for "
3531                                "ctxt %p: %d\n", ctxt, rc);
3532         }
3533
3534         llog_ctxt_put(ctxt);
3535         spin_lock(&imp->imp_lock);
3536         imp->imp_server_timeout = 1;
3537         imp->imp_pingable = 1;
3538         spin_unlock(&imp->imp_lock);
3539         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3540
3541         RETURN(rc);
3542 }
3543
3544 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3545                               void *key, obd_count vallen, void *val,
3546                               struct ptlrpc_request_set *set)
3547 {
3548         struct ptlrpc_request *req;
3549         struct obd_device     *obd = exp->exp_obd;
3550         struct obd_import     *imp = class_exp2cliimp(exp);
3551         char                  *tmp;
3552         int                    rc;
3553         ENTRY;
3554
3555         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3556
3557         if (KEY_IS(KEY_NEXT_ID)) {
3558                 if (vallen != sizeof(obd_id))
3559                         RETURN(-ERANGE);
3560                 if (val == NULL)
3561                         RETURN(-EINVAL);
3562                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3563                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3564                        exp->exp_obd->obd_name,
3565                        obd->u.cli.cl_oscc.oscc_next_id);
3566
3567                 RETURN(0);
3568         }
3569
3570         if (KEY_IS(KEY_UNLINKED)) {
3571                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3572                 spin_lock(&oscc->oscc_lock);
3573                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3574                 spin_unlock(&oscc->oscc_lock);
3575                 RETURN(0);
3576         }
3577
3578         if (KEY_IS(KEY_INIT_RECOV)) {
3579                 if (vallen != sizeof(int))
3580                         RETURN(-EINVAL);
3581                 spin_lock(&imp->imp_lock);
3582                 imp->imp_initial_recov = *(int *)val;
3583                 spin_unlock(&imp->imp_lock);
3584                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3585                        exp->exp_obd->obd_name,
3586                        imp->imp_initial_recov);
3587                 RETURN(0);
3588         }
3589
3590         if (KEY_IS(KEY_CHECKSUM)) {
3591                 if (vallen != sizeof(int))
3592                         RETURN(-EINVAL);
3593                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3594                 RETURN(0);
3595         }
3596
3597         if (KEY_IS(KEY_FLUSH_CTX)) {
3598                 sptlrpc_import_flush_my_ctx(imp);
3599                 RETURN(0);
3600         }
3601
3602         if (!set)
3603                 RETURN(-EINVAL);
3604
3605         /* We pass all other commands directly to OST. Since nobody calls osc
3606            methods directly and everybody is supposed to go through LOV, we
3607            assume lov checked invalid values for us.
3608            The only recognised values so far are evict_by_nid and mds_conn.
3609            Even if something bad goes through, we'd get a -EINVAL from OST
3610            anyway. */
3611
3612
3613         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3614         if (req == NULL)
3615                 RETURN(-ENOMEM);
3616
3617         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3618                              RCL_CLIENT, keylen);
3619         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3620                              RCL_CLIENT, vallen);
3621         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3622         if (rc) {
3623                 ptlrpc_request_free(req);
3624                 RETURN(rc);
3625         }
3626
3627         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3628         memcpy(tmp, key, keylen);
3629         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3630         memcpy(tmp, val, vallen);
3631
3632         if (KEY_IS(KEY_MDS_CONN)) {
3633                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3634
3635                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3636                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3637                 LASSERT(oscc->oscc_oa.o_gr > 0);
3638                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3639         }
3640
3641         ptlrpc_request_set_replen(req);
3642         ptlrpc_set_add_req(set, req);
3643         ptlrpc_check_set(NULL, set);
3644
3645         RETURN(0);
3646 }
3647
3648
3649 static struct llog_operations osc_size_repl_logops = {
3650         lop_cancel: llog_obd_repl_cancel
3651 };
3652
3653 static struct llog_operations osc_mds_ost_orig_logops;
3654 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3655                          struct obd_device *tgt, int count,
3656                          struct llog_catid *catid, struct obd_uuid *uuid)
3657 {
3658         int rc;
3659         ENTRY;
3660
3661         LASSERT(olg == &obd->obd_olg);
3662         spin_lock(&obd->obd_dev_lock);
3663         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3664                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3665                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3666                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3667                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3668                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3669         }
3670         spin_unlock(&obd->obd_dev_lock);
3671
3672         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3673                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3674         if (rc) {
3675                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3676                 GOTO (out, rc);
3677         }
3678
3679         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3680                         NULL, &osc_size_repl_logops);
3681         if (rc) {
3682                 struct llog_ctxt *ctxt =
3683                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3684                 if (ctxt)
3685                         llog_cleanup(ctxt);
3686                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3687         }
3688         GOTO(out, rc);
3689 out:
3690         if (rc) {
3691                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3692                        obd->obd_name, tgt->obd_name, count, catid, rc);
3693                 CERROR("logid "LPX64":0x%x\n",
3694                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3695         }
3696         return rc;
3697 }
3698
3699 static int osc_llog_finish(struct obd_device *obd, int count)
3700 {
3701         struct llog_ctxt *ctxt;
3702         int rc = 0, rc2 = 0;
3703         ENTRY;
3704
3705         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3706         if (ctxt)
3707                 rc = llog_cleanup(ctxt);
3708
3709         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3710         if (ctxt)
3711                 rc2 = llog_cleanup(ctxt);
3712         if (!rc)
3713                 rc = rc2;
3714
3715         RETURN(rc);
3716 }
3717
3718 static int osc_reconnect(const struct lu_env *env,
3719                          struct obd_export *exp, struct obd_device *obd,
3720                          struct obd_uuid *cluuid,
3721                          struct obd_connect_data *data,
3722                          void *localdata)
3723 {
3724         struct client_obd *cli = &obd->u.cli;
3725
3726         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3727                 long lost_grant;
3728
3729                 client_obd_list_lock(&cli->cl_loi_list_lock);
3730                 data->ocd_grant = cli->cl_avail_grant ?:
3731                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3732                 lost_grant = cli->cl_lost_grant;
3733                 cli->cl_lost_grant = 0;
3734                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3735
3736                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3737                        "cl_lost_grant: %ld\n", data->ocd_grant,
3738                        cli->cl_avail_grant, lost_grant);
3739                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3740                        " ocd_grant: %d\n", data->ocd_connect_flags,
3741                        data->ocd_version, data->ocd_grant);
3742         }
3743
3744         RETURN(0);
3745 }
3746
3747 static int osc_disconnect(struct obd_export *exp)
3748 {
3749         struct obd_device *obd = class_exp2obd(exp);
3750         struct llog_ctxt  *ctxt;
3751         int rc;
3752
3753         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3754         if (ctxt) {
3755                 if (obd->u.cli.cl_conn_count == 1) {
3756                         /* Flush any remaining cancel messages out to the 
3757                          * target */
3758                         llog_sync(ctxt, exp);
3759                 }
3760                 llog_ctxt_put(ctxt);
3761         } else {
3762                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 
3763                        obd);
3764         }
3765
3766         rc = client_disconnect_export(exp);
3767         return rc;
3768 }
3769
3770 static int osc_import_event(struct obd_device *obd,
3771                             struct obd_import *imp,
3772                             enum obd_import_event event)
3773 {
3774         struct client_obd *cli;
3775         int rc = 0;
3776
3777         ENTRY;
3778         LASSERT(imp->imp_obd == obd);
3779
3780         switch (event) {
3781         case IMP_EVENT_DISCON: {
3782                 /* Only do this on the MDS OSC's */
3783                 if (imp->imp_server_timeout) {
3784                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3785
3786                         spin_lock(&oscc->oscc_lock);
3787                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3788                         spin_unlock(&oscc->oscc_lock);
3789                 }
3790                 cli = &obd->u.cli;
3791                 client_obd_list_lock(&cli->cl_loi_list_lock);
3792                 cli->cl_avail_grant = 0;
3793                 cli->cl_lost_grant = 0;
3794                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3795                 break;
3796         }
3797         case IMP_EVENT_INACTIVE: {
3798                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3799                 break;
3800         }
3801         case IMP_EVENT_INVALIDATE: {
3802                 struct ldlm_namespace *ns = obd->obd_namespace;
3803                 struct lu_env         *env;
3804                 int                    refcheck;
3805
3806                 env = cl_env_get(&refcheck);
3807                 if (!IS_ERR(env)) {
3808                         /* Reset grants */
3809                         cli = &obd->u.cli;
3810                         client_obd_list_lock(&cli->cl_loi_list_lock);
3811                         /* all pages go to failing rpcs due to the invalid
3812                          * import */
3813                         osc_check_rpcs(env, cli);
3814                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3815
3816                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3817                         cl_env_put(env, &refcheck);
3818                 } else
3819                         rc = PTR_ERR(env);
3820                 break;
3821         }
3822         case IMP_EVENT_ACTIVE: {
3823                 /* Only do this on the MDS OSC's */
3824                 if (imp->imp_server_timeout) {
3825                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3826
3827                         spin_lock(&oscc->oscc_lock);
3828                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3829                         spin_unlock(&oscc->oscc_lock);
3830                 }
3831                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3832                 break;
3833         }
3834         case IMP_EVENT_OCD: {
3835                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3836
3837                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3838                         osc_init_grant(&obd->u.cli, ocd);
3839
3840                 /* See bug 7198 */
3841                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3842                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3843
3844                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3845                 break;
3846         }
3847         default:
3848                 CERROR("Unknown import event %d\n", event);
3849                 LBUG();
3850         }
3851         RETURN(rc);
3852 }
3853
3854 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3855 {
3856         int rc;
3857         ENTRY;
3858
3859         ENTRY;
3860         rc = ptlrpcd_addref();
3861         if (rc)
3862                 RETURN(rc);
3863
3864         rc = client_obd_setup(obd, lcfg);
3865         if (rc) {
3866                 ptlrpcd_decref();
3867         } else {
3868                 struct lprocfs_static_vars lvars = { 0 };
3869                 struct client_obd *cli = &obd->u.cli;
3870
3871                 lprocfs_osc_init_vars(&lvars);
3872                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3873                         lproc_osc_attach_seqstat(obd);
3874                         sptlrpc_lprocfs_cliobd_attach(obd);
3875                         ptlrpc_lprocfs_register_obd(obd);
3876                 }
3877
3878                 oscc_init(obd);
3879                 /* We need to allocate a few requests more, because
3880                    brw_interpret tries to create new requests before freeing
3881                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3882                    reserved, but I afraid that might be too much wasted RAM
3883                    in fact, so 2 is just my guess and still should work. */
3884                 cli->cl_import->imp_rq_pool =
3885                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3886                                             OST_MAXREQSIZE,
3887                                             ptlrpc_add_rqs_to_pool);
3888         }
3889
3890         RETURN(rc);
3891 }
3892
3893 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3894 {
3895         int rc = 0;
3896         ENTRY;
3897
3898         switch (stage) {
3899         case OBD_CLEANUP_EARLY: {
3900                 struct obd_import *imp;
3901                 imp = obd->u.cli.cl_import;
3902                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3903                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3904                 ptlrpc_deactivate_import(imp);
3905                 spin_lock(&imp->imp_lock);
3906                 imp->imp_pingable = 0;
3907                 spin_unlock(&imp->imp_lock);
3908                 break;
3909         }
3910         case OBD_CLEANUP_EXPORTS: {
3911                 /* If we set up but never connected, the
3912                    client import will not have been cleaned. */
3913                 if (obd->u.cli.cl_import) {
3914                         struct obd_import *imp;
3915                         down_write(&obd->u.cli.cl_sem);
3916                         imp = obd->u.cli.cl_import;
3917                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3918                                obd->obd_name);
3919                         ptlrpc_invalidate_import(imp);
3920                         if (imp->imp_rq_pool) {
3921                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3922                                 imp->imp_rq_pool = NULL;
3923                         }
3924                         class_destroy_import(imp);
3925                         up_write(&obd->u.cli.cl_sem);
3926                         obd->u.cli.cl_import = NULL;
3927                 }
3928                 rc = obd_llog_finish(obd, 0);
3929                 if (rc != 0)
3930                         CERROR("failed to cleanup llogging subsystems\n");
3931                 break;
3932                 }
3933         }
3934         RETURN(rc);
3935 }
3936
3937 int osc_cleanup(struct obd_device *obd)
3938 {
3939         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3940         int rc;
3941
3942         ENTRY;
3943         ptlrpc_lprocfs_unregister_obd(obd);
3944         lprocfs_obd_cleanup(obd);
3945
3946         spin_lock(&oscc->oscc_lock);
3947         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3948         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3949         spin_unlock(&oscc->oscc_lock);
3950
3951         /* free memory of osc quota cache */
3952         lquota_cleanup(quota_interface, obd);
3953
3954         rc = client_obd_cleanup(obd);
3955
3956         ptlrpcd_decref();
3957         RETURN(rc);
3958 }
3959
3960 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3961 {
3962         struct lprocfs_static_vars lvars = { 0 };
3963         int rc = 0;
3964
3965         lprocfs_osc_init_vars(&lvars);
3966
3967         switch (lcfg->lcfg_command) {
3968         case LCFG_SPTLRPC_CONF:
3969                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
3970                 break;
3971         default:
3972                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3973                                               lcfg, obd);
3974                 if (rc > 0)
3975                         rc = 0;
3976                 break;
3977         }
3978
3979         return(rc);
3980 }
3981
3982 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3983 {
3984         return osc_process_config_base(obd, buf);
3985 }
3986
3987 struct obd_ops osc_obd_ops = {
3988         .o_owner                = THIS_MODULE,
3989         .o_setup                = osc_setup,
3990         .o_precleanup           = osc_precleanup,
3991         .o_cleanup              = osc_cleanup,
3992         .o_add_conn             = client_import_add_conn,
3993         .o_del_conn             = client_import_del_conn,
3994         .o_connect              = client_connect_import,
3995         .o_reconnect            = osc_reconnect,
3996         .o_disconnect           = osc_disconnect,
3997         .o_statfs               = osc_statfs,
3998         .o_statfs_async         = osc_statfs_async,
3999         .o_packmd               = osc_packmd,
4000         .o_unpackmd             = osc_unpackmd,
4001         .o_precreate            = osc_precreate,
4002         .o_create               = osc_create,
4003         .o_destroy              = osc_destroy,
4004         .o_getattr              = osc_getattr,
4005         .o_getattr_async        = osc_getattr_async,
4006         .o_setattr              = osc_setattr,
4007         .o_setattr_async        = osc_setattr_async,
4008         .o_brw                  = osc_brw,
4009         .o_punch                = osc_punch,
4010         .o_sync                 = osc_sync,
4011         .o_enqueue              = osc_enqueue,
4012         .o_change_cbdata        = osc_change_cbdata,
4013         .o_cancel               = osc_cancel,
4014         .o_cancel_unused        = osc_cancel_unused,
4015         .o_iocontrol            = osc_iocontrol,
4016         .o_get_info             = osc_get_info,
4017         .o_set_info_async       = osc_set_info_async,
4018         .o_import_event         = osc_import_event,
4019         .o_llog_init            = osc_llog_init,
4020         .o_llog_finish          = osc_llog_finish,
4021         .o_process_config       = osc_process_config,
4022 };
4023
4024 extern struct lu_kmem_descr  osc_caches[];
4025 extern spinlock_t            osc_ast_guard;
4026 extern struct lock_class_key osc_ast_guard_class;
4027
4028 int __init osc_init(void)
4029 {
4030         struct lprocfs_static_vars lvars = { 0 };
4031         int rc;
4032         ENTRY;
4033
4034         /* print an address of _any_ initialized kernel symbol from this
4035          * module, to allow debugging with gdb that doesn't support data
4036          * symbols from modules.*/
4037         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4038
4039         rc = lu_kmem_init(osc_caches);
4040
4041         lprocfs_osc_init_vars(&lvars);
4042
4043         request_module("lquota");
4044         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4045         lquota_init(quota_interface);
4046         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4047
4048         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4049                                  LUSTRE_OSC_NAME, &osc_device_type);
4050         if (rc) {
4051                 if (quota_interface)
4052                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4053                 lu_kmem_fini(osc_caches);
4054                 RETURN(rc);
4055         }
4056
4057         spin_lock_init(&osc_ast_guard);
4058         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4059
4060         RETURN(rc);
4061 }
4062
4063 #ifdef __KERNEL__
4064 static void /*__exit*/ osc_exit(void)
4065 {
4066         lu_device_type_fini(&osc_device_type);
4067
4068         lquota_exit(quota_interface);
4069         if (quota_interface)
4070                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4071
4072         class_unregister_type(LUSTRE_OSC_NAME);
4073         lu_kmem_fini(osc_caches);
4074 }
4075
4076 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4077 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4078 MODULE_LICENSE("GPL");
4079
4080 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4081 #endif