Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         int time = GRANT_SHRINK_INTERVAL;
804         cli->cl_next_shrink_grant = cfs_time_shift(time);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
814         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
815         atomic_inc(&obd_dirty_pages);
816         cli->cl_dirty += CFS_PAGE_SIZE;
817         cli->cl_avail_grant -= CFS_PAGE_SIZE;
818         pga->flag |= OBD_BRW_FROM_GRANT;
819         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
820                CFS_PAGE_SIZE, pga, pga->pg);
821         LASSERT(cli->cl_avail_grant >= 0);
822         osc_update_next_shrink(cli);
823 }
824
825 /* the companion to osc_consume_write_grant, called when a brw has completed.
826  * must be called with the loi lock held. */
827 static void osc_release_write_grant(struct client_obd *cli,
828                                     struct brw_page *pga, int sent)
829 {
830         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
831         ENTRY;
832
833         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
834         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
835                 EXIT;
836                 return;
837         }
838
839         pga->flag &= ~OBD_BRW_FROM_GRANT;
840         atomic_dec(&obd_dirty_pages);
841         cli->cl_dirty -= CFS_PAGE_SIZE;
842         if (pga->flag & OBD_BRW_NOCACHE) {
843                 pga->flag &= ~OBD_BRW_NOCACHE;
844                 atomic_dec(&obd_dirty_transit_pages);
845                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
846         }
847         if (!sent) {
848                 cli->cl_lost_grant += CFS_PAGE_SIZE;
849                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
850                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
851         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
852                 /* For short writes we shouldn't count parts of pages that
853                  * span a whole block on the OST side, or our accounting goes
854                  * wrong.  Should match the code in filter_grant_check. */
855                 int offset = pga->off & ~CFS_PAGE_MASK;
856                 int count = pga->count + (offset & (blocksize - 1));
857                 int end = (offset + pga->count) & (blocksize - 1);
858                 if (end)
859                         count += blocksize - end;
860
861                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
862                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
863                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
864                        cli->cl_avail_grant, cli->cl_dirty);
865         }
866
867         EXIT;
868 }
869
870 static unsigned long rpcs_in_flight(struct client_obd *cli)
871 {
872         return cli->cl_r_in_flight + cli->cl_w_in_flight;
873 }
874
875 /* caller must hold loi_list_lock */
876 void osc_wake_cache_waiters(struct client_obd *cli)
877 {
878         struct list_head *l, *tmp;
879         struct osc_cache_waiter *ocw;
880
881         ENTRY;
882         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
883                 /* if we can't dirty more, we must wait until some is written */
884                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
885                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
886                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
887                                "osc max %ld, sys max %d\n", cli->cl_dirty,
888                                cli->cl_dirty_max, obd_max_dirty_pages);
889                         return;
890                 }
891
892                 /* if still dirty cache but no grant wait for pending RPCs that
893                  * may yet return us some grant before doing sync writes */
894                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
895                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
896                                cli->cl_w_in_flight);
897                         return;
898                 }
899
900                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
901                 list_del_init(&ocw->ocw_entry);
902                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
903                         /* no more RPCs in flight to return grant, do sync IO */
904                         ocw->ocw_rc = -EDQUOT;
905                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
906                 } else {
907                         osc_consume_write_grant(cli,
908                                                 &ocw->ocw_oap->oap_brw_page);
909                 }
910
911                 cfs_waitq_signal(&ocw->ocw_waitq);
912         }
913
914         EXIT;
915 }
916
917 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
918 {
919         client_obd_list_lock(&cli->cl_loi_list_lock);
920         cli->cl_avail_grant += grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922 }
923  
924 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
925 {
926         if (body->oa.o_valid & OBD_MD_FLGRANT) {
927                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
928                 __osc_update_grant(cli, body->oa.o_grant);
929         }
930 }
931
932 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
933                               void *key, obd_count vallen, void *val,
934                               struct ptlrpc_request_set *set);
935
936 static int osc_shrink_grant_interpret(const struct lu_env *env,
937                                       struct ptlrpc_request *req,
938                                       void *aa, int rc)
939 {
940         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
941         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
942         struct ost_body *body;
943         
944         if (rc != 0) {
945                 __osc_update_grant(cli, oa->o_grant);
946                 GOTO(out, rc);
947         }
948
949         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
950         LASSERT(body);
951         osc_update_grant(cli, body);
952 out:
953         OBD_FREE_PTR(oa);
954         return rc;        
955 }
956
957 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
958 {
959         client_obd_list_lock(&cli->cl_loi_list_lock);
960         oa->o_grant = cli->cl_avail_grant / 4;
961         cli->cl_avail_grant -= oa->o_grant; 
962         client_obd_list_unlock(&cli->cl_loi_list_lock);
963         oa->o_flags |= OBD_FL_SHRINK_GRANT;
964         osc_update_next_shrink(cli);
965 }
966
967 static int osc_shrink_grant(struct client_obd *cli)
968 {
969         int    rc = 0;
970         struct ost_body     *body;
971         ENTRY;
972
973         OBD_ALLOC_PTR(body);
974         if (!body)
975                 RETURN(-ENOMEM);
976
977         osc_announce_cached(cli, &body->oa, 0);
978         osc_shrink_grant_local(cli, &body->oa);
979         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
980                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
981                                 sizeof(*body), body, NULL);
982         if (rc != 0)
983                 __osc_update_grant(cli, body->oa.o_grant);
984         if (body)
985                OBD_FREE_PTR(body);
986         RETURN(rc);
987 }
988
989 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
990 static int osc_should_shrink_grant(struct client_obd *client)
991 {
992         cfs_time_t time = cfs_time_current();
993         cfs_time_t next_shrink = client->cl_next_shrink_grant;
994         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
995                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
996                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
997                         return 1;
998                 else
999                         osc_update_next_shrink(client);
1000         }
1001         return 0;
1002 }
1003
1004 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1005 {
1006         struct client_obd *client;
1007
1008         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1009                 if (osc_should_shrink_grant(client))
1010                         osc_shrink_grant(client);
1011         }
1012         return 0;
1013 }
1014
1015 static int osc_add_shrink_grant(struct client_obd *client)
1016 {
1017         int rc;
1018
1019         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
1020                                          TIMEOUT_GRANT,
1021                                          osc_grant_shrink_grant_cb, NULL,
1022                                          &client->cl_grant_shrink_list);
1023         if (rc) {
1024                 CERROR("add grant client %s error %d\n", 
1025                         client->cl_import->imp_obd->obd_name, rc);
1026                 return rc;
1027         }
1028         CDEBUG(D_CACHE, "add grant client %s \n", 
1029                client->cl_import->imp_obd->obd_name);
1030         osc_update_next_shrink(client);
1031         return 0; 
1032 }
1033
1034 static int osc_del_shrink_grant(struct client_obd *client)
1035 {
1036         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, 
1037                                          TIMEOUT_GRANT);
1038 }
1039
1040 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1041 {
1042         client_obd_list_lock(&cli->cl_loi_list_lock);
1043         cli->cl_avail_grant = ocd->ocd_grant;
1044         client_obd_list_unlock(&cli->cl_loi_list_lock);
1045
1046         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1047             list_empty(&cli->cl_grant_shrink_list))
1048                 osc_add_shrink_grant(cli);
1049
1050         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1051                cli->cl_avail_grant, cli->cl_lost_grant);
1052         LASSERT(cli->cl_avail_grant >= 0);
1053 }
1054
1055 /* We assume that the reason this OSC got a short read is because it read
1056  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1057  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1058  * this stripe never got written at or beyond this stripe offset yet. */
1059 static void handle_short_read(int nob_read, obd_count page_count,
1060                               struct brw_page **pga)
1061 {
1062         char *ptr;
1063         int i = 0;
1064
1065         /* skip bytes read OK */
1066         while (nob_read > 0) {
1067                 LASSERT (page_count > 0);
1068
1069                 if (pga[i]->count > nob_read) {
1070                         /* EOF inside this page */
1071                         ptr = cfs_kmap(pga[i]->pg) +
1072                                 (pga[i]->off & ~CFS_PAGE_MASK);
1073                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1074                         cfs_kunmap(pga[i]->pg);
1075                         page_count--;
1076                         i++;
1077                         break;
1078                 }
1079
1080                 nob_read -= pga[i]->count;
1081                 page_count--;
1082                 i++;
1083         }
1084
1085         /* zero remaining pages */
1086         while (page_count-- > 0) {
1087                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1088                 memset(ptr, 0, pga[i]->count);
1089                 cfs_kunmap(pga[i]->pg);
1090                 i++;
1091         }
1092 }
1093
1094 static int check_write_rcs(struct ptlrpc_request *req,
1095                            int requested_nob, int niocount,
1096                            obd_count page_count, struct brw_page **pga)
1097 {
1098         int    *remote_rcs, i;
1099
1100         /* return error if any niobuf was in error */
1101         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1102                                         sizeof(*remote_rcs) * niocount, NULL);
1103         if (remote_rcs == NULL) {
1104                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1105                 return(-EPROTO);
1106         }
1107         if (lustre_msg_swabbed(req->rq_repmsg))
1108                 for (i = 0; i < niocount; i++)
1109                         __swab32s(&remote_rcs[i]);
1110
1111         for (i = 0; i < niocount; i++) {
1112                 if (remote_rcs[i] < 0)
1113                         return(remote_rcs[i]);
1114
1115                 if (remote_rcs[i] != 0) {
1116                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1117                                 i, remote_rcs[i], req);
1118                         return(-EPROTO);
1119                 }
1120         }
1121
1122         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1123                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1124                        req->rq_bulk->bd_nob_transferred, requested_nob);
1125                 return(-EPROTO);
1126         }
1127
1128         return (0);
1129 }
1130
1131 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1132 {
1133         if (p1->flag != p2->flag) {
1134                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1135                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1136
1137                 /* warn if we try to combine flags that we don't know to be
1138                  * safe to combine */
1139                 if ((p1->flag & mask) != (p2->flag & mask))
1140                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1141                                "same brw?\n", p1->flag, p2->flag);
1142                 return 0;
1143         }
1144
1145         return (p1->off + p1->count == p2->off);
1146 }
1147
1148 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1149                                    struct brw_page **pga, int opc,
1150                                    cksum_type_t cksum_type)
1151 {
1152         __u32 cksum;
1153         int i = 0;
1154
1155         LASSERT (pg_count > 0);
1156         cksum = init_checksum(cksum_type);
1157         while (nob > 0 && pg_count > 0) {
1158                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1159                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1160                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1161
1162                 /* corrupt the data before we compute the checksum, to
1163                  * simulate an OST->client data error */
1164                 if (i == 0 && opc == OST_READ &&
1165                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1166                         memcpy(ptr + off, "bad1", min(4, nob));
1167                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1168                 cfs_kunmap(pga[i]->pg);
1169                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1170                                off, cksum);
1171
1172                 nob -= pga[i]->count;
1173                 pg_count--;
1174                 i++;
1175         }
1176         /* For sending we only compute the wrong checksum instead
1177          * of corrupting the data so it is still correct on a redo */
1178         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1179                 cksum++;
1180
1181         return cksum;
1182 }
1183
1184 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1185                                 struct lov_stripe_md *lsm, obd_count page_count,
1186                                 struct brw_page **pga,
1187                                 struct ptlrpc_request **reqp,
1188                                 struct obd_capa *ocapa, int reserve)
1189 {
1190         struct ptlrpc_request   *req;
1191         struct ptlrpc_bulk_desc *desc;
1192         struct ost_body         *body;
1193         struct obd_ioobj        *ioobj;
1194         struct niobuf_remote    *niobuf;
1195         int niocount, i, requested_nob, opc, rc;
1196         struct osc_brw_async_args *aa;
1197         struct req_capsule      *pill;
1198         struct brw_page *pg_prev;
1199
1200         ENTRY;
1201         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1202                 RETURN(-ENOMEM); /* Recoverable */
1203         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1204                 RETURN(-EINVAL); /* Fatal */
1205
1206         if ((cmd & OBD_BRW_WRITE) != 0) {
1207                 opc = OST_WRITE;
1208                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1209                                                 cli->cl_import->imp_rq_pool,
1210                                                 &RQF_OST_BRW);
1211         } else {
1212                 opc = OST_READ;
1213                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1214         }
1215         if (req == NULL)
1216                 RETURN(-ENOMEM);
1217
1218         for (niocount = i = 1; i < page_count; i++) {
1219                 if (!can_merge_pages(pga[i - 1], pga[i]))
1220                         niocount++;
1221         }
1222
1223         pill = &req->rq_pill;
1224         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1225                              niocount * sizeof(*niobuf));
1226         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1227
1228         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1229         if (rc) {
1230                 ptlrpc_request_free(req);
1231                 RETURN(rc);
1232         }
1233         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1234         ptlrpc_at_set_req_timeout(req);
1235
1236         if (opc == OST_WRITE)
1237                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1238                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1239         else
1240                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1241                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1242
1243         if (desc == NULL)
1244                 GOTO(out, rc = -ENOMEM);
1245         /* NB request now owns desc and will free it when it gets freed */
1246
1247         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1248         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1249         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1250         LASSERT(body && ioobj && niobuf);
1251
1252         body->oa = *oa;
1253
1254         obdo_to_ioobj(oa, ioobj);
1255         ioobj->ioo_bufcnt = niocount;
1256         osc_pack_capa(req, body, ocapa);
1257         LASSERT (page_count > 0);
1258         pg_prev = pga[0];
1259         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1260                 struct brw_page *pg = pga[i];
1261
1262                 LASSERT(pg->count > 0);
1263                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1264                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1265                          pg->off, pg->count);
1266 #ifdef __linux__
1267                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1268                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1269                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1270                          i, page_count,
1271                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1272                          pg_prev->pg, page_private(pg_prev->pg),
1273                          pg_prev->pg->index, pg_prev->off);
1274 #else
1275                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1276                          "i %d p_c %u\n", i, page_count);
1277 #endif
1278                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1279                         (pg->flag & OBD_BRW_SRVLOCK));
1280
1281                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1282                                       pg->count);
1283                 requested_nob += pg->count;
1284
1285                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1286                         niobuf--;
1287                         niobuf->len += pg->count;
1288                 } else {
1289                         niobuf->offset = pg->off;
1290                         niobuf->len    = pg->count;
1291                         niobuf->flags  = pg->flag;
1292                 }
1293                 pg_prev = pg;
1294         }
1295
1296         LASSERTF((void *)(niobuf - niocount) ==
1297                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1298                                niocount * sizeof(*niobuf)),
1299                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1300                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1301                 (void *)(niobuf - niocount));
1302
1303         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1304         if (osc_should_shrink_grant(cli))
1305                 osc_shrink_grant_local(cli, &body->oa); 
1306
1307         /* size[REQ_REC_OFF] still sizeof (*body) */
1308         if (opc == OST_WRITE) {
1309                 if (unlikely(cli->cl_checksum) &&
1310                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1311                         /* store cl_cksum_type in a local variable since
1312                          * it can be changed via lprocfs */
1313                         cksum_type_t cksum_type = cli->cl_cksum_type;
1314
1315                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1316                                 oa->o_flags = body->oa.o_flags = 0;
1317                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1318                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1319                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1320                                                              page_count, pga,
1321                                                              OST_WRITE,
1322                                                              cksum_type);
1323                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1324                                body->oa.o_cksum);
1325                         /* save this in 'oa', too, for later checking */
1326                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1327                         oa->o_flags |= cksum_type_pack(cksum_type);
1328                 } else {
1329                         /* clear out the checksum flag, in case this is a
1330                          * resend but cl_checksum is no longer set. b=11238 */
1331                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1332                 }
1333                 oa->o_cksum = body->oa.o_cksum;
1334                 /* 1 RC per niobuf */
1335                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1336                                      sizeof(__u32) * niocount);
1337         } else {
1338                 if (unlikely(cli->cl_checksum) &&
1339                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1340                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1341                                 body->oa.o_flags = 0;
1342                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1343                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1344                 }
1345                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1346                 /* 1 RC for the whole I/O */
1347         }
1348         ptlrpc_request_set_replen(req);
1349
1350         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1351         aa = ptlrpc_req_async_args(req);
1352         aa->aa_oa = oa;
1353         aa->aa_requested_nob = requested_nob;
1354         aa->aa_nio_count = niocount;
1355         aa->aa_page_count = page_count;
1356         aa->aa_resends = 0;
1357         aa->aa_ppga = pga;
1358         aa->aa_cli = cli;
1359         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1360         if (ocapa && reserve)
1361                 aa->aa_ocapa = capa_get(ocapa);
1362
1363         *reqp = req;
1364         RETURN(0);
1365
1366  out:
1367         ptlrpc_req_finished(req);
1368         RETURN(rc);
1369 }
1370
1371 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1372                                 __u32 client_cksum, __u32 server_cksum, int nob,
1373                                 obd_count page_count, struct brw_page **pga,
1374                                 cksum_type_t client_cksum_type)
1375 {
1376         __u32 new_cksum;
1377         char *msg;
1378         cksum_type_t cksum_type;
1379
1380         if (server_cksum == client_cksum) {
1381                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1382                 return 0;
1383         }
1384
1385         if (oa->o_valid & OBD_MD_FLFLAGS)
1386                 cksum_type = cksum_type_unpack(oa->o_flags);
1387         else
1388                 cksum_type = OBD_CKSUM_CRC32;
1389
1390         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1391                                       cksum_type);
1392
1393         if (cksum_type != client_cksum_type)
1394                 msg = "the server did not use the checksum type specified in "
1395                       "the original request - likely a protocol problem";
1396         else if (new_cksum == server_cksum)
1397                 msg = "changed on the client after we checksummed it - "
1398                       "likely false positive due to mmap IO (bug 11742)";
1399         else if (new_cksum == client_cksum)
1400                 msg = "changed in transit before arrival at OST";
1401         else
1402                 msg = "changed in transit AND doesn't match the original - "
1403                       "likely false positive due to mmap IO (bug 11742)";
1404
1405         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1406                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1407                            "["LPU64"-"LPU64"]\n",
1408                            msg, libcfs_nid2str(peer->nid),
1409                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1410                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1411                                                         (__u64)0,
1412                            oa->o_id,
1413                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1414                            pga[0]->off,
1415                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1416         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1417                "client csum now %x\n", client_cksum, client_cksum_type,
1418                server_cksum, cksum_type, new_cksum);
1419         return 1;
1420 }
1421
1422 /* Note rc enters this function as number of bytes transferred */
1423 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1424 {
1425         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1426         const lnet_process_id_t *peer =
1427                         &req->rq_import->imp_connection->c_peer;
1428         struct client_obd *cli = aa->aa_cli;
1429         struct ost_body *body;
1430         __u32 client_cksum = 0;
1431         ENTRY;
1432
1433         if (rc < 0 && rc != -EDQUOT)
1434                 RETURN(rc);
1435
1436         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1437         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1438                                   lustre_swab_ost_body);
1439         if (body == NULL) {
1440                 CDEBUG(D_INFO, "Can't unpack body\n");
1441                 RETURN(-EPROTO);
1442         }
1443
1444         /* set/clear over quota flag for a uid/gid */
1445         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1446             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1447                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1448
1449                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1450                              body->oa.o_flags);
1451         }
1452
1453         if (rc < 0)
1454                 RETURN(rc);
1455
1456         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1457                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1458
1459         osc_update_grant(cli, body);
1460
1461         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1462                 if (rc > 0) {
1463                         CERROR("Unexpected +ve rc %d\n", rc);
1464                         RETURN(-EPROTO);
1465                 }
1466                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1467
1468                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1469                         RETURN(-EAGAIN);
1470
1471                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1472                     check_write_checksum(&body->oa, peer, client_cksum,
1473                                          body->oa.o_cksum, aa->aa_requested_nob,
1474                                          aa->aa_page_count, aa->aa_ppga,
1475                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1476                         RETURN(-EAGAIN);
1477
1478                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1479                                      aa->aa_page_count, aa->aa_ppga);
1480                 GOTO(out, rc);
1481         }
1482
1483         /* The rest of this function executes only for OST_READs */
1484
1485         /* if unwrap_bulk failed, return -EAGAIN to retry */
1486         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1487         if (rc < 0)
1488                 GOTO(out, rc = -EAGAIN);
1489
1490         if (rc > aa->aa_requested_nob) {
1491                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1492                        aa->aa_requested_nob);
1493                 RETURN(-EPROTO);
1494         }
1495
1496         if (rc != req->rq_bulk->bd_nob_transferred) {
1497                 CERROR ("Unexpected rc %d (%d transferred)\n",
1498                         rc, req->rq_bulk->bd_nob_transferred);
1499                 return (-EPROTO);
1500         }
1501
1502         if (rc < aa->aa_requested_nob)
1503                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1504
1505         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1506                 static int cksum_counter;
1507                 __u32      server_cksum = body->oa.o_cksum;
1508                 char      *via;
1509                 char      *router;
1510                 cksum_type_t cksum_type;
1511
1512                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1513                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1514                 else
1515                         cksum_type = OBD_CKSUM_CRC32;
1516                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1517                                                  aa->aa_ppga, OST_READ,
1518                                                  cksum_type);
1519
1520                 if (peer->nid == req->rq_bulk->bd_sender) {
1521                         via = router = "";
1522                 } else {
1523                         via = " via ";
1524                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1525                 }
1526
1527                 if (server_cksum == ~0 && rc > 0) {
1528                         CERROR("Protocol error: server %s set the 'checksum' "
1529                                "bit, but didn't send a checksum.  Not fatal, "
1530                                "but please notify on http://bugzilla.lustre.org/\n",
1531                                libcfs_nid2str(peer->nid));
1532                 } else if (server_cksum != client_cksum) {
1533                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1534                                            "%s%s%s inum "LPU64"/"LPU64" object "
1535                                            LPU64"/"LPU64" extent "
1536                                            "["LPU64"-"LPU64"]\n",
1537                                            req->rq_import->imp_obd->obd_name,
1538                                            libcfs_nid2str(peer->nid),
1539                                            via, router,
1540                                            body->oa.o_valid & OBD_MD_FLFID ?
1541                                                 body->oa.o_fid : (__u64)0,
1542                                            body->oa.o_valid & OBD_MD_FLFID ?
1543                                                 body->oa.o_generation :(__u64)0,
1544                                            body->oa.o_id,
1545                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1546                                                 body->oa.o_gr : (__u64)0,
1547                                            aa->aa_ppga[0]->off,
1548                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1549                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1550                                                                         1);
1551                         CERROR("client %x, server %x, cksum_type %x\n",
1552                                client_cksum, server_cksum, cksum_type);
1553                         cksum_counter = 0;
1554                         aa->aa_oa->o_cksum = client_cksum;
1555                         rc = -EAGAIN;
1556                 } else {
1557                         cksum_counter++;
1558                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1559                         rc = 0;
1560                 }
1561         } else if (unlikely(client_cksum)) {
1562                 static int cksum_missed;
1563
1564                 cksum_missed++;
1565                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1566                         CERROR("Checksum %u requested from %s but not sent\n",
1567                                cksum_missed, libcfs_nid2str(peer->nid));
1568         } else {
1569                 rc = 0;
1570         }
1571 out:
1572         if (rc >= 0)
1573                 *aa->aa_oa = body->oa;
1574
1575         RETURN(rc);
1576 }
1577
1578 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1579                             struct lov_stripe_md *lsm,
1580                             obd_count page_count, struct brw_page **pga,
1581                             struct obd_capa *ocapa)
1582 {
1583         struct ptlrpc_request *req;
1584         int                    rc;
1585         cfs_waitq_t            waitq;
1586         int                    resends = 0;
1587         struct l_wait_info     lwi;
1588
1589         ENTRY;
1590
1591         cfs_waitq_init(&waitq);
1592
1593 restart_bulk:
1594         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1595                                   page_count, pga, &req, ocapa, 0);
1596         if (rc != 0)
1597                 return (rc);
1598
1599         rc = ptlrpc_queue_wait(req);
1600
1601         if (rc == -ETIMEDOUT && req->rq_resend) {
1602                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1603                 ptlrpc_req_finished(req);
1604                 goto restart_bulk;
1605         }
1606
1607         rc = osc_brw_fini_request(req, rc);
1608
1609         ptlrpc_req_finished(req);
1610         if (osc_recoverable_error(rc)) {
1611                 resends++;
1612                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1613                         CERROR("too many resend retries, returning error\n");
1614                         RETURN(-EIO);
1615                 }
1616
1617                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1618                 l_wait_event(waitq, 0, &lwi);
1619
1620                 goto restart_bulk;
1621         }
1622
1623         RETURN (rc);
1624 }
1625
1626 int osc_brw_redo_request(struct ptlrpc_request *request,
1627                          struct osc_brw_async_args *aa)
1628 {
1629         struct ptlrpc_request *new_req;
1630         struct ptlrpc_request_set *set = request->rq_set;
1631         struct osc_brw_async_args *new_aa;
1632         struct osc_async_page *oap;
1633         int rc = 0;
1634         ENTRY;
1635
1636         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1637                 CERROR("too many resend retries, returning error\n");
1638                 RETURN(-EIO);
1639         }
1640
1641         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1642
1643         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1644                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1645                                   aa->aa_cli, aa->aa_oa,
1646                                   NULL /* lsm unused by osc currently */,
1647                                   aa->aa_page_count, aa->aa_ppga,
1648                                   &new_req, aa->aa_ocapa, 0);
1649         if (rc)
1650                 RETURN(rc);
1651
1652         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1653
1654         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1655                 if (oap->oap_request != NULL) {
1656                         LASSERTF(request == oap->oap_request,
1657                                  "request %p != oap_request %p\n",
1658                                  request, oap->oap_request);
1659                         if (oap->oap_interrupted) {
1660                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1661                                 ptlrpc_req_finished(new_req);
1662                                 RETURN(-EINTR);
1663                         }
1664                 }
1665         }
1666         /* New request takes over pga and oaps from old request.
1667          * Note that copying a list_head doesn't work, need to move it... */
1668         aa->aa_resends++;
1669         new_req->rq_interpret_reply = request->rq_interpret_reply;
1670         new_req->rq_async_args = request->rq_async_args;
1671         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1672
1673         new_aa = ptlrpc_req_async_args(new_req);
1674
1675         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1676         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1677         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1678
1679         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1680                 if (oap->oap_request) {
1681                         ptlrpc_req_finished(oap->oap_request);
1682                         oap->oap_request = ptlrpc_request_addref(new_req);
1683                 }
1684         }
1685
1686         new_aa->aa_ocapa = aa->aa_ocapa;
1687         aa->aa_ocapa = NULL;
1688
1689         /* use ptlrpc_set_add_req is safe because interpret functions work
1690          * in check_set context. only one way exist with access to request
1691          * from different thread got -EINTR - this way protected with
1692          * cl_loi_list_lock */
1693         ptlrpc_set_add_req(set, new_req);
1694
1695         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1696
1697         DEBUG_REQ(D_INFO, new_req, "new request");
1698         RETURN(0);
1699 }
1700
1701 /*
1702  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1703  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1704  * fine for our small page arrays and doesn't require allocation.  its an
1705  * insertion sort that swaps elements that are strides apart, shrinking the
1706  * stride down until its '1' and the array is sorted.
1707  */
1708 static void sort_brw_pages(struct brw_page **array, int num)
1709 {
1710         int stride, i, j;
1711         struct brw_page *tmp;
1712
1713         if (num == 1)
1714                 return;
1715         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1716                 ;
1717
1718         do {
1719                 stride /= 3;
1720                 for (i = stride ; i < num ; i++) {
1721                         tmp = array[i];
1722                         j = i;
1723                         while (j >= stride && array[j - stride]->off > tmp->off) {
1724                                 array[j] = array[j - stride];
1725                                 j -= stride;
1726                         }
1727                         array[j] = tmp;
1728                 }
1729         } while (stride > 1);
1730 }
1731
1732 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1733 {
1734         int count = 1;
1735         int offset;
1736         int i = 0;
1737
1738         LASSERT (pages > 0);
1739         offset = pg[i]->off & ~CFS_PAGE_MASK;
1740
1741         for (;;) {
1742                 pages--;
1743                 if (pages == 0)         /* that's all */
1744                         return count;
1745
1746                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1747                         return count;   /* doesn't end on page boundary */
1748
1749                 i++;
1750                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1751                 if (offset != 0)        /* doesn't start on page boundary */
1752                         return count;
1753
1754                 count++;
1755         }
1756 }
1757
1758 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1759 {
1760         struct brw_page **ppga;
1761         int i;
1762
1763         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1764         if (ppga == NULL)
1765                 return NULL;
1766
1767         for (i = 0; i < count; i++)
1768                 ppga[i] = pga + i;
1769         return ppga;
1770 }
1771
1772 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1773 {
1774         LASSERT(ppga != NULL);
1775         OBD_FREE(ppga, sizeof(*ppga) * count);
1776 }
1777
1778 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1779                    obd_count page_count, struct brw_page *pga,
1780                    struct obd_trans_info *oti)
1781 {
1782         struct obdo *saved_oa = NULL;
1783         struct brw_page **ppga, **orig;
1784         struct obd_import *imp = class_exp2cliimp(exp);
1785         struct client_obd *cli;
1786         int rc, page_count_orig;
1787         ENTRY;
1788
1789         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1790         cli = &imp->imp_obd->u.cli;
1791
1792         if (cmd & OBD_BRW_CHECK) {
1793                 /* The caller just wants to know if there's a chance that this
1794                  * I/O can succeed */
1795
1796                 if (imp->imp_invalid)
1797                         RETURN(-EIO);
1798                 RETURN(0);
1799         }
1800
1801         /* test_brw with a failed create can trip this, maybe others. */
1802         LASSERT(cli->cl_max_pages_per_rpc);
1803
1804         rc = 0;
1805
1806         orig = ppga = osc_build_ppga(pga, page_count);
1807         if (ppga == NULL)
1808                 RETURN(-ENOMEM);
1809         page_count_orig = page_count;
1810
1811         sort_brw_pages(ppga, page_count);
1812         while (page_count) {
1813                 obd_count pages_per_brw;
1814
1815                 if (page_count > cli->cl_max_pages_per_rpc)
1816                         pages_per_brw = cli->cl_max_pages_per_rpc;
1817                 else
1818                         pages_per_brw = page_count;
1819
1820                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1821
1822                 if (saved_oa != NULL) {
1823                         /* restore previously saved oa */
1824                         *oinfo->oi_oa = *saved_oa;
1825                 } else if (page_count > pages_per_brw) {
1826                         /* save a copy of oa (brw will clobber it) */
1827                         OBDO_ALLOC(saved_oa);
1828                         if (saved_oa == NULL)
1829                                 GOTO(out, rc = -ENOMEM);
1830                         *saved_oa = *oinfo->oi_oa;
1831                 }
1832
1833                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1834                                       pages_per_brw, ppga, oinfo->oi_capa);
1835
1836                 if (rc != 0)
1837                         break;
1838
1839                 page_count -= pages_per_brw;
1840                 ppga += pages_per_brw;
1841         }
1842
1843 out:
1844         osc_release_ppga(orig, page_count_orig);
1845
1846         if (saved_oa != NULL)
1847                 OBDO_FREE(saved_oa);
1848
1849         RETURN(rc);
1850 }
1851
1852 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1853  * the dirty accounting.  Writeback completes or truncate happens before
1854  * writing starts.  Must be called with the loi lock held. */
1855 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1856                            int sent)
1857 {
1858         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1859 }
1860
1861
1862 /* This maintains the lists of pending pages to read/write for a given object
1863  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1864  * to quickly find objects that are ready to send an RPC. */
1865 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1866                          int cmd)
1867 {
1868         int optimal;
1869         ENTRY;
1870
1871         if (lop->lop_num_pending == 0)
1872                 RETURN(0);
1873
1874         /* if we have an invalid import we want to drain the queued pages
1875          * by forcing them through rpcs that immediately fail and complete
1876          * the pages.  recovery relies on this to empty the queued pages
1877          * before canceling the locks and evicting down the llite pages */
1878         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1879                 RETURN(1);
1880
1881         /* stream rpcs in queue order as long as as there is an urgent page
1882          * queued.  this is our cheap solution for good batching in the case
1883          * where writepage marks some random page in the middle of the file
1884          * as urgent because of, say, memory pressure */
1885         if (!list_empty(&lop->lop_urgent)) {
1886                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1887                 RETURN(1);
1888         }
1889         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1890         optimal = cli->cl_max_pages_per_rpc;
1891         if (cmd & OBD_BRW_WRITE) {
1892                 /* trigger a write rpc stream as long as there are dirtiers
1893                  * waiting for space.  as they're waiting, they're not going to
1894                  * create more pages to coallesce with what's waiting.. */
1895                 if (!list_empty(&cli->cl_cache_waiters)) {
1896                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1897                         RETURN(1);
1898                 }
1899                 /* +16 to avoid triggering rpcs that would want to include pages
1900                  * that are being queued but which can't be made ready until
1901                  * the queuer finishes with the page. this is a wart for
1902                  * llite::commit_write() */
1903                 optimal += 16;
1904         }
1905         if (lop->lop_num_pending >= optimal)
1906                 RETURN(1);
1907
1908         RETURN(0);
1909 }
1910
1911 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1912 {
1913         struct osc_async_page *oap;
1914         ENTRY;
1915
1916         if (list_empty(&lop->lop_urgent))
1917                 RETURN(0);
1918
1919         oap = list_entry(lop->lop_urgent.next,
1920                          struct osc_async_page, oap_urgent_item);
1921
1922         if (oap->oap_async_flags & ASYNC_HP) {
1923                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1924                 RETURN(1);
1925         }
1926
1927         RETURN(0);
1928 }
1929
1930 static void on_list(struct list_head *item, struct list_head *list,
1931                     int should_be_on)
1932 {
1933         if (list_empty(item) && should_be_on)
1934                 list_add_tail(item, list);
1935         else if (!list_empty(item) && !should_be_on)
1936                 list_del_init(item);
1937 }
1938
1939 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1940  * can find pages to build into rpcs quickly */
1941 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1942 {
1943         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1944             lop_makes_hprpc(&loi->loi_read_lop)) {
1945                 /* HP rpc */
1946                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1947                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1948         } else {
1949                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1950                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1951                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1952                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1953         }
1954
1955         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1956                 loi->loi_write_lop.lop_num_pending);
1957
1958         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1959                 loi->loi_read_lop.lop_num_pending);
1960 }
1961
1962 static void lop_update_pending(struct client_obd *cli,
1963                                struct loi_oap_pages *lop, int cmd, int delta)
1964 {
1965         lop->lop_num_pending += delta;
1966         if (cmd & OBD_BRW_WRITE)
1967                 cli->cl_pending_w_pages += delta;
1968         else
1969                 cli->cl_pending_r_pages += delta;
1970 }
1971
1972 /**
1973  * this is called when a sync waiter receives an interruption.  Its job is to
1974  * get the caller woken as soon as possible.  If its page hasn't been put in an
1975  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1976  * desiring interruption which will forcefully complete the rpc once the rpc
1977  * has timed out.
1978  */
1979 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1980 {
1981         struct loi_oap_pages *lop;
1982         struct lov_oinfo *loi;
1983         int rc = -EBUSY;
1984         ENTRY;
1985
1986         LASSERT(!oap->oap_interrupted);
1987         oap->oap_interrupted = 1;
1988
1989         /* ok, it's been put in an rpc. only one oap gets a request reference */
1990         if (oap->oap_request != NULL) {
1991                 ptlrpc_mark_interrupted(oap->oap_request);
1992                 ptlrpcd_wake(oap->oap_request);
1993                 ptlrpc_req_finished(oap->oap_request);
1994                 oap->oap_request = NULL;
1995         }
1996
1997         /*
1998          * page completion may be called only if ->cpo_prep() method was
1999          * executed by osc_io_submit(), that also adds page the to pending list
2000          */
2001         if (!list_empty(&oap->oap_pending_item)) {
2002                 list_del_init(&oap->oap_pending_item);
2003                 list_del_init(&oap->oap_urgent_item);
2004
2005                 loi = oap->oap_loi;
2006                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2007                         &loi->loi_write_lop : &loi->loi_read_lop;
2008                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2009                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2010                 rc = oap->oap_caller_ops->ap_completion(env,
2011                                           oap->oap_caller_data,
2012                                           oap->oap_cmd, NULL, -EINTR);
2013         }
2014
2015         RETURN(rc);
2016 }
2017
2018 /* this is trying to propogate async writeback errors back up to the
2019  * application.  As an async write fails we record the error code for later if
2020  * the app does an fsync.  As long as errors persist we force future rpcs to be
2021  * sync so that the app can get a sync error and break the cycle of queueing
2022  * pages for which writeback will fail. */
2023 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2024                            int rc)
2025 {
2026         if (rc) {
2027                 if (!ar->ar_rc)
2028                         ar->ar_rc = rc;
2029
2030                 ar->ar_force_sync = 1;
2031                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2032                 return;
2033
2034         }
2035
2036         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2037                 ar->ar_force_sync = 0;
2038 }
2039
2040 void osc_oap_to_pending(struct osc_async_page *oap)
2041 {
2042         struct loi_oap_pages *lop;
2043
2044         if (oap->oap_cmd & OBD_BRW_WRITE)
2045                 lop = &oap->oap_loi->loi_write_lop;
2046         else
2047                 lop = &oap->oap_loi->loi_read_lop;
2048
2049         if (oap->oap_async_flags & ASYNC_HP)
2050                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2051         else if (oap->oap_async_flags & ASYNC_URGENT)
2052                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2053         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2054         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2055 }
2056
2057 /* this must be called holding the loi list lock to give coverage to exit_cache,
2058  * async_flag maintenance, and oap_request */
2059 static void osc_ap_completion(const struct lu_env *env,
2060                               struct client_obd *cli, struct obdo *oa,
2061                               struct osc_async_page *oap, int sent, int rc)
2062 {
2063         __u64 xid = 0;
2064
2065         ENTRY;
2066         if (oap->oap_request != NULL) {
2067                 xid = ptlrpc_req_xid(oap->oap_request);
2068                 ptlrpc_req_finished(oap->oap_request);
2069                 oap->oap_request = NULL;
2070         }
2071
2072         oap->oap_async_flags = 0;
2073         oap->oap_interrupted = 0;
2074
2075         if (oap->oap_cmd & OBD_BRW_WRITE) {
2076                 osc_process_ar(&cli->cl_ar, xid, rc);
2077                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2078         }
2079
2080         if (rc == 0 && oa != NULL) {
2081                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2082                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2083                 if (oa->o_valid & OBD_MD_FLMTIME)
2084                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2085                 if (oa->o_valid & OBD_MD_FLATIME)
2086                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2087                 if (oa->o_valid & OBD_MD_FLCTIME)
2088                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2089         }
2090
2091         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2092                                                 oap->oap_cmd, oa, rc);
2093
2094         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2095          * I/O on the page could start, but OSC calls it under lock
2096          * and thus we can add oap back to pending safely */
2097         if (rc)
2098                 /* upper layer wants to leave the page on pending queue */
2099                 osc_oap_to_pending(oap);
2100         else
2101                 osc_exit_cache(cli, oap, sent);
2102         EXIT;
2103 }
2104
2105 static int brw_interpret(const struct lu_env *env,
2106                          struct ptlrpc_request *req, void *data, int rc)
2107 {
2108         struct osc_brw_async_args *aa = data;
2109         struct client_obd *cli;
2110         int async;
2111         ENTRY;
2112
2113         rc = osc_brw_fini_request(req, rc);
2114         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2115         if (osc_recoverable_error(rc)) {
2116                 rc = osc_brw_redo_request(req, aa);
2117                 if (rc == 0)
2118                         RETURN(0);
2119         }
2120
2121         if (aa->aa_ocapa) {
2122                 capa_put(aa->aa_ocapa);
2123                 aa->aa_ocapa = NULL;
2124         }
2125
2126         cli = aa->aa_cli;
2127
2128         client_obd_list_lock(&cli->cl_loi_list_lock);
2129
2130         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2131          * is called so we know whether to go to sync BRWs or wait for more
2132          * RPCs to complete */
2133         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2134                 cli->cl_w_in_flight--;
2135         else
2136                 cli->cl_r_in_flight--;
2137
2138         async = list_empty(&aa->aa_oaps);
2139         if (!async) { /* from osc_send_oap_rpc() */
2140                 struct osc_async_page *oap, *tmp;
2141                 /* the caller may re-use the oap after the completion call so
2142                  * we need to clean it up a little */
2143                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2144                         list_del_init(&oap->oap_rpc_item);
2145                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2146                 }
2147                 OBDO_FREE(aa->aa_oa);
2148         } else { /* from async_internal() */
2149                 int i;
2150                 for (i = 0; i < aa->aa_page_count; i++)
2151                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2152         }
2153         osc_wake_cache_waiters(cli);
2154         osc_check_rpcs(env, cli);
2155         client_obd_list_unlock(&cli->cl_loi_list_lock);
2156         if (!async)
2157                 cl_req_completion(env, aa->aa_clerq, rc);
2158         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2159         RETURN(rc);
2160 }
2161
2162 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2163                                             struct client_obd *cli,
2164                                             struct list_head *rpc_list,
2165                                             int page_count, int cmd)
2166 {
2167         struct ptlrpc_request *req;
2168         struct brw_page **pga = NULL;
2169         struct osc_brw_async_args *aa;
2170         struct obdo *oa = NULL;
2171         const struct obd_async_page_ops *ops = NULL;
2172         void *caller_data = NULL;
2173         struct osc_async_page *oap;
2174         struct osc_async_page *tmp;
2175         struct ost_body *body;
2176         struct cl_req *clerq = NULL;
2177         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2178         struct ldlm_lock *lock = NULL;
2179         struct cl_req_attr crattr;
2180         int i, rc;
2181
2182         ENTRY;
2183         LASSERT(!list_empty(rpc_list));
2184
2185         memset(&crattr, 0, sizeof crattr);
2186         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2187         if (pga == NULL)
2188                 GOTO(out, req = ERR_PTR(-ENOMEM));
2189
2190         OBDO_ALLOC(oa);
2191         if (oa == NULL)
2192                 GOTO(out, req = ERR_PTR(-ENOMEM));
2193
2194         i = 0;
2195         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2196                 struct cl_page *page = osc_oap2cl_page(oap);
2197                 if (ops == NULL) {
2198                         ops = oap->oap_caller_ops;
2199                         caller_data = oap->oap_caller_data;
2200
2201                         clerq = cl_req_alloc(env, page, crt,
2202                                              1 /* only 1-object rpcs for
2203                                                 * now */);
2204                         if (IS_ERR(clerq))
2205                                 GOTO(out, req = (void *)clerq);
2206                         lock = oap->oap_ldlm_lock;
2207                 }
2208                 pga[i] = &oap->oap_brw_page;
2209                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2210                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2211                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2212                 i++;
2213                 cl_req_page_add(env, clerq, page);
2214         }
2215
2216         /* always get the data for the obdo for the rpc */
2217         LASSERT(ops != NULL);
2218         crattr.cra_oa = oa;
2219         crattr.cra_capa = NULL;
2220         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2221         if (lock) {
2222                 oa->o_handle = lock->l_remote_handle;
2223                 oa->o_valid |= OBD_MD_FLHANDLE;
2224         }
2225
2226         rc = cl_req_prep(env, clerq);
2227         if (rc != 0) {
2228                 CERROR("cl_req_prep failed: %d\n", rc);
2229                 GOTO(out, req = ERR_PTR(rc));
2230         }
2231
2232         sort_brw_pages(pga, page_count);
2233         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2234                                   pga, &req, crattr.cra_capa, 1);
2235         if (rc != 0) {
2236                 CERROR("prep_req failed: %d\n", rc);
2237                 GOTO(out, req = ERR_PTR(rc));
2238         }
2239
2240         /* Need to update the timestamps after the request is built in case
2241          * we race with setattr (locally or in queue at OST).  If OST gets
2242          * later setattr before earlier BRW (as determined by the request xid),
2243          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2244          * way to do this in a single call.  bug 10150 */
2245         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2246         cl_req_attr_set(env, clerq, &crattr,
2247                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2248
2249         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2250         aa = ptlrpc_req_async_args(req);
2251         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2252         list_splice(rpc_list, &aa->aa_oaps);
2253         CFS_INIT_LIST_HEAD(rpc_list);
2254         aa->aa_clerq = clerq;
2255 out:
2256         capa_put(crattr.cra_capa);
2257         if (IS_ERR(req)) {
2258                 if (oa)
2259                         OBDO_FREE(oa);
2260                 if (pga)
2261                         OBD_FREE(pga, sizeof(*pga) * page_count);
2262                 /* this should happen rarely and is pretty bad, it makes the
2263                  * pending list not follow the dirty order */
2264                 client_obd_list_lock(&cli->cl_loi_list_lock);
2265                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2266                         list_del_init(&oap->oap_rpc_item);
2267
2268                         /* queued sync pages can be torn down while the pages
2269                          * were between the pending list and the rpc */
2270                         if (oap->oap_interrupted) {
2271                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2272                                 osc_ap_completion(env, cli, NULL, oap, 0,
2273                                                   oap->oap_count);
2274                                 continue;
2275                         }
2276                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2277                 }
2278                 if (clerq && !IS_ERR(clerq))
2279                         cl_req_completion(env, clerq, PTR_ERR(req));
2280         }
2281         RETURN(req);
2282 }
2283
2284 /**
2285  * prepare pages for ASYNC io and put pages in send queue.
2286  *
2287  * \param cli -
2288  * \param loi -
2289  * \param cmd - OBD_BRW_* macroses
2290  * \param lop - pending pages
2291  *
2292  * \return zero if pages successfully add to send queue.
2293  * \return not zere if error occurring.
2294  */
2295 static int
2296 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2297                  struct lov_oinfo *loi,
2298                  int cmd, struct loi_oap_pages *lop)
2299 {
2300         struct ptlrpc_request *req;
2301         obd_count page_count = 0;
2302         struct osc_async_page *oap = NULL, *tmp;
2303         struct osc_brw_async_args *aa;
2304         const struct obd_async_page_ops *ops;
2305         CFS_LIST_HEAD(rpc_list);
2306         unsigned int ending_offset;
2307         unsigned  starting_offset = 0;
2308         int srvlock = 0;
2309         struct cl_object *clob = NULL;
2310         ENTRY;
2311
2312         /* If there are HP OAPs we need to handle at least 1 of them,
2313          * move it the beginning of the pending list for that. */
2314         if (!list_empty(&lop->lop_urgent)) {
2315                 oap = list_entry(lop->lop_urgent.next,
2316                                  struct osc_async_page, oap_urgent_item);
2317                 if (oap->oap_async_flags & ASYNC_HP)
2318                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2319         }
2320
2321         /* first we find the pages we're allowed to work with */
2322         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2323                                  oap_pending_item) {
2324                 ops = oap->oap_caller_ops;
2325
2326                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2327                          "magic 0x%x\n", oap, oap->oap_magic);
2328
2329                 if (clob == NULL) {
2330                         /* pin object in memory, so that completion call-backs
2331                          * can be safely called under client_obd_list lock. */
2332                         clob = osc_oap2cl_page(oap)->cp_obj;
2333                         cl_object_get(clob);
2334                 }
2335
2336                 if (page_count != 0 &&
2337                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2338                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2339                                " oap %p, page %p, srvlock %u\n",
2340                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2341                         break;
2342                 }
2343                 /* in llite being 'ready' equates to the page being locked
2344                  * until completion unlocks it.  commit_write submits a page
2345                  * as not ready because its unlock will happen unconditionally
2346                  * as the call returns.  if we race with commit_write giving
2347                  * us that page we dont' want to create a hole in the page
2348                  * stream, so we stop and leave the rpc to be fired by
2349                  * another dirtier or kupdated interval (the not ready page
2350                  * will still be on the dirty list).  we could call in
2351                  * at the end of ll_file_write to process the queue again. */
2352                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2353                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2354                                                     cmd);
2355                         if (rc < 0)
2356                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2357                                                 "instead of ready\n", oap,
2358                                                 oap->oap_page, rc);
2359                         switch (rc) {
2360                         case -EAGAIN:
2361                                 /* llite is telling us that the page is still
2362                                  * in commit_write and that we should try
2363                                  * and put it in an rpc again later.  we
2364                                  * break out of the loop so we don't create
2365                                  * a hole in the sequence of pages in the rpc
2366                                  * stream.*/
2367                                 oap = NULL;
2368                                 break;
2369                         case -EINTR:
2370                                 /* the io isn't needed.. tell the checks
2371                                  * below to complete the rpc with EINTR */
2372                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2373                                 oap->oap_count = -EINTR;
2374                                 break;
2375                         case 0:
2376                                 oap->oap_async_flags |= ASYNC_READY;
2377                                 break;
2378                         default:
2379                                 LASSERTF(0, "oap %p page %p returned %d "
2380                                             "from make_ready\n", oap,
2381                                             oap->oap_page, rc);
2382                                 break;
2383                         }
2384                 }
2385                 if (oap == NULL)
2386                         break;
2387                 /*
2388                  * Page submitted for IO has to be locked. Either by
2389                  * ->ap_make_ready() or by higher layers.
2390                  */
2391 #if defined(__KERNEL__) && defined(__linux__)
2392                 {
2393                         struct cl_page *page;
2394
2395                         page = osc_oap2cl_page(oap);
2396
2397                         if (page->cp_type == CPT_CACHEABLE &&
2398                             !(PageLocked(oap->oap_page) &&
2399                               (CheckWriteback(oap->oap_page, cmd)))) {
2400                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2401                                        oap->oap_page,
2402                                        (long)oap->oap_page->flags,
2403                                        oap->oap_async_flags);
2404                                 LBUG();
2405                         }
2406                 }
2407 #endif
2408                 /* If there is a gap at the start of this page, it can't merge
2409                  * with any previous page, so we'll hand the network a
2410                  * "fragmented" page array that it can't transfer in 1 RDMA */
2411                 if (page_count != 0 && oap->oap_page_off != 0)
2412                         break;
2413
2414                 /* take the page out of our book-keeping */
2415                 list_del_init(&oap->oap_pending_item);
2416                 lop_update_pending(cli, lop, cmd, -1);
2417                 list_del_init(&oap->oap_urgent_item);
2418
2419                 if (page_count == 0)
2420                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2421                                           (PTLRPC_MAX_BRW_SIZE - 1);
2422
2423                 /* ask the caller for the size of the io as the rpc leaves. */
2424                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2425                         oap->oap_count =
2426                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2427                                                       cmd);
2428                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2429                 }
2430                 if (oap->oap_count <= 0) {
2431                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2432                                oap->oap_count);
2433                         osc_ap_completion(env, cli, NULL,
2434                                           oap, 0, oap->oap_count);
2435                         continue;
2436                 }
2437
2438                 /* now put the page back in our accounting */
2439                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2440                 if (page_count == 0)
2441                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2442                 if (++page_count >= cli->cl_max_pages_per_rpc)
2443                         break;
2444
2445                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2446                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2447                  * have the same alignment as the initial writes that allocated
2448                  * extents on the server. */
2449                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2450                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2451                 if (ending_offset == 0)
2452                         break;
2453
2454                 /* If there is a gap at the end of this page, it can't merge
2455                  * with any subsequent pages, so we'll hand the network a
2456                  * "fragmented" page array that it can't transfer in 1 RDMA */
2457                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2458                         break;
2459         }
2460
2461         osc_wake_cache_waiters(cli);
2462
2463         loi_list_maint(cli, loi);
2464
2465         client_obd_list_unlock(&cli->cl_loi_list_lock);
2466
2467         if (clob != NULL)
2468                 cl_object_put(env, clob);
2469
2470         if (page_count == 0) {
2471                 client_obd_list_lock(&cli->cl_loi_list_lock);
2472                 RETURN(0);
2473         }
2474
2475         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2476         if (IS_ERR(req)) {
2477                 LASSERT(list_empty(&rpc_list));
2478                 loi_list_maint(cli, loi);
2479                 RETURN(PTR_ERR(req));
2480         }
2481
2482         aa = ptlrpc_req_async_args(req);
2483
2484         if (cmd == OBD_BRW_READ) {
2485                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2486                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2487                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2488                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2489         } else {
2490                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2491                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2492                                  cli->cl_w_in_flight);
2493                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2494                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2495         }
2496         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2497
2498         client_obd_list_lock(&cli->cl_loi_list_lock);
2499
2500         if (cmd == OBD_BRW_READ)
2501                 cli->cl_r_in_flight++;
2502         else
2503                 cli->cl_w_in_flight++;
2504
2505         /* queued sync pages can be torn down while the pages
2506          * were between the pending list and the rpc */
2507         tmp = NULL;
2508         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2509                 /* only one oap gets a request reference */
2510                 if (tmp == NULL)
2511                         tmp = oap;
2512                 if (oap->oap_interrupted && !req->rq_intr) {
2513                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2514                                oap, req);
2515                         ptlrpc_mark_interrupted(req);
2516                 }
2517         }
2518         if (tmp != NULL)
2519                 tmp->oap_request = ptlrpc_request_addref(req);
2520
2521         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2522                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2523
2524         req->rq_interpret_reply = brw_interpret;
2525         ptlrpcd_add_req(req, PSCOPE_BRW);
2526         RETURN(1);
2527 }
2528
2529 #define LOI_DEBUG(LOI, STR, args...)                                     \
2530         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2531                !list_empty(&(LOI)->loi_ready_item) ||                    \
2532                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2533                (LOI)->loi_write_lop.lop_num_pending,                     \
2534                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2535                (LOI)->loi_read_lop.lop_num_pending,                      \
2536                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2537                args)                                                     \
2538
2539 /* This is called by osc_check_rpcs() to find which objects have pages that
2540  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2541 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2542 {
2543         ENTRY;
2544
2545         /* First return objects that have blocked locks so that they
2546          * will be flushed quickly and other clients can get the lock,
2547          * then objects which have pages ready to be stuffed into RPCs */
2548         if (!list_empty(&cli->cl_loi_hp_ready_list))
2549                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2550                                   struct lov_oinfo, loi_hp_ready_item));
2551         if (!list_empty(&cli->cl_loi_ready_list))
2552                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2553                                   struct lov_oinfo, loi_ready_item));
2554
2555         /* then if we have cache waiters, return all objects with queued
2556          * writes.  This is especially important when many small files
2557          * have filled up the cache and not been fired into rpcs because
2558          * they don't pass the nr_pending/object threshhold */
2559         if (!list_empty(&cli->cl_cache_waiters) &&
2560             !list_empty(&cli->cl_loi_write_list))
2561                 RETURN(list_entry(cli->cl_loi_write_list.next,
2562                                   struct lov_oinfo, loi_write_item));
2563
2564         /* then return all queued objects when we have an invalid import
2565          * so that they get flushed */
2566         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2567                 if (!list_empty(&cli->cl_loi_write_list))
2568                         RETURN(list_entry(cli->cl_loi_write_list.next,
2569                                           struct lov_oinfo, loi_write_item));
2570                 if (!list_empty(&cli->cl_loi_read_list))
2571                         RETURN(list_entry(cli->cl_loi_read_list.next,
2572                                           struct lov_oinfo, loi_read_item));
2573         }
2574         RETURN(NULL);
2575 }
2576
2577 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2578 {
2579         struct osc_async_page *oap;
2580         int hprpc = 0;
2581
2582         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2583                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2584                                  struct osc_async_page, oap_urgent_item);
2585                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2586         }
2587
2588         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2589                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2590                                  struct osc_async_page, oap_urgent_item);
2591                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2592         }
2593
2594         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2595 }
2596
2597 /* called with the loi list lock held */
2598 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2599 {
2600         struct lov_oinfo *loi;
2601         int rc = 0, race_counter = 0;
2602         ENTRY;
2603
2604         while ((loi = osc_next_loi(cli)) != NULL) {
2605                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2606
2607                 if (osc_max_rpc_in_flight(cli, loi))
2608                         break;
2609
2610                 /* attempt some read/write balancing by alternating between
2611                  * reads and writes in an object.  The makes_rpc checks here
2612                  * would be redundant if we were getting read/write work items
2613                  * instead of objects.  we don't want send_oap_rpc to drain a
2614                  * partial read pending queue when we're given this object to
2615                  * do io on writes while there are cache waiters */
2616                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2617                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2618                                               &loi->loi_write_lop);
2619                         if (rc < 0)
2620                                 break;
2621                         if (rc > 0)
2622                                 race_counter = 0;
2623                         else
2624                                 race_counter++;
2625                 }
2626                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2627                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2628                                               &loi->loi_read_lop);
2629                         if (rc < 0)
2630                                 break;
2631                         if (rc > 0)
2632                                 race_counter = 0;
2633                         else
2634                                 race_counter++;
2635                 }
2636
2637                 /* attempt some inter-object balancing by issueing rpcs
2638                  * for each object in turn */
2639                 if (!list_empty(&loi->loi_hp_ready_item))
2640                         list_del_init(&loi->loi_hp_ready_item);
2641                 if (!list_empty(&loi->loi_ready_item))
2642                         list_del_init(&loi->loi_ready_item);
2643                 if (!list_empty(&loi->loi_write_item))
2644                         list_del_init(&loi->loi_write_item);
2645                 if (!list_empty(&loi->loi_read_item))
2646                         list_del_init(&loi->loi_read_item);
2647
2648                 loi_list_maint(cli, loi);
2649
2650                 /* send_oap_rpc fails with 0 when make_ready tells it to
2651                  * back off.  llite's make_ready does this when it tries
2652                  * to lock a page queued for write that is already locked.
2653                  * we want to try sending rpcs from many objects, but we
2654                  * don't want to spin failing with 0.  */
2655                 if (race_counter == 10)
2656                         break;
2657         }
2658         EXIT;
2659 }
2660
2661 /* we're trying to queue a page in the osc so we're subject to the
2662  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2663  * If the osc's queued pages are already at that limit, then we want to sleep
2664  * until there is space in the osc's queue for us.  We also may be waiting for
2665  * write credits from the OST if there are RPCs in flight that may return some
2666  * before we fall back to sync writes.
2667  *
2668  * We need this know our allocation was granted in the presence of signals */
2669 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2670 {
2671         int rc;
2672         ENTRY;
2673         client_obd_list_lock(&cli->cl_loi_list_lock);
2674         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2675         client_obd_list_unlock(&cli->cl_loi_list_lock);
2676         RETURN(rc);
2677 };
2678
2679 /**
2680  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2681  * is available.
2682  */
2683 int osc_enter_cache_try(const struct lu_env *env,
2684                         struct client_obd *cli, struct lov_oinfo *loi,
2685                         struct osc_async_page *oap, int transient)
2686 {
2687         int has_grant;
2688
2689         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2690         if (has_grant) {
2691                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2692                 if (transient) {
2693                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2694                         atomic_inc(&obd_dirty_transit_pages);
2695                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2696                 }
2697         }
2698         return has_grant;
2699 }
2700
2701 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2702  * grant or cache space. */
2703 static int osc_enter_cache(const struct lu_env *env,
2704                            struct client_obd *cli, struct lov_oinfo *loi,
2705                            struct osc_async_page *oap)
2706 {
2707         struct osc_cache_waiter ocw;
2708         struct l_wait_info lwi = { 0 };
2709
2710         ENTRY;
2711
2712         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2713                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2714                cli->cl_dirty_max, obd_max_dirty_pages,
2715                cli->cl_lost_grant, cli->cl_avail_grant);
2716
2717         /* force the caller to try sync io.  this can jump the list
2718          * of queued writes and create a discontiguous rpc stream */
2719         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2720             loi->loi_ar.ar_force_sync)
2721                 RETURN(-EDQUOT);
2722
2723         /* Hopefully normal case - cache space and write credits available */
2724         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2725             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2726             osc_enter_cache_try(env, cli, loi, oap, 0))
2727                 RETURN(0);
2728
2729         /* Make sure that there are write rpcs in flight to wait for.  This
2730          * is a little silly as this object may not have any pending but
2731          * other objects sure might. */
2732         if (cli->cl_w_in_flight) {
2733                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2734                 cfs_waitq_init(&ocw.ocw_waitq);
2735                 ocw.ocw_oap = oap;
2736                 ocw.ocw_rc = 0;
2737
2738                 loi_list_maint(cli, loi);
2739                 osc_check_rpcs(env, cli);
2740                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2741
2742                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2743                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2744
2745                 client_obd_list_lock(&cli->cl_loi_list_lock);
2746                 if (!list_empty(&ocw.ocw_entry)) {
2747                         list_del(&ocw.ocw_entry);
2748                         RETURN(-EINTR);
2749                 }
2750                 RETURN(ocw.ocw_rc);
2751         }
2752
2753         RETURN(-EDQUOT);
2754 }
2755
2756
2757 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2758                         struct lov_oinfo *loi, cfs_page_t *page,
2759                         obd_off offset, const struct obd_async_page_ops *ops,
2760                         void *data, void **res, int nocache,
2761                         struct lustre_handle *lockh)
2762 {
2763         struct osc_async_page *oap;
2764
2765         ENTRY;
2766
2767         if (!page)
2768                 return size_round(sizeof(*oap));
2769
2770         oap = *res;
2771         oap->oap_magic = OAP_MAGIC;
2772         oap->oap_cli = &exp->exp_obd->u.cli;
2773         oap->oap_loi = loi;
2774
2775         oap->oap_caller_ops = ops;
2776         oap->oap_caller_data = data;
2777
2778         oap->oap_page = page;
2779         oap->oap_obj_off = offset;
2780         if (!client_is_remote(exp) &&
2781             cfs_capable(CFS_CAP_SYS_RESOURCE))
2782                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2783
2784         LASSERT(!(offset & ~CFS_PAGE_MASK));
2785
2786         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2787         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2788         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2789         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2790
2791         spin_lock_init(&oap->oap_lock);
2792         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2793         RETURN(0);
2794 }
2795
2796 struct osc_async_page *oap_from_cookie(void *cookie)
2797 {
2798         struct osc_async_page *oap = cookie;
2799         if (oap->oap_magic != OAP_MAGIC)
2800                 return ERR_PTR(-EINVAL);
2801         return oap;
2802 };
2803
2804 int osc_queue_async_io(const struct lu_env *env,
2805                        struct obd_export *exp, struct lov_stripe_md *lsm,
2806                        struct lov_oinfo *loi, void *cookie,
2807                        int cmd, obd_off off, int count,
2808                        obd_flag brw_flags, enum async_flags async_flags)
2809 {
2810         struct client_obd *cli = &exp->exp_obd->u.cli;
2811         struct osc_async_page *oap;
2812         int rc = 0;
2813         ENTRY;
2814
2815         oap = oap_from_cookie(cookie);
2816         if (IS_ERR(oap))
2817                 RETURN(PTR_ERR(oap));
2818
2819         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2820                 RETURN(-EIO);
2821
2822         if (!list_empty(&oap->oap_pending_item) ||
2823             !list_empty(&oap->oap_urgent_item) ||
2824             !list_empty(&oap->oap_rpc_item))
2825                 RETURN(-EBUSY);
2826
2827         /* check if the file's owner/group is over quota */
2828         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2829                 struct cl_object *obj;
2830                 struct cl_attr    attr; /* XXX put attr into thread info */
2831                 unsigned int qid[MAXQUOTAS];
2832
2833                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2834
2835                 cl_object_attr_lock(obj);
2836                 rc = cl_object_attr_get(env, obj, &attr);
2837                 cl_object_attr_unlock(obj);
2838
2839                 qid[USRQUOTA] = attr.cat_uid;
2840                 qid[GRPQUOTA] = attr.cat_gid;
2841                 if (rc == 0 &&
2842                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2843                         rc = -EDQUOT;
2844                 if (rc)
2845                         RETURN(rc);
2846         }
2847
2848         if (loi == NULL)
2849                 loi = lsm->lsm_oinfo[0];
2850
2851         client_obd_list_lock(&cli->cl_loi_list_lock);
2852
2853         LASSERT(off + count <= CFS_PAGE_SIZE);
2854         oap->oap_cmd = cmd;
2855         oap->oap_page_off = off;
2856         oap->oap_count = count;
2857         oap->oap_brw_flags = brw_flags;
2858         oap->oap_async_flags = async_flags;
2859
2860         if (cmd & OBD_BRW_WRITE) {
2861                 rc = osc_enter_cache(env, cli, loi, oap);
2862                 if (rc) {
2863                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2864                         RETURN(rc);
2865                 }
2866         }
2867
2868         osc_oap_to_pending(oap);
2869         loi_list_maint(cli, loi);
2870
2871         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2872                   cmd);
2873
2874         osc_check_rpcs(env, cli);
2875         client_obd_list_unlock(&cli->cl_loi_list_lock);
2876
2877         RETURN(0);
2878 }
2879
2880 /* aka (~was & now & flag), but this is more clear :) */
2881 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2882
2883 int osc_set_async_flags_base(struct client_obd *cli,
2884                              struct lov_oinfo *loi, struct osc_async_page *oap,
2885                              obd_flag async_flags)
2886 {
2887         struct loi_oap_pages *lop;
2888         ENTRY;
2889
2890         LASSERT(!list_empty(&oap->oap_pending_item));
2891
2892         if (oap->oap_cmd & OBD_BRW_WRITE) {
2893                 lop = &loi->loi_write_lop;
2894         } else {
2895                 lop = &loi->loi_read_lop;
2896         }
2897
2898         if ((oap->oap_async_flags & async_flags) == async_flags)
2899                 RETURN(0);
2900
2901         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2902                 oap->oap_async_flags |= ASYNC_READY;
2903
2904         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2905             list_empty(&oap->oap_rpc_item)) {
2906                 if (oap->oap_async_flags & ASYNC_HP)
2907                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2908                 else
2909                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2910                 oap->oap_async_flags |= ASYNC_URGENT;
2911                 loi_list_maint(cli, loi);
2912         }
2913
2914         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2915                         oap->oap_async_flags);
2916         RETURN(0);
2917 }
2918
2919 int osc_teardown_async_page(struct obd_export *exp,
2920                             struct lov_stripe_md *lsm,
2921                             struct lov_oinfo *loi, void *cookie)
2922 {
2923         struct client_obd *cli = &exp->exp_obd->u.cli;
2924         struct loi_oap_pages *lop;
2925         struct osc_async_page *oap;
2926         int rc = 0;
2927         ENTRY;
2928
2929         oap = oap_from_cookie(cookie);
2930         if (IS_ERR(oap))
2931                 RETURN(PTR_ERR(oap));
2932
2933         if (loi == NULL)
2934                 loi = lsm->lsm_oinfo[0];
2935
2936         if (oap->oap_cmd & OBD_BRW_WRITE) {
2937                 lop = &loi->loi_write_lop;
2938         } else {
2939                 lop = &loi->loi_read_lop;
2940         }
2941
2942         client_obd_list_lock(&cli->cl_loi_list_lock);
2943
2944         if (!list_empty(&oap->oap_rpc_item))
2945                 GOTO(out, rc = -EBUSY);
2946
2947         osc_exit_cache(cli, oap, 0);
2948         osc_wake_cache_waiters(cli);
2949
2950         if (!list_empty(&oap->oap_urgent_item)) {
2951                 list_del_init(&oap->oap_urgent_item);
2952                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2953         }
2954         if (!list_empty(&oap->oap_pending_item)) {
2955                 list_del_init(&oap->oap_pending_item);
2956                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2957         }
2958         loi_list_maint(cli, loi);
2959         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2960 out:
2961         client_obd_list_unlock(&cli->cl_loi_list_lock);
2962         RETURN(rc);
2963 }
2964
2965 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2966                                          struct ldlm_enqueue_info *einfo,
2967                                          int flags)
2968 {
2969         void *data = einfo->ei_cbdata;
2970
2971         LASSERT(lock != NULL);
2972         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2973         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2974         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2975         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2976
2977         lock_res_and_lock(lock);
2978         spin_lock(&osc_ast_guard);
2979         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2980         lock->l_ast_data = data;
2981         spin_unlock(&osc_ast_guard);
2982         unlock_res_and_lock(lock);
2983 }
2984
2985 static void osc_set_data_with_check(struct lustre_handle *lockh,
2986                                     struct ldlm_enqueue_info *einfo,
2987                                     int flags)
2988 {
2989         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2990
2991         if (lock != NULL) {
2992                 osc_set_lock_data_with_check(lock, einfo, flags);
2993                 LDLM_LOCK_PUT(lock);
2994         } else
2995                 CERROR("lockh %p, data %p - client evicted?\n",
2996                        lockh, einfo->ei_cbdata);
2997 }
2998
2999 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3000                              ldlm_iterator_t replace, void *data)
3001 {
3002         struct ldlm_res_id res_id;
3003         struct obd_device *obd = class_exp2obd(exp);
3004
3005         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3006         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3007         return 0;
3008 }
3009
3010 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3011                             obd_enqueue_update_f upcall, void *cookie,
3012                             int *flags, int rc)
3013 {
3014         int intent = *flags & LDLM_FL_HAS_INTENT;
3015         ENTRY;
3016
3017         if (intent) {
3018                 /* The request was created before ldlm_cli_enqueue call. */
3019                 if (rc == ELDLM_LOCK_ABORTED) {
3020                         struct ldlm_reply *rep;
3021                         rep = req_capsule_server_get(&req->rq_pill,
3022                                                      &RMF_DLM_REP);
3023
3024                         LASSERT(rep != NULL);
3025                         if (rep->lock_policy_res1)
3026                                 rc = rep->lock_policy_res1;
3027                 }
3028         }
3029
3030         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3031                 *flags |= LDLM_FL_LVB_READY;
3032                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3033                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3034         }
3035
3036         /* Call the update callback. */
3037         rc = (*upcall)(cookie, rc);
3038         RETURN(rc);
3039 }
3040
3041 static int osc_enqueue_interpret(const struct lu_env *env,
3042                                  struct ptlrpc_request *req,
3043                                  struct osc_enqueue_args *aa, int rc)
3044 {
3045         struct ldlm_lock *lock;
3046         struct lustre_handle handle;
3047         __u32 mode;
3048
3049         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3050          * might be freed anytime after lock upcall has been called. */
3051         lustre_handle_copy(&handle, aa->oa_lockh);
3052         mode = aa->oa_ei->ei_mode;
3053
3054         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3055          * be valid. */
3056         lock = ldlm_handle2lock(&handle);
3057
3058         /* Take an additional reference so that a blocking AST that
3059          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3060          * to arrive after an upcall has been executed by
3061          * osc_enqueue_fini(). */
3062         ldlm_lock_addref(&handle, mode);
3063
3064         /* Complete obtaining the lock procedure. */
3065         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3066                                    mode, aa->oa_flags, aa->oa_lvb,
3067                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3068                                    &handle, rc);
3069         /* Complete osc stuff. */
3070         rc = osc_enqueue_fini(req, aa->oa_lvb,
3071                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3072
3073         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3074
3075         /* Release the lock for async request. */
3076         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3077                 /*
3078                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3079                  * not already released by
3080                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3081                  */
3082                 ldlm_lock_decref(&handle, mode);
3083
3084         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3085                  aa->oa_lockh, req, aa);
3086         ldlm_lock_decref(&handle, mode);
3087         LDLM_LOCK_PUT(lock);
3088         return rc;
3089 }
3090
3091 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3092                         struct lov_oinfo *loi, int flags,
3093                         struct ost_lvb *lvb, __u32 mode, int rc)
3094 {
3095         if (rc == ELDLM_OK) {
3096                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3097                 __u64 tmp;
3098
3099                 LASSERT(lock != NULL);
3100                 loi->loi_lvb = *lvb;
3101                 tmp = loi->loi_lvb.lvb_size;
3102                 /* Extend KMS up to the end of this lock and no further
3103                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3104                 if (tmp > lock->l_policy_data.l_extent.end)
3105                         tmp = lock->l_policy_data.l_extent.end + 1;
3106                 if (tmp >= loi->loi_kms) {
3107                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3108                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3109                         loi_kms_set(loi, tmp);
3110                 } else {
3111                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3112                                    LPU64"; leaving kms="LPU64", end="LPU64,
3113                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3114                                    lock->l_policy_data.l_extent.end);
3115                 }
3116                 ldlm_lock_allow_match(lock);
3117                 LDLM_LOCK_PUT(lock);
3118         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3119                 loi->loi_lvb = *lvb;
3120                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3121                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3122                 rc = ELDLM_OK;
3123         }
3124 }
3125 EXPORT_SYMBOL(osc_update_enqueue);
3126
3127 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3128
3129 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3130  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3131  * other synchronous requests, however keeping some locks and trying to obtain
3132  * others may take a considerable amount of time in a case of ost failure; and
3133  * when other sync requests do not get released lock from a client, the client
3134  * is excluded from the cluster -- such scenarious make the life difficult, so
3135  * release locks just after they are obtained. */
3136 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3137                      int *flags, ldlm_policy_data_t *policy,
3138                      struct ost_lvb *lvb, int kms_valid,
3139                      obd_enqueue_update_f upcall, void *cookie,
3140                      struct ldlm_enqueue_info *einfo,
3141                      struct lustre_handle *lockh,
3142                      struct ptlrpc_request_set *rqset, int async)
3143 {
3144         struct obd_device *obd = exp->exp_obd;
3145         struct ptlrpc_request *req = NULL;
3146         int intent = *flags & LDLM_FL_HAS_INTENT;
3147         ldlm_mode_t mode;
3148         int rc;
3149         ENTRY;
3150
3151         /* Filesystem lock extents are extended to page boundaries so that
3152          * dealing with the page cache is a little smoother.  */
3153         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3154         policy->l_extent.end |= ~CFS_PAGE_MASK;
3155
3156         /*
3157          * kms is not valid when either object is completely fresh (so that no
3158          * locks are cached), or object was evicted. In the latter case cached
3159          * lock cannot be used, because it would prime inode state with
3160          * potentially stale LVB.
3161          */
3162         if (!kms_valid)
3163                 goto no_match;
3164
3165         /* Next, search for already existing extent locks that will cover us */
3166         /* If we're trying to read, we also search for an existing PW lock.  The
3167          * VFS and page cache already protect us locally, so lots of readers/
3168          * writers can share a single PW lock.
3169          *
3170          * There are problems with conversion deadlocks, so instead of
3171          * converting a read lock to a write lock, we'll just enqueue a new
3172          * one.
3173          *
3174          * At some point we should cancel the read lock instead of making them
3175          * send us a blocking callback, but there are problems with canceling
3176          * locks out from other users right now, too. */
3177         mode = einfo->ei_mode;
3178         if (einfo->ei_mode == LCK_PR)
3179                 mode |= LCK_PW;
3180         mode = ldlm_lock_match(obd->obd_namespace,
3181                                *flags | LDLM_FL_LVB_READY, res_id,
3182                                einfo->ei_type, policy, mode, lockh, 0);
3183         if (mode) {
3184                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3185
3186                 if (matched->l_ast_data == NULL ||
3187                     matched->l_ast_data == einfo->ei_cbdata) {
3188                         /* addref the lock only if not async requests and PW
3189                          * lock is matched whereas we asked for PR. */
3190                         if (!rqset && einfo->ei_mode != mode)
3191                                 ldlm_lock_addref(lockh, LCK_PR);
3192                         osc_set_lock_data_with_check(matched, einfo, *flags);
3193                         if (intent) {
3194                                 /* I would like to be able to ASSERT here that
3195                                  * rss <= kms, but I can't, for reasons which
3196                                  * are explained in lov_enqueue() */
3197                         }
3198
3199                         /* We already have a lock, and it's referenced */
3200                         (*upcall)(cookie, ELDLM_OK);
3201
3202                         /* For async requests, decref the lock. */
3203                         if (einfo->ei_mode != mode)
3204                                 ldlm_lock_decref(lockh, LCK_PW);
3205                         else if (rqset)
3206                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3207                         LDLM_LOCK_PUT(matched);
3208                         RETURN(ELDLM_OK);
3209                 } else
3210                         ldlm_lock_decref(lockh, mode);
3211                 LDLM_LOCK_PUT(matched);
3212         }
3213
3214  no_match:
3215         if (intent) {
3216                 CFS_LIST_HEAD(cancels);
3217                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3218                                            &RQF_LDLM_ENQUEUE_LVB);
3219                 if (req == NULL)
3220                         RETURN(-ENOMEM);
3221
3222                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3223                 if (rc)
3224                         RETURN(rc);
3225
3226                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3227                                      sizeof *lvb);
3228                 ptlrpc_request_set_replen(req);
3229         }
3230
3231         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3232         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3233
3234         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3235                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3236         if (rqset) {
3237                 if (!rc) {
3238                         struct osc_enqueue_args *aa;
3239                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3240                         aa = ptlrpc_req_async_args(req);
3241                         aa->oa_ei = einfo;
3242                         aa->oa_exp = exp;
3243                         aa->oa_flags  = flags;
3244                         aa->oa_upcall = upcall;
3245                         aa->oa_cookie = cookie;
3246                         aa->oa_lvb    = lvb;
3247                         aa->oa_lockh  = lockh;
3248
3249                         req->rq_interpret_reply =
3250                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3251                         if (rqset == PTLRPCD_SET)
3252                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3253                         else
3254                                 ptlrpc_set_add_req(rqset, req);
3255                 } else if (intent) {
3256                         ptlrpc_req_finished(req);
3257                 }
3258                 RETURN(rc);
3259         }
3260
3261         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3262         if (intent)
3263                 ptlrpc_req_finished(req);
3264
3265         RETURN(rc);
3266 }
3267
3268 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3269                        struct ldlm_enqueue_info *einfo,
3270                        struct ptlrpc_request_set *rqset)
3271 {
3272         struct ldlm_res_id res_id;
3273         int rc;
3274         ENTRY;
3275
3276         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3277                            oinfo->oi_md->lsm_object_gr, &res_id);
3278
3279         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3280                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3281                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3282                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3283                               rqset, rqset != NULL);
3284         RETURN(rc);
3285 }
3286
3287 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3288                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3289                    int *flags, void *data, struct lustre_handle *lockh,
3290                    int unref)
3291 {
3292         struct obd_device *obd = exp->exp_obd;
3293         int lflags = *flags;
3294         ldlm_mode_t rc;
3295         ENTRY;
3296
3297         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3298                 RETURN(-EIO);
3299
3300         /* Filesystem lock extents are extended to page boundaries so that
3301          * dealing with the page cache is a little smoother */
3302         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3303         policy->l_extent.end |= ~CFS_PAGE_MASK;
3304
3305         /* Next, search for already existing extent locks that will cover us */
3306         /* If we're trying to read, we also search for an existing PW lock.  The
3307          * VFS and page cache already protect us locally, so lots of readers/
3308          * writers can share a single PW lock. */
3309         rc = mode;
3310         if (mode == LCK_PR)
3311                 rc |= LCK_PW;
3312         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3313                              res_id, type, policy, rc, lockh, unref);
3314         if (rc) {
3315                 if (data != NULL)
3316                         osc_set_data_with_check(lockh, data, lflags);
3317                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3318                         ldlm_lock_addref(lockh, LCK_PR);
3319                         ldlm_lock_decref(lockh, LCK_PW);
3320                 }
3321                 RETURN(rc);
3322         }
3323         RETURN(rc);
3324 }
3325
3326 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3327 {
3328         ENTRY;
3329
3330         if (unlikely(mode == LCK_GROUP))
3331                 ldlm_lock_decref_and_cancel(lockh, mode);
3332         else
3333                 ldlm_lock_decref(lockh, mode);
3334
3335         RETURN(0);
3336 }
3337
3338 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3339                       __u32 mode, struct lustre_handle *lockh)
3340 {
3341         ENTRY;
3342         RETURN(osc_cancel_base(lockh, mode));
3343 }
3344
3345 static int osc_cancel_unused(struct obd_export *exp,
3346                              struct lov_stripe_md *lsm, int flags,
3347                              void *opaque)
3348 {
3349         struct obd_device *obd = class_exp2obd(exp);
3350         struct ldlm_res_id res_id, *resp = NULL;
3351
3352         if (lsm != NULL) {
3353                 resp = osc_build_res_name(lsm->lsm_object_id,
3354                                           lsm->lsm_object_gr, &res_id);
3355         }
3356
3357         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3358 }
3359
3360 static int osc_statfs_interpret(const struct lu_env *env,
3361                                 struct ptlrpc_request *req,
3362                                 struct osc_async_args *aa, int rc)
3363 {
3364         struct obd_statfs *msfs;
3365         ENTRY;
3366
3367         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3368             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3369                 GOTO(out, rc = 0);
3370
3371         if (rc != 0)
3372                 GOTO(out, rc);
3373
3374         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3375         if (msfs == NULL) {
3376                 GOTO(out, rc = -EPROTO);
3377         }
3378
3379         *aa->aa_oi->oi_osfs = *msfs;
3380 out:
3381         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3382         RETURN(rc);
3383 }
3384
3385 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3386                             __u64 max_age, struct ptlrpc_request_set *rqset)
3387 {
3388         struct ptlrpc_request *req;
3389         struct osc_async_args *aa;
3390         int                    rc;
3391         ENTRY;
3392
3393         /* We could possibly pass max_age in the request (as an absolute
3394          * timestamp or a "seconds.usec ago") so the target can avoid doing
3395          * extra calls into the filesystem if that isn't necessary (e.g.
3396          * during mount that would help a bit).  Having relative timestamps
3397          * is not so great if request processing is slow, while absolute
3398          * timestamps are not ideal because they need time synchronization. */
3399         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3400         if (req == NULL)
3401                 RETURN(-ENOMEM);
3402
3403         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3404         if (rc) {
3405                 ptlrpc_request_free(req);
3406                 RETURN(rc);
3407         }
3408         ptlrpc_request_set_replen(req);
3409         req->rq_request_portal = OST_CREATE_PORTAL;
3410         ptlrpc_at_set_req_timeout(req);
3411
3412         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3413                 /* procfs requests not want stat in wait for avoid deadlock */
3414                 req->rq_no_resend = 1;
3415                 req->rq_no_delay = 1;
3416         }
3417
3418         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3419         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3420         aa = ptlrpc_req_async_args(req);
3421         aa->aa_oi = oinfo;
3422
3423         ptlrpc_set_add_req(rqset, req);
3424         RETURN(0);
3425 }
3426
3427 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3428                       __u64 max_age, __u32 flags)
3429 {
3430         struct obd_statfs     *msfs;
3431         struct ptlrpc_request *req;
3432         struct obd_import     *imp = NULL;
3433         int rc;
3434         ENTRY;
3435
3436         /*Since the request might also come from lprocfs, so we need
3437          *sync this with client_disconnect_export Bug15684*/
3438         down_read(&obd->u.cli.cl_sem);
3439         if (obd->u.cli.cl_import)
3440                 imp = class_import_get(obd->u.cli.cl_import);
3441         up_read(&obd->u.cli.cl_sem);
3442         if (!imp)
3443                 RETURN(-ENODEV);
3444
3445         /* We could possibly pass max_age in the request (as an absolute
3446          * timestamp or a "seconds.usec ago") so the target can avoid doing
3447          * extra calls into the filesystem if that isn't necessary (e.g.
3448          * during mount that would help a bit).  Having relative timestamps
3449          * is not so great if request processing is slow, while absolute
3450          * timestamps are not ideal because they need time synchronization. */
3451         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3452
3453         class_import_put(imp);
3454
3455         if (req == NULL)
3456                 RETURN(-ENOMEM);
3457
3458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3459         if (rc) {
3460                 ptlrpc_request_free(req);
3461                 RETURN(rc);
3462         }
3463         ptlrpc_request_set_replen(req);
3464         req->rq_request_portal = OST_CREATE_PORTAL;
3465         ptlrpc_at_set_req_timeout(req);
3466
3467         if (flags & OBD_STATFS_NODELAY) {
3468                 /* procfs requests not want stat in wait for avoid deadlock */
3469                 req->rq_no_resend = 1;
3470                 req->rq_no_delay = 1;
3471         }
3472
3473         rc = ptlrpc_queue_wait(req);
3474         if (rc)
3475                 GOTO(out, rc);
3476
3477         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3478         if (msfs == NULL) {
3479                 GOTO(out, rc = -EPROTO);
3480         }
3481
3482         *osfs = *msfs;
3483
3484         EXIT;
3485  out:
3486         ptlrpc_req_finished(req);
3487         return rc;
3488 }
3489
3490 /* Retrieve object striping information.
3491  *
3492  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3493  * the maximum number of OST indices which will fit in the user buffer.
3494  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3495  */
3496 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3497 {
3498         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3499         struct lov_user_md_v3 lum, *lumk;
3500         struct lov_user_ost_data_v1 *lmm_objects;
3501         int rc = 0, lum_size;
3502         ENTRY;
3503
3504         if (!lsm)
3505                 RETURN(-ENODATA);
3506
3507         /* we only need the header part from user space to get lmm_magic and
3508          * lmm_stripe_count, (the header part is common to v1 and v3) */
3509         lum_size = sizeof(struct lov_user_md_v1);
3510         if (copy_from_user(&lum, lump, lum_size))
3511                 RETURN(-EFAULT);
3512
3513         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3514             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3515                 RETURN(-EINVAL);
3516
3517         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3518         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3519         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3520         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3521
3522         /* we can use lov_mds_md_size() to compute lum_size
3523          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3524         if (lum.lmm_stripe_count > 0) {
3525                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3526                 OBD_ALLOC(lumk, lum_size);
3527                 if (!lumk)
3528                         RETURN(-ENOMEM);
3529
3530                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3531                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3532                 else
3533                         lmm_objects = &(lumk->lmm_objects[0]);
3534                 lmm_objects->l_object_id = lsm->lsm_object_id;
3535         } else {
3536                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3537                 lumk = &lum;
3538         }
3539
3540         lumk->lmm_object_id = lsm->lsm_object_id;
3541         lumk->lmm_object_gr = lsm->lsm_object_gr;
3542         lumk->lmm_stripe_count = 1;
3543
3544         if (copy_to_user(lump, lumk, lum_size))
3545                 rc = -EFAULT;
3546
3547         if (lumk != &lum)
3548                 OBD_FREE(lumk, lum_size);
3549
3550         RETURN(rc);
3551 }
3552
3553
3554 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3555                          void *karg, void *uarg)
3556 {
3557         struct obd_device *obd = exp->exp_obd;
3558         struct obd_ioctl_data *data = karg;
3559         int err = 0;
3560         ENTRY;
3561
3562         if (!try_module_get(THIS_MODULE)) {
3563                 CERROR("Can't get module. Is it alive?");
3564                 return -EINVAL;
3565         }
3566         switch (cmd) {
3567         case OBD_IOC_LOV_GET_CONFIG: {
3568                 char *buf;
3569                 struct lov_desc *desc;
3570                 struct obd_uuid uuid;
3571
3572                 buf = NULL;
3573                 len = 0;
3574                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3575                         GOTO(out, err = -EINVAL);
3576
3577                 data = (struct obd_ioctl_data *)buf;
3578
3579                 if (sizeof(*desc) > data->ioc_inllen1) {
3580                         obd_ioctl_freedata(buf, len);
3581                         GOTO(out, err = -EINVAL);
3582                 }
3583
3584                 if (data->ioc_inllen2 < sizeof(uuid)) {
3585                         obd_ioctl_freedata(buf, len);
3586                         GOTO(out, err = -EINVAL);
3587                 }
3588
3589                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3590                 desc->ld_tgt_count = 1;
3591                 desc->ld_active_tgt_count = 1;
3592                 desc->ld_default_stripe_count = 1;
3593                 desc->ld_default_stripe_size = 0;
3594                 desc->ld_default_stripe_offset = 0;
3595                 desc->ld_pattern = 0;
3596                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3597
3598                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3599
3600                 err = copy_to_user((void *)uarg, buf, len);
3601                 if (err)
3602                         err = -EFAULT;
3603                 obd_ioctl_freedata(buf, len);
3604                 GOTO(out, err);
3605         }
3606         case LL_IOC_LOV_SETSTRIPE:
3607                 err = obd_alloc_memmd(exp, karg);
3608                 if (err > 0)
3609                         err = 0;
3610                 GOTO(out, err);
3611         case LL_IOC_LOV_GETSTRIPE:
3612                 err = osc_getstripe(karg, uarg);
3613                 GOTO(out, err);
3614         case OBD_IOC_CLIENT_RECOVER:
3615                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3616                                             data->ioc_inlbuf1);
3617                 if (err > 0)
3618                         err = 0;
3619                 GOTO(out, err);
3620         case IOC_OSC_SET_ACTIVE:
3621                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3622                                                data->ioc_offset);
3623                 GOTO(out, err);
3624         case OBD_IOC_POLL_QUOTACHECK:
3625                 err = lquota_poll_check(quota_interface, exp,
3626                                         (struct if_quotacheck *)karg);
3627                 GOTO(out, err);
3628         case OBD_IOC_PING_TARGET:
3629                 err = ptlrpc_obd_ping(obd);
3630                 GOTO(out, err);
3631         default:
3632                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3633                        cmd, cfs_curproc_comm());
3634                 GOTO(out, err = -ENOTTY);
3635         }
3636 out:
3637         module_put(THIS_MODULE);
3638         return err;
3639 }
3640
3641 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3642                         void *key, __u32 *vallen, void *val,
3643                         struct lov_stripe_md *lsm)
3644 {
3645         ENTRY;
3646         if (!vallen || !val)
3647                 RETURN(-EFAULT);
3648
3649         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3650                 __u32 *stripe = val;
3651                 *vallen = sizeof(*stripe);
3652                 *stripe = 0;
3653                 RETURN(0);
3654         } else if (KEY_IS(KEY_LAST_ID)) {
3655                 struct ptlrpc_request *req;
3656                 obd_id                *reply;
3657                 char                  *tmp;
3658                 int                    rc;
3659
3660                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3661                                            &RQF_OST_GET_INFO_LAST_ID);
3662                 if (req == NULL)
3663                         RETURN(-ENOMEM);
3664
3665                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3666                                      RCL_CLIENT, keylen);
3667                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3668                 if (rc) {
3669                         ptlrpc_request_free(req);
3670                         RETURN(rc);
3671                 }
3672
3673                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3674                 memcpy(tmp, key, keylen);
3675
3676                 req->rq_no_delay = req->rq_no_resend = 1;
3677                 ptlrpc_request_set_replen(req);
3678                 rc = ptlrpc_queue_wait(req);
3679                 if (rc)
3680                         GOTO(out, rc);
3681
3682                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3683                 if (reply == NULL)
3684                         GOTO(out, rc = -EPROTO);
3685
3686                 *((obd_id *)val) = *reply;
3687         out:
3688                 ptlrpc_req_finished(req);
3689                 RETURN(rc);
3690         } else if (KEY_IS(KEY_FIEMAP)) {
3691                 struct ptlrpc_request *req;
3692                 struct ll_user_fiemap *reply;
3693                 char *tmp;
3694                 int rc;
3695
3696                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3697                                            &RQF_OST_GET_INFO_FIEMAP);
3698                 if (req == NULL)
3699                         RETURN(-ENOMEM);
3700
3701                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3702                                      RCL_CLIENT, keylen);
3703                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3704                                      RCL_CLIENT, *vallen);
3705                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3706                                      RCL_SERVER, *vallen);
3707
3708                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3709                 if (rc) {
3710                         ptlrpc_request_free(req);
3711                         RETURN(rc);
3712                 }
3713
3714                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3715                 memcpy(tmp, key, keylen);
3716                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3717                 memcpy(tmp, val, *vallen);
3718
3719                 ptlrpc_request_set_replen(req);
3720                 rc = ptlrpc_queue_wait(req);
3721                 if (rc)
3722                         GOTO(out1, rc);
3723
3724                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3725                 if (reply == NULL)
3726                         GOTO(out1, rc = -EPROTO);
3727
3728                 memcpy(val, reply, *vallen);
3729         out1:
3730                 ptlrpc_req_finished(req);
3731
3732                 RETURN(rc);
3733         }
3734
3735         RETURN(-EINVAL);
3736 }
3737
3738 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3739 {
3740         struct llog_ctxt *ctxt;
3741         int rc = 0;
3742         ENTRY;
3743
3744         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3745         if (ctxt) {
3746                 rc = llog_initiator_connect(ctxt);
3747                 llog_ctxt_put(ctxt);
3748         } else {
3749                 /* XXX return an error? skip setting below flags? */
3750         }
3751
3752         spin_lock(&imp->imp_lock);
3753         imp->imp_server_timeout = 1;
3754         imp->imp_pingable = 1;
3755         spin_unlock(&imp->imp_lock);
3756         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3757
3758         RETURN(rc);
3759 }
3760
3761 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3762                                           struct ptlrpc_request *req,
3763                                           void *aa, int rc)
3764 {
3765         ENTRY;
3766         if (rc != 0)
3767                 RETURN(rc);
3768
3769         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3770 }
3771
3772 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3773                               void *key, obd_count vallen, void *val,
3774                               struct ptlrpc_request_set *set)
3775 {
3776         struct ptlrpc_request *req;
3777         struct obd_device     *obd = exp->exp_obd;
3778         struct obd_import     *imp = class_exp2cliimp(exp);
3779         char                  *tmp;
3780         int                    rc;
3781         ENTRY;
3782
3783         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3784
3785         if (KEY_IS(KEY_NEXT_ID)) {
3786                 if (vallen != sizeof(obd_id))
3787                         RETURN(-ERANGE);
3788                 if (val == NULL)
3789                         RETURN(-EINVAL);
3790                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3791                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3792                        exp->exp_obd->obd_name,
3793                        obd->u.cli.cl_oscc.oscc_next_id);
3794
3795                 RETURN(0);
3796         }
3797
3798         if (KEY_IS(KEY_UNLINKED)) {
3799                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3800                 spin_lock(&oscc->oscc_lock);
3801                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3802                 spin_unlock(&oscc->oscc_lock);
3803                 RETURN(0);
3804         }
3805
3806         if (KEY_IS(KEY_INIT_RECOV)) {
3807                 if (vallen != sizeof(int))
3808                         RETURN(-EINVAL);
3809                 spin_lock(&imp->imp_lock);
3810                 imp->imp_initial_recov = *(int *)val;
3811                 spin_unlock(&imp->imp_lock);
3812                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3813                        exp->exp_obd->obd_name,
3814                        imp->imp_initial_recov);
3815                 RETURN(0);
3816         }
3817
3818         if (KEY_IS(KEY_CHECKSUM)) {
3819                 if (vallen != sizeof(int))
3820                         RETURN(-EINVAL);
3821                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3822                 RETURN(0);
3823         }
3824
3825         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3826                 sptlrpc_conf_client_adapt(obd);
3827                 RETURN(0);
3828         }
3829
3830         if (KEY_IS(KEY_FLUSH_CTX)) {
3831                 sptlrpc_import_flush_my_ctx(imp);
3832                 RETURN(0);
3833         }
3834
3835         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3836                 RETURN(-EINVAL);
3837
3838         /* We pass all other commands directly to OST. Since nobody calls osc
3839            methods directly and everybody is supposed to go through LOV, we
3840            assume lov checked invalid values for us.
3841            The only recognised values so far are evict_by_nid and mds_conn.
3842            Even if something bad goes through, we'd get a -EINVAL from OST
3843            anyway. */
3844
3845         if (KEY_IS(KEY_GRANT_SHRINK))  
3846                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
3847         else 
3848                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3849         
3850         if (req == NULL)
3851                 RETURN(-ENOMEM);
3852
3853         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3854                              RCL_CLIENT, keylen);
3855         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3856                              RCL_CLIENT, vallen);
3857         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3858         if (rc) {
3859                 ptlrpc_request_free(req);
3860                 RETURN(rc);
3861         }
3862
3863         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3864         memcpy(tmp, key, keylen);
3865         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3866         memcpy(tmp, val, vallen);
3867
3868         if (KEY_IS(KEY_MDS_CONN)) {
3869                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3870
3871                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3872                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3873                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3874                 req->rq_no_delay = req->rq_no_resend = 1;
3875                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3876         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3877                 struct osc_grant_args *aa;
3878                 struct obdo *oa;
3879
3880                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3881                 aa = ptlrpc_req_async_args(req);
3882                 OBD_ALLOC_PTR(oa);
3883                 if (!oa) {
3884                         ptlrpc_req_finished(req);
3885                         RETURN(-ENOMEM);
3886                 }
3887                 *oa = ((struct ost_body *)val)->oa;
3888                 aa->aa_oa = oa;
3889                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3890         }
3891         
3892         ptlrpc_request_set_replen(req);
3893         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3894                 LASSERT(set != NULL);
3895                 ptlrpc_set_add_req(set, req);
3896                 ptlrpc_check_set(NULL, set);
3897         } else 
3898                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3899         
3900         RETURN(0);
3901 }
3902
3903
3904 static struct llog_operations osc_size_repl_logops = {
3905         lop_cancel: llog_obd_repl_cancel
3906 };
3907
3908 static struct llog_operations osc_mds_ost_orig_logops;
3909 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3910                          struct obd_device *tgt, int count,
3911                          struct llog_catid *catid, struct obd_uuid *uuid)
3912 {
3913         int rc;
3914         ENTRY;
3915
3916         LASSERT(olg == &obd->obd_olg);
3917         spin_lock(&obd->obd_dev_lock);
3918         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3919                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3920                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3921                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3922                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3923                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3924         }
3925         spin_unlock(&obd->obd_dev_lock);
3926
3927         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3928                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3929         if (rc) {
3930                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3931                 GOTO(out, rc);
3932         }
3933
3934         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3935                         NULL, &osc_size_repl_logops);
3936         if (rc) {
3937                 struct llog_ctxt *ctxt =
3938                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3939                 if (ctxt)
3940                         llog_cleanup(ctxt);
3941                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3942         }
3943         GOTO(out, rc);
3944 out:
3945         if (rc) {
3946                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3947                        obd->obd_name, tgt->obd_name, count, catid, rc);
3948                 CERROR("logid "LPX64":0x%x\n",
3949                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3950         }
3951         return rc;
3952 }
3953
3954 static int osc_llog_finish(struct obd_device *obd, int count)
3955 {
3956         struct llog_ctxt *ctxt;
3957         int rc = 0, rc2 = 0;
3958         ENTRY;
3959
3960         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3961         if (ctxt)
3962                 rc = llog_cleanup(ctxt);
3963
3964         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3965         if (ctxt)
3966                 rc2 = llog_cleanup(ctxt);
3967         if (!rc)
3968                 rc = rc2;
3969
3970         RETURN(rc);
3971 }
3972
3973 static int osc_reconnect(const struct lu_env *env,
3974                          struct obd_export *exp, struct obd_device *obd,
3975                          struct obd_uuid *cluuid,
3976                          struct obd_connect_data *data,
3977                          void *localdata)
3978 {
3979         struct client_obd *cli = &obd->u.cli;
3980
3981         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3982                 long lost_grant;
3983
3984                 client_obd_list_lock(&cli->cl_loi_list_lock);
3985                 data->ocd_grant = cli->cl_avail_grant ?:
3986                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3987                 lost_grant = cli->cl_lost_grant;
3988                 cli->cl_lost_grant = 0;
3989                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3990
3991                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3992                        "cl_lost_grant: %ld\n", data->ocd_grant,
3993                        cli->cl_avail_grant, lost_grant);
3994                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3995                        " ocd_grant: %d\n", data->ocd_connect_flags,
3996                        data->ocd_version, data->ocd_grant);
3997         }
3998
3999         RETURN(0);
4000 }
4001
4002 static int osc_disconnect(struct obd_export *exp)
4003 {
4004         struct obd_device *obd = class_exp2obd(exp);
4005         struct llog_ctxt  *ctxt;
4006         int rc;
4007
4008         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4009         if (ctxt) {
4010                 if (obd->u.cli.cl_conn_count == 1) {
4011                         /* Flush any remaining cancel messages out to the
4012                          * target */
4013                         llog_sync(ctxt, exp);
4014                 }
4015                 llog_ctxt_put(ctxt);
4016         } else {
4017                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4018                        obd);
4019         }
4020
4021         rc = client_disconnect_export(exp);
4022         /**
4023          * Initially we put del_shrink_grant before disconnect_export, but it
4024          * causes the following problem if setup (connect) and cleanup
4025          * (disconnect) are tangled together.
4026          *      connect p1                     disconnect p2
4027          *   ptlrpc_connect_import 
4028          *     ...............               class_manual_cleanup
4029          *                                     osc_disconnect
4030          *                                     del_shrink_grant
4031          *   ptlrpc_connect_interrupt
4032          *     init_grant_shrink
4033          *   add this client to shrink list                 
4034          *                                      cleanup_osc
4035          * Bang! pinger trigger the shrink.
4036          * So the osc should be disconnected from the shrink list, after we
4037          * are sure the import has been destroyed. BUG18662 
4038          */
4039         if (obd->u.cli.cl_import == NULL)
4040                 osc_del_shrink_grant(&obd->u.cli);
4041         return rc;
4042 }
4043
4044 static int osc_import_event(struct obd_device *obd,
4045                             struct obd_import *imp,
4046                             enum obd_import_event event)
4047 {
4048         struct client_obd *cli;
4049         int rc = 0;
4050
4051         ENTRY;
4052         LASSERT(imp->imp_obd == obd);
4053
4054         switch (event) {
4055         case IMP_EVENT_DISCON: {
4056                 /* Only do this on the MDS OSC's */
4057                 if (imp->imp_server_timeout) {
4058                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4059
4060                         spin_lock(&oscc->oscc_lock);
4061                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4062                         spin_unlock(&oscc->oscc_lock);
4063                 }
4064                 cli = &obd->u.cli;
4065                 client_obd_list_lock(&cli->cl_loi_list_lock);
4066                 cli->cl_avail_grant = 0;
4067                 cli->cl_lost_grant = 0;
4068                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4069                 break;
4070         }
4071         case IMP_EVENT_INACTIVE: {
4072                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4073                 break;
4074         }
4075         case IMP_EVENT_INVALIDATE: {
4076                 struct ldlm_namespace *ns = obd->obd_namespace;
4077                 struct lu_env         *env;
4078                 int                    refcheck;
4079
4080                 env = cl_env_get(&refcheck);
4081                 if (!IS_ERR(env)) {
4082                         /* Reset grants */
4083                         cli = &obd->u.cli;
4084                         client_obd_list_lock(&cli->cl_loi_list_lock);
4085                         /* all pages go to failing rpcs due to the invalid
4086                          * import */
4087                         osc_check_rpcs(env, cli);
4088                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4089
4090                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4091                         cl_env_put(env, &refcheck);
4092                 } else
4093                         rc = PTR_ERR(env);
4094                 break;
4095         }
4096         case IMP_EVENT_ACTIVE: {
4097                 /* Only do this on the MDS OSC's */
4098                 if (imp->imp_server_timeout) {
4099                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4100
4101                         spin_lock(&oscc->oscc_lock);
4102                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4103                         spin_unlock(&oscc->oscc_lock);
4104                 }
4105                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4106                 break;
4107         }
4108         case IMP_EVENT_OCD: {
4109                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4110
4111                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4112                         osc_init_grant(&obd->u.cli, ocd);
4113
4114                 /* See bug 7198 */
4115                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4116                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4117
4118                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4119                 break;
4120         }
4121         default:
4122                 CERROR("Unknown import event %d\n", event);
4123                 LBUG();
4124         }
4125         RETURN(rc);
4126 }
4127
4128 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4129 {
4130         int rc;
4131         ENTRY;
4132
4133         ENTRY;
4134         rc = ptlrpcd_addref();
4135         if (rc)
4136                 RETURN(rc);
4137
4138         rc = client_obd_setup(obd, lcfg);
4139         if (rc) {
4140                 ptlrpcd_decref();
4141         } else {
4142                 struct lprocfs_static_vars lvars = { 0 };
4143                 struct client_obd *cli = &obd->u.cli;
4144
4145                 lprocfs_osc_init_vars(&lvars);
4146                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4147                         lproc_osc_attach_seqstat(obd);
4148                         sptlrpc_lprocfs_cliobd_attach(obd);
4149                         ptlrpc_lprocfs_register_obd(obd);
4150                 }
4151
4152                 oscc_init(obd);
4153                 /* We need to allocate a few requests more, because
4154                    brw_interpret tries to create new requests before freeing
4155                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4156                    reserved, but I afraid that might be too much wasted RAM
4157                    in fact, so 2 is just my guess and still should work. */
4158                 cli->cl_import->imp_rq_pool =
4159                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4160                                             OST_MAXREQSIZE,
4161                                             ptlrpc_add_rqs_to_pool);
4162                 
4163                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4164                 sema_init(&cli->cl_grant_sem, 1);
4165         }
4166
4167         RETURN(rc);
4168 }
4169
4170 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4171 {
4172         int rc = 0;
4173         ENTRY;
4174
4175         switch (stage) {
4176         case OBD_CLEANUP_EARLY: {
4177                 struct obd_import *imp;
4178                 imp = obd->u.cli.cl_import;
4179                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4180                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4181                 ptlrpc_deactivate_import(imp);
4182                 spin_lock(&imp->imp_lock);
4183                 imp->imp_pingable = 0;
4184                 spin_unlock(&imp->imp_lock);
4185                 break;
4186         }
4187         case OBD_CLEANUP_EXPORTS: {
4188                 /* If we set up but never connected, the
4189                    client import will not have been cleaned. */
4190                 if (obd->u.cli.cl_import) {
4191                         struct obd_import *imp;
4192                         down_write(&obd->u.cli.cl_sem);
4193                         imp = obd->u.cli.cl_import;
4194                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4195                                obd->obd_name);
4196                         ptlrpc_invalidate_import(imp);
4197                         if (imp->imp_rq_pool) {
4198                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4199                                 imp->imp_rq_pool = NULL;
4200                         }
4201                         class_destroy_import(imp);
4202                         up_write(&obd->u.cli.cl_sem);
4203                         obd->u.cli.cl_import = NULL;
4204                 }
4205                 rc = obd_llog_finish(obd, 0);
4206                 if (rc != 0)
4207                         CERROR("failed to cleanup llogging subsystems\n");
4208                 break;
4209                 }
4210         }
4211         RETURN(rc);
4212 }
4213
4214 int osc_cleanup(struct obd_device *obd)
4215 {
4216         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4217         int rc;
4218
4219         ENTRY;
4220         ptlrpc_lprocfs_unregister_obd(obd);
4221         lprocfs_obd_cleanup(obd);
4222
4223         spin_lock(&oscc->oscc_lock);
4224         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4225         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4226         spin_unlock(&oscc->oscc_lock);
4227
4228         /* free memory of osc quota cache */
4229         lquota_cleanup(quota_interface, obd);
4230
4231         rc = client_obd_cleanup(obd);
4232
4233         ptlrpcd_decref();
4234         RETURN(rc);
4235 }
4236
4237 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4238 {
4239         struct lprocfs_static_vars lvars = { 0 };
4240         int rc = 0;
4241
4242         lprocfs_osc_init_vars(&lvars);
4243
4244         switch (lcfg->lcfg_command) {
4245         default:
4246                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4247                                               lcfg, obd);
4248                 if (rc > 0)
4249                         rc = 0;
4250                 break;
4251         }
4252
4253         return(rc);
4254 }
4255
4256 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4257 {
4258         return osc_process_config_base(obd, buf);
4259 }
4260
4261 struct obd_ops osc_obd_ops = {
4262         .o_owner                = THIS_MODULE,
4263         .o_setup                = osc_setup,
4264         .o_precleanup           = osc_precleanup,
4265         .o_cleanup              = osc_cleanup,
4266         .o_add_conn             = client_import_add_conn,
4267         .o_del_conn             = client_import_del_conn,
4268         .o_connect              = client_connect_import,
4269         .o_reconnect            = osc_reconnect,
4270         .o_disconnect           = osc_disconnect,
4271         .o_statfs               = osc_statfs,
4272         .o_statfs_async         = osc_statfs_async,
4273         .o_packmd               = osc_packmd,
4274         .o_unpackmd             = osc_unpackmd,
4275         .o_precreate            = osc_precreate,
4276         .o_create               = osc_create,
4277         .o_destroy              = osc_destroy,
4278         .o_getattr              = osc_getattr,
4279         .o_getattr_async        = osc_getattr_async,
4280         .o_setattr              = osc_setattr,
4281         .o_setattr_async        = osc_setattr_async,
4282         .o_brw                  = osc_brw,
4283         .o_punch                = osc_punch,
4284         .o_sync                 = osc_sync,
4285         .o_enqueue              = osc_enqueue,
4286         .o_change_cbdata        = osc_change_cbdata,
4287         .o_cancel               = osc_cancel,
4288         .o_cancel_unused        = osc_cancel_unused,
4289         .o_iocontrol            = osc_iocontrol,
4290         .o_get_info             = osc_get_info,
4291         .o_set_info_async       = osc_set_info_async,
4292         .o_import_event         = osc_import_event,
4293         .o_llog_init            = osc_llog_init,
4294         .o_llog_finish          = osc_llog_finish,
4295         .o_process_config       = osc_process_config,
4296 };
4297
4298 extern struct lu_kmem_descr  osc_caches[];
4299 extern spinlock_t            osc_ast_guard;
4300 extern struct lock_class_key osc_ast_guard_class;
4301
4302 int __init osc_init(void)
4303 {
4304         struct lprocfs_static_vars lvars = { 0 };
4305         int rc;
4306         ENTRY;
4307
4308         /* print an address of _any_ initialized kernel symbol from this
4309          * module, to allow debugging with gdb that doesn't support data
4310          * symbols from modules.*/
4311         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4312
4313         rc = lu_kmem_init(osc_caches);
4314
4315         lprocfs_osc_init_vars(&lvars);
4316
4317         request_module("lquota");
4318         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4319         lquota_init(quota_interface);
4320         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4321
4322         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4323                                  LUSTRE_OSC_NAME, &osc_device_type);
4324         if (rc) {
4325                 if (quota_interface)
4326                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4327                 lu_kmem_fini(osc_caches);
4328                 RETURN(rc);
4329         }
4330
4331         spin_lock_init(&osc_ast_guard);
4332         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4333
4334         RETURN(rc);
4335 }
4336
4337 #ifdef __KERNEL__
4338 static void /*__exit*/ osc_exit(void)
4339 {
4340         lu_device_type_fini(&osc_device_type);
4341
4342         lquota_exit(quota_interface);
4343         if (quota_interface)
4344                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4345
4346         class_unregister_type(LUSTRE_OSC_NAME);
4347         lu_kmem_fini(osc_caches);
4348 }
4349
4350 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4351 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4352 MODULE_LICENSE("GPL");
4353
4354 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4355 #endif