Whamcloud - gitweb
b=14132
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798 }
799
800 /* caller must hold loi_list_lock */
801 static void osc_consume_write_grant(struct client_obd *cli,
802                                     struct brw_page *pga)
803 {
804         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
805         atomic_inc(&obd_dirty_pages);
806         cli->cl_dirty += CFS_PAGE_SIZE;
807         cli->cl_avail_grant -= CFS_PAGE_SIZE;
808         pga->flag |= OBD_BRW_FROM_GRANT;
809         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
810                CFS_PAGE_SIZE, pga, pga->pg);
811         LASSERT(cli->cl_avail_grant >= 0);
812 }
813
814 /* the companion to osc_consume_write_grant, called when a brw has completed.
815  * must be called with the loi lock held. */
816 static void osc_release_write_grant(struct client_obd *cli,
817                                     struct brw_page *pga, int sent)
818 {
819         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
820         ENTRY;
821
822         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
823                 EXIT;
824                 return;
825         }
826
827         pga->flag &= ~OBD_BRW_FROM_GRANT;
828         atomic_dec(&obd_dirty_pages);
829         cli->cl_dirty -= CFS_PAGE_SIZE;
830         if (pga->flag & OBD_BRW_NOCACHE) {
831                 pga->flag &= ~OBD_BRW_NOCACHE;
832                 atomic_dec(&obd_dirty_transit_pages);
833                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
834         }
835         if (!sent) {
836                 cli->cl_lost_grant += CFS_PAGE_SIZE;
837                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
838                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
839         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
840                 /* For short writes we shouldn't count parts of pages that
841                  * span a whole block on the OST side, or our accounting goes
842                  * wrong.  Should match the code in filter_grant_check. */
843                 int offset = pga->off & ~CFS_PAGE_MASK;
844                 int count = pga->count + (offset & (blocksize - 1));
845                 int end = (offset + pga->count) & (blocksize - 1);
846                 if (end)
847                         count += blocksize - end;
848
849                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
850                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
851                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
852                        cli->cl_avail_grant, cli->cl_dirty);
853         }
854
855         EXIT;
856 }
857
858 static unsigned long rpcs_in_flight(struct client_obd *cli)
859 {
860         return cli->cl_r_in_flight + cli->cl_w_in_flight;
861 }
862
863 /* caller must hold loi_list_lock */
864 void osc_wake_cache_waiters(struct client_obd *cli)
865 {
866         struct list_head *l, *tmp;
867         struct osc_cache_waiter *ocw;
868
869         ENTRY;
870         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
871                 /* if we can't dirty more, we must wait until some is written */
872                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
873                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
874                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
875                                "osc max %ld, sys max %d\n", cli->cl_dirty,
876                                cli->cl_dirty_max, obd_max_dirty_pages);
877                         return;
878                 }
879
880                 /* if still dirty cache but no grant wait for pending RPCs that
881                  * may yet return us some grant before doing sync writes */
882                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
883                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
884                                cli->cl_w_in_flight);
885                         return;
886                 }
887
888                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
889                 list_del_init(&ocw->ocw_entry);
890                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
891                         /* no more RPCs in flight to return grant, do sync IO */
892                         ocw->ocw_rc = -EDQUOT;
893                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
894                 } else {
895                         osc_consume_write_grant(cli,
896                                                 &ocw->ocw_oap->oap_brw_page);
897                 }
898
899                 cfs_waitq_signal(&ocw->ocw_waitq);
900         }
901
902         EXIT;
903 }
904
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 {
907         client_obd_list_lock(&cli->cl_loi_list_lock);
908         cli->cl_avail_grant = ocd->ocd_grant;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
912                cli->cl_avail_grant, cli->cl_lost_grant);
913         LASSERT(cli->cl_avail_grant >= 0);
914 }
915
916 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 {
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
920         if (body->oa.o_valid & OBD_MD_FLGRANT)
921                 cli->cl_avail_grant += body->oa.o_grant;
922         /* waiters are woken in brw_interpret */
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924 }
925
926 /* We assume that the reason this OSC got a short read is because it read
927  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
928  * via the LOV, and it _knows_ it's reading inside the file, it's just that
929  * this stripe never got written at or beyond this stripe offset yet. */
930 static void handle_short_read(int nob_read, obd_count page_count,
931                               struct brw_page **pga)
932 {
933         char *ptr;
934         int i = 0;
935
936         /* skip bytes read OK */
937         while (nob_read > 0) {
938                 LASSERT (page_count > 0);
939
940                 if (pga[i]->count > nob_read) {
941                         /* EOF inside this page */
942                         ptr = cfs_kmap(pga[i]->pg) +
943                                 (pga[i]->off & ~CFS_PAGE_MASK);
944                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
945                         cfs_kunmap(pga[i]->pg);
946                         page_count--;
947                         i++;
948                         break;
949                 }
950
951                 nob_read -= pga[i]->count;
952                 page_count--;
953                 i++;
954         }
955
956         /* zero remaining pages */
957         while (page_count-- > 0) {
958                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
959                 memset(ptr, 0, pga[i]->count);
960                 cfs_kunmap(pga[i]->pg);
961                 i++;
962         }
963 }
964
965 static int check_write_rcs(struct ptlrpc_request *req,
966                            int requested_nob, int niocount,
967                            obd_count page_count, struct brw_page **pga)
968 {
969         int    *remote_rcs, i;
970
971         /* return error if any niobuf was in error */
972         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
973                                         sizeof(*remote_rcs) * niocount, NULL);
974         if (remote_rcs == NULL) {
975                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
976                 return(-EPROTO);
977         }
978         if (lustre_msg_swabbed(req->rq_repmsg))
979                 for (i = 0; i < niocount; i++)
980                         __swab32s(&remote_rcs[i]);
981
982         for (i = 0; i < niocount; i++) {
983                 if (remote_rcs[i] < 0)
984                         return(remote_rcs[i]);
985
986                 if (remote_rcs[i] != 0) {
987                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
988                                 i, remote_rcs[i], req);
989                         return(-EPROTO);
990                 }
991         }
992
993         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
994                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
995                        req->rq_bulk->bd_nob_transferred, requested_nob);
996                 return(-EPROTO);
997         }
998
999         return (0);
1000 }
1001
1002 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1003 {
1004         if (p1->flag != p2->flag) {
1005                 unsigned mask = ~(OBD_BRW_FROM_GRANT|OBD_BRW_NOCACHE);
1006
1007                 /* warn if we try to combine flags that we don't know to be
1008                  * safe to combine */
1009                 if ((p1->flag & mask) != (p2->flag & mask))
1010                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1011                                "same brw?\n", p1->flag, p2->flag);
1012                 return 0;
1013         }
1014
1015         return (p1->off + p1->count == p2->off);
1016 }
1017
1018 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1019                                    struct brw_page **pga, int opc,
1020                                    cksum_type_t cksum_type)
1021 {
1022         __u32 cksum;
1023         int i = 0;
1024
1025         LASSERT (pg_count > 0);
1026         cksum = init_checksum(cksum_type);
1027         while (nob > 0 && pg_count > 0) {
1028                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1029                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1030                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1031
1032                 /* corrupt the data before we compute the checksum, to
1033                  * simulate an OST->client data error */
1034                 if (i == 0 && opc == OST_READ &&
1035                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1036                         memcpy(ptr + off, "bad1", min(4, nob));
1037                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1038                 cfs_kunmap(pga[i]->pg);
1039                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1040                                off, cksum);
1041
1042                 nob -= pga[i]->count;
1043                 pg_count--;
1044                 i++;
1045         }
1046         /* For sending we only compute the wrong checksum instead
1047          * of corrupting the data so it is still correct on a redo */
1048         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1049                 cksum++;
1050
1051         return cksum;
1052 }
1053
1054 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1055                                 struct lov_stripe_md *lsm, obd_count page_count,
1056                                 struct brw_page **pga,
1057                                 struct ptlrpc_request **reqp,
1058                                 struct obd_capa *ocapa, int reserve)
1059 {
1060         struct ptlrpc_request   *req;
1061         struct ptlrpc_bulk_desc *desc;
1062         struct ost_body         *body;
1063         struct obd_ioobj        *ioobj;
1064         struct niobuf_remote    *niobuf;
1065         int niocount, i, requested_nob, opc, rc;
1066         struct osc_brw_async_args *aa;
1067         struct req_capsule      *pill;
1068         struct brw_page *pg_prev;
1069
1070         ENTRY;
1071         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1072                 RETURN(-ENOMEM); /* Recoverable */
1073         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1074                 RETURN(-EINVAL); /* Fatal */
1075
1076         if ((cmd & OBD_BRW_WRITE) != 0) {
1077                 opc = OST_WRITE;
1078                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1079                                                 cli->cl_import->imp_rq_pool,
1080                                                 &RQF_OST_BRW);
1081         } else {
1082                 opc = OST_READ;
1083                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1084         }
1085         if (req == NULL)
1086                 RETURN(-ENOMEM);
1087
1088         for (niocount = i = 1; i < page_count; i++) {
1089                 if (!can_merge_pages(pga[i - 1], pga[i]))
1090                         niocount++;
1091         }
1092
1093         pill = &req->rq_pill;
1094         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1095                              niocount * sizeof(*niobuf));
1096         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1097
1098         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1099         if (rc) {
1100                 ptlrpc_request_free(req);
1101                 RETURN(rc);
1102         }
1103         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1104         ptlrpc_at_set_req_timeout(req);
1105
1106         if (opc == OST_WRITE)
1107                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1108                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1109         else
1110                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1111                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1112
1113         if (desc == NULL)
1114                 GOTO(out, rc = -ENOMEM);
1115         /* NB request now owns desc and will free it when it gets freed */
1116
1117         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1118         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1119         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1120         LASSERT(body && ioobj && niobuf);
1121
1122         body->oa = *oa;
1123
1124         obdo_to_ioobj(oa, ioobj);
1125         ioobj->ioo_bufcnt = niocount;
1126         osc_pack_capa(req, body, ocapa);
1127         LASSERT (page_count > 0);
1128         pg_prev = pga[0];
1129         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1130                 struct brw_page *pg = pga[i];
1131
1132                 LASSERT(pg->count > 0);
1133                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1134                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1135                          pg->off, pg->count);
1136 #ifdef __linux__
1137                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1138                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1139                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1140                          i, page_count,
1141                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1142                          pg_prev->pg, page_private(pg_prev->pg),
1143                          pg_prev->pg->index, pg_prev->off);
1144 #else
1145                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1146                          "i %d p_c %u\n", i, page_count);
1147 #endif
1148                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1149                         (pg->flag & OBD_BRW_SRVLOCK));
1150
1151                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1152                                       pg->count);
1153                 requested_nob += pg->count;
1154
1155                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1156                         niobuf--;
1157                         niobuf->len += pg->count;
1158                 } else {
1159                         niobuf->offset = pg->off;
1160                         niobuf->len    = pg->count;
1161                         niobuf->flags  = pg->flag;
1162                 }
1163                 pg_prev = pg;
1164         }
1165
1166         LASSERTF((void *)(niobuf - niocount) ==
1167                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1168                                niocount * sizeof(*niobuf)),
1169                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1170                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1171                 (void *)(niobuf - niocount));
1172
1173         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1174
1175         /* size[REQ_REC_OFF] still sizeof (*body) */
1176         if (opc == OST_WRITE) {
1177                 if (unlikely(cli->cl_checksum) &&
1178                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1179                         /* store cl_cksum_type in a local variable since
1180                          * it can be changed via lprocfs */
1181                         cksum_type_t cksum_type = cli->cl_cksum_type;
1182
1183                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1184                                 oa->o_flags = body->oa.o_flags = 0;
1185                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1186                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1187                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1188                                                              page_count, pga,
1189                                                              OST_WRITE,
1190                                                              cksum_type);
1191                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1192                                body->oa.o_cksum);
1193                         /* save this in 'oa', too, for later checking */
1194                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1195                         oa->o_flags |= cksum_type_pack(cksum_type);
1196                 } else {
1197                         /* clear out the checksum flag, in case this is a
1198                          * resend but cl_checksum is no longer set. b=11238 */
1199                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1200                 }
1201                 oa->o_cksum = body->oa.o_cksum;
1202                 /* 1 RC per niobuf */
1203                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1204                                      sizeof(__u32) * niocount);
1205         } else {
1206                 if (unlikely(cli->cl_checksum) &&
1207                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1208                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1209                                 body->oa.o_flags = 0;
1210                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1211                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1212                 }
1213                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1214                 /* 1 RC for the whole I/O */
1215         }
1216         ptlrpc_request_set_replen(req);
1217
1218         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1219         aa = ptlrpc_req_async_args(req);
1220         aa->aa_oa = oa;
1221         aa->aa_requested_nob = requested_nob;
1222         aa->aa_nio_count = niocount;
1223         aa->aa_page_count = page_count;
1224         aa->aa_resends = 0;
1225         aa->aa_ppga = pga;
1226         aa->aa_cli = cli;
1227         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1228         if (ocapa && reserve)
1229                 aa->aa_ocapa = capa_get(ocapa);
1230
1231         *reqp = req;
1232         RETURN(0);
1233
1234  out:
1235         ptlrpc_req_finished(req);
1236         RETURN(rc);
1237 }
1238
1239 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1240                                 __u32 client_cksum, __u32 server_cksum, int nob,
1241                                 obd_count page_count, struct brw_page **pga,
1242                                 cksum_type_t client_cksum_type)
1243 {
1244         __u32 new_cksum;
1245         char *msg;
1246         cksum_type_t cksum_type;
1247
1248         if (server_cksum == client_cksum) {
1249                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1250                 return 0;
1251         }
1252
1253         if (oa->o_valid & OBD_MD_FLFLAGS)
1254                 cksum_type = cksum_type_unpack(oa->o_flags);
1255         else
1256                 cksum_type = OBD_CKSUM_CRC32;
1257
1258         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1259                                       cksum_type);
1260
1261         if (cksum_type != client_cksum_type)
1262                 msg = "the server did not use the checksum type specified in "
1263                       "the original request - likely a protocol problem";
1264         else if (new_cksum == server_cksum)
1265                 msg = "changed on the client after we checksummed it - "
1266                       "likely false positive due to mmap IO (bug 11742)";
1267         else if (new_cksum == client_cksum)
1268                 msg = "changed in transit before arrival at OST";
1269         else
1270                 msg = "changed in transit AND doesn't match the original - "
1271                       "likely false positive due to mmap IO (bug 11742)";
1272
1273         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1274                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1275                            "["LPU64"-"LPU64"]\n",
1276                            msg, libcfs_nid2str(peer->nid),
1277                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1278                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1279                                                         (__u64)0,
1280                            oa->o_id,
1281                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1282                            pga[0]->off,
1283                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1284         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1285                "client csum now %x\n", client_cksum, client_cksum_type,
1286                server_cksum, cksum_type, new_cksum);
1287         return 1;
1288 }
1289
1290 /* Note rc enters this function as number of bytes transferred */
1291 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1292 {
1293         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1294         const lnet_process_id_t *peer =
1295                         &req->rq_import->imp_connection->c_peer;
1296         struct client_obd *cli = aa->aa_cli;
1297         struct ost_body *body;
1298         __u32 client_cksum = 0;
1299         ENTRY;
1300
1301         if (rc < 0 && rc != -EDQUOT)
1302                 RETURN(rc);
1303
1304         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1305         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1306                                   lustre_swab_ost_body);
1307         if (body == NULL) {
1308                 CDEBUG(D_INFO, "Can't unpack body\n");
1309                 RETURN(-EPROTO);
1310         }
1311
1312         /* set/clear over quota flag for a uid/gid */
1313         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1314             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1315                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1316                              body->oa.o_gid, body->oa.o_valid,
1317                              body->oa.o_flags);
1318
1319         if (rc < 0)
1320                 RETURN(rc);
1321
1322         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1323                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1324
1325         osc_update_grant(cli, body);
1326
1327         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1328                 if (rc > 0) {
1329                         CERROR("Unexpected +ve rc %d\n", rc);
1330                         RETURN(-EPROTO);
1331                 }
1332                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1333
1334                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1335                         RETURN(-EAGAIN);
1336
1337                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1338                     check_write_checksum(&body->oa, peer, client_cksum,
1339                                          body->oa.o_cksum, aa->aa_requested_nob,
1340                                          aa->aa_page_count, aa->aa_ppga,
1341                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1342                         RETURN(-EAGAIN);
1343
1344                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1345                                      aa->aa_page_count, aa->aa_ppga);
1346                 GOTO(out, rc);
1347         }
1348
1349         /* The rest of this function executes only for OST_READs */
1350
1351         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1352         if (rc < 0)
1353                 GOTO(out, rc);
1354
1355         if (rc > aa->aa_requested_nob) {
1356                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1357                        aa->aa_requested_nob);
1358                 RETURN(-EPROTO);
1359         }
1360
1361         if (rc != req->rq_bulk->bd_nob_transferred) {
1362                 CERROR ("Unexpected rc %d (%d transferred)\n",
1363                         rc, req->rq_bulk->bd_nob_transferred);
1364                 return (-EPROTO);
1365         }
1366
1367         if (rc < aa->aa_requested_nob)
1368                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1369
1370         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1371                 static int cksum_counter;
1372                 __u32      server_cksum = body->oa.o_cksum;
1373                 char      *via;
1374                 char      *router;
1375                 cksum_type_t cksum_type;
1376
1377                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1378                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1379                 else
1380                         cksum_type = OBD_CKSUM_CRC32;
1381                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1382                                                  aa->aa_ppga, OST_READ,
1383                                                  cksum_type);
1384
1385                 if (peer->nid == req->rq_bulk->bd_sender) {
1386                         via = router = "";
1387                 } else {
1388                         via = " via ";
1389                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1390                 }
1391
1392                 if (server_cksum == ~0 && rc > 0) {
1393                         CERROR("Protocol error: server %s set the 'checksum' "
1394                                "bit, but didn't send a checksum.  Not fatal, "
1395                                "but please notify on http://bugzilla.lustre.org/\n",
1396                                libcfs_nid2str(peer->nid));
1397                 } else if (server_cksum != client_cksum) {
1398                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1399                                            "%s%s%s inum "LPU64"/"LPU64" object "
1400                                            LPU64"/"LPU64" extent "
1401                                            "["LPU64"-"LPU64"]\n",
1402                                            req->rq_import->imp_obd->obd_name,
1403                                            libcfs_nid2str(peer->nid),
1404                                            via, router,
1405                                            body->oa.o_valid & OBD_MD_FLFID ?
1406                                                 body->oa.o_fid : (__u64)0,
1407                                            body->oa.o_valid & OBD_MD_FLFID ?
1408                                                 body->oa.o_generation :(__u64)0,
1409                                            body->oa.o_id,
1410                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1411                                                 body->oa.o_gr : (__u64)0,
1412                                            aa->aa_ppga[0]->off,
1413                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1414                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1415                                                                         1);
1416                         CERROR("client %x, server %x, cksum_type %x\n",
1417                                client_cksum, server_cksum, cksum_type);
1418                         cksum_counter = 0;
1419                         aa->aa_oa->o_cksum = client_cksum;
1420                         rc = -EAGAIN;
1421                 } else {
1422                         cksum_counter++;
1423                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1424                         rc = 0;
1425                 }
1426         } else if (unlikely(client_cksum)) {
1427                 static int cksum_missed;
1428
1429                 cksum_missed++;
1430                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1431                         CERROR("Checksum %u requested from %s but not sent\n",
1432                                cksum_missed, libcfs_nid2str(peer->nid));
1433         } else {
1434                 rc = 0;
1435         }
1436 out:
1437         if (rc >= 0)
1438                 *aa->aa_oa = body->oa;
1439
1440         RETURN(rc);
1441 }
1442
1443 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1444                             struct lov_stripe_md *lsm,
1445                             obd_count page_count, struct brw_page **pga,
1446                             struct obd_capa *ocapa)
1447 {
1448         struct ptlrpc_request *req;
1449         int                    rc;
1450         cfs_waitq_t            waitq;
1451         int                    resends = 0;
1452         struct l_wait_info     lwi;
1453
1454         ENTRY;
1455
1456         cfs_waitq_init(&waitq);
1457
1458 restart_bulk:
1459         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1460                                   page_count, pga, &req, ocapa, 0);
1461         if (rc != 0)
1462                 return (rc);
1463
1464         rc = ptlrpc_queue_wait(req);
1465
1466         if (rc == -ETIMEDOUT && req->rq_resend) {
1467                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1468                 ptlrpc_req_finished(req);
1469                 goto restart_bulk;
1470         }
1471
1472         rc = osc_brw_fini_request(req, rc);
1473
1474         ptlrpc_req_finished(req);
1475         if (osc_recoverable_error(rc)) {
1476                 resends++;
1477                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1478                         CERROR("too many resend retries, returning error\n");
1479                         RETURN(-EIO);
1480                 }
1481
1482                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1483                 l_wait_event(waitq, 0, &lwi);
1484
1485                 goto restart_bulk;
1486         }
1487
1488         RETURN (rc);
1489 }
1490
1491 int osc_brw_redo_request(struct ptlrpc_request *request,
1492                          struct osc_brw_async_args *aa)
1493 {
1494         struct ptlrpc_request *new_req;
1495         struct ptlrpc_request_set *set = request->rq_set;
1496         struct osc_brw_async_args *new_aa;
1497         struct osc_async_page *oap;
1498         int rc = 0;
1499         ENTRY;
1500
1501         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1502                 CERROR("too many resend retries, returning error\n");
1503                 RETURN(-EIO);
1504         }
1505
1506         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1507
1508         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1509                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1510                                   aa->aa_cli, aa->aa_oa,
1511                                   NULL /* lsm unused by osc currently */,
1512                                   aa->aa_page_count, aa->aa_ppga,
1513                                   &new_req, aa->aa_ocapa, 0);
1514         if (rc)
1515                 RETURN(rc);
1516
1517         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1518
1519         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1520                 if (oap->oap_request != NULL) {
1521                         LASSERTF(request == oap->oap_request,
1522                                  "request %p != oap_request %p\n",
1523                                  request, oap->oap_request);
1524                         if (oap->oap_interrupted) {
1525                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1526                                 ptlrpc_req_finished(new_req);
1527                                 RETURN(-EINTR);
1528                         }
1529                 }
1530         }
1531         /* New request takes over pga and oaps from old request.
1532          * Note that copying a list_head doesn't work, need to move it... */
1533         aa->aa_resends++;
1534         new_req->rq_interpret_reply = request->rq_interpret_reply;
1535         new_req->rq_async_args = request->rq_async_args;
1536         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1537
1538         new_aa = ptlrpc_req_async_args(new_req);
1539
1540         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1541         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1542         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1543
1544         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1545                 if (oap->oap_request) {
1546                         ptlrpc_req_finished(oap->oap_request);
1547                         oap->oap_request = ptlrpc_request_addref(new_req);
1548                 }
1549         }
1550
1551         new_aa->aa_ocapa = aa->aa_ocapa;
1552         aa->aa_ocapa = NULL;
1553
1554         /* use ptlrpc_set_add_req is safe because interpret functions work
1555          * in check_set context. only one way exist with access to request
1556          * from different thread got -EINTR - this way protected with
1557          * cl_loi_list_lock */
1558         ptlrpc_set_add_req(set, new_req);
1559
1560         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1561
1562         DEBUG_REQ(D_INFO, new_req, "new request");
1563         RETURN(0);
1564 }
1565
1566 /*
1567  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1568  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1569  * fine for our small page arrays and doesn't require allocation.  its an
1570  * insertion sort that swaps elements that are strides apart, shrinking the
1571  * stride down until its '1' and the array is sorted.
1572  */
1573 static void sort_brw_pages(struct brw_page **array, int num)
1574 {
1575         int stride, i, j;
1576         struct brw_page *tmp;
1577
1578         if (num == 1)
1579                 return;
1580         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1581                 ;
1582
1583         do {
1584                 stride /= 3;
1585                 for (i = stride ; i < num ; i++) {
1586                         tmp = array[i];
1587                         j = i;
1588                         while (j >= stride && array[j - stride]->off > tmp->off) {
1589                                 array[j] = array[j - stride];
1590                                 j -= stride;
1591                         }
1592                         array[j] = tmp;
1593                 }
1594         } while (stride > 1);
1595 }
1596
1597 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1598 {
1599         int count = 1;
1600         int offset;
1601         int i = 0;
1602
1603         LASSERT (pages > 0);
1604         offset = pg[i]->off & ~CFS_PAGE_MASK;
1605
1606         for (;;) {
1607                 pages--;
1608                 if (pages == 0)         /* that's all */
1609                         return count;
1610
1611                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1612                         return count;   /* doesn't end on page boundary */
1613
1614                 i++;
1615                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1616                 if (offset != 0)        /* doesn't start on page boundary */
1617                         return count;
1618
1619                 count++;
1620         }
1621 }
1622
1623 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1624 {
1625         struct brw_page **ppga;
1626         int i;
1627
1628         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1629         if (ppga == NULL)
1630                 return NULL;
1631
1632         for (i = 0; i < count; i++)
1633                 ppga[i] = pga + i;
1634         return ppga;
1635 }
1636
1637 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1638 {
1639         LASSERT(ppga != NULL);
1640         OBD_FREE(ppga, sizeof(*ppga) * count);
1641 }
1642
1643 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1644                    obd_count page_count, struct brw_page *pga,
1645                    struct obd_trans_info *oti)
1646 {
1647         struct obdo *saved_oa = NULL;
1648         struct brw_page **ppga, **orig;
1649         struct obd_import *imp = class_exp2cliimp(exp);
1650         struct client_obd *cli = &imp->imp_obd->u.cli;
1651         int rc, page_count_orig;
1652         ENTRY;
1653
1654         if (cmd & OBD_BRW_CHECK) {
1655                 /* The caller just wants to know if there's a chance that this
1656                  * I/O can succeed */
1657
1658                 if (imp == NULL || imp->imp_invalid)
1659                         RETURN(-EIO);
1660                 RETURN(0);
1661         }
1662
1663         /* test_brw with a failed create can trip this, maybe others. */
1664         LASSERT(cli->cl_max_pages_per_rpc);
1665
1666         rc = 0;
1667
1668         orig = ppga = osc_build_ppga(pga, page_count);
1669         if (ppga == NULL)
1670                 RETURN(-ENOMEM);
1671         page_count_orig = page_count;
1672
1673         sort_brw_pages(ppga, page_count);
1674         while (page_count) {
1675                 obd_count pages_per_brw;
1676
1677                 if (page_count > cli->cl_max_pages_per_rpc)
1678                         pages_per_brw = cli->cl_max_pages_per_rpc;
1679                 else
1680                         pages_per_brw = page_count;
1681
1682                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1683
1684                 if (saved_oa != NULL) {
1685                         /* restore previously saved oa */
1686                         *oinfo->oi_oa = *saved_oa;
1687                 } else if (page_count > pages_per_brw) {
1688                         /* save a copy of oa (brw will clobber it) */
1689                         OBDO_ALLOC(saved_oa);
1690                         if (saved_oa == NULL)
1691                                 GOTO(out, rc = -ENOMEM);
1692                         *saved_oa = *oinfo->oi_oa;
1693                 }
1694
1695                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1696                                       pages_per_brw, ppga, oinfo->oi_capa);
1697
1698                 if (rc != 0)
1699                         break;
1700
1701                 page_count -= pages_per_brw;
1702                 ppga += pages_per_brw;
1703         }
1704
1705 out:
1706         osc_release_ppga(orig, page_count_orig);
1707
1708         if (saved_oa != NULL)
1709                 OBDO_FREE(saved_oa);
1710
1711         RETURN(rc);
1712 }
1713
1714 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1715  * the dirty accounting.  Writeback completes or truncate happens before
1716  * writing starts.  Must be called with the loi lock held. */
1717 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1718                            int sent)
1719 {
1720         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1721 }
1722
1723
1724 /* This maintains the lists of pending pages to read/write for a given object
1725  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1726  * to quickly find objects that are ready to send an RPC. */
1727 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1728                          int cmd)
1729 {
1730         int optimal;
1731         ENTRY;
1732
1733         if (lop->lop_num_pending == 0)
1734                 RETURN(0);
1735
1736         /* if we have an invalid import we want to drain the queued pages
1737          * by forcing them through rpcs that immediately fail and complete
1738          * the pages.  recovery relies on this to empty the queued pages
1739          * before canceling the locks and evicting down the llite pages */
1740         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1741                 RETURN(1);
1742
1743         /* stream rpcs in queue order as long as as there is an urgent page
1744          * queued.  this is our cheap solution for good batching in the case
1745          * where writepage marks some random page in the middle of the file
1746          * as urgent because of, say, memory pressure */
1747         if (!list_empty(&lop->lop_urgent)) {
1748                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1749                 RETURN(1);
1750         }
1751         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1752         optimal = cli->cl_max_pages_per_rpc;
1753         if (cmd & OBD_BRW_WRITE) {
1754                 /* trigger a write rpc stream as long as there are dirtiers
1755                  * waiting for space.  as they're waiting, they're not going to
1756                  * create more pages to coallesce with what's waiting.. */
1757                 if (!list_empty(&cli->cl_cache_waiters)) {
1758                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1759                         RETURN(1);
1760                 }
1761                 /* +16 to avoid triggering rpcs that would want to include pages
1762                  * that are being queued but which can't be made ready until
1763                  * the queuer finishes with the page. this is a wart for
1764                  * llite::commit_write() */
1765                 optimal += 16;
1766         }
1767         if (lop->lop_num_pending >= optimal)
1768                 RETURN(1);
1769
1770         RETURN(0);
1771 }
1772
1773 static void on_list(struct list_head *item, struct list_head *list,
1774                     int should_be_on)
1775 {
1776         if (list_empty(item) && should_be_on)
1777                 list_add_tail(item, list);
1778         else if (!list_empty(item) && !should_be_on)
1779                 list_del_init(item);
1780 }
1781
1782 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1783  * can find pages to build into rpcs quickly */
1784 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1785 {
1786         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1787                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1788                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1789
1790         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1791                 loi->loi_write_lop.lop_num_pending);
1792
1793         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1794                 loi->loi_read_lop.lop_num_pending);
1795 }
1796
1797 static void lop_update_pending(struct client_obd *cli,
1798                                struct loi_oap_pages *lop, int cmd, int delta)
1799 {
1800         lop->lop_num_pending += delta;
1801         if (cmd & OBD_BRW_WRITE)
1802                 cli->cl_pending_w_pages += delta;
1803         else
1804                 cli->cl_pending_r_pages += delta;
1805 }
1806
1807 /**
1808  * this is called when a sync waiter receives an interruption.  Its job is to
1809  * get the caller woken as soon as possible.  If its page hasn't been put in an
1810  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1811  * desiring interruption which will forcefully complete the rpc once the rpc
1812  * has timed out.
1813  */
1814 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1815 {
1816         struct loi_oap_pages *lop;
1817         struct lov_oinfo *loi;
1818         int rc = -EBUSY;
1819         ENTRY;
1820
1821         LASSERT(!oap->oap_interrupted);
1822         oap->oap_interrupted = 1;
1823
1824         /* ok, it's been put in an rpc. only one oap gets a request reference */
1825         if (oap->oap_request != NULL) {
1826                 ptlrpc_mark_interrupted(oap->oap_request);
1827                 ptlrpcd_wake(oap->oap_request);
1828                 ptlrpc_req_finished(oap->oap_request);
1829                 oap->oap_request = NULL;
1830         }
1831
1832         /*
1833          * page completion may be called only if ->cpo_prep() method was
1834          * executed by osc_io_submit(), that also adds page the to pending list
1835          */
1836         if (!list_empty(&oap->oap_pending_item)) {
1837                 list_del_init(&oap->oap_pending_item);
1838                 list_del_init(&oap->oap_urgent_item);
1839
1840                 loi = oap->oap_loi;
1841                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1842                         &loi->loi_write_lop : &loi->loi_read_lop;
1843                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1844                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1845                 rc = oap->oap_caller_ops->ap_completion(env,
1846                                           oap->oap_caller_data,
1847                                           oap->oap_cmd, NULL, -EINTR);
1848         }
1849
1850         RETURN(rc);
1851 }
1852
1853 /* this is trying to propogate async writeback errors back up to the
1854  * application.  As an async write fails we record the error code for later if
1855  * the app does an fsync.  As long as errors persist we force future rpcs to be
1856  * sync so that the app can get a sync error and break the cycle of queueing
1857  * pages for which writeback will fail. */
1858 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1859                            int rc)
1860 {
1861         if (rc) {
1862                 if (!ar->ar_rc)
1863                         ar->ar_rc = rc;
1864
1865                 ar->ar_force_sync = 1;
1866                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1867                 return;
1868
1869         }
1870
1871         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1872                 ar->ar_force_sync = 0;
1873 }
1874
1875 void osc_oap_to_pending(struct osc_async_page *oap)
1876 {
1877         struct loi_oap_pages *lop;
1878
1879         if (oap->oap_cmd & OBD_BRW_WRITE)
1880                 lop = &oap->oap_loi->loi_write_lop;
1881         else
1882                 lop = &oap->oap_loi->loi_read_lop;
1883
1884         if (oap->oap_async_flags & ASYNC_URGENT)
1885                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1886         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1887         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1888 }
1889
1890 /* this must be called holding the loi list lock to give coverage to exit_cache,
1891  * async_flag maintenance, and oap_request */
1892 static void osc_ap_completion(const struct lu_env *env,
1893                               struct client_obd *cli, struct obdo *oa,
1894                               struct osc_async_page *oap, int sent, int rc)
1895 {
1896         __u64 xid = 0;
1897
1898         ENTRY;
1899         if (oap->oap_request != NULL) {
1900                 xid = ptlrpc_req_xid(oap->oap_request);
1901                 ptlrpc_req_finished(oap->oap_request);
1902                 oap->oap_request = NULL;
1903         }
1904
1905         oap->oap_async_flags = 0;
1906         oap->oap_interrupted = 0;
1907
1908         if (oap->oap_cmd & OBD_BRW_WRITE) {
1909                 osc_process_ar(&cli->cl_ar, xid, rc);
1910                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1911         }
1912
1913         if (rc == 0 && oa != NULL) {
1914                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1915                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1916                 if (oa->o_valid & OBD_MD_FLMTIME)
1917                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1918                 if (oa->o_valid & OBD_MD_FLATIME)
1919                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1920                 if (oa->o_valid & OBD_MD_FLCTIME)
1921                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1922         }
1923
1924         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1925                                                 oap->oap_cmd, oa, rc);
1926
1927         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1928          * I/O on the page could start, but OSC calls it under lock
1929          * and thus we can add oap back to pending safely */
1930         if (rc)
1931                 /* upper layer wants to leave the page on pending queue */
1932                 osc_oap_to_pending(oap);
1933         else
1934                 osc_exit_cache(cli, oap, sent);
1935         EXIT;
1936 }
1937
1938 static int brw_interpret(const struct lu_env *env,
1939                          struct ptlrpc_request *req, void *data, int rc)
1940 {
1941         struct osc_brw_async_args *aa = data;
1942         struct client_obd *cli;
1943         int async;
1944         ENTRY;
1945
1946         rc = osc_brw_fini_request(req, rc);
1947         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1948         if (osc_recoverable_error(rc)) {
1949                 rc = osc_brw_redo_request(req, aa);
1950                 if (rc == 0)
1951                         RETURN(0);
1952         }
1953
1954         if (aa->aa_ocapa) {
1955                 capa_put(aa->aa_ocapa);
1956                 aa->aa_ocapa = NULL;
1957         }
1958
1959         cli = aa->aa_cli;
1960
1961         client_obd_list_lock(&cli->cl_loi_list_lock);
1962
1963         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1964          * is called so we know whether to go to sync BRWs or wait for more
1965          * RPCs to complete */
1966         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1967                 cli->cl_w_in_flight--;
1968         else
1969                 cli->cl_r_in_flight--;
1970
1971         async = list_empty(&aa->aa_oaps);
1972         if (!async) { /* from osc_send_oap_rpc() */
1973                 struct osc_async_page *oap, *tmp;
1974                 /* the caller may re-use the oap after the completion call so
1975                  * we need to clean it up a little */
1976                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1977                         list_del_init(&oap->oap_rpc_item);
1978                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1979                 }
1980                 OBDO_FREE(aa->aa_oa);
1981         } else { /* from async_internal() */
1982                 int i;
1983                 for (i = 0; i < aa->aa_page_count; i++)
1984                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1985         }
1986         osc_wake_cache_waiters(cli);
1987         osc_check_rpcs(env, cli);
1988         client_obd_list_unlock(&cli->cl_loi_list_lock);
1989         if (!async)
1990                 cl_req_completion(env, aa->aa_clerq, rc);
1991         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1992         RETURN(rc);
1993 }
1994
1995 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1996                                             struct client_obd *cli,
1997                                             struct list_head *rpc_list,
1998                                             int page_count, int cmd)
1999 {
2000         struct ptlrpc_request *req;
2001         struct brw_page **pga = NULL;
2002         struct osc_brw_async_args *aa;
2003         struct obdo *oa = NULL;
2004         const struct obd_async_page_ops *ops = NULL;
2005         void *caller_data = NULL;
2006         struct osc_async_page *oap;
2007         struct osc_async_page *tmp;
2008         struct ost_body *body;
2009         struct cl_req *clerq = NULL;
2010         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2011         struct ldlm_lock *lock = NULL;
2012         struct cl_req_attr crattr;
2013         int i, rc;
2014
2015         ENTRY;
2016         LASSERT(!list_empty(rpc_list));
2017
2018         memset(&crattr, 0, sizeof crattr);
2019         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2020         if (pga == NULL)
2021                 GOTO(out, req = ERR_PTR(-ENOMEM));
2022
2023         OBDO_ALLOC(oa);
2024         if (oa == NULL)
2025                 GOTO(out, req = ERR_PTR(-ENOMEM));
2026
2027         i = 0;
2028         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2029                 struct cl_page *page = osc_oap2cl_page(oap);
2030                 if (ops == NULL) {
2031                         ops = oap->oap_caller_ops;
2032                         caller_data = oap->oap_caller_data;
2033
2034                         clerq = cl_req_alloc(env, page, crt,
2035                                              1 /* only 1-object rpcs for
2036                                                 * now */);
2037                         if (IS_ERR(clerq))
2038                                 GOTO(out, req = (void *)clerq);
2039                         lock = oap->oap_ldlm_lock;
2040                 }
2041                 pga[i] = &oap->oap_brw_page;
2042                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2043                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2044                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2045                 i++;
2046                 cl_req_page_add(env, clerq, page);
2047         }
2048
2049         /* always get the data for the obdo for the rpc */
2050         LASSERT(ops != NULL);
2051         crattr.cra_oa = oa;
2052         crattr.cra_capa = NULL;
2053         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2054         if (lock) {
2055                 oa->o_handle = lock->l_remote_handle;
2056                 oa->o_valid |= OBD_MD_FLHANDLE;
2057         }
2058
2059         rc = cl_req_prep(env, clerq);
2060         if (rc != 0) {
2061                 CERROR("cl_req_prep failed: %d\n", rc);
2062                 GOTO(out, req = ERR_PTR(rc));
2063         }
2064
2065         sort_brw_pages(pga, page_count);
2066         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2067                                   pga, &req, crattr.cra_capa, 1);
2068         if (rc != 0) {
2069                 CERROR("prep_req failed: %d\n", rc);
2070                 GOTO(out, req = ERR_PTR(rc));
2071         }
2072
2073         /* Need to update the timestamps after the request is built in case
2074          * we race with setattr (locally or in queue at OST).  If OST gets
2075          * later setattr before earlier BRW (as determined by the request xid),
2076          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2077          * way to do this in a single call.  bug 10150 */
2078         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2079         cl_req_attr_set(env, clerq, &crattr,
2080                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2081
2082         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2083         aa = ptlrpc_req_async_args(req);
2084         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2085         list_splice(rpc_list, &aa->aa_oaps);
2086         CFS_INIT_LIST_HEAD(rpc_list);
2087         aa->aa_clerq = clerq;
2088 out:
2089         capa_put(crattr.cra_capa);
2090         if (IS_ERR(req)) {
2091                 if (oa)
2092                         OBDO_FREE(oa);
2093                 if (pga)
2094                         OBD_FREE(pga, sizeof(*pga) * page_count);
2095                 /* this should happen rarely and is pretty bad, it makes the
2096                  * pending list not follow the dirty order */
2097                 client_obd_list_lock(&cli->cl_loi_list_lock);
2098                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2099                         list_del_init(&oap->oap_rpc_item);
2100
2101                         /* queued sync pages can be torn down while the pages
2102                          * were between the pending list and the rpc */
2103                         if (oap->oap_interrupted) {
2104                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2105                                 osc_ap_completion(env, cli, NULL, oap, 0,
2106                                                   oap->oap_count);
2107                                 continue;
2108                         }
2109                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2110                 }
2111                 if (clerq && !IS_ERR(clerq))
2112                         cl_req_completion(env, clerq, PTR_ERR(req));
2113         }
2114         RETURN(req);
2115 }
2116
2117 /**
2118  * prepare pages for ASYNC io and put pages in send queue.
2119  *
2120  * \param cli -
2121  * \param loi -
2122  * \param cmd - OBD_BRW_* macroses
2123  * \param lop - pending pages
2124  *
2125  * \return zero if pages successfully add to send queue.
2126  * \return not zere if error occurring.
2127  */
2128 static int
2129 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2130                  struct lov_oinfo *loi,
2131                  int cmd, struct loi_oap_pages *lop)
2132 {
2133         struct ptlrpc_request *req;
2134         obd_count page_count = 0;
2135         struct osc_async_page *oap = NULL, *tmp;
2136         struct osc_brw_async_args *aa;
2137         const struct obd_async_page_ops *ops;
2138         CFS_LIST_HEAD(rpc_list);
2139         unsigned int ending_offset;
2140         unsigned  starting_offset = 0;
2141         int srvlock = 0;
2142         struct cl_object *clob = NULL;
2143         ENTRY;
2144
2145         /* first we find the pages we're allowed to work with */
2146         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2147                                  oap_pending_item) {
2148                 ops = oap->oap_caller_ops;
2149
2150                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2151                          "magic 0x%x\n", oap, oap->oap_magic);
2152
2153                 if (clob == NULL) {
2154                         /* pin object in memory, so that completion call-backs
2155                          * can be safely called under client_obd_list lock. */
2156                         clob = osc_oap2cl_page(oap)->cp_obj;
2157                         cl_object_get(clob);
2158                 }
2159
2160                 if (page_count != 0 &&
2161                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2162                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2163                                " oap %p, page %p, srvlock %u\n",
2164                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2165                         break;
2166                 }
2167                 /* in llite being 'ready' equates to the page being locked
2168                  * until completion unlocks it.  commit_write submits a page
2169                  * as not ready because its unlock will happen unconditionally
2170                  * as the call returns.  if we race with commit_write giving
2171                  * us that page we dont' want to create a hole in the page
2172                  * stream, so we stop and leave the rpc to be fired by
2173                  * another dirtier or kupdated interval (the not ready page
2174                  * will still be on the dirty list).  we could call in
2175                  * at the end of ll_file_write to process the queue again. */
2176                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2177                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2178                                                     cmd);
2179                         if (rc < 0)
2180                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2181                                                 "instead of ready\n", oap,
2182                                                 oap->oap_page, rc);
2183                         switch (rc) {
2184                         case -EAGAIN:
2185                                 /* llite is telling us that the page is still
2186                                  * in commit_write and that we should try
2187                                  * and put it in an rpc again later.  we
2188                                  * break out of the loop so we don't create
2189                                  * a hole in the sequence of pages in the rpc
2190                                  * stream.*/
2191                                 oap = NULL;
2192                                 break;
2193                         case -EINTR:
2194                                 /* the io isn't needed.. tell the checks
2195                                  * below to complete the rpc with EINTR */
2196                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2197                                 oap->oap_count = -EINTR;
2198                                 break;
2199                         case 0:
2200                                 oap->oap_async_flags |= ASYNC_READY;
2201                                 break;
2202                         default:
2203                                 LASSERTF(0, "oap %p page %p returned %d "
2204                                             "from make_ready\n", oap,
2205                                             oap->oap_page, rc);
2206                                 break;
2207                         }
2208                 }
2209                 if (oap == NULL)
2210                         break;
2211                 /*
2212                  * Page submitted for IO has to be locked. Either by
2213                  * ->ap_make_ready() or by higher layers.
2214                  */
2215 #if defined(__KERNEL__) && defined(__linux__)
2216                 {
2217                         struct cl_page *page;
2218
2219                         page = osc_oap2cl_page(oap);
2220
2221                         if (page->cp_type == CPT_CACHEABLE &&
2222                             !(PageLocked(oap->oap_page) &&
2223                               (CheckWriteback(oap->oap_page, cmd)))) {
2224                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2225                                        oap->oap_page,
2226                                        (long)oap->oap_page->flags,
2227                                        oap->oap_async_flags);
2228                                 LBUG();
2229                         }
2230                 }
2231 #endif
2232                 /* If there is a gap at the start of this page, it can't merge
2233                  * with any previous page, so we'll hand the network a
2234                  * "fragmented" page array that it can't transfer in 1 RDMA */
2235                 if (page_count != 0 && oap->oap_page_off != 0)
2236                         break;
2237
2238                 /* take the page out of our book-keeping */
2239                 list_del_init(&oap->oap_pending_item);
2240                 lop_update_pending(cli, lop, cmd, -1);
2241                 list_del_init(&oap->oap_urgent_item);
2242
2243                 if (page_count == 0)
2244                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2245                                           (PTLRPC_MAX_BRW_SIZE - 1);
2246
2247                 /* ask the caller for the size of the io as the rpc leaves. */
2248                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2249                         oap->oap_count =
2250                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2251                                                       cmd);
2252                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2253                 }
2254                 if (oap->oap_count <= 0) {
2255                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2256                                oap->oap_count);
2257                         osc_ap_completion(env, cli, NULL,
2258                                           oap, 0, oap->oap_count);
2259                         continue;
2260                 }
2261
2262                 /* now put the page back in our accounting */
2263                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2264                 if (page_count == 0)
2265                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2266                 if (++page_count >= cli->cl_max_pages_per_rpc)
2267                         break;
2268
2269                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2270                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2271                  * have the same alignment as the initial writes that allocated
2272                  * extents on the server. */
2273                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2274                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2275                 if (ending_offset == 0)
2276                         break;
2277
2278                 /* If there is a gap at the end of this page, it can't merge
2279                  * with any subsequent pages, so we'll hand the network a
2280                  * "fragmented" page array that it can't transfer in 1 RDMA */
2281                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2282                         break;
2283         }
2284
2285         osc_wake_cache_waiters(cli);
2286
2287         loi_list_maint(cli, loi);
2288
2289         client_obd_list_unlock(&cli->cl_loi_list_lock);
2290
2291         if (clob != NULL)
2292                 cl_object_put(env, clob);
2293
2294         if (page_count == 0) {
2295                 client_obd_list_lock(&cli->cl_loi_list_lock);
2296                 RETURN(0);
2297         }
2298
2299         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2300         if (IS_ERR(req)) {
2301                 LASSERT(list_empty(&rpc_list));
2302                 loi_list_maint(cli, loi);
2303                 RETURN(PTR_ERR(req));
2304         }
2305
2306         aa = ptlrpc_req_async_args(req);
2307
2308         if (cmd == OBD_BRW_READ) {
2309                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2310                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2311                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2312                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2313         } else {
2314                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2315                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2316                                  cli->cl_w_in_flight);
2317                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2318                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2319         }
2320         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2321
2322         client_obd_list_lock(&cli->cl_loi_list_lock);
2323
2324         if (cmd == OBD_BRW_READ)
2325                 cli->cl_r_in_flight++;
2326         else
2327                 cli->cl_w_in_flight++;
2328
2329         /* queued sync pages can be torn down while the pages
2330          * were between the pending list and the rpc */
2331         tmp = NULL;
2332         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2333                 /* only one oap gets a request reference */
2334                 if (tmp == NULL)
2335                         tmp = oap;
2336                 if (oap->oap_interrupted && !req->rq_intr) {
2337                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2338                                oap, req);
2339                         ptlrpc_mark_interrupted(req);
2340                 }
2341         }
2342         if (tmp != NULL)
2343                 tmp->oap_request = ptlrpc_request_addref(req);
2344
2345         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2346                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2347
2348         req->rq_interpret_reply = brw_interpret;
2349         ptlrpcd_add_req(req, PSCOPE_BRW);
2350         RETURN(1);
2351 }
2352
2353 #define LOI_DEBUG(LOI, STR, args...)                                     \
2354         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2355                !list_empty(&(LOI)->loi_cli_item),                        \
2356                (LOI)->loi_write_lop.lop_num_pending,                     \
2357                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2358                (LOI)->loi_read_lop.lop_num_pending,                      \
2359                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2360                args)                                                     \
2361
2362 /* This is called by osc_check_rpcs() to find which objects have pages that
2363  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2364 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2365 {
2366         ENTRY;
2367         /* first return all objects which we already know to have
2368          * pages ready to be stuffed into rpcs */
2369         if (!list_empty(&cli->cl_loi_ready_list))
2370                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2371                                   struct lov_oinfo, loi_cli_item));
2372
2373         /* then if we have cache waiters, return all objects with queued
2374          * writes.  This is especially important when many small files
2375          * have filled up the cache and not been fired into rpcs because
2376          * they don't pass the nr_pending/object threshhold */
2377         if (!list_empty(&cli->cl_cache_waiters) &&
2378             !list_empty(&cli->cl_loi_write_list))
2379                 RETURN(list_entry(cli->cl_loi_write_list.next,
2380                                   struct lov_oinfo, loi_write_item));
2381
2382         /* then return all queued objects when we have an invalid import
2383          * so that they get flushed */
2384         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2385                 if (!list_empty(&cli->cl_loi_write_list))
2386                         RETURN(list_entry(cli->cl_loi_write_list.next,
2387                                           struct lov_oinfo, loi_write_item));
2388                 if (!list_empty(&cli->cl_loi_read_list))
2389                         RETURN(list_entry(cli->cl_loi_read_list.next,
2390                                           struct lov_oinfo, loi_read_item));
2391         }
2392         RETURN(NULL);
2393 }
2394
2395 /* called with the loi list lock held */
2396 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2397 {
2398         struct lov_oinfo *loi;
2399         int rc = 0, race_counter = 0;
2400         ENTRY;
2401
2402         while ((loi = osc_next_loi(cli)) != NULL) {
2403                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2404
2405                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2406                         break;
2407
2408                 /* attempt some read/write balancing by alternating between
2409                  * reads and writes in an object.  The makes_rpc checks here
2410                  * would be redundant if we were getting read/write work items
2411                  * instead of objects.  we don't want send_oap_rpc to drain a
2412                  * partial read pending queue when we're given this object to
2413                  * do io on writes while there are cache waiters */
2414                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2415                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2416                                               &loi->loi_write_lop);
2417                         if (rc < 0)
2418                                 break;
2419                         if (rc > 0)
2420                                 race_counter = 0;
2421                         else
2422                                 race_counter++;
2423                 }
2424                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2425                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2426                                               &loi->loi_read_lop);
2427                         if (rc < 0)
2428                                 break;
2429                         if (rc > 0)
2430                                 race_counter = 0;
2431                         else
2432                                 race_counter++;
2433                 }
2434
2435                 /* attempt some inter-object balancing by issueing rpcs
2436                  * for each object in turn */
2437                 if (!list_empty(&loi->loi_cli_item))
2438                         list_del_init(&loi->loi_cli_item);
2439                 if (!list_empty(&loi->loi_write_item))
2440                         list_del_init(&loi->loi_write_item);
2441                 if (!list_empty(&loi->loi_read_item))
2442                         list_del_init(&loi->loi_read_item);
2443
2444                 loi_list_maint(cli, loi);
2445
2446                 /* send_oap_rpc fails with 0 when make_ready tells it to
2447                  * back off.  llite's make_ready does this when it tries
2448                  * to lock a page queued for write that is already locked.
2449                  * we want to try sending rpcs from many objects, but we
2450                  * don't want to spin failing with 0.  */
2451                 if (race_counter == 10)
2452                         break;
2453         }
2454         EXIT;
2455 }
2456
2457 /* we're trying to queue a page in the osc so we're subject to the
2458  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2459  * If the osc's queued pages are already at that limit, then we want to sleep
2460  * until there is space in the osc's queue for us.  We also may be waiting for
2461  * write credits from the OST if there are RPCs in flight that may return some
2462  * before we fall back to sync writes.
2463  *
2464  * We need this know our allocation was granted in the presence of signals */
2465 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2466 {
2467         int rc;
2468         ENTRY;
2469         client_obd_list_lock(&cli->cl_loi_list_lock);
2470         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2471         client_obd_list_unlock(&cli->cl_loi_list_lock);
2472         RETURN(rc);
2473 };
2474
2475 /**
2476  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2477  * is available.
2478  */
2479 int osc_enter_cache_try(const struct lu_env *env,
2480                         struct client_obd *cli, struct lov_oinfo *loi,
2481                         struct osc_async_page *oap, int transient)
2482 {
2483         int has_grant;
2484
2485         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2486         if (has_grant) {
2487                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2488                 if (transient) {
2489                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2490                         atomic_inc(&obd_dirty_transit_pages);
2491                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2492                 }
2493         }
2494         return has_grant;
2495 }
2496
2497 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2498  * grant or cache space. */
2499 static int osc_enter_cache(const struct lu_env *env,
2500                            struct client_obd *cli, struct lov_oinfo *loi,
2501                            struct osc_async_page *oap)
2502 {
2503         struct osc_cache_waiter ocw;
2504         struct l_wait_info lwi = { 0 };
2505
2506         ENTRY;
2507
2508         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2509                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2510                cli->cl_dirty_max, obd_max_dirty_pages,
2511                cli->cl_lost_grant, cli->cl_avail_grant);
2512
2513         /* force the caller to try sync io.  this can jump the list
2514          * of queued writes and create a discontiguous rpc stream */
2515         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2516             loi->loi_ar.ar_force_sync)
2517                 RETURN(-EDQUOT);
2518
2519         /* Hopefully normal case - cache space and write credits available */
2520         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2521             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2522             osc_enter_cache_try(env, cli, loi, oap, 0))
2523                 RETURN(0);
2524
2525         /* Make sure that there are write rpcs in flight to wait for.  This
2526          * is a little silly as this object may not have any pending but
2527          * other objects sure might. */
2528         if (cli->cl_w_in_flight) {
2529                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2530                 cfs_waitq_init(&ocw.ocw_waitq);
2531                 ocw.ocw_oap = oap;
2532                 ocw.ocw_rc = 0;
2533
2534                 loi_list_maint(cli, loi);
2535                 osc_check_rpcs(env, cli);
2536                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2537
2538                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2539                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2540
2541                 client_obd_list_lock(&cli->cl_loi_list_lock);
2542                 if (!list_empty(&ocw.ocw_entry)) {
2543                         list_del(&ocw.ocw_entry);
2544                         RETURN(-EINTR);
2545                 }
2546                 RETURN(ocw.ocw_rc);
2547         }
2548
2549         RETURN(-EDQUOT);
2550 }
2551
2552
2553 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2554                         struct lov_oinfo *loi, cfs_page_t *page,
2555                         obd_off offset, const struct obd_async_page_ops *ops,
2556                         void *data, void **res, int nocache,
2557                         struct lustre_handle *lockh)
2558 {
2559         struct osc_async_page *oap;
2560
2561         ENTRY;
2562
2563         if (!page)
2564                 return size_round(sizeof(*oap));
2565
2566         oap = *res;
2567         oap->oap_magic = OAP_MAGIC;
2568         oap->oap_cli = &exp->exp_obd->u.cli;
2569         oap->oap_loi = loi;
2570
2571         oap->oap_caller_ops = ops;
2572         oap->oap_caller_data = data;
2573
2574         oap->oap_page = page;
2575         oap->oap_obj_off = offset;
2576         if (!client_is_remote(exp) &&
2577             cfs_capable(CFS_CAP_SYS_RESOURCE))
2578                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2579
2580         LASSERT(!(offset & ~CFS_PAGE_MASK));
2581
2582         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2583         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2584         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2585         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2586
2587         spin_lock_init(&oap->oap_lock);
2588         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2589         RETURN(0);
2590 }
2591
2592 struct osc_async_page *oap_from_cookie(void *cookie)
2593 {
2594         struct osc_async_page *oap = cookie;
2595         if (oap->oap_magic != OAP_MAGIC)
2596                 return ERR_PTR(-EINVAL);
2597         return oap;
2598 };
2599
2600 int osc_queue_async_io(const struct lu_env *env,
2601                        struct obd_export *exp, struct lov_stripe_md *lsm,
2602                        struct lov_oinfo *loi, void *cookie,
2603                        int cmd, obd_off off, int count,
2604                        obd_flag brw_flags, enum async_flags async_flags)
2605 {
2606         struct client_obd *cli = &exp->exp_obd->u.cli;
2607         struct osc_async_page *oap;
2608         int rc = 0;
2609         ENTRY;
2610
2611         oap = oap_from_cookie(cookie);
2612         if (IS_ERR(oap))
2613                 RETURN(PTR_ERR(oap));
2614
2615         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2616                 RETURN(-EIO);
2617
2618         if (!list_empty(&oap->oap_pending_item) ||
2619             !list_empty(&oap->oap_urgent_item) ||
2620             !list_empty(&oap->oap_rpc_item))
2621                 RETURN(-EBUSY);
2622
2623         /* check if the file's owner/group is over quota */
2624         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2625                 struct cl_object *obj;
2626                 struct cl_attr    attr; /* XXX put attr into thread info */
2627
2628                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2629
2630                 cl_object_attr_lock(obj);
2631                 rc = cl_object_attr_get(env, obj, &attr);
2632                 cl_object_attr_unlock(obj);
2633
2634                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2635                                             attr.cat_gid) == NO_QUOTA)
2636                         rc = -EDQUOT;
2637                 if (rc)
2638                         RETURN(rc);
2639         }
2640
2641         if (loi == NULL)
2642                 loi = lsm->lsm_oinfo[0];
2643
2644         client_obd_list_lock(&cli->cl_loi_list_lock);
2645
2646         LASSERT(off + count <= CFS_PAGE_SIZE);
2647         oap->oap_cmd = cmd;
2648         oap->oap_page_off = off;
2649         oap->oap_count = count;
2650         oap->oap_brw_flags = brw_flags;
2651         oap->oap_async_flags = async_flags;
2652
2653         if (cmd & OBD_BRW_WRITE) {
2654                 rc = osc_enter_cache(env, cli, loi, oap);
2655                 if (rc) {
2656                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2657                         RETURN(rc);
2658                 }
2659         }
2660
2661         osc_oap_to_pending(oap);
2662         loi_list_maint(cli, loi);
2663
2664         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2665                   cmd);
2666
2667         osc_check_rpcs(env, cli);
2668         client_obd_list_unlock(&cli->cl_loi_list_lock);
2669
2670         RETURN(0);
2671 }
2672
2673 /* aka (~was & now & flag), but this is more clear :) */
2674 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2675
2676 int osc_set_async_flags_base(struct client_obd *cli,
2677                              struct lov_oinfo *loi, struct osc_async_page *oap,
2678                              obd_flag async_flags)
2679 {
2680         struct loi_oap_pages *lop;
2681         ENTRY;
2682
2683         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2684                 RETURN(-EIO);
2685
2686         if (oap->oap_cmd & OBD_BRW_WRITE) {
2687                 lop = &loi->loi_write_lop;
2688         } else {
2689                 lop = &loi->loi_read_lop;
2690         }
2691
2692         if (list_empty(&oap->oap_pending_item))
2693                 RETURN(-EINVAL);
2694
2695         if ((oap->oap_async_flags & async_flags) == async_flags)
2696                 RETURN(0);
2697
2698         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2699                 oap->oap_async_flags |= ASYNC_READY;
2700
2701         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2702                 if (list_empty(&oap->oap_rpc_item)) {
2703                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2704                         loi_list_maint(cli, loi);
2705                 }
2706         }
2707
2708         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2709                         oap->oap_async_flags);
2710         RETURN(0);
2711 }
2712
2713 int osc_teardown_async_page(struct obd_export *exp,
2714                             struct lov_stripe_md *lsm,
2715                             struct lov_oinfo *loi, void *cookie)
2716 {
2717         struct client_obd *cli = &exp->exp_obd->u.cli;
2718         struct loi_oap_pages *lop;
2719         struct osc_async_page *oap;
2720         int rc = 0;
2721         ENTRY;
2722
2723         oap = oap_from_cookie(cookie);
2724         if (IS_ERR(oap))
2725                 RETURN(PTR_ERR(oap));
2726
2727         if (loi == NULL)
2728                 loi = lsm->lsm_oinfo[0];
2729
2730         if (oap->oap_cmd & OBD_BRW_WRITE) {
2731                 lop = &loi->loi_write_lop;
2732         } else {
2733                 lop = &loi->loi_read_lop;
2734         }
2735
2736         client_obd_list_lock(&cli->cl_loi_list_lock);
2737
2738         if (!list_empty(&oap->oap_rpc_item))
2739                 GOTO(out, rc = -EBUSY);
2740
2741         osc_exit_cache(cli, oap, 0);
2742         osc_wake_cache_waiters(cli);
2743
2744         if (!list_empty(&oap->oap_urgent_item)) {
2745                 list_del_init(&oap->oap_urgent_item);
2746                 oap->oap_async_flags &= ~ASYNC_URGENT;
2747         }
2748         if (!list_empty(&oap->oap_pending_item)) {
2749                 list_del_init(&oap->oap_pending_item);
2750                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2751         }
2752         loi_list_maint(cli, loi);
2753         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2754 out:
2755         client_obd_list_unlock(&cli->cl_loi_list_lock);
2756         RETURN(rc);
2757 }
2758
2759 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2760                                          struct ldlm_enqueue_info *einfo,
2761                                          int flags)
2762 {
2763         void *data = einfo->ei_cbdata;
2764
2765         LASSERT(lock != NULL);
2766         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2767         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2768         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2769         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2770
2771         lock_res_and_lock(lock);
2772         spin_lock(&osc_ast_guard);
2773         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2774         lock->l_ast_data = data;
2775         spin_unlock(&osc_ast_guard);
2776         unlock_res_and_lock(lock);
2777 }
2778
2779 static void osc_set_data_with_check(struct lustre_handle *lockh,
2780                                     struct ldlm_enqueue_info *einfo,
2781                                     int flags)
2782 {
2783         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2784
2785         if (lock != NULL) {
2786                 osc_set_lock_data_with_check(lock, einfo, flags);
2787                 LDLM_LOCK_PUT(lock);
2788         } else
2789                 CERROR("lockh %p, data %p - client evicted?\n",
2790                        lockh, einfo->ei_cbdata);
2791 }
2792
2793 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2794                              ldlm_iterator_t replace, void *data)
2795 {
2796         struct ldlm_res_id res_id;
2797         struct obd_device *obd = class_exp2obd(exp);
2798
2799         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2800         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2801         return 0;
2802 }
2803
2804 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2805                             obd_enqueue_update_f upcall, void *cookie,
2806                             int *flags, int rc)
2807 {
2808         int intent = *flags & LDLM_FL_HAS_INTENT;
2809         ENTRY;
2810
2811         if (intent) {
2812                 /* The request was created before ldlm_cli_enqueue call. */
2813                 if (rc == ELDLM_LOCK_ABORTED) {
2814                         struct ldlm_reply *rep;
2815                         rep = req_capsule_server_get(&req->rq_pill,
2816                                                      &RMF_DLM_REP);
2817
2818                         LASSERT(rep != NULL);
2819                         if (rep->lock_policy_res1)
2820                                 rc = rep->lock_policy_res1;
2821                 }
2822         }
2823
2824         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2825                 *flags |= LDLM_FL_LVB_READY;
2826                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2827                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2828         }
2829
2830         /* Call the update callback. */
2831         rc = (*upcall)(cookie, rc);
2832         RETURN(rc);
2833 }
2834
2835 static int osc_enqueue_interpret(const struct lu_env *env,
2836                                  struct ptlrpc_request *req,
2837                                  struct osc_enqueue_args *aa, int rc)
2838 {
2839         struct ldlm_lock *lock;
2840         struct lustre_handle handle;
2841         __u32 mode;
2842
2843         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2844          * might be freed anytime after lock upcall has been called. */
2845         lustre_handle_copy(&handle, aa->oa_lockh);
2846         mode = aa->oa_ei->ei_mode;
2847
2848         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2849          * be valid. */
2850         lock = ldlm_handle2lock(&handle);
2851
2852         /* Take an additional reference so that a blocking AST that
2853          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2854          * to arrive after an upcall has been executed by
2855          * osc_enqueue_fini(). */
2856         ldlm_lock_addref(&handle, mode);
2857
2858         /* Complete obtaining the lock procedure. */
2859         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2860                                    mode, aa->oa_flags, aa->oa_lvb,
2861                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2862                                    &handle, rc);
2863         /* Complete osc stuff. */
2864         rc = osc_enqueue_fini(req, aa->oa_lvb,
2865                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2866         /* Release the lock for async request. */
2867         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2868                 /*
2869                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2870                  * not already released by
2871                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2872                  */
2873                 ldlm_lock_decref(&handle, mode);
2874
2875         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2876                  aa->oa_lockh, req, aa);
2877         ldlm_lock_decref(&handle, mode);
2878         LDLM_LOCK_PUT(lock);
2879         return rc;
2880 }
2881
2882 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2883                         struct lov_oinfo *loi, int flags,
2884                         struct ost_lvb *lvb, __u32 mode, int rc)
2885 {
2886         if (rc == ELDLM_OK) {
2887                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2888                 __u64 tmp;
2889
2890                 LASSERT(lock != NULL);
2891                 loi->loi_lvb = *lvb;
2892                 tmp = loi->loi_lvb.lvb_size;
2893                 /* Extend KMS up to the end of this lock and no further
2894                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2895                 if (tmp > lock->l_policy_data.l_extent.end)
2896                         tmp = lock->l_policy_data.l_extent.end + 1;
2897                 if (tmp >= loi->loi_kms) {
2898                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2899                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2900                         loi_kms_set(loi, tmp);
2901                 } else {
2902                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2903                                    LPU64"; leaving kms="LPU64", end="LPU64,
2904                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2905                                    lock->l_policy_data.l_extent.end);
2906                 }
2907                 ldlm_lock_allow_match(lock);
2908                 LDLM_LOCK_PUT(lock);
2909         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2910                 loi->loi_lvb = *lvb;
2911                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2912                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2913                 rc = ELDLM_OK;
2914         }
2915 }
2916 EXPORT_SYMBOL(osc_update_enqueue);
2917
2918 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2919
2920 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2921  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2922  * other synchronous requests, however keeping some locks and trying to obtain
2923  * others may take a considerable amount of time in a case of ost failure; and
2924  * when other sync requests do not get released lock from a client, the client
2925  * is excluded from the cluster -- such scenarious make the life difficult, so
2926  * release locks just after they are obtained. */
2927 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2928                      int *flags, ldlm_policy_data_t *policy,
2929                      struct ost_lvb *lvb, int kms_valid,
2930                      obd_enqueue_update_f upcall, void *cookie,
2931                      struct ldlm_enqueue_info *einfo,
2932                      struct lustre_handle *lockh,
2933                      struct ptlrpc_request_set *rqset, int async)
2934 {
2935         struct obd_device *obd = exp->exp_obd;
2936         struct ptlrpc_request *req = NULL;
2937         int intent = *flags & LDLM_FL_HAS_INTENT;
2938         ldlm_mode_t mode;
2939         int rc;
2940         ENTRY;
2941
2942         /* Filesystem lock extents are extended to page boundaries so that
2943          * dealing with the page cache is a little smoother.  */
2944         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2945         policy->l_extent.end |= ~CFS_PAGE_MASK;
2946
2947         /*
2948          * kms is not valid when either object is completely fresh (so that no
2949          * locks are cached), or object was evicted. In the latter case cached
2950          * lock cannot be used, because it would prime inode state with
2951          * potentially stale LVB.
2952          */
2953         if (!kms_valid)
2954                 goto no_match;
2955
2956         /* Next, search for already existing extent locks that will cover us */
2957         /* If we're trying to read, we also search for an existing PW lock.  The
2958          * VFS and page cache already protect us locally, so lots of readers/
2959          * writers can share a single PW lock.
2960          *
2961          * There are problems with conversion deadlocks, so instead of
2962          * converting a read lock to a write lock, we'll just enqueue a new
2963          * one.
2964          *
2965          * At some point we should cancel the read lock instead of making them
2966          * send us a blocking callback, but there are problems with canceling
2967          * locks out from other users right now, too. */
2968         mode = einfo->ei_mode;
2969         if (einfo->ei_mode == LCK_PR)
2970                 mode |= LCK_PW;
2971         mode = ldlm_lock_match(obd->obd_namespace,
2972                                *flags | LDLM_FL_LVB_READY, res_id,
2973                                einfo->ei_type, policy, mode, lockh, 0);
2974         if (mode) {
2975                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2976
2977                 if (matched->l_ast_data == NULL ||
2978                     matched->l_ast_data == einfo->ei_cbdata) {
2979                         /* addref the lock only if not async requests and PW
2980                          * lock is matched whereas we asked for PR. */
2981                         if (!rqset && einfo->ei_mode != mode)
2982                                 ldlm_lock_addref(lockh, LCK_PR);
2983                         osc_set_lock_data_with_check(matched, einfo, *flags);
2984                         if (intent) {
2985                                 /* I would like to be able to ASSERT here that
2986                                  * rss <= kms, but I can't, for reasons which
2987                                  * are explained in lov_enqueue() */
2988                         }
2989
2990                         /* We already have a lock, and it's referenced */
2991                         (*upcall)(cookie, ELDLM_OK);
2992
2993                         /* For async requests, decref the lock. */
2994                         if (einfo->ei_mode != mode)
2995                                 ldlm_lock_decref(lockh, LCK_PW);
2996                         else if (rqset)
2997                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2998                         LDLM_LOCK_PUT(matched);
2999                         RETURN(ELDLM_OK);
3000                 } else
3001                         ldlm_lock_decref(lockh, mode);
3002                 LDLM_LOCK_PUT(matched);
3003         }
3004
3005  no_match:
3006         if (intent) {
3007                 CFS_LIST_HEAD(cancels);
3008                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3009                                            &RQF_LDLM_ENQUEUE_LVB);
3010                 if (req == NULL)
3011                         RETURN(-ENOMEM);
3012
3013                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3014                 if (rc)
3015                         RETURN(rc);
3016
3017                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3018                                      sizeof *lvb);
3019                 ptlrpc_request_set_replen(req);
3020         }
3021
3022         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3023         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3024
3025         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3026                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3027         if (rqset) {
3028                 if (!rc) {
3029                         struct osc_enqueue_args *aa;
3030                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3031                         aa = ptlrpc_req_async_args(req);
3032                         aa->oa_ei = einfo;
3033                         aa->oa_exp = exp;
3034                         aa->oa_flags  = flags;
3035                         aa->oa_upcall = upcall;
3036                         aa->oa_cookie = cookie;
3037                         aa->oa_lvb    = lvb;
3038                         aa->oa_lockh  = lockh;
3039
3040                         req->rq_interpret_reply =
3041                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3042                         if (rqset == PTLRPCD_SET)
3043                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3044                         else
3045                                 ptlrpc_set_add_req(rqset, req);
3046                 } else if (intent) {
3047                         ptlrpc_req_finished(req);
3048                 }
3049                 RETURN(rc);
3050         }
3051
3052         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3053         if (intent)
3054                 ptlrpc_req_finished(req);
3055
3056         RETURN(rc);
3057 }
3058
3059 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3060                        struct ldlm_enqueue_info *einfo,
3061                        struct ptlrpc_request_set *rqset)
3062 {
3063         struct ldlm_res_id res_id;
3064         int rc;
3065         ENTRY;
3066
3067         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3068                            oinfo->oi_md->lsm_object_gr, &res_id);
3069
3070         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3071                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3072                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3073                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3074                               rqset, rqset != NULL);
3075         RETURN(rc);
3076 }
3077
3078 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3079                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3080                    int *flags, void *data, struct lustre_handle *lockh,
3081                    int unref)
3082 {
3083         struct obd_device *obd = exp->exp_obd;
3084         int lflags = *flags;
3085         ldlm_mode_t rc;
3086         ENTRY;
3087
3088         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3089                 RETURN(-EIO);
3090
3091         /* Filesystem lock extents are extended to page boundaries so that
3092          * dealing with the page cache is a little smoother */
3093         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3094         policy->l_extent.end |= ~CFS_PAGE_MASK;
3095
3096         /* Next, search for already existing extent locks that will cover us */
3097         /* If we're trying to read, we also search for an existing PW lock.  The
3098          * VFS and page cache already protect us locally, so lots of readers/
3099          * writers can share a single PW lock. */
3100         rc = mode;
3101         if (mode == LCK_PR)
3102                 rc |= LCK_PW;
3103         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3104                              res_id, type, policy, rc, lockh, unref);
3105         if (rc) {
3106                 if (data != NULL)
3107                         osc_set_data_with_check(lockh, data, lflags);
3108                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3109                         ldlm_lock_addref(lockh, LCK_PR);
3110                         ldlm_lock_decref(lockh, LCK_PW);
3111                 }
3112                 RETURN(rc);
3113         }
3114         RETURN(rc);
3115 }
3116
3117 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3118 {
3119         ENTRY;
3120
3121         if (unlikely(mode == LCK_GROUP))
3122                 ldlm_lock_decref_and_cancel(lockh, mode);
3123         else
3124                 ldlm_lock_decref(lockh, mode);
3125
3126         RETURN(0);
3127 }
3128
3129 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3130                       __u32 mode, struct lustre_handle *lockh)
3131 {
3132         ENTRY;
3133         RETURN(osc_cancel_base(lockh, mode));
3134 }
3135
3136 static int osc_cancel_unused(struct obd_export *exp,
3137                              struct lov_stripe_md *lsm, int flags,
3138                              void *opaque)
3139 {
3140         struct obd_device *obd = class_exp2obd(exp);
3141         struct ldlm_res_id res_id, *resp = NULL;
3142
3143         if (lsm != NULL) {
3144                 resp = osc_build_res_name(lsm->lsm_object_id,
3145                                           lsm->lsm_object_gr, &res_id);
3146         }
3147
3148         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3149 }
3150
3151 static int osc_statfs_interpret(const struct lu_env *env,
3152                                 struct ptlrpc_request *req,
3153                                 struct osc_async_args *aa, int rc)
3154 {
3155         struct obd_statfs *msfs;
3156         ENTRY;
3157
3158         if (rc != 0)
3159                 GOTO(out, rc);
3160
3161         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3162         if (msfs == NULL) {
3163                 GOTO(out, rc = -EPROTO);
3164         }
3165
3166         *aa->aa_oi->oi_osfs = *msfs;
3167 out:
3168         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3169         RETURN(rc);
3170 }
3171
3172 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3173                             __u64 max_age, struct ptlrpc_request_set *rqset)
3174 {
3175         struct ptlrpc_request *req;
3176         struct osc_async_args *aa;
3177         int                    rc;
3178         ENTRY;
3179
3180         /* We could possibly pass max_age in the request (as an absolute
3181          * timestamp or a "seconds.usec ago") so the target can avoid doing
3182          * extra calls into the filesystem if that isn't necessary (e.g.
3183          * during mount that would help a bit).  Having relative timestamps
3184          * is not so great if request processing is slow, while absolute
3185          * timestamps are not ideal because they need time synchronization. */
3186         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3187         if (req == NULL)
3188                 RETURN(-ENOMEM);
3189
3190         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3191         if (rc) {
3192                 ptlrpc_request_free(req);
3193                 RETURN(rc);
3194         }
3195         ptlrpc_request_set_replen(req);
3196         req->rq_request_portal = OST_CREATE_PORTAL;
3197         ptlrpc_at_set_req_timeout(req);
3198
3199         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3200                 /* procfs requests not want stat in wait for avoid deadlock */
3201                 req->rq_no_resend = 1;
3202                 req->rq_no_delay = 1;
3203         }
3204
3205         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3206         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3207         aa = ptlrpc_req_async_args(req);
3208         aa->aa_oi = oinfo;
3209
3210         ptlrpc_set_add_req(rqset, req);
3211         RETURN(0);
3212 }
3213
3214 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3215                       __u64 max_age, __u32 flags)
3216 {
3217         struct obd_statfs     *msfs;
3218         struct ptlrpc_request *req;
3219         struct obd_import     *imp = NULL;
3220         int rc;
3221         ENTRY;
3222
3223         /*Since the request might also come from lprocfs, so we need
3224          *sync this with client_disconnect_export Bug15684*/
3225         down_read(&obd->u.cli.cl_sem);
3226         if (obd->u.cli.cl_import)
3227                 imp = class_import_get(obd->u.cli.cl_import);
3228         up_read(&obd->u.cli.cl_sem);
3229         if (!imp)
3230                 RETURN(-ENODEV);
3231
3232         /* We could possibly pass max_age in the request (as an absolute
3233          * timestamp or a "seconds.usec ago") so the target can avoid doing
3234          * extra calls into the filesystem if that isn't necessary (e.g.
3235          * during mount that would help a bit).  Having relative timestamps
3236          * is not so great if request processing is slow, while absolute
3237          * timestamps are not ideal because they need time synchronization. */
3238         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3239
3240         class_import_put(imp);
3241
3242         if (req == NULL)
3243                 RETURN(-ENOMEM);
3244
3245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3246         if (rc) {
3247                 ptlrpc_request_free(req);
3248                 RETURN(rc);
3249         }
3250         ptlrpc_request_set_replen(req);
3251         req->rq_request_portal = OST_CREATE_PORTAL;
3252         ptlrpc_at_set_req_timeout(req);
3253
3254         if (flags & OBD_STATFS_NODELAY) {
3255                 /* procfs requests not want stat in wait for avoid deadlock */
3256                 req->rq_no_resend = 1;
3257                 req->rq_no_delay = 1;
3258         }
3259
3260         rc = ptlrpc_queue_wait(req);
3261         if (rc)
3262                 GOTO(out, rc);
3263
3264         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3265         if (msfs == NULL) {
3266                 GOTO(out, rc = -EPROTO);
3267         }
3268
3269         *osfs = *msfs;
3270
3271         EXIT;
3272  out:
3273         ptlrpc_req_finished(req);
3274         return rc;
3275 }
3276
3277 /* Retrieve object striping information.
3278  *
3279  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3280  * the maximum number of OST indices which will fit in the user buffer.
3281  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3282  */
3283 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3284 {
3285         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3286         struct lov_user_md_v3 lum, *lumk;
3287         struct lov_user_ost_data_v1 *lmm_objects;
3288         int rc = 0, lum_size;
3289         ENTRY;
3290
3291         if (!lsm)
3292                 RETURN(-ENODATA);
3293
3294         /* we only need the header part from user space to get lmm_magic and
3295          * lmm_stripe_count, (the header part is common to v1 and v3) */
3296         lum_size = sizeof(struct lov_user_md_v1);
3297         if (copy_from_user(&lum, lump, lum_size))
3298                 RETURN(-EFAULT);
3299
3300         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3301             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3302                 RETURN(-EINVAL);
3303
3304         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3305         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3306         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3307         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3308
3309         /* we can use lov_mds_md_size() to compute lum_size
3310          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3311         if (lum.lmm_stripe_count > 0) {
3312                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3313                 OBD_ALLOC(lumk, lum_size);
3314                 if (!lumk)
3315                         RETURN(-ENOMEM);
3316
3317                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3318                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3319                 else
3320                         lmm_objects = &(lumk->lmm_objects[0]);
3321                 lmm_objects->l_object_id = lsm->lsm_object_id;
3322         } else {
3323                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3324                 lumk = &lum;
3325         }
3326
3327         lumk->lmm_object_id = lsm->lsm_object_id;
3328         lumk->lmm_object_gr = lsm->lsm_object_gr;
3329         lumk->lmm_stripe_count = 1;
3330
3331         if (copy_to_user(lump, lumk, lum_size))
3332                 rc = -EFAULT;
3333
3334         if (lumk != &lum)
3335                 OBD_FREE(lumk, lum_size);
3336
3337         RETURN(rc);
3338 }
3339
3340
3341 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3342                          void *karg, void *uarg)
3343 {
3344         struct obd_device *obd = exp->exp_obd;
3345         struct obd_ioctl_data *data = karg;
3346         int err = 0;
3347         ENTRY;
3348
3349         if (!try_module_get(THIS_MODULE)) {
3350                 CERROR("Can't get module. Is it alive?");
3351                 return -EINVAL;
3352         }
3353         switch (cmd) {
3354         case OBD_IOC_LOV_GET_CONFIG: {
3355                 char *buf;
3356                 struct lov_desc *desc;
3357                 struct obd_uuid uuid;
3358
3359                 buf = NULL;
3360                 len = 0;
3361                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3362                         GOTO(out, err = -EINVAL);
3363
3364                 data = (struct obd_ioctl_data *)buf;
3365
3366                 if (sizeof(*desc) > data->ioc_inllen1) {
3367                         obd_ioctl_freedata(buf, len);
3368                         GOTO(out, err = -EINVAL);
3369                 }
3370
3371                 if (data->ioc_inllen2 < sizeof(uuid)) {
3372                         obd_ioctl_freedata(buf, len);
3373                         GOTO(out, err = -EINVAL);
3374                 }
3375
3376                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3377                 desc->ld_tgt_count = 1;
3378                 desc->ld_active_tgt_count = 1;
3379                 desc->ld_default_stripe_count = 1;
3380                 desc->ld_default_stripe_size = 0;
3381                 desc->ld_default_stripe_offset = 0;
3382                 desc->ld_pattern = 0;
3383                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3384
3385                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3386
3387                 err = copy_to_user((void *)uarg, buf, len);
3388                 if (err)
3389                         err = -EFAULT;
3390                 obd_ioctl_freedata(buf, len);
3391                 GOTO(out, err);
3392         }
3393         case LL_IOC_LOV_SETSTRIPE:
3394                 err = obd_alloc_memmd(exp, karg);
3395                 if (err > 0)
3396                         err = 0;
3397                 GOTO(out, err);
3398         case LL_IOC_LOV_GETSTRIPE:
3399                 err = osc_getstripe(karg, uarg);
3400                 GOTO(out, err);
3401         case OBD_IOC_CLIENT_RECOVER:
3402                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3403                                             data->ioc_inlbuf1);
3404                 if (err > 0)
3405                         err = 0;
3406                 GOTO(out, err);
3407         case IOC_OSC_SET_ACTIVE:
3408                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3409                                                data->ioc_offset);
3410                 GOTO(out, err);
3411         case OBD_IOC_POLL_QUOTACHECK:
3412                 err = lquota_poll_check(quota_interface, exp,
3413                                         (struct if_quotacheck *)karg);
3414                 GOTO(out, err);
3415         case OBD_IOC_PING_TARGET:
3416                 err = ptlrpc_obd_ping(obd);
3417                 GOTO(out, err);
3418         default:
3419                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3420                        cmd, cfs_curproc_comm());
3421                 GOTO(out, err = -ENOTTY);
3422         }
3423 out:
3424         module_put(THIS_MODULE);
3425         return err;
3426 }
3427
3428 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3429                         void *key, __u32 *vallen, void *val,
3430                         struct lov_stripe_md *lsm)
3431 {
3432         ENTRY;
3433         if (!vallen || !val)
3434                 RETURN(-EFAULT);
3435
3436         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3437                 __u32 *stripe = val;
3438                 *vallen = sizeof(*stripe);
3439                 *stripe = 0;
3440                 RETURN(0);
3441         } else if (KEY_IS(KEY_LAST_ID)) {
3442                 struct ptlrpc_request *req;
3443                 obd_id                *reply;
3444                 char                  *tmp;
3445                 int                    rc;
3446
3447                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3448                                            &RQF_OST_GET_INFO_LAST_ID);
3449                 if (req == NULL)
3450                         RETURN(-ENOMEM);
3451
3452                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3453                                      RCL_CLIENT, keylen);
3454                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3455                 if (rc) {
3456                         ptlrpc_request_free(req);
3457                         RETURN(rc);
3458                 }
3459
3460                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3461                 memcpy(tmp, key, keylen);
3462
3463                 ptlrpc_request_set_replen(req);
3464                 rc = ptlrpc_queue_wait(req);
3465                 if (rc)
3466                         GOTO(out, rc);
3467
3468                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3469                 if (reply == NULL)
3470                         GOTO(out, rc = -EPROTO);
3471
3472                 *((obd_id *)val) = *reply;
3473         out:
3474                 ptlrpc_req_finished(req);
3475                 RETURN(rc);
3476         } else if (KEY_IS(KEY_FIEMAP)) {
3477                 struct ptlrpc_request *req;
3478                 struct ll_user_fiemap *reply;
3479                 char *tmp;
3480                 int rc;
3481
3482                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3483                                            &RQF_OST_GET_INFO_FIEMAP);
3484                 if (req == NULL)
3485                         RETURN(-ENOMEM);
3486
3487                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3488                                      RCL_CLIENT, keylen);
3489                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3490                                      RCL_CLIENT, *vallen);
3491                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3492                                      RCL_SERVER, *vallen);
3493
3494                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3495                 if (rc) {
3496                         ptlrpc_request_free(req);
3497                         RETURN(rc);
3498                 }
3499
3500                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3501                 memcpy(tmp, key, keylen);
3502                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3503                 memcpy(tmp, val, *vallen);
3504
3505                 ptlrpc_request_set_replen(req);
3506                 rc = ptlrpc_queue_wait(req);
3507                 if (rc)
3508                         GOTO(out1, rc);
3509
3510                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3511                 if (reply == NULL)
3512                         GOTO(out1, rc = -EPROTO);
3513
3514                 memcpy(val, reply, *vallen);
3515         out1:
3516                 ptlrpc_req_finished(req);
3517
3518                 RETURN(rc);
3519         }
3520
3521         RETURN(-EINVAL);
3522 }
3523
3524 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3525                                           struct ptlrpc_request *req,
3526                                           void *aa, int rc)
3527 {
3528         struct llog_ctxt *ctxt;
3529         struct obd_import *imp = req->rq_import;
3530         ENTRY;
3531
3532         if (rc != 0)
3533                 RETURN(rc);
3534
3535         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3536         if (ctxt) {
3537                 if (rc == 0)
3538                         rc = llog_initiator_connect(ctxt);
3539                 else
3540                         CERROR("cannot establish connection for "
3541                                "ctxt %p: %d\n", ctxt, rc);
3542         }
3543
3544         llog_ctxt_put(ctxt);
3545         spin_lock(&imp->imp_lock);
3546         imp->imp_server_timeout = 1;
3547         imp->imp_pingable = 1;
3548         spin_unlock(&imp->imp_lock);
3549         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3550
3551         RETURN(rc);
3552 }
3553
3554 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3555                               void *key, obd_count vallen, void *val,
3556                               struct ptlrpc_request_set *set)
3557 {
3558         struct ptlrpc_request *req;
3559         struct obd_device     *obd = exp->exp_obd;
3560         struct obd_import     *imp = class_exp2cliimp(exp);
3561         char                  *tmp;
3562         int                    rc;
3563         ENTRY;
3564
3565         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3566
3567         if (KEY_IS(KEY_NEXT_ID)) {
3568                 if (vallen != sizeof(obd_id))
3569                         RETURN(-ERANGE);
3570                 if (val == NULL)
3571                         RETURN(-EINVAL);
3572                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3573                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3574                        exp->exp_obd->obd_name,
3575                        obd->u.cli.cl_oscc.oscc_next_id);
3576
3577                 RETURN(0);
3578         }
3579
3580         if (KEY_IS(KEY_UNLINKED)) {
3581                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3582                 spin_lock(&oscc->oscc_lock);
3583                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3584                 spin_unlock(&oscc->oscc_lock);
3585                 RETURN(0);
3586         }
3587
3588         if (KEY_IS(KEY_INIT_RECOV)) {
3589                 if (vallen != sizeof(int))
3590                         RETURN(-EINVAL);
3591                 spin_lock(&imp->imp_lock);
3592                 imp->imp_initial_recov = *(int *)val;
3593                 spin_unlock(&imp->imp_lock);
3594                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3595                        exp->exp_obd->obd_name,
3596                        imp->imp_initial_recov);
3597                 RETURN(0);
3598         }
3599
3600         if (KEY_IS(KEY_CHECKSUM)) {
3601                 if (vallen != sizeof(int))
3602                         RETURN(-EINVAL);
3603                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3604                 RETURN(0);
3605         }
3606
3607         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3608                 sptlrpc_conf_client_adapt(obd);
3609                 RETURN(0);
3610         }
3611
3612         if (KEY_IS(KEY_FLUSH_CTX)) {
3613                 sptlrpc_import_flush_my_ctx(imp);
3614                 RETURN(0);
3615         }
3616
3617         if (!set)
3618                 RETURN(-EINVAL);
3619
3620         /* We pass all other commands directly to OST. Since nobody calls osc
3621            methods directly and everybody is supposed to go through LOV, we
3622            assume lov checked invalid values for us.
3623            The only recognised values so far are evict_by_nid and mds_conn.
3624            Even if something bad goes through, we'd get a -EINVAL from OST
3625            anyway. */
3626
3627
3628         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3629         if (req == NULL)
3630                 RETURN(-ENOMEM);
3631
3632         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3633                              RCL_CLIENT, keylen);
3634         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3635                              RCL_CLIENT, vallen);
3636         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3637         if (rc) {
3638                 ptlrpc_request_free(req);
3639                 RETURN(rc);
3640         }
3641
3642         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3643         memcpy(tmp, key, keylen);
3644         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3645         memcpy(tmp, val, vallen);
3646
3647         if (KEY_IS(KEY_MDS_CONN)) {
3648                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3649
3650                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3651                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3652                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3653                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3654         }
3655
3656         ptlrpc_request_set_replen(req);
3657         ptlrpc_set_add_req(set, req);
3658         ptlrpc_check_set(NULL, set);
3659
3660         RETURN(0);
3661 }
3662
3663
3664 static struct llog_operations osc_size_repl_logops = {
3665         lop_cancel: llog_obd_repl_cancel
3666 };
3667
3668 static struct llog_operations osc_mds_ost_orig_logops;
3669 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3670                          struct obd_device *tgt, int count,
3671                          struct llog_catid *catid, struct obd_uuid *uuid)
3672 {
3673         int rc;
3674         ENTRY;
3675
3676         LASSERT(olg == &obd->obd_olg);
3677         spin_lock(&obd->obd_dev_lock);
3678         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3679                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3680                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3681                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3682                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3683                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3684         }
3685         spin_unlock(&obd->obd_dev_lock);
3686
3687         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3688                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3689         if (rc) {
3690                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3691                 GOTO(out, rc);
3692         }
3693
3694         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3695                         NULL, &osc_size_repl_logops);
3696         if (rc) {
3697                 struct llog_ctxt *ctxt =
3698                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3699                 if (ctxt)
3700                         llog_cleanup(ctxt);
3701                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3702         }
3703         GOTO(out, rc);
3704 out:
3705         if (rc) {
3706                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3707                        obd->obd_name, tgt->obd_name, count, catid, rc);
3708                 CERROR("logid "LPX64":0x%x\n",
3709                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3710         }
3711         return rc;
3712 }
3713
3714 static int osc_llog_finish(struct obd_device *obd, int count)
3715 {
3716         struct llog_ctxt *ctxt;
3717         int rc = 0, rc2 = 0;
3718         ENTRY;
3719
3720         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3721         if (ctxt)
3722                 rc = llog_cleanup(ctxt);
3723
3724         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3725         if (ctxt)
3726                 rc2 = llog_cleanup(ctxt);
3727         if (!rc)
3728                 rc = rc2;
3729
3730         RETURN(rc);
3731 }
3732
3733 static int osc_reconnect(const struct lu_env *env,
3734                          struct obd_export *exp, struct obd_device *obd,
3735                          struct obd_uuid *cluuid,
3736                          struct obd_connect_data *data,
3737                          void *localdata)
3738 {
3739         struct client_obd *cli = &obd->u.cli;
3740
3741         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3742                 long lost_grant;
3743
3744                 client_obd_list_lock(&cli->cl_loi_list_lock);
3745                 data->ocd_grant = cli->cl_avail_grant ?:
3746                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3747                 lost_grant = cli->cl_lost_grant;
3748                 cli->cl_lost_grant = 0;
3749                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3750
3751                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3752                        "cl_lost_grant: %ld\n", data->ocd_grant,
3753                        cli->cl_avail_grant, lost_grant);
3754                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3755                        " ocd_grant: %d\n", data->ocd_connect_flags,
3756                        data->ocd_version, data->ocd_grant);
3757         }
3758
3759         RETURN(0);
3760 }
3761
3762 static int osc_disconnect(struct obd_export *exp)
3763 {
3764         struct obd_device *obd = class_exp2obd(exp);
3765         struct llog_ctxt  *ctxt;
3766         int rc;
3767
3768         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3769         if (ctxt) {
3770                 if (obd->u.cli.cl_conn_count == 1) {
3771                         /* Flush any remaining cancel messages out to the
3772                          * target */
3773                         llog_sync(ctxt, exp);
3774                 }
3775                 llog_ctxt_put(ctxt);
3776         } else {
3777                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3778                        obd);
3779         }
3780
3781         rc = client_disconnect_export(exp);
3782         return rc;
3783 }
3784
3785 static int osc_import_event(struct obd_device *obd,
3786                             struct obd_import *imp,
3787                             enum obd_import_event event)
3788 {
3789         struct client_obd *cli;
3790         int rc = 0;
3791
3792         ENTRY;
3793         LASSERT(imp->imp_obd == obd);
3794
3795         switch (event) {
3796         case IMP_EVENT_DISCON: {
3797                 /* Only do this on the MDS OSC's */
3798                 if (imp->imp_server_timeout) {
3799                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3800
3801                         spin_lock(&oscc->oscc_lock);
3802                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3803                         spin_unlock(&oscc->oscc_lock);
3804                 }
3805                 cli = &obd->u.cli;
3806                 client_obd_list_lock(&cli->cl_loi_list_lock);
3807                 cli->cl_avail_grant = 0;
3808                 cli->cl_lost_grant = 0;
3809                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3810                 break;
3811         }
3812         case IMP_EVENT_INACTIVE: {
3813                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3814                 break;
3815         }
3816         case IMP_EVENT_INVALIDATE: {
3817                 struct ldlm_namespace *ns = obd->obd_namespace;
3818                 struct lu_env         *env;
3819                 int                    refcheck;
3820
3821                 env = cl_env_get(&refcheck);
3822                 if (!IS_ERR(env)) {
3823                         /* Reset grants */
3824                         cli = &obd->u.cli;
3825                         client_obd_list_lock(&cli->cl_loi_list_lock);
3826                         /* all pages go to failing rpcs due to the invalid
3827                          * import */
3828                         osc_check_rpcs(env, cli);
3829                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3830
3831                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3832                         cl_env_put(env, &refcheck);
3833                 } else
3834                         rc = PTR_ERR(env);
3835                 break;
3836         }
3837         case IMP_EVENT_ACTIVE: {
3838                 /* Only do this on the MDS OSC's */
3839                 if (imp->imp_server_timeout) {
3840                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3841
3842                         spin_lock(&oscc->oscc_lock);
3843                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3844                         spin_unlock(&oscc->oscc_lock);
3845                 }
3846                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3847                 break;
3848         }
3849         case IMP_EVENT_OCD: {
3850                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3851
3852                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3853                         osc_init_grant(&obd->u.cli, ocd);
3854
3855                 /* See bug 7198 */
3856                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3857                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3858
3859                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3860                 break;
3861         }
3862         default:
3863                 CERROR("Unknown import event %d\n", event);
3864                 LBUG();
3865         }
3866         RETURN(rc);
3867 }
3868
3869 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3870 {
3871         int rc;
3872         ENTRY;
3873
3874         ENTRY;
3875         rc = ptlrpcd_addref();
3876         if (rc)
3877                 RETURN(rc);
3878
3879         rc = client_obd_setup(obd, lcfg);
3880         if (rc) {
3881                 ptlrpcd_decref();
3882         } else {
3883                 struct lprocfs_static_vars lvars = { 0 };
3884                 struct client_obd *cli = &obd->u.cli;
3885
3886                 lprocfs_osc_init_vars(&lvars);
3887                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3888                         lproc_osc_attach_seqstat(obd);
3889                         sptlrpc_lprocfs_cliobd_attach(obd);
3890                         ptlrpc_lprocfs_register_obd(obd);
3891                 }
3892
3893                 oscc_init(obd);
3894                 /* We need to allocate a few requests more, because
3895                    brw_interpret tries to create new requests before freeing
3896                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
3897                    reserved, but I afraid that might be too much wasted RAM
3898                    in fact, so 2 is just my guess and still should work. */
3899                 cli->cl_import->imp_rq_pool =
3900                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3901                                             OST_MAXREQSIZE,
3902                                             ptlrpc_add_rqs_to_pool);
3903         }
3904
3905         RETURN(rc);
3906 }
3907
3908 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3909 {
3910         int rc = 0;
3911         ENTRY;
3912
3913         switch (stage) {
3914         case OBD_CLEANUP_EARLY: {
3915                 struct obd_import *imp;
3916                 imp = obd->u.cli.cl_import;
3917                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3918                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3919                 ptlrpc_deactivate_import(imp);
3920                 spin_lock(&imp->imp_lock);
3921                 imp->imp_pingable = 0;
3922                 spin_unlock(&imp->imp_lock);
3923                 break;
3924         }
3925         case OBD_CLEANUP_EXPORTS: {
3926                 /* If we set up but never connected, the
3927                    client import will not have been cleaned. */
3928                 if (obd->u.cli.cl_import) {
3929                         struct obd_import *imp;
3930                         down_write(&obd->u.cli.cl_sem);
3931                         imp = obd->u.cli.cl_import;
3932                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
3933                                obd->obd_name);
3934                         ptlrpc_invalidate_import(imp);
3935                         if (imp->imp_rq_pool) {
3936                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
3937                                 imp->imp_rq_pool = NULL;
3938                         }
3939                         class_destroy_import(imp);
3940                         up_write(&obd->u.cli.cl_sem);
3941                         obd->u.cli.cl_import = NULL;
3942                 }
3943                 rc = obd_llog_finish(obd, 0);
3944                 if (rc != 0)
3945                         CERROR("failed to cleanup llogging subsystems\n");
3946                 break;
3947                 }
3948         }
3949         RETURN(rc);
3950 }
3951
3952 int osc_cleanup(struct obd_device *obd)
3953 {
3954         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3955         int rc;
3956
3957         ENTRY;
3958         ptlrpc_lprocfs_unregister_obd(obd);
3959         lprocfs_obd_cleanup(obd);
3960
3961         spin_lock(&oscc->oscc_lock);
3962         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
3963         oscc->oscc_flags |= OSCC_FLAG_EXITING;
3964         spin_unlock(&oscc->oscc_lock);
3965
3966         /* free memory of osc quota cache */
3967         lquota_cleanup(quota_interface, obd);
3968
3969         rc = client_obd_cleanup(obd);
3970
3971         ptlrpcd_decref();
3972         RETURN(rc);
3973 }
3974
3975 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3976 {
3977         struct lprocfs_static_vars lvars = { 0 };
3978         int rc = 0;
3979
3980         lprocfs_osc_init_vars(&lvars);
3981
3982         switch (lcfg->lcfg_command) {
3983         default:
3984                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3985                                               lcfg, obd);
3986                 if (rc > 0)
3987                         rc = 0;
3988                 break;
3989         }
3990
3991         return(rc);
3992 }
3993
3994 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3995 {
3996         return osc_process_config_base(obd, buf);
3997 }
3998
3999 struct obd_ops osc_obd_ops = {
4000         .o_owner                = THIS_MODULE,
4001         .o_setup                = osc_setup,
4002         .o_precleanup           = osc_precleanup,
4003         .o_cleanup              = osc_cleanup,
4004         .o_add_conn             = client_import_add_conn,
4005         .o_del_conn             = client_import_del_conn,
4006         .o_connect              = client_connect_import,
4007         .o_reconnect            = osc_reconnect,
4008         .o_disconnect           = osc_disconnect,
4009         .o_statfs               = osc_statfs,
4010         .o_statfs_async         = osc_statfs_async,
4011         .o_packmd               = osc_packmd,
4012         .o_unpackmd             = osc_unpackmd,
4013         .o_precreate            = osc_precreate,
4014         .o_create               = osc_create,
4015         .o_destroy              = osc_destroy,
4016         .o_getattr              = osc_getattr,
4017         .o_getattr_async        = osc_getattr_async,
4018         .o_setattr              = osc_setattr,
4019         .o_setattr_async        = osc_setattr_async,
4020         .o_brw                  = osc_brw,
4021         .o_punch                = osc_punch,
4022         .o_sync                 = osc_sync,
4023         .o_enqueue              = osc_enqueue,
4024         .o_change_cbdata        = osc_change_cbdata,
4025         .o_cancel               = osc_cancel,
4026         .o_cancel_unused        = osc_cancel_unused,
4027         .o_iocontrol            = osc_iocontrol,
4028         .o_get_info             = osc_get_info,
4029         .o_set_info_async       = osc_set_info_async,
4030         .o_import_event         = osc_import_event,
4031         .o_llog_init            = osc_llog_init,
4032         .o_llog_finish          = osc_llog_finish,
4033         .o_process_config       = osc_process_config,
4034 };
4035
4036 extern struct lu_kmem_descr  osc_caches[];
4037 extern spinlock_t            osc_ast_guard;
4038 extern struct lock_class_key osc_ast_guard_class;
4039
4040 int __init osc_init(void)
4041 {
4042         struct lprocfs_static_vars lvars = { 0 };
4043         int rc;
4044         ENTRY;
4045
4046         /* print an address of _any_ initialized kernel symbol from this
4047          * module, to allow debugging with gdb that doesn't support data
4048          * symbols from modules.*/
4049         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4050
4051         rc = lu_kmem_init(osc_caches);
4052
4053         lprocfs_osc_init_vars(&lvars);
4054
4055         request_module("lquota");
4056         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4057         lquota_init(quota_interface);
4058         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4059
4060         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4061                                  LUSTRE_OSC_NAME, &osc_device_type);
4062         if (rc) {
4063                 if (quota_interface)
4064                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4065                 lu_kmem_fini(osc_caches);
4066                 RETURN(rc);
4067         }
4068
4069         spin_lock_init(&osc_ast_guard);
4070         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4071
4072         RETURN(rc);
4073 }
4074
4075 #ifdef __KERNEL__
4076 static void /*__exit*/ osc_exit(void)
4077 {
4078         lu_device_type_fini(&osc_device_type);
4079
4080         lquota_exit(quota_interface);
4081         if (quota_interface)
4082                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4083
4084         class_unregister_type(LUSTRE_OSC_NAME);
4085         lu_kmem_fini(osc_caches);
4086 }
4087
4088 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4089 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4090 MODULE_LICENSE("GPL");
4091
4092 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4093 #endif