Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798 }
799
800 /* caller must hold loi_list_lock */
801 static void osc_consume_write_grant(struct client_obd *cli,
802                                     struct brw_page *pga)
803 {
804         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
805         atomic_inc(&obd_dirty_pages);
806         cli->cl_dirty += CFS_PAGE_SIZE;
807         cli->cl_avail_grant -= CFS_PAGE_SIZE;
808         pga->flag |= OBD_BRW_FROM_GRANT;
809         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
810                CFS_PAGE_SIZE, pga, pga->pg);
811         LASSERT(cli->cl_avail_grant >= 0);
812 }
813
814 /* the companion to osc_consume_write_grant, called when a brw has completed.
815  * must be called with the loi lock held. */
816 static void osc_release_write_grant(struct client_obd *cli,
817                                     struct brw_page *pga, int sent)
818 {
819         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
820         ENTRY;
821
822         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
823                 EXIT;
824                 return;
825         }
826
827         pga->flag &= ~OBD_BRW_FROM_GRANT;
828         atomic_dec(&obd_dirty_pages);
829         cli->cl_dirty -= CFS_PAGE_SIZE;
830         if (pga->flag & OBD_BRW_NOCACHE) {
831                 pga->flag &= ~OBD_BRW_NOCACHE;
832                 atomic_dec(&obd_dirty_transit_pages);
833                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
834         }
835         if (!sent) {
836                 cli->cl_lost_grant += CFS_PAGE_SIZE;
837                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
838                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
839         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
840                 /* For short writes we shouldn't count parts of pages that
841                  * span a whole block on the OST side, or our accounting goes
842                  * wrong.  Should match the code in filter_grant_check. */
843                 int offset = pga->off & ~CFS_PAGE_MASK;
844                 int count = pga->count + (offset & (blocksize - 1));
845                 int end = (offset + pga->count) & (blocksize - 1);
846                 if (end)
847                         count += blocksize - end;
848
849                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
850                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
851                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
852                        cli->cl_avail_grant, cli->cl_dirty);
853         }
854
855         EXIT;
856 }
857
858 static unsigned long rpcs_in_flight(struct client_obd *cli)
859 {
860         return cli->cl_r_in_flight + cli->cl_w_in_flight;
861 }
862
863 /* caller must hold loi_list_lock */
864 void osc_wake_cache_waiters(struct client_obd *cli)
865 {
866         struct list_head *l, *tmp;
867         struct osc_cache_waiter *ocw;
868
869         ENTRY;
870         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
871                 /* if we can't dirty more, we must wait until some is written */
872                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
873                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
874                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
875                                "osc max %ld, sys max %d\n", cli->cl_dirty,
876                                cli->cl_dirty_max, obd_max_dirty_pages);
877                         return;
878                 }
879
880                 /* if still dirty cache but no grant wait for pending RPCs that
881                  * may yet return us some grant before doing sync writes */
882                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
883                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
884                                cli->cl_w_in_flight);
885                         return;
886                 }
887
888                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
889                 list_del_init(&ocw->ocw_entry);
890                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
891                         /* no more RPCs in flight to return grant, do sync IO */
892                         ocw->ocw_rc = -EDQUOT;
893                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
894                 } else {
895                         osc_consume_write_grant(cli,
896                                                 &ocw->ocw_oap->oap_brw_page);
897                 }
898
899                 cfs_waitq_signal(&ocw->ocw_waitq);
900         }
901
902         EXIT;
903 }
904
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 {
907         client_obd_list_lock(&cli->cl_loi_list_lock);
908         cli->cl_avail_grant = ocd->ocd_grant;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
912                cli->cl_avail_grant, cli->cl_lost_grant);
913         LASSERT(cli->cl_avail_grant >= 0);
914 }
915
916 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 {
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
920         if (body->oa.o_valid & OBD_MD_FLGRANT)
921                 cli->cl_avail_grant += body->oa.o_grant;
922         /* waiters are woken in brw_interpret */
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924 }
925
926 /* We assume that the reason this OSC got a short read is because it read
927  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
928  * via the LOV, and it _knows_ it's reading inside the file, it's just that
929  * this stripe never got written at or beyond this stripe offset yet. */
930 static void handle_short_read(int nob_read, obd_count page_count,
931                               struct brw_page **pga)
932 {
933         char *ptr;
934         int i = 0;
935
936         /* skip bytes read OK */
937         while (nob_read > 0) {
938                 LASSERT (page_count > 0);
939
940                 if (pga[i]->count > nob_read) {
941                         /* EOF inside this page */
942                         ptr = cfs_kmap(pga[i]->pg) +
943                                 (pga[i]->off & ~CFS_PAGE_MASK);
944                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
945                         cfs_kunmap(pga[i]->pg);
946                         page_count--;
947                         i++;
948                         break;
949                 }
950
951                 nob_read -= pga[i]->count;
952                 page_count--;
953                 i++;
954         }
955
956         /* zero remaining pages */
957         while (page_count-- > 0) {
958                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
959                 memset(ptr, 0, pga[i]->count);
960                 cfs_kunmap(pga[i]->pg);
961                 i++;
962         }
963 }
964
965 static int check_write_rcs(struct ptlrpc_request *req,
966                            int requested_nob, int niocount,
967                            obd_count page_count, struct brw_page **pga)
968 {
969         int    *remote_rcs, i;
970
971         /* return error if any niobuf was in error */
972         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
973                                         sizeof(*remote_rcs) * niocount, NULL);
974         if (remote_rcs == NULL) {
975                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
976                 return(-EPROTO);
977         }
978         if (lustre_msg_swabbed(req->rq_repmsg))
979                 for (i = 0; i < niocount; i++)
980                         __swab32s(&remote_rcs[i]);
981
982         for (i = 0; i < niocount; i++) {
983                 if (remote_rcs[i] < 0)
984                         return(remote_rcs[i]);
985
986                 if (remote_rcs[i] != 0) {
987                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
988                                 i, remote_rcs[i], req);
989                         return(-EPROTO);
990                 }
991         }
992
993         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
994                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
995                        req->rq_bulk->bd_nob_transferred, requested_nob);
996                 return(-EPROTO);
997         }
998
999         return (0);
1000 }
1001
1002 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1003 {
1004         if (p1->flag != p2->flag) {
1005                 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
1006
1007                 /* warn if we try to combine flags that we don't know to be
1008                  * safe to combine */
1009                 if ((p1->flag & mask) != (p2->flag & mask))
1010                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1011                                "same brw?\n", p1->flag, p2->flag);
1012                 return 0;
1013         }
1014
1015         return (p1->off + p1->count == p2->off);
1016 }
1017
1018 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1019                                    struct brw_page **pga, int opc,
1020                                    cksum_type_t cksum_type)
1021 {
1022         __u32 cksum;
1023         int i = 0;
1024
1025         LASSERT (pg_count > 0);
1026         cksum = init_checksum(cksum_type);
1027         while (nob > 0 && pg_count > 0) {
1028                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1029                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1030                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1031
1032                 /* corrupt the data before we compute the checksum, to
1033                  * simulate an OST->client data error */
1034                 if (i == 0 && opc == OST_READ &&
1035                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1036                         memcpy(ptr + off, "bad1", min(4, nob));
1037                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1038                 cfs_kunmap(pga[i]->pg);
1039                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1040                                off, cksum);
1041
1042                 nob -= pga[i]->count;
1043                 pg_count--;
1044                 i++;
1045         }
1046         /* For sending we only compute the wrong checksum instead
1047          * of corrupting the data so it is still correct on a redo */
1048         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1049                 cksum++;
1050
1051         return cksum;
1052 }
1053
1054 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1055                                 struct lov_stripe_md *lsm, obd_count page_count,
1056                                 struct brw_page **pga,
1057                                 struct ptlrpc_request **reqp,
1058                                 struct obd_capa *ocapa, int reserve)
1059 {
1060         struct ptlrpc_request   *req;
1061         struct ptlrpc_bulk_desc *desc;
1062         struct ost_body         *body;
1063         struct obd_ioobj        *ioobj;
1064         struct niobuf_remote    *niobuf;
1065         int niocount, i, requested_nob, opc, rc;
1066         struct osc_brw_async_args *aa;
1067         struct req_capsule      *pill;
1068         struct brw_page *pg_prev;
1069
1070         ENTRY;
1071         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1072                 RETURN(-ENOMEM); /* Recoverable */
1073         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1074                 RETURN(-EINVAL); /* Fatal */
1075
1076         if ((cmd & OBD_BRW_WRITE) != 0) {
1077                 opc = OST_WRITE;
1078                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1079                                                 cli->cl_import->imp_rq_pool,
1080                                                 &RQF_OST_BRW);
1081         } else {
1082                 opc = OST_READ;
1083                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1084         }
1085         if (req == NULL)
1086                 RETURN(-ENOMEM);
1087
1088         for (niocount = i = 1; i < page_count; i++) {
1089                 if (!can_merge_pages(pga[i - 1], pga[i]))
1090                         niocount++;
1091         }
1092
1093         pill = &req->rq_pill;
1094         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1095                              niocount * sizeof(*niobuf));
1096         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1097
1098         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1099         if (rc) {
1100                 ptlrpc_request_free(req);
1101                 RETURN(rc);
1102         }
1103         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1104         ptlrpc_at_set_req_timeout(req);
1105
1106         if (opc == OST_WRITE)
1107                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1108                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1109         else
1110                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1111                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1112
1113         if (desc == NULL)
1114                 GOTO(out, rc = -ENOMEM);
1115         /* NB request now owns desc and will free it when it gets freed */
1116
1117         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1118         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1119         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1120         LASSERT(body && ioobj && niobuf);
1121
1122         body->oa = *oa;
1123
1124         obdo_to_ioobj(oa, ioobj);
1125         ioobj->ioo_bufcnt = niocount;
1126         osc_pack_capa(req, body, ocapa);
1127         LASSERT (page_count > 0);
1128         pg_prev = pga[0];
1129         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1130                 struct brw_page *pg = pga[i];
1131
1132                 LASSERT(pg->count > 0);
1133                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1134                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1135                          pg->off, pg->count);
1136 #ifdef __linux__
1137                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1138                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1139                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1140                          i, page_count,
1141                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1142                          pg_prev->pg, page_private(pg_prev->pg),
1143                          pg_prev->pg->index, pg_prev->off);
1144 #else
1145                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1146                          "i %d p_c %u\n", i, page_count);
1147 #endif
1148                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1149                         (pg->flag & OBD_BRW_SRVLOCK));
1150
1151                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1152                                       pg->count);
1153                 requested_nob += pg->count;
1154
1155                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1156                         niobuf--;
1157                         niobuf->len += pg->count;
1158                 } else {
1159                         niobuf->offset = pg->off;
1160                         niobuf->len    = pg->count;
1161                         niobuf->flags  = pg->flag;
1162                 }
1163                 pg_prev = pg;
1164         }
1165
1166         LASSERTF((void *)(niobuf - niocount) ==
1167                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1168                                niocount * sizeof(*niobuf)),
1169                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1170                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1171                 (void *)(niobuf - niocount));
1172
1173         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1174
1175         /* size[REQ_REC_OFF] still sizeof (*body) */
1176         if (opc == OST_WRITE) {
1177                 if (unlikely(cli->cl_checksum) &&
1178                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1179                         /* store cl_cksum_type in a local variable since
1180                          * it can be changed via lprocfs */
1181                         cksum_type_t cksum_type = cli->cl_cksum_type;
1182
1183                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1184                                 oa->o_flags = body->oa.o_flags = 0;
1185                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1186                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1187                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1188                                                              page_count, pga,
1189                                                              OST_WRITE,
1190                                                              cksum_type);
1191                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1192                                body->oa.o_cksum);
1193                         /* save this in 'oa', too, for later checking */
1194                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1195                         oa->o_flags |= cksum_type_pack(cksum_type);
1196                 } else {
1197                         /* clear out the checksum flag, in case this is a
1198                          * resend but cl_checksum is no longer set. b=11238 */
1199                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1200                 }
1201                 oa->o_cksum = body->oa.o_cksum;
1202                 /* 1 RC per niobuf */
1203                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1204                                      sizeof(__u32) * niocount);
1205         } else {
1206                 if (unlikely(cli->cl_checksum) &&
1207                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1208                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1209                                 body->oa.o_flags = 0;
1210                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1211                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1212                 }
1213                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1214                 /* 1 RC for the whole I/O */
1215         }
1216         ptlrpc_request_set_replen(req);
1217
1218         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1219         aa = ptlrpc_req_async_args(req);
1220         aa->aa_oa = oa;
1221         aa->aa_requested_nob = requested_nob;
1222         aa->aa_nio_count = niocount;
1223         aa->aa_page_count = page_count;
1224         aa->aa_resends = 0;
1225         aa->aa_ppga = pga;
1226         aa->aa_cli = cli;
1227         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1228         if (ocapa && reserve)
1229                 aa->aa_ocapa = capa_get(ocapa);
1230
1231         *reqp = req;
1232         RETURN(0);
1233
1234  out:
1235         ptlrpc_req_finished(req);
1236         RETURN(rc);
1237 }
1238
1239 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1240                                 __u32 client_cksum, __u32 server_cksum, int nob,
1241                                 obd_count page_count, struct brw_page **pga,
1242                                 cksum_type_t client_cksum_type)
1243 {
1244         __u32 new_cksum;
1245         char *msg;
1246         cksum_type_t cksum_type;
1247
1248         if (server_cksum == client_cksum) {
1249                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1250                 return 0;
1251         }
1252
1253         if (oa->o_valid & OBD_MD_FLFLAGS)
1254                 cksum_type = cksum_type_unpack(oa->o_flags);
1255         else
1256                 cksum_type = OBD_CKSUM_CRC32;
1257
1258         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1259                                       cksum_type);
1260
1261         if (cksum_type != client_cksum_type)
1262                 msg = "the server did not use the checksum type specified in "
1263                       "the original request - likely a protocol problem";
1264         else if (new_cksum == server_cksum)
1265                 msg = "changed on the client after we checksummed it - "
1266                       "likely false positive due to mmap IO (bug 11742)";
1267         else if (new_cksum == client_cksum)
1268                 msg = "changed in transit before arrival at OST";
1269         else
1270                 msg = "changed in transit AND doesn't match the original - "
1271                       "likely false positive due to mmap IO (bug 11742)";
1272
1273         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1274                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1275                            "["LPU64"-"LPU64"]\n",
1276                            msg, libcfs_nid2str(peer->nid),
1277                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1278                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1279                                                         (__u64)0,
1280                            oa->o_id,
1281                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1282                            pga[0]->off,
1283                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1284         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1285                "client csum now %x\n", client_cksum, client_cksum_type,
1286                server_cksum, cksum_type, new_cksum);
1287         return 1;
1288 }
1289
1290 /* Note rc enters this function as number of bytes transferred */
1291 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1292 {
1293         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1294         const lnet_process_id_t *peer =
1295                         &req->rq_import->imp_connection->c_peer;
1296         struct client_obd *cli = aa->aa_cli;
1297         struct ost_body *body;
1298         __u32 client_cksum = 0;
1299         ENTRY;
1300
1301         if (rc < 0 && rc != -EDQUOT)
1302                 RETURN(rc);
1303
1304         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1306                                   lustre_swab_ost_body);
1307         if (body == NULL) {
1308                 CDEBUG(D_INFO, "Can't unpack body\n");
1309                 RETURN(-EPROTO);
1310         }
1311
1312         /* set/clear over quota flag for a uid/gid */
1313         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1314             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1315                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1316                              body->oa.o_gid, body->oa.o_valid,
1317                              body->oa.o_flags);
1318
1319         if (rc < 0)
1320                 RETURN(rc);
1321
1322         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1323                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1324
1325         osc_update_grant(cli, body);
1326
1327         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1328                 if (rc > 0) {
1329                         CERROR("Unexpected +ve rc %d\n", rc);
1330                         RETURN(-EPROTO);
1331                 }
1332                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1333
1334                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1335                         RETURN(-EAGAIN);
1336
1337                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1338                     check_write_checksum(&body->oa, peer, client_cksum,
1339                                          body->oa.o_cksum, aa->aa_requested_nob,
1340                                          aa->aa_page_count, aa->aa_ppga,
1341                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1342                         RETURN(-EAGAIN);
1343
1344                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1345                                      aa->aa_page_count, aa->aa_ppga);
1346                 GOTO(out, rc);
1347         }
1348
1349         /* The rest of this function executes only for OST_READs */
1350
1351         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1352         if (rc < 0)
1353                 GOTO(out, rc);
1354
1355         if (rc > aa->aa_requested_nob) {
1356                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1357                        aa->aa_requested_nob);
1358                 RETURN(-EPROTO);
1359         }
1360
1361         if (rc != req->rq_bulk->bd_nob_transferred) {
1362                 CERROR ("Unexpected rc %d (%d transferred)\n",
1363                         rc, req->rq_bulk->bd_nob_transferred);
1364                 return (-EPROTO);
1365         }
1366
1367         if (rc < aa->aa_requested_nob)
1368                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1369
1370         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1371                 static int cksum_counter;
1372                 __u32      server_cksum = body->oa.o_cksum;
1373                 char      *via;
1374                 char      *router;
1375                 cksum_type_t cksum_type;
1376
1377                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1378                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1379                 else
1380                         cksum_type = OBD_CKSUM_CRC32;
1381                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1382                                                  aa->aa_ppga, OST_READ,
1383                                                  cksum_type);
1384
1385                 if (peer->nid == req->rq_bulk->bd_sender) {
1386                         via = router = "";
1387                 } else {
1388                         via = " via ";
1389                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1390                 }
1391
1392                 if (server_cksum == ~0 && rc > 0) {
1393                         CERROR("Protocol error: server %s set the 'checksum' "
1394                                "bit, but didn't send a checksum.  Not fatal, "
1395                                "but please notify on http://bugzilla.lustre.org/\n",
1396                                libcfs_nid2str(peer->nid));
1397                 } else if (server_cksum != client_cksum) {
1398                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1399                                            "%s%s%s inum "LPU64"/"LPU64" object "
1400                                            LPU64"/"LPU64" extent "
1401                                            "["LPU64"-"LPU64"]\n",
1402                                            req->rq_import->imp_obd->obd_name,
1403                                            libcfs_nid2str(peer->nid),
1404                                            via, router,
1405                                            body->oa.o_valid & OBD_MD_FLFID ?
1406                                                 body->oa.o_fid : (__u64)0,
1407                                            body->oa.o_valid & OBD_MD_FLFID ?
1408                                                 body->oa.o_generation :(__u64)0,
1409                                            body->oa.o_id,
1410                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1411                                                 body->oa.o_gr : (__u64)0,
1412                                            aa->aa_ppga[0]->off,
1413                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1414                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1415                                                                         1);
1416                         CERROR("client %x, server %x, cksum_type %x\n",
1417                                client_cksum, server_cksum, cksum_type);
1418                         cksum_counter = 0;
1419                         aa->aa_oa->o_cksum = client_cksum;
1420                         rc = -EAGAIN;
1421                 } else {
1422                         cksum_counter++;
1423                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1424                         rc = 0;
1425                 }
1426         } else if (unlikely(client_cksum)) {
1427                 static int cksum_missed;
1428
1429                 cksum_missed++;
1430                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1431                         CERROR("Checksum %u requested from %s but not sent\n",
1432                                cksum_missed, libcfs_nid2str(peer->nid));
1433         } else {
1434                 rc = 0;
1435         }
1436 out:
1437         if (rc >= 0)
1438                 *aa->aa_oa = body->oa;
1439
1440         RETURN(rc);
1441 }
1442
1443 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1444                             struct lov_stripe_md *lsm,
1445                             obd_count page_count, struct brw_page **pga,
1446                             struct obd_capa *ocapa)
1447 {
1448         struct ptlrpc_request *req;
1449         int                    rc;
1450         cfs_waitq_t            waitq;
1451         int                    resends = 0;
1452         struct l_wait_info     lwi;
1453
1454         ENTRY;
1455
1456         cfs_waitq_init(&waitq);
1457
1458 restart_bulk:
1459         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1460                                   page_count, pga, &req, ocapa, 0);
1461         if (rc != 0)
1462                 return (rc);
1463
1464         rc = ptlrpc_queue_wait(req);
1465
1466         if (rc == -ETIMEDOUT && req->rq_resend) {
1467                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1468                 ptlrpc_req_finished(req);
1469                 goto restart_bulk;
1470         }
1471
1472         rc = osc_brw_fini_request(req, rc);
1473
1474         ptlrpc_req_finished(req);
1475         if (osc_recoverable_error(rc)) {
1476                 resends++;
1477                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1478                         CERROR("too many resend retries, returning error\n");
1479                         RETURN(-EIO);
1480                 }
1481
1482                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1483                 l_wait_event(waitq, 0, &lwi);
1484
1485                 goto restart_bulk;
1486         }
1487
1488         RETURN (rc);
1489 }
1490
1491 int osc_brw_redo_request(struct ptlrpc_request *request,
1492                          struct osc_brw_async_args *aa)
1493 {
1494         struct ptlrpc_request *new_req;
1495         struct ptlrpc_request_set *set = request->rq_set;
1496         struct osc_brw_async_args *new_aa;
1497         struct osc_async_page *oap;
1498         int rc = 0;
1499         ENTRY;
1500
1501         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1502                 CERROR("too many resend retries, returning error\n");
1503                 RETURN(-EIO);
1504         }
1505
1506         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1507
1508         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1509                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1510                                   aa->aa_cli, aa->aa_oa,
1511                                   NULL /* lsm unused by osc currently */,
1512                                   aa->aa_page_count, aa->aa_ppga,
1513                                   &new_req, aa->aa_ocapa, 0);
1514         if (rc)
1515                 RETURN(rc);
1516
1517         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1518
1519         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1520                 if (oap->oap_request != NULL) {
1521                         LASSERTF(request == oap->oap_request,
1522                                  "request %p != oap_request %p\n",
1523                                  request, oap->oap_request);
1524                         if (oap->oap_interrupted) {
1525                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1526                                 ptlrpc_req_finished(new_req);
1527                                 RETURN(-EINTR);
1528                         }
1529                 }
1530         }
1531         /* New request takes over pga and oaps from old request.
1532          * Note that copying a list_head doesn't work, need to move it... */
1533         aa->aa_resends++;
1534         new_req->rq_interpret_reply = request->rq_interpret_reply;
1535         new_req->rq_async_args = request->rq_async_args;
1536         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1537
1538         new_aa = ptlrpc_req_async_args(new_req);
1539
1540         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1541         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1542         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1543
1544         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1545                 if (oap->oap_request) {
1546                         ptlrpc_req_finished(oap->oap_request);
1547                         oap->oap_request = ptlrpc_request_addref(new_req);
1548                 }
1549         }
1550
1551         new_aa->aa_ocapa = aa->aa_ocapa;
1552         aa->aa_ocapa = NULL;
1553
1554         /* use ptlrpc_set_add_req is safe because interpret functions work
1555          * in check_set context. only one way exist with access to request
1556          * from different thread got -EINTR - this way protected with
1557          * cl_loi_list_lock */
1558         ptlrpc_set_add_req(set, new_req);
1559
1560         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1561
1562         DEBUG_REQ(D_INFO, new_req, "new request");
1563         RETURN(0);
1564 }
1565
1566 /*
1567  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1568  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1569  * fine for our small page arrays and doesn't require allocation.  its an
1570  * insertion sort that swaps elements that are strides apart, shrinking the
1571  * stride down until its '1' and the array is sorted.
1572  */
1573 static void sort_brw_pages(struct brw_page **array, int num)
1574 {
1575         int stride, i, j;
1576         struct brw_page *tmp;
1577
1578         if (num == 1)
1579                 return;
1580         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1581                 ;
1582
1583         do {
1584                 stride /= 3;
1585                 for (i = stride ; i < num ; i++) {
1586                         tmp = array[i];
1587                         j = i;
1588                         while (j >= stride && array[j - stride]->off > tmp->off) {
1589                                 array[j] = array[j - stride];
1590                                 j -= stride;
1591                         }
1592                         array[j] = tmp;
1593                 }
1594         } while (stride > 1);
1595 }
1596
1597 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1598 {
1599         int count = 1;
1600         int offset;
1601         int i = 0;
1602
1603         LASSERT (pages > 0);
1604         offset = pg[i]->off & ~CFS_PAGE_MASK;
1605
1606         for (;;) {
1607                 pages--;
1608                 if (pages == 0)         /* that's all */
1609                         return count;
1610
1611                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1612                         return count;   /* doesn't end on page boundary */
1613
1614                 i++;
1615                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1616                 if (offset != 0)        /* doesn't start on page boundary */
1617                         return count;
1618
1619                 count++;
1620         }
1621 }
1622
1623 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1624 {
1625         struct brw_page **ppga;
1626         int i;
1627
1628         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1629         if (ppga == NULL)
1630                 return NULL;
1631
1632         for (i = 0; i < count; i++)
1633                 ppga[i] = pga + i;
1634         return ppga;
1635 }
1636
1637 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1638 {
1639         LASSERT(ppga != NULL);
1640         OBD_FREE(ppga, sizeof(*ppga) * count);
1641 }
1642
1643 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1644                    obd_count page_count, struct brw_page *pga,
1645                    struct obd_trans_info *oti)
1646 {
1647         struct obdo *saved_oa = NULL;
1648         struct brw_page **ppga, **orig;
1649         struct obd_import *imp = class_exp2cliimp(exp);
1650         struct client_obd *cli = &imp->imp_obd->u.cli;
1651         int rc, page_count_orig;
1652         ENTRY;
1653
1654         if (cmd & OBD_BRW_CHECK) {
1655                 /* The caller just wants to know if there's a chance that this
1656                  * I/O can succeed */
1657
1658                 if (imp == NULL || imp->imp_invalid)
1659                         RETURN(-EIO);
1660                 RETURN(0);
1661         }
1662
1663         /* test_brw with a failed create can trip this, maybe others. */
1664         LASSERT(cli->cl_max_pages_per_rpc);
1665
1666         rc = 0;
1667
1668         orig = ppga = osc_build_ppga(pga, page_count);
1669         if (ppga == NULL)
1670                 RETURN(-ENOMEM);
1671         page_count_orig = page_count;
1672
1673         sort_brw_pages(ppga, page_count);
1674         while (page_count) {
1675                 obd_count pages_per_brw;
1676
1677                 if (page_count > cli->cl_max_pages_per_rpc)
1678                         pages_per_brw = cli->cl_max_pages_per_rpc;
1679                 else
1680                         pages_per_brw = page_count;
1681
1682                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1683
1684                 if (saved_oa != NULL) {
1685                         /* restore previously saved oa */
1686                         *oinfo->oi_oa = *saved_oa;
1687                 } else if (page_count > pages_per_brw) {
1688                         /* save a copy of oa (brw will clobber it) */
1689                         OBDO_ALLOC(saved_oa);
1690                         if (saved_oa == NULL)
1691                                 GOTO(out, rc = -ENOMEM);
1692                         *saved_oa = *oinfo->oi_oa;
1693                 }
1694
1695                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1696                                       pages_per_brw, ppga, oinfo->oi_capa);
1697
1698                 if (rc != 0)
1699                         break;
1700
1701                 page_count -= pages_per_brw;
1702                 ppga += pages_per_brw;
1703         }
1704
1705 out:
1706         osc_release_ppga(orig, page_count_orig);
1707
1708         if (saved_oa != NULL)
1709                 OBDO_FREE(saved_oa);
1710
1711         RETURN(rc);
1712 }
1713
1714 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1715  * the dirty accounting.  Writeback completes or truncate happens before
1716  * writing starts.  Must be called with the loi lock held. */
1717 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1718                            int sent)
1719 {
1720         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1721 }
1722
1723
1724 /* This maintains the lists of pending pages to read/write for a given object
1725  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1726  * to quickly find objects that are ready to send an RPC. */
1727 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1728                          int cmd)
1729 {
1730         int optimal;
1731         ENTRY;
1732
1733         if (lop->lop_num_pending == 0)
1734                 RETURN(0);
1735
1736         /* if we have an invalid import we want to drain the queued pages
1737          * by forcing them through rpcs that immediately fail and complete
1738          * the pages.  recovery relies on this to empty the queued pages
1739          * before canceling the locks and evicting down the llite pages */
1740         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1741                 RETURN(1);
1742
1743         /* stream rpcs in queue order as long as as there is an urgent page
1744          * queued.  this is our cheap solution for good batching in the case
1745          * where writepage marks some random page in the middle of the file
1746          * as urgent because of, say, memory pressure */
1747         if (!list_empty(&lop->lop_urgent)) {
1748                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1749                 RETURN(1);
1750         }
1751         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1752         optimal = cli->cl_max_pages_per_rpc;
1753         if (cmd & OBD_BRW_WRITE) {
1754                 /* trigger a write rpc stream as long as there are dirtiers
1755                  * waiting for space.  as they're waiting, they're not going to
1756                  * create more pages to coallesce with what's waiting.. */
1757                 if (!list_empty(&cli->cl_cache_waiters)) {
1758                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1759                         RETURN(1);
1760                 }
1761                 /* +16 to avoid triggering rpcs that would want to include pages
1762                  * that are being queued but which can't be made ready until
1763                  * the queuer finishes with the page. this is a wart for
1764                  * llite::commit_write() */
1765                 optimal += 16;
1766         }
1767         if (lop->lop_num_pending >= optimal)
1768                 RETURN(1);
1769
1770         RETURN(0);
1771 }
1772
1773 static void on_list(struct list_head *item, struct list_head *list,
1774                     int should_be_on)
1775 {
1776         if (list_empty(item) && should_be_on)
1777                 list_add_tail(item, list);
1778         else if (!list_empty(item) && !should_be_on)
1779                 list_del_init(item);
1780 }
1781
1782 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1783  * can find pages to build into rpcs quickly */
1784 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1785 {
1786         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1787                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1788                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1789
1790         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1791                 loi->loi_write_lop.lop_num_pending);
1792
1793         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1794                 loi->loi_read_lop.lop_num_pending);
1795 }
1796
1797 static void lop_update_pending(struct client_obd *cli,
1798                                struct loi_oap_pages *lop, int cmd, int delta)
1799 {
1800         lop->lop_num_pending += delta;
1801         if (cmd & OBD_BRW_WRITE)
1802                 cli->cl_pending_w_pages += delta;
1803         else
1804                 cli->cl_pending_r_pages += delta;
1805 }
1806
1807 /**
1808  * this is called when a sync waiter receives an interruption.  Its job is to
1809  * get the caller woken as soon as possible.  If its page hasn't been put in an
1810  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1811  * desiring interruption which will forcefully complete the rpc once the rpc
1812  * has timed out.
1813  */
1814 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1815 {
1816         struct loi_oap_pages *lop;
1817         struct lov_oinfo *loi;
1818         int rc = -EBUSY;
1819         ENTRY;
1820
1821         LASSERT(!oap->oap_interrupted);
1822         oap->oap_interrupted = 1;
1823
1824         /* ok, it's been put in an rpc. only one oap gets a request reference */
1825         if (oap->oap_request != NULL) {
1826                 ptlrpc_mark_interrupted(oap->oap_request);
1827                 ptlrpcd_wake(oap->oap_request);
1828                 ptlrpc_req_finished(oap->oap_request);
1829                 oap->oap_request = NULL;
1830         }
1831
1832         /*
1833          * page completion may be called only if ->cpo_prep() method was
1834          * executed by osc_io_submit(), that also adds page the to pending list
1835          */
1836         if (!list_empty(&oap->oap_pending_item)) {
1837                 list_del_init(&oap->oap_pending_item);
1838                 list_del_init(&oap->oap_urgent_item);
1839
1840                 loi = oap->oap_loi;
1841                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1842                         &loi->loi_write_lop : &loi->loi_read_lop;
1843                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1844                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1845                 rc = oap->oap_caller_ops->ap_completion(env,
1846                                           oap->oap_caller_data,
1847                                           oap->oap_cmd, NULL, -EINTR);
1848         }
1849
1850         RETURN(rc);
1851 }
1852
1853 /* this is trying to propogate async writeback errors back up to the
1854  * application.  As an async write fails we record the error code for later if
1855  * the app does an fsync.  As long as errors persist we force future rpcs to be
1856  * sync so that the app can get a sync error and break the cycle of queueing
1857  * pages for which writeback will fail. */
1858 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1859                            int rc)
1860 {
1861         if (rc) {
1862                 if (!ar->ar_rc)
1863                         ar->ar_rc = rc;
1864
1865                 ar->ar_force_sync = 1;
1866                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1867                 return;
1868
1869         }
1870
1871         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1872                 ar->ar_force_sync = 0;
1873 }
1874
1875 void osc_oap_to_pending(struct osc_async_page *oap)
1876 {
1877         struct loi_oap_pages *lop;
1878
1879         if (oap->oap_cmd & OBD_BRW_WRITE)
1880                 lop = &oap->oap_loi->loi_write_lop;
1881         else
1882                 lop = &oap->oap_loi->loi_read_lop;
1883
1884         if (oap->oap_async_flags & ASYNC_URGENT)
1885                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1886         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1887         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1888 }
1889
1890 /* this must be called holding the loi list lock to give coverage to exit_cache,
1891  * async_flag maintenance, and oap_request */
1892 static void osc_ap_completion(const struct lu_env *env,
1893                               struct client_obd *cli, struct obdo *oa,
1894                               struct osc_async_page *oap, int sent, int rc)
1895 {
1896         __u64 xid = 0;
1897
1898         ENTRY;
1899         if (oap->oap_request != NULL) {
1900                 xid = ptlrpc_req_xid(oap->oap_request);
1901                 ptlrpc_req_finished(oap->oap_request);
1902                 oap->oap_request = NULL;
1903         }
1904
1905         oap->oap_async_flags = 0;
1906         oap->oap_interrupted = 0;
1907
1908         if (oap->oap_cmd & OBD_BRW_WRITE) {
1909                 osc_process_ar(&cli->cl_ar, xid, rc);
1910                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1911         }
1912
1913         if (rc == 0 && oa != NULL) {
1914                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1915                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1916                 if (oa->o_valid & OBD_MD_FLMTIME)
1917                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1918                 if (oa->o_valid & OBD_MD_FLATIME)
1919                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1920                 if (oa->o_valid & OBD_MD_FLCTIME)
1921                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1922         }
1923
1924         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1925                                                 oap->oap_cmd, oa, rc);
1926
1927         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1928          * I/O on the page could start, but OSC calls it under lock
1929          * and thus we can add oap back to pending safely */
1930         if (rc)
1931                 /* upper layer wants to leave the page on pending queue */
1932                 osc_oap_to_pending(oap);
1933         else
1934                 osc_exit_cache(cli, oap, sent);
1935         EXIT;
1936 }
1937
1938 static int brw_interpret(const struct lu_env *env,
1939                          struct ptlrpc_request *req, void *data, int rc)
1940 {
1941         struct osc_brw_async_args *aa = data;
1942         struct client_obd *cli;
1943         int async;
1944         ENTRY;
1945
1946         rc = osc_brw_fini_request(req, rc);
1947         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1948         if (osc_recoverable_error(rc)) {
1949                 rc = osc_brw_redo_request(req, aa);
1950                 if (rc == 0)
1951                         RETURN(0);
1952         }
1953
1954         if (aa->aa_ocapa) {
1955                 capa_put(aa->aa_ocapa);
1956                 aa->aa_ocapa = NULL;
1957         }
1958
1959         cli = aa->aa_cli;
1960
1961         client_obd_list_lock(&cli->cl_loi_list_lock);
1962
1963         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1964          * is called so we know whether to go to sync BRWs or wait for more
1965          * RPCs to complete */
1966         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1967                 cli->cl_w_in_flight--;
1968         else
1969                 cli->cl_r_in_flight--;
1970
1971         async = list_empty(&aa->aa_oaps);
1972         if (!async) { /* from osc_send_oap_rpc() */
1973                 struct osc_async_page *oap, *tmp;
1974                 /* the caller may re-use the oap after the completion call so
1975                  * we need to clean it up a little */
1976                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1977                         list_del_init(&oap->oap_rpc_item);
1978                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1979                 }
1980                 OBDO_FREE(aa->aa_oa);
1981         } else { /* from async_internal() */
1982                 int i;
1983                 for (i = 0; i < aa->aa_page_count; i++)
1984                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1985         }
1986         osc_wake_cache_waiters(cli);
1987         osc_check_rpcs(env, cli);
1988         client_obd_list_unlock(&cli->cl_loi_list_lock);
1989         if (!async)
1990                 cl_req_completion(env, aa->aa_clerq, rc);
1991         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1992         RETURN(rc);
1993 }
1994
1995 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1996                                             struct client_obd *cli,
1997                                             struct list_head *rpc_list,
1998                                             int page_count, int cmd)
1999 {
2000         struct ptlrpc_request *req;
2001         struct brw_page **pga = NULL;
2002         struct osc_brw_async_args *aa;
2003         struct obdo *oa = NULL;
2004         const struct obd_async_page_ops *ops = NULL;
2005         void *caller_data = NULL;
2006         struct osc_async_page *oap;
2007         struct osc_async_page *tmp;
2008         struct ost_body *body;
2009         struct cl_req *clerq = NULL;
2010         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2011         struct ldlm_lock *lock = NULL;
2012         struct cl_req_attr crattr;
2013         int i, rc;
2014
2015         ENTRY;
2016         LASSERT(!list_empty(rpc_list));
2017
2018         memset(&crattr, 0, sizeof crattr);
2019         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2020         if (pga == NULL)
2021                 GOTO(out, req = ERR_PTR(-ENOMEM));
2022
2023         OBDO_ALLOC(oa);
2024         if (oa == NULL)
2025                 GOTO(out, req = ERR_PTR(-ENOMEM));
2026
2027         i = 0;
2028         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2029                 struct cl_page *page = osc_oap2cl_page(oap);
2030                 if (ops == NULL) {
2031                         ops = oap->oap_caller_ops;
2032                         caller_data = oap->oap_caller_data;
2033
2034                         clerq = cl_req_alloc(env, page, crt,
2035                                              1 /* only 1-object rpcs for
2036                                                 * now */);
2037                         if (IS_ERR(clerq))
2038                                 GOTO(out, req = (void *)clerq);
2039                         lock = oap->oap_ldlm_lock;
2040                 }
2041                 pga[i] = &oap->oap_brw_page;
2042                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2043                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2044                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2045                 i++;
2046                 cl_req_page_add(env, clerq, page);
2047         }
2048
2049         /* always get the data for the obdo for the rpc */
2050         LASSERT(ops != NULL);
2051         crattr.cra_oa = oa;
2052         crattr.cra_capa = NULL;
2053         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2054         if (lock) {
2055                 oa->o_handle = lock->l_remote_handle;
2056                 oa->o_valid |= OBD_MD_FLHANDLE;
2057         }
2058
2059         rc = cl_req_prep(env, clerq);
2060         if (rc != 0) {
2061                 CERROR("cl_req_prep failed: %d\n", rc);
2062                 GOTO(out, req = ERR_PTR(rc));
2063         }
2064
2065         sort_brw_pages(pga, page_count);
2066         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2067                                   pga, &req, crattr.cra_capa, 1);
2068         if (rc != 0) {
2069                 CERROR("prep_req failed: %d\n", rc);
2070                 GOTO(out, req = ERR_PTR(rc));
2071         }
2072
2073         /* Need to update the timestamps after the request is built in case
2074          * we race with setattr (locally or in queue at OST).  If OST gets
2075          * later setattr before earlier BRW (as determined by the request xid),
2076          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2077          * way to do this in a single call.  bug 10150 */
2078         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2079         cl_req_attr_set(env, clerq, &crattr,
2080                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2081
2082         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2083         aa = ptlrpc_req_async_args(req);
2084         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2085         list_splice(rpc_list, &aa->aa_oaps);
2086         CFS_INIT_LIST_HEAD(rpc_list);
2087         aa->aa_clerq = clerq;
2088 out:
2089         capa_put(crattr.cra_capa);
2090         if (IS_ERR(req)) {
2091                 if (oa)
2092                         OBDO_FREE(oa);
2093                 if (pga)
2094                         OBD_FREE(pga, sizeof(*pga) * page_count);
2095                 /* this should happen rarely and is pretty bad, it makes the
2096                  * pending list not follow the dirty order */
2097                 client_obd_list_lock(&cli->cl_loi_list_lock);
2098                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2099                         list_del_init(&oap->oap_rpc_item);
2100
2101                         /* queued sync pages can be torn down while the pages
2102                          * were between the pending list and the rpc */
2103                         if (oap->oap_interrupted) {
2104                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2105                                 osc_ap_completion(env, cli, NULL, oap, 0,
2106                                                   oap->oap_count);
2107                                 continue;
2108                         }
2109                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2110                 }
2111                 if (clerq && !IS_ERR(clerq))
2112                         cl_req_completion(env, clerq, PTR_ERR(req));
2113         }
2114         RETURN(req);
2115 }
2116
2117 /**
2118  * prepare pages for ASYNC io and put pages in send queue.
2119  *
2120  * \param cli -
2121  * \param loi -
2122  * \param cmd - OBD_BRW_* macroses
2123  * \param lop - pending pages
2124  *
2125  * \return zero if pages successfully add to send queue.
2126  * \return not zere if error occurring.
2127  */
2128 static int
2129 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2130                  struct lov_oinfo *loi,
2131                  int cmd, struct loi_oap_pages *lop)
2132 {
2133         struct ptlrpc_request *req;
2134         obd_count page_count = 0;
2135         struct osc_async_page *oap = NULL, *tmp;
2136         struct osc_brw_async_args *aa;
2137         const struct obd_async_page_ops *ops;
2138         CFS_LIST_HEAD(rpc_list);
2139         unsigned int ending_offset;
2140         unsigned  starting_offset = 0;
2141         int srvlock = 0;
2142         struct cl_object *clob = NULL;
2143         ENTRY;
2144
2145         /* first we find the pages we're allowed to work with */
2146         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2147                                  oap_pending_item) {
2148                 ops = oap->oap_caller_ops;
2149
2150                 LASSERT(oap->oap_magic == OAP_MAGIC);
2151
2152                 if (clob == NULL) {
2153                         /* pin object in memory, so that completion call-backs
2154                          * can be safely called under client_obd_list lock. */
2155                         clob = osc_oap2cl_page(oap)->cp_obj;
2156                         cl_object_get(clob);
2157                 }
2158
2159                 if (page_count != 0 &&
2160                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2161                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2162                                " oap %p, page %p, srvlock %u\n",
2163                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2164                         break;
2165                 }
2166                 /* in llite being 'ready' equates to the page being locked
2167                  * until completion unlocks it.  commit_write submits a page
2168                  * as not ready because its unlock will happen unconditionally
2169                  * as the call returns.  if we race with commit_write giving
2170                  * us that page we dont' want to create a hole in the page
2171                  * stream, so we stop and leave the rpc to be fired by
2172                  * another dirtier or kupdated interval (the not ready page
2173                  * will still be on the dirty list).  we could call in
2174                  * at the end of ll_file_write to process the queue again. */
2175                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2176                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2177                                                     cmd);
2178                         if (rc < 0)
2179                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2180                                                 "instead of ready\n", oap,
2181                                                 oap->oap_page, rc);
2182                         switch (rc) {
2183                         case -EAGAIN:
2184                                 /* llite is telling us that the page is still
2185                                  * in commit_write and that we should try
2186                                  * and put it in an rpc again later.  we
2187                                  * break out of the loop so we don't create
2188                                  * a hole in the sequence of pages in the rpc
2189                                  * stream.*/
2190                                 oap = NULL;
2191                                 break;
2192                         case -EINTR:
2193                                 /* the io isn't needed.. tell the checks
2194                                  * below to complete the rpc with EINTR */
2195                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2196                                 oap->oap_count = -EINTR;
2197                                 break;
2198                         case 0:
2199                                 oap->oap_async_flags |= ASYNC_READY;
2200                                 break;
2201                         default:
2202                                 LASSERTF(0, "oap %p page %p returned %d "
2203                                             "from make_ready\n", oap,
2204                                             oap->oap_page, rc);
2205                                 break;
2206                         }
2207                 }
2208                 if (oap == NULL)
2209                         break;
2210                 /*
2211                  * Page submitted for IO has to be locked. Either by
2212                  * ->ap_make_ready() or by higher layers.
2213                  */
2214 #if defined(__KERNEL__) && defined(__linux__)
2215                 {
2216                         struct cl_page *page;
2217
2218                         page = osc_oap2cl_page(oap);
2219
2220                         if (page->cp_type == CPT_CACHEABLE &&
2221                             !(PageLocked(oap->oap_page) &&
2222                               (CheckWriteback(oap->oap_page, cmd)))) {
2223                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2224                                        oap->oap_page,
2225                                        (long)oap->oap_page->flags,
2226                                        oap->oap_async_flags);
2227                                 LBUG();
2228                         }
2229                 }
2230 #endif
2231                 /* If there is a gap at the start of this page, it can't merge
2232                  * with any previous page, so we'll hand the network a
2233                  * "fragmented" page array that it can't transfer in 1 RDMA */
2234                 if (page_count != 0 && oap->oap_page_off != 0)
2235                         break;
2236
2237                 /* take the page out of our book-keeping */
2238                 list_del_init(&oap->oap_pending_item);
2239                 lop_update_pending(cli, lop, cmd, -1);
2240                 list_del_init(&oap->oap_urgent_item);
2241
2242                 if (page_count == 0)
2243                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2244                                           (PTLRPC_MAX_BRW_SIZE - 1);
2245
2246                 /* ask the caller for the size of the io as the rpc leaves. */
2247                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2248                         oap->oap_count =
2249                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2250                                                       cmd);
2251                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2252                 }
2253                 if (oap->oap_count <= 0) {
2254                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2255                                oap->oap_count);
2256                         osc_ap_completion(env, cli, NULL,
2257                                           oap, 0, oap->oap_count);
2258                         continue;
2259                 }
2260
2261                 /* now put the page back in our accounting */
2262                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2263                 if (page_count == 0)
2264                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2265                 if (++page_count >= cli->cl_max_pages_per_rpc)
2266                         break;
2267
2268                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2269                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2270                  * have the same alignment as the initial writes that allocated
2271                  * extents on the server. */
2272                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2273                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2274                 if (ending_offset == 0)
2275                         break;
2276
2277                 /* If there is a gap at the end of this page, it can't merge
2278                  * with any subsequent pages, so we'll hand the network a
2279                  * "fragmented" page array that it can't transfer in 1 RDMA */
2280                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2281                         break;
2282         }
2283
2284         osc_wake_cache_waiters(cli);
2285
2286         loi_list_maint(cli, loi);
2287
2288         client_obd_list_unlock(&cli->cl_loi_list_lock);
2289
2290         if (clob != NULL)
2291                 cl_object_put(env, clob);
2292
2293         if (page_count == 0) {
2294                 client_obd_list_lock(&cli->cl_loi_list_lock);
2295                 RETURN(0);
2296         }
2297
2298         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2299         if (IS_ERR(req)) {
2300                 LASSERT(list_empty(&rpc_list));
2301                 loi_list_maint(cli, loi);
2302                 RETURN(PTR_ERR(req));
2303         }
2304
2305         aa = ptlrpc_req_async_args(req);
2306
2307         if (cmd == OBD_BRW_READ) {
2308                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2309                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2310                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2311                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2312         } else {
2313                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2314                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2315                                  cli->cl_w_in_flight);
2316                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2317                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2318         }
2319         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2320
2321         client_obd_list_lock(&cli->cl_loi_list_lock);
2322
2323         if (cmd == OBD_BRW_READ)
2324                 cli->cl_r_in_flight++;
2325         else
2326                 cli->cl_w_in_flight++;
2327
2328         /* queued sync pages can be torn down while the pages
2329          * were between the pending list and the rpc */
2330         tmp = NULL;
2331         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2332                 /* only one oap gets a request reference */
2333                 if (tmp == NULL)
2334                         tmp = oap;
2335                 if (oap->oap_interrupted && !req->rq_intr) {
2336                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2337                                oap, req);
2338                         ptlrpc_mark_interrupted(req);
2339                 }
2340         }
2341         if (tmp != NULL)
2342                 tmp->oap_request = ptlrpc_request_addref(req);
2343
2344         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2345                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2346
2347         req->rq_interpret_reply = brw_interpret;
2348         ptlrpcd_add_req(req, PSCOPE_BRW);
2349         RETURN(1);
2350 }
2351
2352 #define LOI_DEBUG(LOI, STR, args...)                                     \
2353         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2354                !list_empty(&(LOI)->loi_cli_item),                        \
2355                (LOI)->loi_write_lop.lop_num_pending,                     \
2356                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2357                (LOI)->loi_read_lop.lop_num_pending,                      \
2358                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2359                args)                                                     \
2360
2361 /* This is called by osc_check_rpcs() to find which objects have pages that
2362  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2363 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2364 {
2365         ENTRY;
2366         /* first return all objects which we already know to have
2367          * pages ready to be stuffed into rpcs */
2368         if (!list_empty(&cli->cl_loi_ready_list))
2369                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2370                                   struct lov_oinfo, loi_cli_item));
2371
2372         /* then if we have cache waiters, return all objects with queued
2373          * writes.  This is especially important when many small files
2374          * have filled up the cache and not been fired into rpcs because
2375          * they don't pass the nr_pending/object threshhold */
2376         if (!list_empty(&cli->cl_cache_waiters) &&
2377             !list_empty(&cli->cl_loi_write_list))
2378                 RETURN(list_entry(cli->cl_loi_write_list.next,
2379                                   struct lov_oinfo, loi_write_item));
2380
2381         /* then return all queued objects when we have an invalid import
2382          * so that they get flushed */
2383         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2384                 if (!list_empty(&cli->cl_loi_write_list))
2385                         RETURN(list_entry(cli->cl_loi_write_list.next,
2386                                           struct lov_oinfo, loi_write_item));
2387                 if (!list_empty(&cli->cl_loi_read_list))
2388                         RETURN(list_entry(cli->cl_loi_read_list.next,
2389                                           struct lov_oinfo, loi_read_item));
2390         }
2391         RETURN(NULL);
2392 }
2393
2394 /* called with the loi list lock held */
2395 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2396 {
2397         struct lov_oinfo *loi;
2398         int rc = 0, race_counter = 0;
2399         ENTRY;
2400
2401         while ((loi = osc_next_loi(cli)) != NULL) {
2402                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2403
2404                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2405                         break;
2406
2407                 /* attempt some read/write balancing by alternating between
2408                  * reads and writes in an object.  The makes_rpc checks here
2409                  * would be redundant if we were getting read/write work items
2410                  * instead of objects.  we don't want send_oap_rpc to drain a
2411                  * partial read pending queue when we're given this object to
2412                  * do io on writes while there are cache waiters */
2413                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2414                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2415                                               &loi->loi_write_lop);
2416                         if (rc < 0)
2417                                 break;
2418                         if (rc > 0)
2419                                 race_counter = 0;
2420                         else
2421                                 race_counter++;
2422                 }
2423                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2424                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2425                                               &loi->loi_read_lop);
2426                         if (rc < 0)
2427                                 break;
2428                         if (rc > 0)
2429                                 race_counter = 0;
2430                         else
2431                                 race_counter++;
2432                 }
2433
2434                 /* attempt some inter-object balancing by issueing rpcs
2435                  * for each object in turn */
2436                 if (!list_empty(&loi->loi_cli_item))
2437                         list_del_init(&loi->loi_cli_item);
2438                 if (!list_empty(&loi->loi_write_item))
2439                         list_del_init(&loi->loi_write_item);
2440                 if (!list_empty(&loi->loi_read_item))
2441                         list_del_init(&loi->loi_read_item);
2442
2443                 loi_list_maint(cli, loi);
2444
2445                 /* send_oap_rpc fails with 0 when make_ready tells it to
2446                  * back off.  llite's make_ready does this when it tries
2447                  * to lock a page queued for write that is already locked.
2448                  * we want to try sending rpcs from many objects, but we
2449                  * don't want to spin failing with 0.  */
2450                 if (race_counter == 10)
2451                         break;
2452         }
2453         EXIT;
2454 }
2455
2456 /* we're trying to queue a page in the osc so we're subject to the
2457  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2458  * If the osc's queued pages are already at that limit, then we want to sleep
2459  * until there is space in the osc's queue for us.  We also may be waiting for
2460  * write credits from the OST if there are RPCs in flight that may return some
2461  * before we fall back to sync writes.
2462  *
2463  * We need this know our allocation was granted in the presence of signals */
2464 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2465 {
2466         int rc;
2467         ENTRY;
2468         client_obd_list_lock(&cli->cl_loi_list_lock);
2469         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2470         client_obd_list_unlock(&cli->cl_loi_list_lock);
2471         RETURN(rc);
2472 };
2473
2474 /**
2475  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2476  * is available.
2477  */
2478 int osc_enter_cache_try(const struct lu_env *env,
2479                         struct client_obd *cli, struct lov_oinfo *loi,
2480                         struct osc_async_page *oap, int transient)
2481 {
2482         int has_grant;
2483
2484         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2485         if (has_grant) {
2486                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2487                 if (transient) {
2488                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2489                         atomic_inc(&obd_dirty_transit_pages);
2490                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2491                 }
2492         }
2493         return has_grant;
2494 }
2495
2496 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2497  * grant or cache space. */
2498 static int osc_enter_cache(const struct lu_env *env,
2499                            struct client_obd *cli, struct lov_oinfo *loi,
2500                            struct osc_async_page *oap)
2501 {
2502         struct osc_cache_waiter ocw;
2503         struct l_wait_info lwi = { 0 };
2504
2505         ENTRY;
2506
2507         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2508                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2509                cli->cl_dirty_max, obd_max_dirty_pages,
2510                cli->cl_lost_grant, cli->cl_avail_grant);
2511
2512         /* force the caller to try sync io.  this can jump the list
2513          * of queued writes and create a discontiguous rpc stream */
2514         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2515             loi->loi_ar.ar_force_sync)
2516                 RETURN(-EDQUOT);
2517
2518         /* Hopefully normal case - cache space and write credits available */
2519         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2520             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2521             osc_enter_cache_try(env, cli, loi, oap, 0))
2522                 RETURN(0);
2523
2524         /* Make sure that there are write rpcs in flight to wait for.  This
2525          * is a little silly as this object may not have any pending but
2526          * other objects sure might. */
2527         if (cli->cl_w_in_flight) {
2528                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2529                 cfs_waitq_init(&ocw.ocw_waitq);
2530                 ocw.ocw_oap = oap;
2531                 ocw.ocw_rc = 0;
2532
2533                 loi_list_maint(cli, loi);
2534                 osc_check_rpcs(env, cli);
2535                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2536
2537                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2538                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2539
2540                 client_obd_list_lock(&cli->cl_loi_list_lock);
2541                 if (!list_empty(&ocw.ocw_entry)) {
2542                         list_del(&ocw.ocw_entry);
2543                         RETURN(-EINTR);
2544                 }
2545                 RETURN(ocw.ocw_rc);
2546         }
2547
2548         RETURN(-EDQUOT);
2549 }
2550
2551
2552 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2553                         struct lov_oinfo *loi, cfs_page_t *page,
2554                         obd_off offset, const struct obd_async_page_ops *ops,
2555                         void *data, void **res, int nocache,
2556                         struct lustre_handle *lockh)
2557 {
2558         struct osc_async_page *oap;
2559
2560         ENTRY;
2561
2562         if (!page)
2563                 return size_round(sizeof(*oap));
2564
2565         oap = *res;
2566         oap->oap_magic = OAP_MAGIC;
2567         oap->oap_cli = &exp->exp_obd->u.cli;
2568         oap->oap_loi = loi;
2569
2570         oap->oap_caller_ops = ops;
2571         oap->oap_caller_data = data;
2572
2573         oap->oap_page = page;
2574         oap->oap_obj_off = offset;
2575         if (!client_is_remote(exp) &&
2576             cfs_capable(CFS_CAP_SYS_RESOURCE))
2577                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2578
2579         LASSERT(!(offset & ~CFS_PAGE_MASK));
2580
2581         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2582         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2583         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2584         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2585
2586         spin_lock_init(&oap->oap_lock);
2587         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2588         RETURN(0);
2589 }
2590
2591 struct osc_async_page *oap_from_cookie(void *cookie)
2592 {
2593         struct osc_async_page *oap = cookie;
2594         if (oap->oap_magic != OAP_MAGIC)
2595                 return ERR_PTR(-EINVAL);
2596         return oap;
2597 };
2598
2599 int osc_queue_async_io(const struct lu_env *env,
2600                        struct obd_export *exp, struct lov_stripe_md *lsm,
2601                        struct lov_oinfo *loi, void *cookie,
2602                        int cmd, obd_off off, int count,
2603                        obd_flag brw_flags, enum async_flags async_flags)
2604 {
2605         struct client_obd *cli = &exp->exp_obd->u.cli;
2606         struct osc_async_page *oap;
2607         int rc = 0;
2608         ENTRY;
2609
2610         oap = oap_from_cookie(cookie);
2611         if (IS_ERR(oap))
2612                 RETURN(PTR_ERR(oap));
2613
2614         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2615                 RETURN(-EIO);
2616
2617         if (!list_empty(&oap->oap_pending_item) ||
2618             !list_empty(&oap->oap_urgent_item) ||
2619             !list_empty(&oap->oap_rpc_item))
2620                 RETURN(-EBUSY);
2621
2622         /* check if the file's owner/group is over quota */
2623         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2624                 struct cl_object *obj;
2625                 struct cl_attr    attr; /* XXX put attr into thread info */
2626
2627                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2628
2629                 cl_object_attr_lock(obj);
2630                 rc = cl_object_attr_get(env, obj, &attr);
2631                 cl_object_attr_unlock(obj);
2632
2633                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2634                                             attr.cat_gid) == NO_QUOTA)
2635                         rc = -EDQUOT;
2636                 if (rc)
2637                         RETURN(rc);
2638         }
2639
2640         if (loi == NULL)
2641                 loi = lsm->lsm_oinfo[0];
2642
2643         client_obd_list_lock(&cli->cl_loi_list_lock);
2644
2645         LASSERT(off + count <= CFS_PAGE_SIZE);
2646         oap->oap_cmd = cmd;
2647         oap->oap_page_off = off;
2648         oap->oap_count = count;
2649         oap->oap_brw_flags = brw_flags;
2650         oap->oap_async_flags = async_flags;
2651
2652         if (cmd & OBD_BRW_WRITE) {
2653                 rc = osc_enter_cache(env, cli, loi, oap);
2654                 if (rc) {
2655                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2656                         RETURN(rc);
2657                 }
2658         }
2659
2660         osc_oap_to_pending(oap);
2661         loi_list_maint(cli, loi);
2662
2663         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2664                   cmd);
2665
2666         osc_check_rpcs(env, cli);
2667         client_obd_list_unlock(&cli->cl_loi_list_lock);
2668
2669         RETURN(0);
2670 }
2671
2672 /* aka (~was & now & flag), but this is more clear :) */
2673 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2674
2675 int osc_set_async_flags_base(struct client_obd *cli,
2676                              struct lov_oinfo *loi, struct osc_async_page *oap,
2677                              obd_flag async_flags)
2678 {
2679         struct loi_oap_pages *lop;
2680         ENTRY;
2681
2682         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2683                 RETURN(-EIO);
2684
2685         if (oap->oap_cmd & OBD_BRW_WRITE) {
2686                 lop = &loi->loi_write_lop;
2687         } else {
2688                 lop = &loi->loi_read_lop;
2689         }
2690
2691         if (list_empty(&oap->oap_pending_item))
2692                 RETURN(-EINVAL);
2693
2694         if ((oap->oap_async_flags & async_flags) == async_flags)
2695                 RETURN(0);
2696
2697         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2698                 oap->oap_async_flags |= ASYNC_READY;
2699
2700         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2701                 if (list_empty(&oap->oap_rpc_item)) {
2702                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2703                         loi_list_maint(cli, loi);
2704                 }
2705         }
2706
2707         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2708                         oap->oap_async_flags);
2709         RETURN(0);
2710 }
2711
2712 int osc_teardown_async_page(struct obd_export *exp,
2713                             struct lov_stripe_md *lsm,
2714                             struct lov_oinfo *loi, void *cookie)
2715 {
2716         struct client_obd *cli = &exp->exp_obd->u.cli;
2717         struct loi_oap_pages *lop;
2718         struct osc_async_page *oap;
2719         int rc = 0;
2720         ENTRY;
2721
2722         oap = oap_from_cookie(cookie);
2723         if (IS_ERR(oap))
2724                 RETURN(PTR_ERR(oap));
2725
2726         if (loi == NULL)
2727                 loi = lsm->lsm_oinfo[0];
2728
2729         if (oap->oap_cmd & OBD_BRW_WRITE) {
2730                 lop = &loi->loi_write_lop;
2731         } else {
2732                 lop = &loi->loi_read_lop;
2733         }
2734
2735         client_obd_list_lock(&cli->cl_loi_list_lock);
2736
2737         if (!list_empty(&oap->oap_rpc_item))
2738                 GOTO(out, rc = -EBUSY);
2739
2740         osc_exit_cache(cli, oap, 0);
2741         osc_wake_cache_waiters(cli);
2742
2743         if (!list_empty(&oap->oap_urgent_item)) {
2744                 list_del_init(&oap->oap_urgent_item);
2745                 oap->oap_async_flags &= ~ASYNC_URGENT;
2746         }
2747         if (!list_empty(&oap->oap_pending_item)) {
2748                 list_del_init(&oap->oap_pending_item);
2749                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2750         }
2751         loi_list_maint(cli, loi);
2752         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2753 out:
2754         client_obd_list_unlock(&cli->cl_loi_list_lock);
2755         RETURN(rc);
2756 }
2757
2758 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2759                                          struct ldlm_enqueue_info *einfo,
2760                                          int flags)
2761 {
2762         void *data = einfo->ei_cbdata;
2763
2764         LASSERT(lock != NULL);
2765         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2766         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2767         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2768         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2769
2770         lock_res_and_lock(lock);
2771         spin_lock(&osc_ast_guard);
2772         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2773         lock->l_ast_data = data;
2774         spin_unlock(&osc_ast_guard);
2775         unlock_res_and_lock(lock);
2776 }
2777
2778 static void osc_set_data_with_check(struct lustre_handle *lockh,
2779                                     struct ldlm_enqueue_info *einfo,
2780                                     int flags)
2781 {
2782         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2783
2784         if (lock != NULL) {
2785                 osc_set_lock_data_with_check(lock, einfo, flags);
2786                 LDLM_LOCK_PUT(lock);
2787         } else
2788                 CERROR("lockh %p, data %p - client evicted?\n",
2789                        lockh, einfo->ei_cbdata);
2790 }
2791
2792 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2793                              ldlm_iterator_t replace, void *data)
2794 {
2795         struct ldlm_res_id res_id;
2796         struct obd_device *obd = class_exp2obd(exp);
2797
2798         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2799         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2800         return 0;
2801 }
2802
2803 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2804                             obd_enqueue_update_f upcall, void *cookie,
2805                             int *flags, int rc)
2806 {
2807         int intent = *flags & LDLM_FL_HAS_INTENT;
2808         ENTRY;
2809
2810         if (intent) {
2811                 /* The request was created before ldlm_cli_enqueue call. */
2812                 if (rc == ELDLM_LOCK_ABORTED) {
2813                         struct ldlm_reply *rep;
2814                         rep = req_capsule_server_get(&req->rq_pill,
2815                                                      &RMF_DLM_REP);
2816
2817                         LASSERT(rep != NULL);
2818                         if (rep->lock_policy_res1)
2819                                 rc = rep->lock_policy_res1;
2820                 }
2821         }
2822
2823         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2824                 *flags |= LDLM_FL_LVB_READY;
2825                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2826                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2827         }
2828
2829         /* Call the update callback. */
2830         rc = (*upcall)(cookie, rc);
2831         RETURN(rc);
2832 }
2833
2834 static int osc_enqueue_interpret(const struct lu_env *env,
2835                                  struct ptlrpc_request *req,
2836                                  struct osc_enqueue_args *aa, int rc)
2837 {
2838         struct ldlm_lock *lock;
2839         struct lustre_handle handle;
2840         __u32 mode;
2841
2842         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2843          * might be freed anytime after lock upcall has been called. */
2844         lustre_handle_copy(&handle, aa->oa_lockh);
2845         mode = aa->oa_ei->ei_mode;
2846
2847         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2848          * be valid. */
2849         lock = ldlm_handle2lock(&handle);
2850
2851         /* Take an additional reference so that a blocking AST that
2852          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2853          * to arrive after an upcall has been executed by
2854          * osc_enqueue_fini(). */
2855         ldlm_lock_addref(&handle, mode);
2856
2857         /* Complete obtaining the lock procedure. */
2858         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2859                                    mode, aa->oa_flags, aa->oa_lvb,
2860                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2861                                    &handle, rc);
2862         /* Complete osc stuff. */
2863         rc = osc_enqueue_fini(req, aa->oa_lvb,
2864                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2865         /* Release the lock for async request. */
2866         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2867                 /*
2868                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2869                  * not already released by
2870                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2871                  */
2872                 ldlm_lock_decref(&handle, mode);
2873
2874         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2875                  aa->oa_lockh, req, aa);
2876         ldlm_lock_decref(&handle, mode);
2877         LDLM_LOCK_PUT(lock);
2878         return rc;
2879 }
2880
2881 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2882                         struct lov_oinfo *loi, int flags,
2883                         struct ost_lvb *lvb, __u32 mode, int rc)
2884 {
2885         if (rc == ELDLM_OK) {
2886                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2887                 __u64 tmp;
2888
2889                 LASSERT(lock != NULL);
2890                 loi->loi_lvb = *lvb;
2891                 tmp = loi->loi_lvb.lvb_size;
2892                 /* Extend KMS up to the end of this lock and no further
2893                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2894                 if (tmp > lock->l_policy_data.l_extent.end)
2895                         tmp = lock->l_policy_data.l_extent.end + 1;
2896                 if (tmp >= loi->loi_kms) {
2897                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2898                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2899                         loi_kms_set(loi, tmp);
2900                 } else {
2901                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2902                                    LPU64"; leaving kms="LPU64", end="LPU64,
2903                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2904                                    lock->l_policy_data.l_extent.end);
2905                 }
2906                 ldlm_lock_allow_match(lock);
2907                 LDLM_LOCK_PUT(lock);
2908         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2909                 loi->loi_lvb = *lvb;
2910                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2911                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2912                 rc = ELDLM_OK;
2913         }
2914 }
2915 EXPORT_SYMBOL(osc_update_enqueue);
2916
2917 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2918
2919 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2920  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2921  * other synchronous requests, however keeping some locks and trying to obtain
2922  * others may take a considerable amount of time in a case of ost failure; and
2923  * when other sync requests do not get released lock from a client, the client
2924  * is excluded from the cluster -- such scenarious make the life difficult, so
2925  * release locks just after they are obtained. */
2926 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2927                      int *flags, ldlm_policy_data_t *policy,
2928                      struct ost_lvb *lvb, int kms_valid,
2929                      obd_enqueue_update_f upcall, void *cookie,
2930                      struct ldlm_enqueue_info *einfo,
2931                      struct lustre_handle *lockh,
2932                      struct ptlrpc_request_set *rqset, int async)
2933 {
2934         struct obd_device *obd = exp->exp_obd;
2935         struct ptlrpc_request *req = NULL;
2936         int intent = *flags & LDLM_FL_HAS_INTENT;
2937         ldlm_mode_t mode;
2938         int rc;
2939         ENTRY;
2940
2941         /* Filesystem lock extents are extended to page boundaries so that
2942          * dealing with the page cache is a little smoother.  */
2943         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2944         policy->l_extent.end |= ~CFS_PAGE_MASK;
2945
2946         /*
2947          * kms is not valid when either object is completely fresh (so that no
2948          * locks are cached), or object was evicted. In the latter case cached
2949          * lock cannot be used, because it would prime inode state with
2950          * potentially stale LVB.
2951          */
2952         if (!kms_valid)
2953                 goto no_match;
2954
2955         /* Next, search for already existing extent locks that will cover us */
2956         /* If we're trying to read, we also search for an existing PW lock.  The
2957          * VFS and page cache already protect us locally, so lots of readers/
2958          * writers can share a single PW lock.
2959          *
2960          * There are problems with conversion deadlocks, so instead of
2961          * converting a read lock to a write lock, we'll just enqueue a new
2962          * one.
2963          *
2964          * At some point we should cancel the read lock instead of making them
2965          * send us a blocking callback, but there are problems with canceling
2966          * locks out from other users right now, too. */
2967         mode = einfo->ei_mode;
2968         if (einfo->ei_mode == LCK_PR)
2969                 mode |= LCK_PW;
2970         mode = ldlm_lock_match(obd->obd_namespace,
2971                                *flags | LDLM_FL_LVB_READY, res_id,
2972                                einfo->ei_type, policy, mode, lockh, 0);
2973         if (mode) {
2974                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2975
2976                 if (matched->l_ast_data == NULL ||
2977                     matched->l_ast_data == einfo->ei_cbdata) {
2978                         /* addref the lock only if not async requests and PW
2979                          * lock is matched whereas we asked for PR. */
2980                         if (!rqset && einfo->ei_mode != mode)
2981                                 ldlm_lock_addref(lockh, LCK_PR);
2982                         osc_set_lock_data_with_check(matched, einfo, *flags);
2983                         if (intent) {
2984                                 /* I would like to be able to ASSERT here that
2985                                  * rss <= kms, but I can't, for reasons which
2986                                  * are explained in lov_enqueue() */
2987                         }
2988
2989                         /* We already have a lock, and it's referenced */
2990                         (*upcall)(cookie, ELDLM_OK);
2991
2992                         /* For async requests, decref the lock. */
2993                         if (einfo->ei_mode != mode)
2994                                 ldlm_lock_decref(lockh, LCK_PW);
2995                         else if (rqset)
2996                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2997                         LDLM_LOCK_PUT(matched);
2998                         RETURN(ELDLM_OK);
2999                 } else
3000                         ldlm_lock_decref(lockh, mode);
3001                 LDLM_LOCK_PUT(matched);
3002         }
3003
3004  no_match:
3005         if (intent) {
3006                 CFS_LIST_HEAD(cancels);
3007                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3008                                            &RQF_LDLM_ENQUEUE_LVB);
3009                 if (req == NULL)
3010                         RETURN(-ENOMEM);
3011
3012                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3013                 if (rc)
3014                         RETURN(rc);
3015
3016                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3017                                      sizeof *lvb);
3018                 ptlrpc_request_set_replen(req);
3019         }
3020
3021         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3022         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3023
3024         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3025                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3026         if (rqset) {
3027                 if (!rc) {
3028                         struct osc_enqueue_args *aa;
3029                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3030                         aa = ptlrpc_req_async_args(req);
3031                         aa->oa_ei = einfo;
3032                         aa->oa_exp = exp;
3033                         aa->oa_flags  = flags;
3034                         aa->oa_upcall = upcall;
3035                         aa->oa_cookie = cookie;
3036                         aa->oa_lvb    = lvb;
3037                         aa->oa_lockh  = lockh;
3038
3039                         req->rq_interpret_reply =
3040                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3041                         if (rqset == PTLRPCD_SET)
3042                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3043                         else
3044                                 ptlrpc_set_add_req(rqset, req);
3045                 } else if (intent) {
3046                         ptlrpc_req_finished(req);
3047                 }
3048                 RETURN(rc);
3049         }
3050
3051         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3052         if (intent)
3053                 ptlrpc_req_finished(req);
3054
3055         RETURN(rc);
3056 }
3057
3058 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3059                        struct ldlm_enqueue_info *einfo,
3060                        struct ptlrpc_request_set *rqset)
3061 {
3062         struct ldlm_res_id res_id;
3063         int rc;
3064         ENTRY;
3065
3066         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3067                            oinfo->oi_md->lsm_object_gr, &res_id);
3068
3069         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3070                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3071                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3072                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3073                               rqset, rqset != NULL);
3074         RETURN(rc);
3075 }
3076
3077 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3078                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3079                    int *flags, void *data, struct lustre_handle *lockh,
3080                    int unref)
3081 {
3082         struct obd_device *obd = exp->exp_obd;
3083         int lflags = *flags;
3084         ldlm_mode_t rc;
3085         ENTRY;
3086
3087         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3088                 RETURN(-EIO);
3089
3090         /* Filesystem lock extents are extended to page boundaries so that
3091          * dealing with the page cache is a little smoother */
3092         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3093         policy->l_extent.end |= ~CFS_PAGE_MASK;
3094
3095         /* Next, search for already existing extent locks that will cover us */
3096         /* If we're trying to read, we also search for an existing PW lock.  The
3097          * VFS and page cache already protect us locally, so lots of readers/
3098          * writers can share a single PW lock. */
3099         rc = mode;
3100         if (mode == LCK_PR)
3101                 rc |= LCK_PW;
3102         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3103                              res_id, type, policy, rc, lockh, unref);
3104         if (rc) {
3105                 if (data != NULL)
3106                         osc_set_data_with_check(lockh, data, lflags);
3107                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3108                         ldlm_lock_addref(lockh, LCK_PR);
3109                         ldlm_lock_decref(lockh, LCK_PW);
3110                 }
3111                 RETURN(rc);
3112         }
3113         RETURN(rc);
3114 }
3115
3116 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3117 {
3118         ENTRY;
3119
3120         if (unlikely(mode == LCK_GROUP))
3121                 ldlm_lock_decref_and_cancel(lockh, mode);
3122         else
3123                 ldlm_lock_decref(lockh, mode);
3124
3125         RETURN(0);
3126 }
3127
3128 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3129                       __u32 mode, struct lustre_handle *lockh)
3130 {
3131         ENTRY;
3132         RETURN(osc_cancel_base(lockh, mode));
3133 }
3134
3135 static int osc_cancel_unused(struct obd_export *exp,
3136                              struct lov_stripe_md *lsm, int flags,
3137                              void *opaque)
3138 {
3139         struct obd_device *obd = class_exp2obd(exp);
3140         struct ldlm_res_id res_id, *resp = NULL;
3141
3142         if (lsm != NULL) {
3143                 resp = osc_build_res_name(lsm->lsm_object_id,
3144                                           lsm->lsm_object_gr, &res_id);
3145         }
3146
3147         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3148 }
3149
3150 static int osc_statfs_interpret(const struct lu_env *env,
3151                                 struct ptlrpc_request *req,
3152                                 struct osc_async_args *aa, int rc)
3153 {
3154         struct obd_statfs *msfs;
3155         ENTRY;
3156
3157         if (rc != 0)
3158                 GOTO(out, rc);
3159
3160         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3161         if (msfs == NULL) {
3162                 GOTO(out, rc = -EPROTO);
3163         }
3164
3165         *aa->aa_oi->oi_osfs = *msfs;
3166 out:
3167         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3168         RETURN(rc);
3169 }
3170
3171 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3172                             __u64 max_age, struct ptlrpc_request_set *rqset)
3173 {
3174         struct ptlrpc_request *req;
3175         struct osc_async_args *aa;
3176         int                    rc;
3177         ENTRY;
3178
3179         /* We could possibly pass max_age in the request (as an absolute
3180          * timestamp or a "seconds.usec ago") so the target can avoid doing
3181          * extra calls into the filesystem if that isn't necessary (e.g.
3182          * during mount that would help a bit).  Having relative timestamps
3183          * is not so great if request processing is slow, while absolute
3184          * timestamps are not ideal because they need time synchronization. */
3185         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3186         if (req == NULL)
3187                 RETURN(-ENOMEM);
3188
3189         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3190         if (rc) {
3191                 ptlrpc_request_free(req);
3192                 RETURN(rc);
3193         }
3194         ptlrpc_request_set_replen(req);
3195         req->rq_request_portal = OST_CREATE_PORTAL;
3196         ptlrpc_at_set_req_timeout(req);
3197
3198         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3199                 /* procfs requests not want stat in wait for avoid deadlock */
3200                 req->rq_no_resend = 1;
3201                 req->rq_no_delay = 1;
3202         }
3203
3204         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3205         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3206         aa = ptlrpc_req_async_args(req);
3207         aa->aa_oi = oinfo;
3208
3209         ptlrpc_set_add_req(rqset, req);
3210         RETURN(0);
3211 }
3212
3213 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3214                       __u64 max_age, __u32 flags)
3215 {
3216         struct obd_statfs     *msfs;
3217         struct ptlrpc_request *req;
3218         struct obd_import     *imp = NULL;
3219         int rc;
3220         ENTRY;
3221
3222         /*Since the request might also come from lprocfs, so we need
3223          *sync this with client_disconnect_export Bug15684*/
3224         down_read(&obd->u.cli.cl_sem);
3225         if (obd->u.cli.cl_import)
3226                 imp = class_import_get(obd->u.cli.cl_import);
3227         up_read(&obd->u.cli.cl_sem);
3228         if (!imp)
3229                 RETURN(-ENODEV);
3230
3231         /* We could possibly pass max_age in the request (as an absolute
3232          * timestamp or a "seconds.usec ago") so the target can avoid doing
3233          * extra calls into the filesystem if that isn't necessary (e.g.
3234          * during mount that would help a bit).  Having relative timestamps
3235          * is not so great if request processing is slow, while absolute
3236          * timestamps are not ideal because they need time synchronization. */
3237         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3238
3239         class_import_put(imp);
3240
3241         if (req == NULL)
3242                 RETURN(-ENOMEM);
3243
3244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3245         if (rc) {
3246                 ptlrpc_request_free(req);
3247                 RETURN(rc);
3248         }
3249         ptlrpc_request_set_replen(req);
3250         req->rq_request_portal = OST_CREATE_PORTAL;
3251         ptlrpc_at_set_req_timeout(req);
3252
3253         if (flags & OBD_STATFS_NODELAY) {
3254                 /* procfs requests not want stat in wait for avoid deadlock */
3255                 req->rq_no_resend = 1;
3256                 req->rq_no_delay = 1;
3257         }
3258
3259         rc = ptlrpc_queue_wait(req);
3260         if (rc)
3261                 GOTO(out, rc);
3262
3263         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3264         if (msfs == NULL) {
3265                 GOTO(out, rc = -EPROTO);
3266         }
3267
3268         *osfs = *msfs;
3269
3270         EXIT;
3271  out:
3272         ptlrpc_req_finished(req);
3273         return rc;
3274 }
3275
3276 /* Retrieve object striping information.
3277  *
3278  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3279  * the maximum number of OST indices which will fit in the user buffer.
3280  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3281  */
3282 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3283 {
3284         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3285         struct lov_user_md_v3 lum, *lumk;
3286         struct lov_user_ost_data_v1 *lmm_objects;
3287         int rc = 0, lum_size;
3288         ENTRY;
3289
3290         if (!lsm)
3291                 RETURN(-ENODATA);
3292
3293         /* we only need the header part from user space to get lmm_magic and
3294          * lmm_stripe_count, (the header part is common to v1 and v3) */
3295         lum_size = sizeof(struct lov_user_md_v1);
3296         if (copy_from_user(&lum, lump, lum_size))
3297                 RETURN(-EFAULT);
3298
3299         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3300             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3301                 RETURN(-EINVAL);
3302
3303         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3304         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3305         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3306         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3307
3308         /* we can use lov_mds_md_size() to compute lum_size
3309          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3310         if (lum.lmm_stripe_count > 0) {
3311                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3312                 OBD_ALLOC(lumk, lum_size);
3313                 if (!lumk)
3314                         RETURN(-ENOMEM);
3315
3316                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3317                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3318                 else
3319                         lmm_objects = &(lumk->lmm_objects[0]);
3320                 lmm_objects->l_object_id = lsm->lsm_object_id;
3321         } else {
3322                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3323                 lumk = &lum;
3324         }
3325
3326         lumk->lmm_object_id = lsm->lsm_object_id;
3327         lumk->lmm_object_gr = lsm->lsm_object_gr;
3328         lumk->lmm_stripe_count = 1;
3329
3330         if (copy_to_user(lump, lumk, lum_size))
3331                 rc = -EFAULT;
3332
3333         if (lumk != &lum)
3334                 OBD_FREE(lumk, lum_size);
3335
3336         RETURN(rc);
3337 }
3338
3339
3340 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3341                          void *karg, void *uarg)
3342 {
3343         struct obd_device *obd = exp->exp_obd;
3344         struct obd_ioctl_data *data = karg;
3345         int err = 0;
3346         ENTRY;
3347
3348         if (!try_module_get(THIS_MODULE)) {
3349                 CERROR("Can't get module. Is it alive?");
3350                 return -EINVAL;
3351         }
3352         switch (cmd) {
3353         case OBD_IOC_LOV_GET_CONFIG: {
3354                 char *buf;
3355                 struct lov_desc *desc;
3356                 struct obd_uuid uuid;
3357
3358                 buf = NULL;
3359                 len = 0;
3360                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3361                         GOTO(out, err = -EINVAL);
3362
3363                 data = (struct obd_ioctl_data *)buf;
3364
3365                 if (sizeof(*desc) > data->ioc_inllen1) {
3366                         obd_ioctl_freedata(buf, len);
3367                         GOTO(out, err = -EINVAL);
3368                 }
3369
3370                 if (data->ioc_inllen2 < sizeof(uuid)) {
3371                         obd_ioctl_freedata(buf, len);
3372                         GOTO(out, err = -EINVAL);
3373                 }
3374
3375                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3376                 desc->ld_tgt_count = 1;
3377                 desc->ld_active_tgt_count = 1;
3378                 desc->ld_default_stripe_count = 1;
3379                 desc->ld_default_stripe_size = 0;
3380                 desc->ld_default_stripe_offset = 0;
3381                 desc->ld_pattern = 0;
3382                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3383
3384                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3385
3386                 err = copy_to_user((void *)uarg, buf, len);
3387                 if (err)
3388                         err = -EFAULT;
3389                 obd_ioctl_freedata(buf, len);
3390                 GOTO(out, err);
3391         }
3392         case LL_IOC_LOV_SETSTRIPE:
3393                 err = obd_alloc_memmd(exp, karg);
3394                 if (err > 0)
3395                         err = 0;
3396                 GOTO(out, err);
3397         case LL_IOC_LOV_GETSTRIPE:
3398                 err = osc_getstripe(karg, uarg);
3399                 GOTO(out, err);
3400         case OBD_IOC_CLIENT_RECOVER:
3401                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3402                                             data->ioc_inlbuf1);
3403                 if (err > 0)
3404                         err = 0;
3405                 GOTO(out, err);
3406         case IOC_OSC_SET_ACTIVE:
3407                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3408                                                data->ioc_offset);
3409                 GOTO(out, err);
3410         case OBD_IOC_POLL_QUOTACHECK:
3411                 err = lquota_poll_check(quota_interface, exp,
3412                                         (struct if_quotacheck *)karg);
3413                 GOTO(out, err);
3414         case OBD_IOC_PING_TARGET:
3415                 err = ptlrpc_obd_ping(obd);
3416                 GOTO(out, err);
3417         default:
3418                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3419                        cmd, cfs_curproc_comm());
3420                 GOTO(out, err = -ENOTTY);
3421         }
3422 out:
3423         module_put(THIS_MODULE);
3424         return err;
3425 }
3426
3427 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3428                         void *key, __u32 *vallen, void *val,
3429                         struct lov_stripe_md *lsm)
3430 {
3431         ENTRY;
3432         if (!vallen || !val)
3433                 RETURN(-EFAULT);
3434
3435         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3436                 __u32 *stripe = val;
3437                 *vallen = sizeof(*stripe);
3438                 *stripe = 0;
3439                 RETURN(0);
3440         } else if (KEY_IS(KEY_LAST_ID)) {
3441                 struct ptlrpc_request *req;
3442                 obd_id                *reply;
3443                 char                  *tmp;
3444                 int                    rc;
3445
3446                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3447                                            &RQF_OST_GET_INFO_LAST_ID);
3448                 if (req == NULL)
3449                         RETURN(-ENOMEM);
3450
3451                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3452                                      RCL_CLIENT, keylen);
3453                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3454                 if (rc) {
3455                         ptlrpc_request_free(req);
3456                         RETURN(rc);
3457                 }
3458
3459                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3460                 memcpy(tmp, key, keylen);
3461
3462                 ptlrpc_request_set_replen(req);
3463                 rc = ptlrpc_queue_wait(req);
3464                 if (rc)
3465                         GOTO(out, rc);
3466
3467                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3468                 if (reply == NULL)
3469                         GOTO(out, rc = -EPROTO);
3470
3471                 *((obd_id *)val) = *reply;
3472         out:
3473                 ptlrpc_req_finished(req);
3474                 RETURN(rc);
3475         } else if (KEY_IS(KEY_FIEMAP)) {
3476                 struct ptlrpc_request *req;
3477                 struct ll_user_fiemap *reply;
3478                 char *tmp;
3479                 int rc;
3480
3481                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3482                                            &RQF_OST_GET_INFO_FIEMAP);
3483                 if (req == NULL)
3484                         RETURN(-ENOMEM);
3485
3486                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3487                                      RCL_CLIENT, keylen);
3488                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3489                                      RCL_CLIENT, *vallen);
3490                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3491                                      RCL_SERVER, *vallen);
3492
3493                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3494                 if (rc) {
3495                         ptlrpc_request_free(req);
3496                         RETURN(rc);
3497                 }
3498
3499                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3500                 memcpy(tmp, key, keylen);
3501                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3502                 memcpy(tmp, val, *vallen);
3503
3504                 ptlrpc_request_set_replen(req);
3505                 rc = ptlrpc_queue_wait(req);
3506                 if (rc)
3507                         GOTO(out1, rc);
3508
3509                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3510                 if (reply == NULL)
3511                         GOTO(out1, rc = -EPROTO);
3512
3513                 memcpy(val, reply, *vallen);
3514         out1:
3515                 ptlrpc_req_finished(req);
3516
3517                 RETURN(rc);
3518         }
3519
3520         RETURN(-EINVAL);
3521 }
3522
3523 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3524                                           struct ptlrpc_request *req,
3525                                           void *aa, int rc)
3526 {
3527         struct llog_ctxt *ctxt;
3528         struct obd_import *imp = req->rq_import;
3529         ENTRY;
3530
3531         if (rc != 0)
3532                 RETURN(rc);
3533
3534         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3535         if (ctxt) {
3536                 if (rc == 0)
3537                         rc = llog_initiator_connect(ctxt);
3538                 else
3539                         CERROR("cannot establish connection for "
3540                                "ctxt %p: %d\n", ctxt, rc);
3541         }
3542
3543         llog_ctxt_put(ctxt);
3544         spin_lock(&imp->imp_lock);
3545         imp->imp_server_timeout = 1;
3546         imp->imp_pingable = 1;
3547         spin_unlock(&imp->imp_lock);
3548         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3549
3550         RETURN(rc);
3551 }
3552
3553 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3554                               void *key, obd_count vallen, void *val,
3555                               struct ptlrpc_request_set *set)
3556 {
3557         struct ptlrpc_request *req;
3558         struct obd_device     *obd = exp->exp_obd;
3559         struct obd_import     *imp = class_exp2cliimp(exp);
3560         char                  *tmp;
3561         int                    rc;
3562         ENTRY;
3563
3564         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3565
3566         if (KEY_IS(KEY_NEXT_ID)) {
3567                 if (vallen != sizeof(obd_id))
3568                         RETURN(-ERANGE);
3569                 if (val == NULL)
3570                         RETURN(-EINVAL);
3571                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3572                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3573                        exp->exp_obd->obd_name,
3574                        obd->u.cli.cl_oscc.oscc_next_id);
3575
3576                 RETURN(0);
3577         }
3578
3579         if (KEY_IS(KEY_UNLINKED)) {
3580                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3581                 spin_lock(&oscc->oscc_lock);
3582                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3583                 spin_unlock(&oscc->oscc_lock);
3584                 RETURN(0);
3585         }
3586
3587         if (KEY_IS(KEY_INIT_RECOV)) {
3588                 if (vallen != sizeof(int))
3589                         RETURN(-EINVAL);
3590                 spin_lock(&imp->imp_lock);
3591                 imp->imp_initial_recov = *(int *)val;
3592                 spin_unlock(&imp->imp_lock);
3593                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3594                        exp->exp_obd->obd_name,
3595                        imp->imp_initial_recov);
3596                 RETURN(0);
3597         }
3598
3599         if (KEY_IS(KEY_CHECKSUM)) {
3600                 if (vallen != sizeof(int))
3601                         RETURN(-EINVAL);
3602                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3603                 RETURN(0);
3604         }
3605
3606         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3607                 sptlrpc_conf_client_adapt(obd);
3608                 RETURN(0);
3609         }
3610
3611         if (KEY_IS(KEY_FLUSH_CTX)) {
3612                 sptlrpc_import_flush_my_ctx(imp);
3613                 RETURN(0);
3614         }
3615
3616         if (!set)
3617                 RETURN(-EINVAL);
3618
3619         /* We pass all other commands directly to OST. Since nobody calls osc
3620            methods directly and everybody is supposed to go through LOV, we
3621            assume lov checked invalid values for us.
3622            The only recognised values so far are evict_by_nid and mds_conn.
3623            Even if something bad goes through, we'd get a -EINVAL from OST
3624            anyway. */
3625
3626
3627         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3628         if (req == NULL)
3629                 RETURN(-ENOMEM);
3630
3631         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3632                              RCL_CLIENT, keylen);
3633         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3634                              RCL_CLIENT, vallen);
3635         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3636         if (rc) {
3637                 ptlrpc_request_free(req);
3638                 RETURN(rc);
3639         }
3640
3641         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3642         memcpy(tmp, key, keylen);
3643         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3644         memcpy(tmp, val, vallen);
3645
3646         if (KEY_IS(KEY_MDS_CONN)) {
3647                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3648
3649                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3650                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3651                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3652                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3653         }
3654
3655         ptlrpc_request_set_replen(req);
3656         ptlrpc_set_add_req(set, req);
3657         ptlrpc_check_set(NULL, set);
3658
3659         RETURN(0);
3660 }
3661
3662
3663 static struct llog_operations osc_size_repl_logops = {
3664         lop_cancel: llog_obd_repl_cancel
3665 };
3666
3667 static struct llog_operations osc_mds_ost_orig_logops;
3668 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3669                          struct obd_device *tgt, int count,
3670                          struct llog_catid *catid, struct obd_uuid *uuid)
3671 {
3672         int rc;
3673         ENTRY;
3674
3675         LASSERT(olg == &obd->obd_olg);
3676         spin_lock(&obd->obd_dev_lock);
3677         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3678                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3679                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3680                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3681                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3682                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3683         }
3684         spin_unlock(&obd->obd_dev_lock);
3685
3686         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3687                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3688         if (rc) {
3689                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3690                 GOTO(out, rc);
3691         }
3692
3693         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3694                         NULL, &osc_size_repl_logops);
3695         if (rc) {
3696                 struct llog_ctxt *ctxt =
3697                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3698                 if (ctxt)
3699                         llog_cleanup(ctxt);
3700                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3701         }
3702         GOTO(out, rc);
3703 out:
3704         if (rc) {
3705                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3706                        obd->obd_name, tgt->obd_name, count, catid, rc);
3707                 CERROR("logid "LPX64":0x%x\n",
3708                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3709         }
3710         return rc;
3711 }
3712
3713 static int osc_llog_finish(struct obd_device *obd, int count)
3714 {
3715         struct llog_ctxt *ctxt;
3716         int rc = 0, rc2 = 0;
3717         ENTRY;
3718
3719         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3720         if (ctxt)
3721                 rc = llog_cleanup(ctxt);
3722
3723         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3724         if (ctxt)
3725                 rc2 = llog_cleanup(ctxt);
3726         if (!rc)
3727                 rc = rc2;
3728
3729         RETURN(rc);
3730 }
3731
3732 static int osc_reconnect(const struct lu_env *env,
3733                          struct obd_export *exp, struct obd_device *obd,
3734                          struct obd_uuid *cluuid,
3735                          struct obd_connect_data *data,
3736                          void *localdata)
3737 {
3738         struct client_obd *cli = &obd->u.cli;
3739
3740         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3741                 long lost_grant;
3742
3743                 client_obd_list_lock(&cli->cl_loi_list_lock);
3744                 data->ocd_grant = cli->cl_avail_grant ?:
3745                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3746                 lost_grant = cli->cl_lost_grant;
3747                 cli->cl_lost_grant = 0;
3748                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3749
3750                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3751                        "cl_lost_grant: %ld\n", data->ocd_grant,
3752                        cli->cl_avail_grant, lost_grant);
3753                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3754                        " ocd_grant: %d\n", data->ocd_connect_flags,
3755                        data->ocd_version, data->ocd_grant);
3756         }
3757
3758         RETURN(0);
3759 }
3760
3761 static int osc_disconnect(struct obd_export *exp)
3762 {
3763         struct obd_device *obd = class_exp2obd(exp);
3764         struct llog_ctxt  *ctxt;
3765         int rc;
3766
3767         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3768         if (ctxt) {
3769                 if (obd->u.cli.cl_conn_count == 1) {
3770                         /* Flush any remaining cancel messages out to the
3771                          * target */
3772                         llog_sync(ctxt, exp);
3773                 }
3774                 llog_ctxt_put(ctxt);
3775         } else {
3776                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3777                        obd);
3778         }
3779
3780         rc = client_disconnect_export(exp);
3781         return rc;
3782 }
3783
3784 static int osc_import_event(struct obd_device *obd,
3785                             struct obd_import *imp,
3786                             enum obd_import_event event)
3787 {
3788         struct client_obd *cli;
3789         int rc = 0;
3790
3791         ENTRY;
3792         LASSERT(imp->imp_obd == obd);
3793
3794         switch (event) {
3795         case IMP_EVENT_DISCON: {
3796                 /* Only do this on the MDS OSC's */
3797                 if (imp->imp_server_timeout) {
3798                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3799
3800                         spin_lock(&oscc->oscc_lock);
3801                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3802                         spin_unlock(&oscc->oscc_lock);
3803                 }
3804                 cli = &obd->u.cli;
3805                 client_obd_list_lock(&cli->cl_loi_list_lock);
3806                 cli->cl_avail_grant = 0;
3807                 cli->cl_lost_grant = 0;
3808                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3809                 break;
3810         }
3811         case IMP_EVENT_INACTIVE: {
3812                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3813                 break;
3814         }
3815         case IMP_EVENT_INVALIDATE: {
3816                 struct ldlm_namespace *ns = obd->obd_namespace;
3817                 struct lu_env         *env;
3818                 int                    refcheck;
3819
3820                 env = cl_env_get(&refcheck);
3821                 if (!IS_ERR(env)) {
3822                         /* Reset grants */
3823                         cli = &obd->u.cli;
3824                         client_obd_list_lock(&cli->cl_loi_list_lock);
3825                         /* all pages go to failing rpcs due to the invalid
3826                          * import */
3827                         osc_check_rpcs(env, cli);
3828                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3829
3830                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3831                         cl_env_put(env, &refcheck);
3832                 } else
3833                         rc = PTR_ERR(env);
3834                 break;
3835         }
3836         case IMP_EVENT_ACTIVE: {
3837                 /* Only do this on the MDS OSC's */
3838                 if (imp->imp_server_timeout) {
3839                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3840
3841                         spin_lock(&oscc->oscc_lock);
3842                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3843                         spin_unlock(&oscc->oscc_lock);
3844                 }
3845                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3846                 break;
3847         }
3848         case IMP_EVENT_OCD: {
3849                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3850
3851                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3852                         osc_init_grant(&obd->u.cli, ocd);
3853
3854                 /* See bug 7198 */
3855                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3856                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3857
3858                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3859                 break;
3860         }
3861         default:
3862                 CERROR("Unknown import event %d\n", event);
3863                 LBUG();
3864         }
3865         RETURN(rc);
3866 }
3867
3868 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3869 {
3870         int rc;
3871         ENTRY;
3872
3873         ENTRY;
3874         rc = ptlrpcd_addref();
3875         if (rc)
3876                 RETURN(rc);
3877
3878         rc = client_obd_setup(obd, lcfg);
3879         if (rc) {
3880                 ptlrpcd_decref();
3881         } else {
3882                 struct lprocfs_static_vars lvars = { 0 };
3883                 struct client_obd *cli = &obd->u.cli;
3884
3885                 lprocfs_osc_init_vars(&lvars);
3886                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3887                         lproc_osc_attach_seqstat(obd);
3888                         sptlrpc_lprocfs_cliobd_attach(obd);
3889                         ptlrpc_lprocfs_register_obd(obd);
3890                 }
3891
3892                 oscc_init(obd);
3893                 /* We need to allocate a few requests more, because
3894                    brw_interpret tries to create new requests before freeing
3895                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3896                    reserved, but I afraid that might be too much wasted RAM
3897                    in fact, so 2 is just my guess and still should work. */
3898                 cli->cl_import->imp_rq_pool =
3899                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3900                                             OST_MAXREQSIZE,
3901                                             ptlrpc_add_rqs_to_pool);
3902         }
3903
3904         RETURN(rc);
3905 }
3906
3907 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3908 {
3909         int rc = 0;
3910         ENTRY;
3911
3912         switch (stage) {
3913         case OBD_CLEANUP_EARLY: {
3914                 struct obd_import *imp;
3915                 imp = obd->u.cli.cl_import;
3916                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3917                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3918                 ptlrpc_deactivate_import(imp);
3919                 spin_lock(&imp->imp_lock);
3920                 imp->imp_pingable = 0;
3921                 spin_unlock(&imp->imp_lock);
3922                 break;
3923         }
3924         case OBD_CLEANUP_EXPORTS: {
3925                 /* If we set up but never connected, the
3926                    client import will not have been cleaned. */
3927                 if (obd->u.cli.cl_import) {
3928                         struct obd_import *imp;
3929                         down_write(&obd->u.cli.cl_sem);
3930                         imp = obd->u.cli.cl_import;
3931                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3932                                obd->obd_name);
3933                         ptlrpc_invalidate_import(imp);
3934                         if (imp->imp_rq_pool) {
3935                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3936                                 imp->imp_rq_pool = NULL;
3937                         }
3938                         class_destroy_import(imp);
3939                         up_write(&obd->u.cli.cl_sem);
3940                         obd->u.cli.cl_import = NULL;
3941                 }
3942                 rc = obd_llog_finish(obd, 0);
3943                 if (rc != 0)
3944                         CERROR("failed to cleanup llogging subsystems\n");
3945                 break;
3946                 }
3947         }
3948         RETURN(rc);
3949 }
3950
3951 int osc_cleanup(struct obd_device *obd)
3952 {
3953         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3954         int rc;
3955
3956         ENTRY;
3957         ptlrpc_lprocfs_unregister_obd(obd);
3958         lprocfs_obd_cleanup(obd);
3959
3960         spin_lock(&oscc->oscc_lock);
3961         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3962         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3963         spin_unlock(&oscc->oscc_lock);
3964
3965         /* free memory of osc quota cache */
3966         lquota_cleanup(quota_interface, obd);
3967
3968         rc = client_obd_cleanup(obd);
3969
3970         ptlrpcd_decref();
3971         RETURN(rc);
3972 }
3973
3974 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3975 {
3976         struct lprocfs_static_vars lvars = { 0 };
3977         int rc = 0;
3978
3979         lprocfs_osc_init_vars(&lvars);
3980
3981         switch (lcfg->lcfg_command) {
3982         default:
3983                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3984                                               lcfg, obd);
3985                 if (rc > 0)
3986                         rc = 0;
3987                 break;
3988         }
3989
3990         return(rc);
3991 }
3992
3993 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3994 {
3995         return osc_process_config_base(obd, buf);
3996 }
3997
3998 struct obd_ops osc_obd_ops = {
3999         .o_owner                = THIS_MODULE,
4000         .o_setup                = osc_setup,
4001         .o_precleanup           = osc_precleanup,
4002         .o_cleanup              = osc_cleanup,
4003         .o_add_conn             = client_import_add_conn,
4004         .o_del_conn             = client_import_del_conn,
4005         .o_connect              = client_connect_import,
4006         .o_reconnect            = osc_reconnect,
4007         .o_disconnect           = osc_disconnect,
4008         .o_statfs               = osc_statfs,
4009         .o_statfs_async         = osc_statfs_async,
4010         .o_packmd               = osc_packmd,
4011         .o_unpackmd             = osc_unpackmd,
4012         .o_precreate            = osc_precreate,
4013         .o_create               = osc_create,
4014         .o_destroy              = osc_destroy,
4015         .o_getattr              = osc_getattr,
4016         .o_getattr_async        = osc_getattr_async,
4017         .o_setattr              = osc_setattr,
4018         .o_setattr_async        = osc_setattr_async,
4019         .o_brw                  = osc_brw,
4020         .o_punch                = osc_punch,
4021         .o_sync                 = osc_sync,
4022         .o_enqueue              = osc_enqueue,
4023         .o_change_cbdata        = osc_change_cbdata,
4024         .o_cancel               = osc_cancel,
4025         .o_cancel_unused        = osc_cancel_unused,
4026         .o_iocontrol            = osc_iocontrol,
4027         .o_get_info             = osc_get_info,
4028         .o_set_info_async       = osc_set_info_async,
4029         .o_import_event         = osc_import_event,
4030         .o_llog_init            = osc_llog_init,
4031         .o_llog_finish          = osc_llog_finish,
4032         .o_process_config       = osc_process_config,
4033 };
4034
4035 extern struct lu_kmem_descr  osc_caches[];
4036 extern spinlock_t            osc_ast_guard;
4037 extern struct lock_class_key osc_ast_guard_class;
4038
4039 int __init osc_init(void)
4040 {
4041         struct lprocfs_static_vars lvars = { 0 };
4042         int rc;
4043         ENTRY;
4044
4045         /* print an address of _any_ initialized kernel symbol from this
4046          * module, to allow debugging with gdb that doesn't support data
4047          * symbols from modules.*/
4048         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4049
4050         rc = lu_kmem_init(osc_caches);
4051
4052         lprocfs_osc_init_vars(&lvars);
4053
4054         request_module("lquota");
4055         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4056         lquota_init(quota_interface);
4057         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4058
4059         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4060                                  LUSTRE_OSC_NAME, &osc_device_type);
4061         if (rc) {
4062                 if (quota_interface)
4063                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4064                 lu_kmem_fini(osc_caches);
4065                 RETURN(rc);
4066         }
4067
4068         spin_lock_init(&osc_ast_guard);
4069         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4070
4071         RETURN(rc);
4072 }
4073
4074 #ifdef __KERNEL__
4075 static void /*__exit*/ osc_exit(void)
4076 {
4077         lu_device_type_fini(&osc_device_type);
4078
4079         lquota_exit(quota_interface);
4080         if (quota_interface)
4081                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4082
4083         class_unregister_type(LUSTRE_OSC_NAME);
4084         lu_kmem_fini(osc_caches);
4085 }
4086
4087 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4088 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4089 MODULE_LICENSE("GPL");
4090
4091 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4092 #endif