Whamcloud - gitweb
3cbe9c01db5e46db8a529bb1549a2b54389cfb5f
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798 }
799
800 /* caller must hold loi_list_lock */
801 static void osc_consume_write_grant(struct client_obd *cli,
802                                     struct brw_page *pga)
803 {
804         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
805         atomic_inc(&obd_dirty_pages);
806         cli->cl_dirty += CFS_PAGE_SIZE;
807         cli->cl_avail_grant -= CFS_PAGE_SIZE;
808         pga->flag |= OBD_BRW_FROM_GRANT;
809         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
810                CFS_PAGE_SIZE, pga, pga->pg);
811         LASSERT(cli->cl_avail_grant >= 0);
812 }
813
814 /* the companion to osc_consume_write_grant, called when a brw has completed.
815  * must be called with the loi lock held. */
816 static void osc_release_write_grant(struct client_obd *cli,
817                                     struct brw_page *pga, int sent)
818 {
819         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
820         ENTRY;
821
822         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
823                 EXIT;
824                 return;
825         }
826
827         pga->flag &= ~OBD_BRW_FROM_GRANT;
828         atomic_dec(&obd_dirty_pages);
829         cli->cl_dirty -= CFS_PAGE_SIZE;
830         if (pga->flag & OBD_BRW_NOCACHE) {
831                 pga->flag &= ~OBD_BRW_NOCACHE;
832                 atomic_dec(&obd_dirty_transit_pages);
833                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
834         }
835         if (!sent) {
836                 cli->cl_lost_grant += CFS_PAGE_SIZE;
837                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
838                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
839         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
840                 /* For short writes we shouldn't count parts of pages that
841                  * span a whole block on the OST side, or our accounting goes
842                  * wrong.  Should match the code in filter_grant_check. */
843                 int offset = pga->off & ~CFS_PAGE_MASK;
844                 int count = pga->count + (offset & (blocksize - 1));
845                 int end = (offset + pga->count) & (blocksize - 1);
846                 if (end)
847                         count += blocksize - end;
848
849                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
850                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
851                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
852                        cli->cl_avail_grant, cli->cl_dirty);
853         }
854
855         EXIT;
856 }
857
858 static unsigned long rpcs_in_flight(struct client_obd *cli)
859 {
860         return cli->cl_r_in_flight + cli->cl_w_in_flight;
861 }
862
863 /* caller must hold loi_list_lock */
864 void osc_wake_cache_waiters(struct client_obd *cli)
865 {
866         struct list_head *l, *tmp;
867         struct osc_cache_waiter *ocw;
868
869         ENTRY;
870         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
871                 /* if we can't dirty more, we must wait until some is written */
872                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
873                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
874                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
875                                "osc max %ld, sys max %d\n", cli->cl_dirty,
876                                cli->cl_dirty_max, obd_max_dirty_pages);
877                         return;
878                 }
879
880                 /* if still dirty cache but no grant wait for pending RPCs that
881                  * may yet return us some grant before doing sync writes */
882                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
883                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
884                                cli->cl_w_in_flight);
885                         return;
886                 }
887
888                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
889                 list_del_init(&ocw->ocw_entry);
890                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
891                         /* no more RPCs in flight to return grant, do sync IO */
892                         ocw->ocw_rc = -EDQUOT;
893                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
894                 } else {
895                         osc_consume_write_grant(cli,
896                                                 &ocw->ocw_oap->oap_brw_page);
897                 }
898
899                 cfs_waitq_signal(&ocw->ocw_waitq);
900         }
901
902         EXIT;
903 }
904
905 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
906 {
907         client_obd_list_lock(&cli->cl_loi_list_lock);
908         cli->cl_avail_grant = ocd->ocd_grant;
909         client_obd_list_unlock(&cli->cl_loi_list_lock);
910
911         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
912                cli->cl_avail_grant, cli->cl_lost_grant);
913         LASSERT(cli->cl_avail_grant >= 0);
914 }
915
916 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
917 {
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
920         if (body->oa.o_valid & OBD_MD_FLGRANT)
921                 cli->cl_avail_grant += body->oa.o_grant;
922         /* waiters are woken in brw_interpret */
923         client_obd_list_unlock(&cli->cl_loi_list_lock);
924 }
925
926 /* We assume that the reason this OSC got a short read is because it read
927  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
928  * via the LOV, and it _knows_ it's reading inside the file, it's just that
929  * this stripe never got written at or beyond this stripe offset yet. */
930 static void handle_short_read(int nob_read, obd_count page_count,
931                               struct brw_page **pga)
932 {
933         char *ptr;
934         int i = 0;
935
936         /* skip bytes read OK */
937         while (nob_read > 0) {
938                 LASSERT (page_count > 0);
939
940                 if (pga[i]->count > nob_read) {
941                         /* EOF inside this page */
942                         ptr = cfs_kmap(pga[i]->pg) +
943                                 (pga[i]->off & ~CFS_PAGE_MASK);
944                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
945                         cfs_kunmap(pga[i]->pg);
946                         page_count--;
947                         i++;
948                         break;
949                 }
950
951                 nob_read -= pga[i]->count;
952                 page_count--;
953                 i++;
954         }
955
956         /* zero remaining pages */
957         while (page_count-- > 0) {
958                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
959                 memset(ptr, 0, pga[i]->count);
960                 cfs_kunmap(pga[i]->pg);
961                 i++;
962         }
963 }
964
965 static int check_write_rcs(struct ptlrpc_request *req,
966                            int requested_nob, int niocount,
967                            obd_count page_count, struct brw_page **pga)
968 {
969         int    *remote_rcs, i;
970
971         /* return error if any niobuf was in error */
972         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
973                                         sizeof(*remote_rcs) * niocount, NULL);
974         if (remote_rcs == NULL) {
975                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
976                 return(-EPROTO);
977         }
978         if (lustre_msg_swabbed(req->rq_repmsg))
979                 for (i = 0; i < niocount; i++)
980                         __swab32s(&remote_rcs[i]);
981
982         for (i = 0; i < niocount; i++) {
983                 if (remote_rcs[i] < 0)
984                         return(remote_rcs[i]);
985
986                 if (remote_rcs[i] != 0) {
987                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
988                                 i, remote_rcs[i], req);
989                         return(-EPROTO);
990                 }
991         }
992
993         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
994                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
995                        req->rq_bulk->bd_nob_transferred, requested_nob);
996                 return(-EPROTO);
997         }
998
999         return (0);
1000 }
1001
1002 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1003 {
1004         if (p1->flag != p2->flag) {
1005                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1006                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1007
1008                 /* warn if we try to combine flags that we don't know to be
1009                  * safe to combine */
1010                 if ((p1->flag & mask) != (p2->flag & mask))
1011                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1012                                "same brw?\n", p1->flag, p2->flag);
1013                 return 0;
1014         }
1015
1016         return (p1->off + p1->count == p2->off);
1017 }
1018
1019 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1020                                    struct brw_page **pga, int opc,
1021                                    cksum_type_t cksum_type)
1022 {
1023         __u32 cksum;
1024         int i = 0;
1025
1026         LASSERT (pg_count > 0);
1027         cksum = init_checksum(cksum_type);
1028         while (nob > 0 && pg_count > 0) {
1029                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1030                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1031                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1032
1033                 /* corrupt the data before we compute the checksum, to
1034                  * simulate an OST->client data error */
1035                 if (i == 0 && opc == OST_READ &&
1036                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1037                         memcpy(ptr + off, "bad1", min(4, nob));
1038                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1039                 cfs_kunmap(pga[i]->pg);
1040                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1041                                off, cksum);
1042
1043                 nob -= pga[i]->count;
1044                 pg_count--;
1045                 i++;
1046         }
1047         /* For sending we only compute the wrong checksum instead
1048          * of corrupting the data so it is still correct on a redo */
1049         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1050                 cksum++;
1051
1052         return cksum;
1053 }
1054
1055 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1056                                 struct lov_stripe_md *lsm, obd_count page_count,
1057                                 struct brw_page **pga,
1058                                 struct ptlrpc_request **reqp,
1059                                 struct obd_capa *ocapa, int reserve)
1060 {
1061         struct ptlrpc_request   *req;
1062         struct ptlrpc_bulk_desc *desc;
1063         struct ost_body         *body;
1064         struct obd_ioobj        *ioobj;
1065         struct niobuf_remote    *niobuf;
1066         int niocount, i, requested_nob, opc, rc;
1067         struct osc_brw_async_args *aa;
1068         struct req_capsule      *pill;
1069         struct brw_page *pg_prev;
1070
1071         ENTRY;
1072         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1073                 RETURN(-ENOMEM); /* Recoverable */
1074         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1075                 RETURN(-EINVAL); /* Fatal */
1076
1077         if ((cmd & OBD_BRW_WRITE) != 0) {
1078                 opc = OST_WRITE;
1079                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1080                                                 cli->cl_import->imp_rq_pool,
1081                                                 &RQF_OST_BRW);
1082         } else {
1083                 opc = OST_READ;
1084                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1085         }
1086         if (req == NULL)
1087                 RETURN(-ENOMEM);
1088
1089         for (niocount = i = 1; i < page_count; i++) {
1090                 if (!can_merge_pages(pga[i - 1], pga[i]))
1091                         niocount++;
1092         }
1093
1094         pill = &req->rq_pill;
1095         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1096                              niocount * sizeof(*niobuf));
1097         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1098
1099         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1100         if (rc) {
1101                 ptlrpc_request_free(req);
1102                 RETURN(rc);
1103         }
1104         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1105         ptlrpc_at_set_req_timeout(req);
1106
1107         if (opc == OST_WRITE)
1108                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1109                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1110         else
1111                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1112                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1113
1114         if (desc == NULL)
1115                 GOTO(out, rc = -ENOMEM);
1116         /* NB request now owns desc and will free it when it gets freed */
1117
1118         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1119         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1120         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1121         LASSERT(body && ioobj && niobuf);
1122
1123         body->oa = *oa;
1124
1125         obdo_to_ioobj(oa, ioobj);
1126         ioobj->ioo_bufcnt = niocount;
1127         osc_pack_capa(req, body, ocapa);
1128         LASSERT (page_count > 0);
1129         pg_prev = pga[0];
1130         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1131                 struct brw_page *pg = pga[i];
1132
1133                 LASSERT(pg->count > 0);
1134                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1135                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1136                          pg->off, pg->count);
1137 #ifdef __linux__
1138                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1139                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1140                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1141                          i, page_count,
1142                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1143                          pg_prev->pg, page_private(pg_prev->pg),
1144                          pg_prev->pg->index, pg_prev->off);
1145 #else
1146                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1147                          "i %d p_c %u\n", i, page_count);
1148 #endif
1149                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1150                         (pg->flag & OBD_BRW_SRVLOCK));
1151
1152                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1153                                       pg->count);
1154                 requested_nob += pg->count;
1155
1156                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1157                         niobuf--;
1158                         niobuf->len += pg->count;
1159                 } else {
1160                         niobuf->offset = pg->off;
1161                         niobuf->len    = pg->count;
1162                         niobuf->flags  = pg->flag;
1163                 }
1164                 pg_prev = pg;
1165         }
1166
1167         LASSERTF((void *)(niobuf - niocount) ==
1168                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1169                                niocount * sizeof(*niobuf)),
1170                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1171                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1172                 (void *)(niobuf - niocount));
1173
1174         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1175
1176         /* size[REQ_REC_OFF] still sizeof (*body) */
1177         if (opc == OST_WRITE) {
1178                 if (unlikely(cli->cl_checksum) &&
1179                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1180                         /* store cl_cksum_type in a local variable since
1181                          * it can be changed via lprocfs */
1182                         cksum_type_t cksum_type = cli->cl_cksum_type;
1183
1184                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1185                                 oa->o_flags = body->oa.o_flags = 0;
1186                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1187                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1188                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1189                                                              page_count, pga,
1190                                                              OST_WRITE,
1191                                                              cksum_type);
1192                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1193                                body->oa.o_cksum);
1194                         /* save this in 'oa', too, for later checking */
1195                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1196                         oa->o_flags |= cksum_type_pack(cksum_type);
1197                 } else {
1198                         /* clear out the checksum flag, in case this is a
1199                          * resend but cl_checksum is no longer set. b=11238 */
1200                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1201                 }
1202                 oa->o_cksum = body->oa.o_cksum;
1203                 /* 1 RC per niobuf */
1204                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1205                                      sizeof(__u32) * niocount);
1206         } else {
1207                 if (unlikely(cli->cl_checksum) &&
1208                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1209                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1210                                 body->oa.o_flags = 0;
1211                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1212                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1213                 }
1214                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1215                 /* 1 RC for the whole I/O */
1216         }
1217         ptlrpc_request_set_replen(req);
1218
1219         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1220         aa = ptlrpc_req_async_args(req);
1221         aa->aa_oa = oa;
1222         aa->aa_requested_nob = requested_nob;
1223         aa->aa_nio_count = niocount;
1224         aa->aa_page_count = page_count;
1225         aa->aa_resends = 0;
1226         aa->aa_ppga = pga;
1227         aa->aa_cli = cli;
1228         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1229         if (ocapa && reserve)
1230                 aa->aa_ocapa = capa_get(ocapa);
1231
1232         *reqp = req;
1233         RETURN(0);
1234
1235  out:
1236         ptlrpc_req_finished(req);
1237         RETURN(rc);
1238 }
1239
1240 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1241                                 __u32 client_cksum, __u32 server_cksum, int nob,
1242                                 obd_count page_count, struct brw_page **pga,
1243                                 cksum_type_t client_cksum_type)
1244 {
1245         __u32 new_cksum;
1246         char *msg;
1247         cksum_type_t cksum_type;
1248
1249         if (server_cksum == client_cksum) {
1250                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1251                 return 0;
1252         }
1253
1254         if (oa->o_valid & OBD_MD_FLFLAGS)
1255                 cksum_type = cksum_type_unpack(oa->o_flags);
1256         else
1257                 cksum_type = OBD_CKSUM_CRC32;
1258
1259         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1260                                       cksum_type);
1261
1262         if (cksum_type != client_cksum_type)
1263                 msg = "the server did not use the checksum type specified in "
1264                       "the original request - likely a protocol problem";
1265         else if (new_cksum == server_cksum)
1266                 msg = "changed on the client after we checksummed it - "
1267                       "likely false positive due to mmap IO (bug 11742)";
1268         else if (new_cksum == client_cksum)
1269                 msg = "changed in transit before arrival at OST";
1270         else
1271                 msg = "changed in transit AND doesn't match the original - "
1272                       "likely false positive due to mmap IO (bug 11742)";
1273
1274         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1275                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1276                            "["LPU64"-"LPU64"]\n",
1277                            msg, libcfs_nid2str(peer->nid),
1278                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1279                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1280                                                         (__u64)0,
1281                            oa->o_id,
1282                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1283                            pga[0]->off,
1284                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1285         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1286                "client csum now %x\n", client_cksum, client_cksum_type,
1287                server_cksum, cksum_type, new_cksum);
1288         return 1;
1289 }
1290
1291 /* Note rc enters this function as number of bytes transferred */
1292 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1293 {
1294         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1295         const lnet_process_id_t *peer =
1296                         &req->rq_import->imp_connection->c_peer;
1297         struct client_obd *cli = aa->aa_cli;
1298         struct ost_body *body;
1299         __u32 client_cksum = 0;
1300         ENTRY;
1301
1302         if (rc < 0 && rc != -EDQUOT)
1303                 RETURN(rc);
1304
1305         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1306         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1307                                   lustre_swab_ost_body);
1308         if (body == NULL) {
1309                 CDEBUG(D_INFO, "Can't unpack body\n");
1310                 RETURN(-EPROTO);
1311         }
1312
1313         /* set/clear over quota flag for a uid/gid */
1314         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1315             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1316                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1317                              body->oa.o_gid, body->oa.o_valid,
1318                              body->oa.o_flags);
1319
1320         if (rc < 0)
1321                 RETURN(rc);
1322
1323         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1324                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1325
1326         osc_update_grant(cli, body);
1327
1328         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1329                 if (rc > 0) {
1330                         CERROR("Unexpected +ve rc %d\n", rc);
1331                         RETURN(-EPROTO);
1332                 }
1333                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1334
1335                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1336                         RETURN(-EAGAIN);
1337
1338                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1339                     check_write_checksum(&body->oa, peer, client_cksum,
1340                                          body->oa.o_cksum, aa->aa_requested_nob,
1341                                          aa->aa_page_count, aa->aa_ppga,
1342                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1343                         RETURN(-EAGAIN);
1344
1345                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1346                                      aa->aa_page_count, aa->aa_ppga);
1347                 GOTO(out, rc);
1348         }
1349
1350         /* The rest of this function executes only for OST_READs */
1351
1352         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1353         if (rc < 0)
1354                 GOTO(out, rc);
1355
1356         if (rc > aa->aa_requested_nob) {
1357                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1358                        aa->aa_requested_nob);
1359                 RETURN(-EPROTO);
1360         }
1361
1362         if (rc != req->rq_bulk->bd_nob_transferred) {
1363                 CERROR ("Unexpected rc %d (%d transferred)\n",
1364                         rc, req->rq_bulk->bd_nob_transferred);
1365                 return (-EPROTO);
1366         }
1367
1368         if (rc < aa->aa_requested_nob)
1369                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1370
1371         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1372                 static int cksum_counter;
1373                 __u32      server_cksum = body->oa.o_cksum;
1374                 char      *via;
1375                 char      *router;
1376                 cksum_type_t cksum_type;
1377
1378                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1379                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1380                 else
1381                         cksum_type = OBD_CKSUM_CRC32;
1382                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1383                                                  aa->aa_ppga, OST_READ,
1384                                                  cksum_type);
1385
1386                 if (peer->nid == req->rq_bulk->bd_sender) {
1387                         via = router = "";
1388                 } else {
1389                         via = " via ";
1390                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1391                 }
1392
1393                 if (server_cksum == ~0 && rc > 0) {
1394                         CERROR("Protocol error: server %s set the 'checksum' "
1395                                "bit, but didn't send a checksum.  Not fatal, "
1396                                "but please notify on http://bugzilla.lustre.org/\n",
1397                                libcfs_nid2str(peer->nid));
1398                 } else if (server_cksum != client_cksum) {
1399                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1400                                            "%s%s%s inum "LPU64"/"LPU64" object "
1401                                            LPU64"/"LPU64" extent "
1402                                            "["LPU64"-"LPU64"]\n",
1403                                            req->rq_import->imp_obd->obd_name,
1404                                            libcfs_nid2str(peer->nid),
1405                                            via, router,
1406                                            body->oa.o_valid & OBD_MD_FLFID ?
1407                                                 body->oa.o_fid : (__u64)0,
1408                                            body->oa.o_valid & OBD_MD_FLFID ?
1409                                                 body->oa.o_generation :(__u64)0,
1410                                            body->oa.o_id,
1411                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1412                                                 body->oa.o_gr : (__u64)0,
1413                                            aa->aa_ppga[0]->off,
1414                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1415                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1416                                                                         1);
1417                         CERROR("client %x, server %x, cksum_type %x\n",
1418                                client_cksum, server_cksum, cksum_type);
1419                         cksum_counter = 0;
1420                         aa->aa_oa->o_cksum = client_cksum;
1421                         rc = -EAGAIN;
1422                 } else {
1423                         cksum_counter++;
1424                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425                         rc = 0;
1426                 }
1427         } else if (unlikely(client_cksum)) {
1428                 static int cksum_missed;
1429
1430                 cksum_missed++;
1431                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1432                         CERROR("Checksum %u requested from %s but not sent\n",
1433                                cksum_missed, libcfs_nid2str(peer->nid));
1434         } else {
1435                 rc = 0;
1436         }
1437 out:
1438         if (rc >= 0)
1439                 *aa->aa_oa = body->oa;
1440
1441         RETURN(rc);
1442 }
1443
1444 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1445                             struct lov_stripe_md *lsm,
1446                             obd_count page_count, struct brw_page **pga,
1447                             struct obd_capa *ocapa)
1448 {
1449         struct ptlrpc_request *req;
1450         int                    rc;
1451         cfs_waitq_t            waitq;
1452         int                    resends = 0;
1453         struct l_wait_info     lwi;
1454
1455         ENTRY;
1456
1457         cfs_waitq_init(&waitq);
1458
1459 restart_bulk:
1460         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1461                                   page_count, pga, &req, ocapa, 0);
1462         if (rc != 0)
1463                 return (rc);
1464
1465         rc = ptlrpc_queue_wait(req);
1466
1467         if (rc == -ETIMEDOUT && req->rq_resend) {
1468                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1469                 ptlrpc_req_finished(req);
1470                 goto restart_bulk;
1471         }
1472
1473         rc = osc_brw_fini_request(req, rc);
1474
1475         ptlrpc_req_finished(req);
1476         if (osc_recoverable_error(rc)) {
1477                 resends++;
1478                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1479                         CERROR("too many resend retries, returning error\n");
1480                         RETURN(-EIO);
1481                 }
1482
1483                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1484                 l_wait_event(waitq, 0, &lwi);
1485
1486                 goto restart_bulk;
1487         }
1488
1489         RETURN (rc);
1490 }
1491
1492 int osc_brw_redo_request(struct ptlrpc_request *request,
1493                          struct osc_brw_async_args *aa)
1494 {
1495         struct ptlrpc_request *new_req;
1496         struct ptlrpc_request_set *set = request->rq_set;
1497         struct osc_brw_async_args *new_aa;
1498         struct osc_async_page *oap;
1499         int rc = 0;
1500         ENTRY;
1501
1502         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1503                 CERROR("too many resend retries, returning error\n");
1504                 RETURN(-EIO);
1505         }
1506
1507         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1508
1509         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1510                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1511                                   aa->aa_cli, aa->aa_oa,
1512                                   NULL /* lsm unused by osc currently */,
1513                                   aa->aa_page_count, aa->aa_ppga,
1514                                   &new_req, aa->aa_ocapa, 0);
1515         if (rc)
1516                 RETURN(rc);
1517
1518         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1519
1520         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1521                 if (oap->oap_request != NULL) {
1522                         LASSERTF(request == oap->oap_request,
1523                                  "request %p != oap_request %p\n",
1524                                  request, oap->oap_request);
1525                         if (oap->oap_interrupted) {
1526                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1527                                 ptlrpc_req_finished(new_req);
1528                                 RETURN(-EINTR);
1529                         }
1530                 }
1531         }
1532         /* New request takes over pga and oaps from old request.
1533          * Note that copying a list_head doesn't work, need to move it... */
1534         aa->aa_resends++;
1535         new_req->rq_interpret_reply = request->rq_interpret_reply;
1536         new_req->rq_async_args = request->rq_async_args;
1537         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1538
1539         new_aa = ptlrpc_req_async_args(new_req);
1540
1541         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1542         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1543         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1544
1545         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1546                 if (oap->oap_request) {
1547                         ptlrpc_req_finished(oap->oap_request);
1548                         oap->oap_request = ptlrpc_request_addref(new_req);
1549                 }
1550         }
1551
1552         new_aa->aa_ocapa = aa->aa_ocapa;
1553         aa->aa_ocapa = NULL;
1554
1555         /* use ptlrpc_set_add_req is safe because interpret functions work
1556          * in check_set context. only one way exist with access to request
1557          * from different thread got -EINTR - this way protected with
1558          * cl_loi_list_lock */
1559         ptlrpc_set_add_req(set, new_req);
1560
1561         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1562
1563         DEBUG_REQ(D_INFO, new_req, "new request");
1564         RETURN(0);
1565 }
1566
1567 /*
1568  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1569  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1570  * fine for our small page arrays and doesn't require allocation.  its an
1571  * insertion sort that swaps elements that are strides apart, shrinking the
1572  * stride down until its '1' and the array is sorted.
1573  */
1574 static void sort_brw_pages(struct brw_page **array, int num)
1575 {
1576         int stride, i, j;
1577         struct brw_page *tmp;
1578
1579         if (num == 1)
1580                 return;
1581         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1582                 ;
1583
1584         do {
1585                 stride /= 3;
1586                 for (i = stride ; i < num ; i++) {
1587                         tmp = array[i];
1588                         j = i;
1589                         while (j >= stride && array[j - stride]->off > tmp->off) {
1590                                 array[j] = array[j - stride];
1591                                 j -= stride;
1592                         }
1593                         array[j] = tmp;
1594                 }
1595         } while (stride > 1);
1596 }
1597
1598 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1599 {
1600         int count = 1;
1601         int offset;
1602         int i = 0;
1603
1604         LASSERT (pages > 0);
1605         offset = pg[i]->off & ~CFS_PAGE_MASK;
1606
1607         for (;;) {
1608                 pages--;
1609                 if (pages == 0)         /* that's all */
1610                         return count;
1611
1612                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1613                         return count;   /* doesn't end on page boundary */
1614
1615                 i++;
1616                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1617                 if (offset != 0)        /* doesn't start on page boundary */
1618                         return count;
1619
1620                 count++;
1621         }
1622 }
1623
1624 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1625 {
1626         struct brw_page **ppga;
1627         int i;
1628
1629         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1630         if (ppga == NULL)
1631                 return NULL;
1632
1633         for (i = 0; i < count; i++)
1634                 ppga[i] = pga + i;
1635         return ppga;
1636 }
1637
1638 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1639 {
1640         LASSERT(ppga != NULL);
1641         OBD_FREE(ppga, sizeof(*ppga) * count);
1642 }
1643
1644 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1645                    obd_count page_count, struct brw_page *pga,
1646                    struct obd_trans_info *oti)
1647 {
1648         struct obdo *saved_oa = NULL;
1649         struct brw_page **ppga, **orig;
1650         struct obd_import *imp = class_exp2cliimp(exp);
1651         struct client_obd *cli = &imp->imp_obd->u.cli;
1652         int rc, page_count_orig;
1653         ENTRY;
1654
1655         if (cmd & OBD_BRW_CHECK) {
1656                 /* The caller just wants to know if there's a chance that this
1657                  * I/O can succeed */
1658
1659                 if (imp == NULL || imp->imp_invalid)
1660                         RETURN(-EIO);
1661                 RETURN(0);
1662         }
1663
1664         /* test_brw with a failed create can trip this, maybe others. */
1665         LASSERT(cli->cl_max_pages_per_rpc);
1666
1667         rc = 0;
1668
1669         orig = ppga = osc_build_ppga(pga, page_count);
1670         if (ppga == NULL)
1671                 RETURN(-ENOMEM);
1672         page_count_orig = page_count;
1673
1674         sort_brw_pages(ppga, page_count);
1675         while (page_count) {
1676                 obd_count pages_per_brw;
1677
1678                 if (page_count > cli->cl_max_pages_per_rpc)
1679                         pages_per_brw = cli->cl_max_pages_per_rpc;
1680                 else
1681                         pages_per_brw = page_count;
1682
1683                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1684
1685                 if (saved_oa != NULL) {
1686                         /* restore previously saved oa */
1687                         *oinfo->oi_oa = *saved_oa;
1688                 } else if (page_count > pages_per_brw) {
1689                         /* save a copy of oa (brw will clobber it) */
1690                         OBDO_ALLOC(saved_oa);
1691                         if (saved_oa == NULL)
1692                                 GOTO(out, rc = -ENOMEM);
1693                         *saved_oa = *oinfo->oi_oa;
1694                 }
1695
1696                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1697                                       pages_per_brw, ppga, oinfo->oi_capa);
1698
1699                 if (rc != 0)
1700                         break;
1701
1702                 page_count -= pages_per_brw;
1703                 ppga += pages_per_brw;
1704         }
1705
1706 out:
1707         osc_release_ppga(orig, page_count_orig);
1708
1709         if (saved_oa != NULL)
1710                 OBDO_FREE(saved_oa);
1711
1712         RETURN(rc);
1713 }
1714
1715 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1716  * the dirty accounting.  Writeback completes or truncate happens before
1717  * writing starts.  Must be called with the loi lock held. */
1718 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1719                            int sent)
1720 {
1721         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1722 }
1723
1724
1725 /* This maintains the lists of pending pages to read/write for a given object
1726  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1727  * to quickly find objects that are ready to send an RPC. */
1728 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1729                          int cmd)
1730 {
1731         int optimal;
1732         ENTRY;
1733
1734         if (lop->lop_num_pending == 0)
1735                 RETURN(0);
1736
1737         /* if we have an invalid import we want to drain the queued pages
1738          * by forcing them through rpcs that immediately fail and complete
1739          * the pages.  recovery relies on this to empty the queued pages
1740          * before canceling the locks and evicting down the llite pages */
1741         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1742                 RETURN(1);
1743
1744         /* stream rpcs in queue order as long as as there is an urgent page
1745          * queued.  this is our cheap solution for good batching in the case
1746          * where writepage marks some random page in the middle of the file
1747          * as urgent because of, say, memory pressure */
1748         if (!list_empty(&lop->lop_urgent)) {
1749                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1750                 RETURN(1);
1751         }
1752         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1753         optimal = cli->cl_max_pages_per_rpc;
1754         if (cmd & OBD_BRW_WRITE) {
1755                 /* trigger a write rpc stream as long as there are dirtiers
1756                  * waiting for space.  as they're waiting, they're not going to
1757                  * create more pages to coallesce with what's waiting.. */
1758                 if (!list_empty(&cli->cl_cache_waiters)) {
1759                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1760                         RETURN(1);
1761                 }
1762                 /* +16 to avoid triggering rpcs that would want to include pages
1763                  * that are being queued but which can't be made ready until
1764                  * the queuer finishes with the page. this is a wart for
1765                  * llite::commit_write() */
1766                 optimal += 16;
1767         }
1768         if (lop->lop_num_pending >= optimal)
1769                 RETURN(1);
1770
1771         RETURN(0);
1772 }
1773
1774 static void on_list(struct list_head *item, struct list_head *list,
1775                     int should_be_on)
1776 {
1777         if (list_empty(item) && should_be_on)
1778                 list_add_tail(item, list);
1779         else if (!list_empty(item) && !should_be_on)
1780                 list_del_init(item);
1781 }
1782
1783 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1784  * can find pages to build into rpcs quickly */
1785 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1786 {
1787         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1788                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1789                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1790
1791         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1792                 loi->loi_write_lop.lop_num_pending);
1793
1794         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1795                 loi->loi_read_lop.lop_num_pending);
1796 }
1797
1798 static void lop_update_pending(struct client_obd *cli,
1799                                struct loi_oap_pages *lop, int cmd, int delta)
1800 {
1801         lop->lop_num_pending += delta;
1802         if (cmd & OBD_BRW_WRITE)
1803                 cli->cl_pending_w_pages += delta;
1804         else
1805                 cli->cl_pending_r_pages += delta;
1806 }
1807
1808 /**
1809  * this is called when a sync waiter receives an interruption.  Its job is to
1810  * get the caller woken as soon as possible.  If its page hasn't been put in an
1811  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1812  * desiring interruption which will forcefully complete the rpc once the rpc
1813  * has timed out.
1814  */
1815 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1816 {
1817         struct loi_oap_pages *lop;
1818         struct lov_oinfo *loi;
1819         int rc = -EBUSY;
1820         ENTRY;
1821
1822         LASSERT(!oap->oap_interrupted);
1823         oap->oap_interrupted = 1;
1824
1825         /* ok, it's been put in an rpc. only one oap gets a request reference */
1826         if (oap->oap_request != NULL) {
1827                 ptlrpc_mark_interrupted(oap->oap_request);
1828                 ptlrpcd_wake(oap->oap_request);
1829                 ptlrpc_req_finished(oap->oap_request);
1830                 oap->oap_request = NULL;
1831         }
1832
1833         /*
1834          * page completion may be called only if ->cpo_prep() method was
1835          * executed by osc_io_submit(), that also adds page the to pending list
1836          */
1837         if (!list_empty(&oap->oap_pending_item)) {
1838                 list_del_init(&oap->oap_pending_item);
1839                 list_del_init(&oap->oap_urgent_item);
1840
1841                 loi = oap->oap_loi;
1842                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1843                         &loi->loi_write_lop : &loi->loi_read_lop;
1844                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1845                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1846                 rc = oap->oap_caller_ops->ap_completion(env,
1847                                           oap->oap_caller_data,
1848                                           oap->oap_cmd, NULL, -EINTR);
1849         }
1850
1851         RETURN(rc);
1852 }
1853
1854 /* this is trying to propogate async writeback errors back up to the
1855  * application.  As an async write fails we record the error code for later if
1856  * the app does an fsync.  As long as errors persist we force future rpcs to be
1857  * sync so that the app can get a sync error and break the cycle of queueing
1858  * pages for which writeback will fail. */
1859 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1860                            int rc)
1861 {
1862         if (rc) {
1863                 if (!ar->ar_rc)
1864                         ar->ar_rc = rc;
1865
1866                 ar->ar_force_sync = 1;
1867                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1868                 return;
1869
1870         }
1871
1872         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1873                 ar->ar_force_sync = 0;
1874 }
1875
1876 void osc_oap_to_pending(struct osc_async_page *oap)
1877 {
1878         struct loi_oap_pages *lop;
1879
1880         if (oap->oap_cmd & OBD_BRW_WRITE)
1881                 lop = &oap->oap_loi->loi_write_lop;
1882         else
1883                 lop = &oap->oap_loi->loi_read_lop;
1884
1885         if (oap->oap_async_flags & ASYNC_URGENT)
1886                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1887         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1888         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1889 }
1890
1891 /* this must be called holding the loi list lock to give coverage to exit_cache,
1892  * async_flag maintenance, and oap_request */
1893 static void osc_ap_completion(const struct lu_env *env,
1894                               struct client_obd *cli, struct obdo *oa,
1895                               struct osc_async_page *oap, int sent, int rc)
1896 {
1897         __u64 xid = 0;
1898
1899         ENTRY;
1900         if (oap->oap_request != NULL) {
1901                 xid = ptlrpc_req_xid(oap->oap_request);
1902                 ptlrpc_req_finished(oap->oap_request);
1903                 oap->oap_request = NULL;
1904         }
1905
1906         oap->oap_async_flags = 0;
1907         oap->oap_interrupted = 0;
1908
1909         if (oap->oap_cmd & OBD_BRW_WRITE) {
1910                 osc_process_ar(&cli->cl_ar, xid, rc);
1911                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1912         }
1913
1914         if (rc == 0 && oa != NULL) {
1915                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1916                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1917                 if (oa->o_valid & OBD_MD_FLMTIME)
1918                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1919                 if (oa->o_valid & OBD_MD_FLATIME)
1920                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1921                 if (oa->o_valid & OBD_MD_FLCTIME)
1922                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1923         }
1924
1925         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
1926                                                 oap->oap_cmd, oa, rc);
1927
1928         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1929          * I/O on the page could start, but OSC calls it under lock
1930          * and thus we can add oap back to pending safely */
1931         if (rc)
1932                 /* upper layer wants to leave the page on pending queue */
1933                 osc_oap_to_pending(oap);
1934         else
1935                 osc_exit_cache(cli, oap, sent);
1936         EXIT;
1937 }
1938
1939 static int brw_interpret(const struct lu_env *env,
1940                          struct ptlrpc_request *req, void *data, int rc)
1941 {
1942         struct osc_brw_async_args *aa = data;
1943         struct client_obd *cli;
1944         int async;
1945         ENTRY;
1946
1947         rc = osc_brw_fini_request(req, rc);
1948         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1949         if (osc_recoverable_error(rc)) {
1950                 rc = osc_brw_redo_request(req, aa);
1951                 if (rc == 0)
1952                         RETURN(0);
1953         }
1954
1955         if (aa->aa_ocapa) {
1956                 capa_put(aa->aa_ocapa);
1957                 aa->aa_ocapa = NULL;
1958         }
1959
1960         cli = aa->aa_cli;
1961
1962         client_obd_list_lock(&cli->cl_loi_list_lock);
1963
1964         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1965          * is called so we know whether to go to sync BRWs or wait for more
1966          * RPCs to complete */
1967         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1968                 cli->cl_w_in_flight--;
1969         else
1970                 cli->cl_r_in_flight--;
1971
1972         async = list_empty(&aa->aa_oaps);
1973         if (!async) { /* from osc_send_oap_rpc() */
1974                 struct osc_async_page *oap, *tmp;
1975                 /* the caller may re-use the oap after the completion call so
1976                  * we need to clean it up a little */
1977                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
1978                         list_del_init(&oap->oap_rpc_item);
1979                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
1980                 }
1981                 OBDO_FREE(aa->aa_oa);
1982         } else { /* from async_internal() */
1983                 int i;
1984                 for (i = 0; i < aa->aa_page_count; i++)
1985                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
1986         }
1987         osc_wake_cache_waiters(cli);
1988         osc_check_rpcs(env, cli);
1989         client_obd_list_unlock(&cli->cl_loi_list_lock);
1990         if (!async)
1991                 cl_req_completion(env, aa->aa_clerq, rc);
1992         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1993         RETURN(rc);
1994 }
1995
1996 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
1997                                             struct client_obd *cli,
1998                                             struct list_head *rpc_list,
1999                                             int page_count, int cmd)
2000 {
2001         struct ptlrpc_request *req;
2002         struct brw_page **pga = NULL;
2003         struct osc_brw_async_args *aa;
2004         struct obdo *oa = NULL;
2005         const struct obd_async_page_ops *ops = NULL;
2006         void *caller_data = NULL;
2007         struct osc_async_page *oap;
2008         struct osc_async_page *tmp;
2009         struct ost_body *body;
2010         struct cl_req *clerq = NULL;
2011         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2012         struct ldlm_lock *lock = NULL;
2013         struct cl_req_attr crattr;
2014         int i, rc;
2015
2016         ENTRY;
2017         LASSERT(!list_empty(rpc_list));
2018
2019         memset(&crattr, 0, sizeof crattr);
2020         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2021         if (pga == NULL)
2022                 GOTO(out, req = ERR_PTR(-ENOMEM));
2023
2024         OBDO_ALLOC(oa);
2025         if (oa == NULL)
2026                 GOTO(out, req = ERR_PTR(-ENOMEM));
2027
2028         i = 0;
2029         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2030                 struct cl_page *page = osc_oap2cl_page(oap);
2031                 if (ops == NULL) {
2032                         ops = oap->oap_caller_ops;
2033                         caller_data = oap->oap_caller_data;
2034
2035                         clerq = cl_req_alloc(env, page, crt,
2036                                              1 /* only 1-object rpcs for
2037                                                 * now */);
2038                         if (IS_ERR(clerq))
2039                                 GOTO(out, req = (void *)clerq);
2040                         lock = oap->oap_ldlm_lock;
2041                 }
2042                 pga[i] = &oap->oap_brw_page;
2043                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2044                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2045                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2046                 i++;
2047                 cl_req_page_add(env, clerq, page);
2048         }
2049
2050         /* always get the data for the obdo for the rpc */
2051         LASSERT(ops != NULL);
2052         crattr.cra_oa = oa;
2053         crattr.cra_capa = NULL;
2054         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2055         if (lock) {
2056                 oa->o_handle = lock->l_remote_handle;
2057                 oa->o_valid |= OBD_MD_FLHANDLE;
2058         }
2059
2060         rc = cl_req_prep(env, clerq);
2061         if (rc != 0) {
2062                 CERROR("cl_req_prep failed: %d\n", rc);
2063                 GOTO(out, req = ERR_PTR(rc));
2064         }
2065
2066         sort_brw_pages(pga, page_count);
2067         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2068                                   pga, &req, crattr.cra_capa, 1);
2069         if (rc != 0) {
2070                 CERROR("prep_req failed: %d\n", rc);
2071                 GOTO(out, req = ERR_PTR(rc));
2072         }
2073
2074         /* Need to update the timestamps after the request is built in case
2075          * we race with setattr (locally or in queue at OST).  If OST gets
2076          * later setattr before earlier BRW (as determined by the request xid),
2077          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2078          * way to do this in a single call.  bug 10150 */
2079         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2080         cl_req_attr_set(env, clerq, &crattr,
2081                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2082
2083         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2084         aa = ptlrpc_req_async_args(req);
2085         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2086         list_splice(rpc_list, &aa->aa_oaps);
2087         CFS_INIT_LIST_HEAD(rpc_list);
2088         aa->aa_clerq = clerq;
2089 out:
2090         capa_put(crattr.cra_capa);
2091         if (IS_ERR(req)) {
2092                 if (oa)
2093                         OBDO_FREE(oa);
2094                 if (pga)
2095                         OBD_FREE(pga, sizeof(*pga) * page_count);
2096                 /* this should happen rarely and is pretty bad, it makes the
2097                  * pending list not follow the dirty order */
2098                 client_obd_list_lock(&cli->cl_loi_list_lock);
2099                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2100                         list_del_init(&oap->oap_rpc_item);
2101
2102                         /* queued sync pages can be torn down while the pages
2103                          * were between the pending list and the rpc */
2104                         if (oap->oap_interrupted) {
2105                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2106                                 osc_ap_completion(env, cli, NULL, oap, 0,
2107                                                   oap->oap_count);
2108                                 continue;
2109                         }
2110                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2111                 }
2112                 if (clerq && !IS_ERR(clerq))
2113                         cl_req_completion(env, clerq, PTR_ERR(req));
2114         }
2115         RETURN(req);
2116 }
2117
2118 /**
2119  * prepare pages for ASYNC io and put pages in send queue.
2120  *
2121  * \param cli -
2122  * \param loi -
2123  * \param cmd - OBD_BRW_* macroses
2124  * \param lop - pending pages
2125  *
2126  * \return zero if pages successfully add to send queue.
2127  * \return not zere if error occurring.
2128  */
2129 static int
2130 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2131                  struct lov_oinfo *loi,
2132                  int cmd, struct loi_oap_pages *lop)
2133 {
2134         struct ptlrpc_request *req;
2135         obd_count page_count = 0;
2136         struct osc_async_page *oap = NULL, *tmp;
2137         struct osc_brw_async_args *aa;
2138         const struct obd_async_page_ops *ops;
2139         CFS_LIST_HEAD(rpc_list);
2140         unsigned int ending_offset;
2141         unsigned  starting_offset = 0;
2142         int srvlock = 0;
2143         struct cl_object *clob = NULL;
2144         ENTRY;
2145
2146         /* first we find the pages we're allowed to work with */
2147         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2148                                  oap_pending_item) {
2149                 ops = oap->oap_caller_ops;
2150
2151                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2152                          "magic 0x%x\n", oap, oap->oap_magic);
2153
2154                 if (clob == NULL) {
2155                         /* pin object in memory, so that completion call-backs
2156                          * can be safely called under client_obd_list lock. */
2157                         clob = osc_oap2cl_page(oap)->cp_obj;
2158                         cl_object_get(clob);
2159                 }
2160
2161                 if (page_count != 0 &&
2162                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2163                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2164                                " oap %p, page %p, srvlock %u\n",
2165                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2166                         break;
2167                 }
2168                 /* in llite being 'ready' equates to the page being locked
2169                  * until completion unlocks it.  commit_write submits a page
2170                  * as not ready because its unlock will happen unconditionally
2171                  * as the call returns.  if we race with commit_write giving
2172                  * us that page we dont' want to create a hole in the page
2173                  * stream, so we stop and leave the rpc to be fired by
2174                  * another dirtier or kupdated interval (the not ready page
2175                  * will still be on the dirty list).  we could call in
2176                  * at the end of ll_file_write to process the queue again. */
2177                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2178                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2179                                                     cmd);
2180                         if (rc < 0)
2181                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2182                                                 "instead of ready\n", oap,
2183                                                 oap->oap_page, rc);
2184                         switch (rc) {
2185                         case -EAGAIN:
2186                                 /* llite is telling us that the page is still
2187                                  * in commit_write and that we should try
2188                                  * and put it in an rpc again later.  we
2189                                  * break out of the loop so we don't create
2190                                  * a hole in the sequence of pages in the rpc
2191                                  * stream.*/
2192                                 oap = NULL;
2193                                 break;
2194                         case -EINTR:
2195                                 /* the io isn't needed.. tell the checks
2196                                  * below to complete the rpc with EINTR */
2197                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2198                                 oap->oap_count = -EINTR;
2199                                 break;
2200                         case 0:
2201                                 oap->oap_async_flags |= ASYNC_READY;
2202                                 break;
2203                         default:
2204                                 LASSERTF(0, "oap %p page %p returned %d "
2205                                             "from make_ready\n", oap,
2206                                             oap->oap_page, rc);
2207                                 break;
2208                         }
2209                 }
2210                 if (oap == NULL)
2211                         break;
2212                 /*
2213                  * Page submitted for IO has to be locked. Either by
2214                  * ->ap_make_ready() or by higher layers.
2215                  */
2216 #if defined(__KERNEL__) && defined(__linux__)
2217                 {
2218                         struct cl_page *page;
2219
2220                         page = osc_oap2cl_page(oap);
2221
2222                         if (page->cp_type == CPT_CACHEABLE &&
2223                             !(PageLocked(oap->oap_page) &&
2224                               (CheckWriteback(oap->oap_page, cmd)))) {
2225                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2226                                        oap->oap_page,
2227                                        (long)oap->oap_page->flags,
2228                                        oap->oap_async_flags);
2229                                 LBUG();
2230                         }
2231                 }
2232 #endif
2233                 /* If there is a gap at the start of this page, it can't merge
2234                  * with any previous page, so we'll hand the network a
2235                  * "fragmented" page array that it can't transfer in 1 RDMA */
2236                 if (page_count != 0 && oap->oap_page_off != 0)
2237                         break;
2238
2239                 /* take the page out of our book-keeping */
2240                 list_del_init(&oap->oap_pending_item);
2241                 lop_update_pending(cli, lop, cmd, -1);
2242                 list_del_init(&oap->oap_urgent_item);
2243
2244                 if (page_count == 0)
2245                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2246                                           (PTLRPC_MAX_BRW_SIZE - 1);
2247
2248                 /* ask the caller for the size of the io as the rpc leaves. */
2249                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2250                         oap->oap_count =
2251                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2252                                                       cmd);
2253                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2254                 }
2255                 if (oap->oap_count <= 0) {
2256                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2257                                oap->oap_count);
2258                         osc_ap_completion(env, cli, NULL,
2259                                           oap, 0, oap->oap_count);
2260                         continue;
2261                 }
2262
2263                 /* now put the page back in our accounting */
2264                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2265                 if (page_count == 0)
2266                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2267                 if (++page_count >= cli->cl_max_pages_per_rpc)
2268                         break;
2269
2270                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2271                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2272                  * have the same alignment as the initial writes that allocated
2273                  * extents on the server. */
2274                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2275                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2276                 if (ending_offset == 0)
2277                         break;
2278
2279                 /* If there is a gap at the end of this page, it can't merge
2280                  * with any subsequent pages, so we'll hand the network a
2281                  * "fragmented" page array that it can't transfer in 1 RDMA */
2282                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2283                         break;
2284         }
2285
2286         osc_wake_cache_waiters(cli);
2287
2288         loi_list_maint(cli, loi);
2289
2290         client_obd_list_unlock(&cli->cl_loi_list_lock);
2291
2292         if (clob != NULL)
2293                 cl_object_put(env, clob);
2294
2295         if (page_count == 0) {
2296                 client_obd_list_lock(&cli->cl_loi_list_lock);
2297                 RETURN(0);
2298         }
2299
2300         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2301         if (IS_ERR(req)) {
2302                 LASSERT(list_empty(&rpc_list));
2303                 loi_list_maint(cli, loi);
2304                 RETURN(PTR_ERR(req));
2305         }
2306
2307         aa = ptlrpc_req_async_args(req);
2308
2309         if (cmd == OBD_BRW_READ) {
2310                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2311                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2312                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2313                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2314         } else {
2315                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2316                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2317                                  cli->cl_w_in_flight);
2318                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2319                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2320         }
2321         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2322
2323         client_obd_list_lock(&cli->cl_loi_list_lock);
2324
2325         if (cmd == OBD_BRW_READ)
2326                 cli->cl_r_in_flight++;
2327         else
2328                 cli->cl_w_in_flight++;
2329
2330         /* queued sync pages can be torn down while the pages
2331          * were between the pending list and the rpc */
2332         tmp = NULL;
2333         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2334                 /* only one oap gets a request reference */
2335                 if (tmp == NULL)
2336                         tmp = oap;
2337                 if (oap->oap_interrupted && !req->rq_intr) {
2338                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2339                                oap, req);
2340                         ptlrpc_mark_interrupted(req);
2341                 }
2342         }
2343         if (tmp != NULL)
2344                 tmp->oap_request = ptlrpc_request_addref(req);
2345
2346         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2347                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2348
2349         req->rq_interpret_reply = brw_interpret;
2350         ptlrpcd_add_req(req, PSCOPE_BRW);
2351         RETURN(1);
2352 }
2353
2354 #define LOI_DEBUG(LOI, STR, args...)                                     \
2355         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2356                !list_empty(&(LOI)->loi_cli_item),                        \
2357                (LOI)->loi_write_lop.lop_num_pending,                     \
2358                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2359                (LOI)->loi_read_lop.lop_num_pending,                      \
2360                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2361                args)                                                     \
2362
2363 /* This is called by osc_check_rpcs() to find which objects have pages that
2364  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2365 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2366 {
2367         ENTRY;
2368         /* first return all objects which we already know to have
2369          * pages ready to be stuffed into rpcs */
2370         if (!list_empty(&cli->cl_loi_ready_list))
2371                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2372                                   struct lov_oinfo, loi_cli_item));
2373
2374         /* then if we have cache waiters, return all objects with queued
2375          * writes.  This is especially important when many small files
2376          * have filled up the cache and not been fired into rpcs because
2377          * they don't pass the nr_pending/object threshhold */
2378         if (!list_empty(&cli->cl_cache_waiters) &&
2379             !list_empty(&cli->cl_loi_write_list))
2380                 RETURN(list_entry(cli->cl_loi_write_list.next,
2381                                   struct lov_oinfo, loi_write_item));
2382
2383         /* then return all queued objects when we have an invalid import
2384          * so that they get flushed */
2385         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2386                 if (!list_empty(&cli->cl_loi_write_list))
2387                         RETURN(list_entry(cli->cl_loi_write_list.next,
2388                                           struct lov_oinfo, loi_write_item));
2389                 if (!list_empty(&cli->cl_loi_read_list))
2390                         RETURN(list_entry(cli->cl_loi_read_list.next,
2391                                           struct lov_oinfo, loi_read_item));
2392         }
2393         RETURN(NULL);
2394 }
2395
2396 /* called with the loi list lock held */
2397 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2398 {
2399         struct lov_oinfo *loi;
2400         int rc = 0, race_counter = 0;
2401         ENTRY;
2402
2403         while ((loi = osc_next_loi(cli)) != NULL) {
2404                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2405
2406                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2407                         break;
2408
2409                 /* attempt some read/write balancing by alternating between
2410                  * reads and writes in an object.  The makes_rpc checks here
2411                  * would be redundant if we were getting read/write work items
2412                  * instead of objects.  we don't want send_oap_rpc to drain a
2413                  * partial read pending queue when we're given this object to
2414                  * do io on writes while there are cache waiters */
2415                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2416                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2417                                               &loi->loi_write_lop);
2418                         if (rc < 0)
2419                                 break;
2420                         if (rc > 0)
2421                                 race_counter = 0;
2422                         else
2423                                 race_counter++;
2424                 }
2425                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2426                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2427                                               &loi->loi_read_lop);
2428                         if (rc < 0)
2429                                 break;
2430                         if (rc > 0)
2431                                 race_counter = 0;
2432                         else
2433                                 race_counter++;
2434                 }
2435
2436                 /* attempt some inter-object balancing by issueing rpcs
2437                  * for each object in turn */
2438                 if (!list_empty(&loi->loi_cli_item))
2439                         list_del_init(&loi->loi_cli_item);
2440                 if (!list_empty(&loi->loi_write_item))
2441                         list_del_init(&loi->loi_write_item);
2442                 if (!list_empty(&loi->loi_read_item))
2443                         list_del_init(&loi->loi_read_item);
2444
2445                 loi_list_maint(cli, loi);
2446
2447                 /* send_oap_rpc fails with 0 when make_ready tells it to
2448                  * back off.  llite's make_ready does this when it tries
2449                  * to lock a page queued for write that is already locked.
2450                  * we want to try sending rpcs from many objects, but we
2451                  * don't want to spin failing with 0.  */
2452                 if (race_counter == 10)
2453                         break;
2454         }
2455         EXIT;
2456 }
2457
2458 /* we're trying to queue a page in the osc so we're subject to the
2459  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2460  * If the osc's queued pages are already at that limit, then we want to sleep
2461  * until there is space in the osc's queue for us.  We also may be waiting for
2462  * write credits from the OST if there are RPCs in flight that may return some
2463  * before we fall back to sync writes.
2464  *
2465  * We need this know our allocation was granted in the presence of signals */
2466 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2467 {
2468         int rc;
2469         ENTRY;
2470         client_obd_list_lock(&cli->cl_loi_list_lock);
2471         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2472         client_obd_list_unlock(&cli->cl_loi_list_lock);
2473         RETURN(rc);
2474 };
2475
2476 /**
2477  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2478  * is available.
2479  */
2480 int osc_enter_cache_try(const struct lu_env *env,
2481                         struct client_obd *cli, struct lov_oinfo *loi,
2482                         struct osc_async_page *oap, int transient)
2483 {
2484         int has_grant;
2485
2486         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2487         if (has_grant) {
2488                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2489                 if (transient) {
2490                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2491                         atomic_inc(&obd_dirty_transit_pages);
2492                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2493                 }
2494         }
2495         return has_grant;
2496 }
2497
2498 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2499  * grant or cache space. */
2500 static int osc_enter_cache(const struct lu_env *env,
2501                            struct client_obd *cli, struct lov_oinfo *loi,
2502                            struct osc_async_page *oap)
2503 {
2504         struct osc_cache_waiter ocw;
2505         struct l_wait_info lwi = { 0 };
2506
2507         ENTRY;
2508
2509         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2510                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2511                cli->cl_dirty_max, obd_max_dirty_pages,
2512                cli->cl_lost_grant, cli->cl_avail_grant);
2513
2514         /* force the caller to try sync io.  this can jump the list
2515          * of queued writes and create a discontiguous rpc stream */
2516         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2517             loi->loi_ar.ar_force_sync)
2518                 RETURN(-EDQUOT);
2519
2520         /* Hopefully normal case - cache space and write credits available */
2521         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2522             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2523             osc_enter_cache_try(env, cli, loi, oap, 0))
2524                 RETURN(0);
2525
2526         /* Make sure that there are write rpcs in flight to wait for.  This
2527          * is a little silly as this object may not have any pending but
2528          * other objects sure might. */
2529         if (cli->cl_w_in_flight) {
2530                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2531                 cfs_waitq_init(&ocw.ocw_waitq);
2532                 ocw.ocw_oap = oap;
2533                 ocw.ocw_rc = 0;
2534
2535                 loi_list_maint(cli, loi);
2536                 osc_check_rpcs(env, cli);
2537                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2538
2539                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2540                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2541
2542                 client_obd_list_lock(&cli->cl_loi_list_lock);
2543                 if (!list_empty(&ocw.ocw_entry)) {
2544                         list_del(&ocw.ocw_entry);
2545                         RETURN(-EINTR);
2546                 }
2547                 RETURN(ocw.ocw_rc);
2548         }
2549
2550         RETURN(-EDQUOT);
2551 }
2552
2553
2554 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2555                         struct lov_oinfo *loi, cfs_page_t *page,
2556                         obd_off offset, const struct obd_async_page_ops *ops,
2557                         void *data, void **res, int nocache,
2558                         struct lustre_handle *lockh)
2559 {
2560         struct osc_async_page *oap;
2561
2562         ENTRY;
2563
2564         if (!page)
2565                 return size_round(sizeof(*oap));
2566
2567         oap = *res;
2568         oap->oap_magic = OAP_MAGIC;
2569         oap->oap_cli = &exp->exp_obd->u.cli;
2570         oap->oap_loi = loi;
2571
2572         oap->oap_caller_ops = ops;
2573         oap->oap_caller_data = data;
2574
2575         oap->oap_page = page;
2576         oap->oap_obj_off = offset;
2577         if (!client_is_remote(exp) &&
2578             cfs_capable(CFS_CAP_SYS_RESOURCE))
2579                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2580
2581         LASSERT(!(offset & ~CFS_PAGE_MASK));
2582
2583         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2584         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2585         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2586         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2587
2588         spin_lock_init(&oap->oap_lock);
2589         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2590         RETURN(0);
2591 }
2592
2593 struct osc_async_page *oap_from_cookie(void *cookie)
2594 {
2595         struct osc_async_page *oap = cookie;
2596         if (oap->oap_magic != OAP_MAGIC)
2597                 return ERR_PTR(-EINVAL);
2598         return oap;
2599 };
2600
2601 int osc_queue_async_io(const struct lu_env *env,
2602                        struct obd_export *exp, struct lov_stripe_md *lsm,
2603                        struct lov_oinfo *loi, void *cookie,
2604                        int cmd, obd_off off, int count,
2605                        obd_flag brw_flags, enum async_flags async_flags)
2606 {
2607         struct client_obd *cli = &exp->exp_obd->u.cli;
2608         struct osc_async_page *oap;
2609         int rc = 0;
2610         ENTRY;
2611
2612         oap = oap_from_cookie(cookie);
2613         if (IS_ERR(oap))
2614                 RETURN(PTR_ERR(oap));
2615
2616         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2617                 RETURN(-EIO);
2618
2619         if (!list_empty(&oap->oap_pending_item) ||
2620             !list_empty(&oap->oap_urgent_item) ||
2621             !list_empty(&oap->oap_rpc_item))
2622                 RETURN(-EBUSY);
2623
2624         /* check if the file's owner/group is over quota */
2625         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2626                 struct cl_object *obj;
2627                 struct cl_attr    attr; /* XXX put attr into thread info */
2628
2629                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2630
2631                 cl_object_attr_lock(obj);
2632                 rc = cl_object_attr_get(env, obj, &attr);
2633                 cl_object_attr_unlock(obj);
2634
2635                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2636                                             attr.cat_gid) == NO_QUOTA)
2637                         rc = -EDQUOT;
2638                 if (rc)
2639                         RETURN(rc);
2640         }
2641
2642         if (loi == NULL)
2643                 loi = lsm->lsm_oinfo[0];
2644
2645         client_obd_list_lock(&cli->cl_loi_list_lock);
2646
2647         LASSERT(off + count <= CFS_PAGE_SIZE);
2648         oap->oap_cmd = cmd;
2649         oap->oap_page_off = off;
2650         oap->oap_count = count;
2651         oap->oap_brw_flags = brw_flags;
2652         oap->oap_async_flags = async_flags;
2653
2654         if (cmd & OBD_BRW_WRITE) {
2655                 rc = osc_enter_cache(env, cli, loi, oap);
2656                 if (rc) {
2657                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2658                         RETURN(rc);
2659                 }
2660         }
2661
2662         osc_oap_to_pending(oap);
2663         loi_list_maint(cli, loi);
2664
2665         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2666                   cmd);
2667
2668         osc_check_rpcs(env, cli);
2669         client_obd_list_unlock(&cli->cl_loi_list_lock);
2670
2671         RETURN(0);
2672 }
2673
2674 /* aka (~was & now & flag), but this is more clear :) */
2675 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2676
2677 int osc_set_async_flags_base(struct client_obd *cli,
2678                              struct lov_oinfo *loi, struct osc_async_page *oap,
2679                              obd_flag async_flags)
2680 {
2681         struct loi_oap_pages *lop;
2682         ENTRY;
2683
2684         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2685                 RETURN(-EIO);
2686
2687         if (oap->oap_cmd & OBD_BRW_WRITE) {
2688                 lop = &loi->loi_write_lop;
2689         } else {
2690                 lop = &loi->loi_read_lop;
2691         }
2692
2693         if (list_empty(&oap->oap_pending_item))
2694                 RETURN(-EINVAL);
2695
2696         if ((oap->oap_async_flags & async_flags) == async_flags)
2697                 RETURN(0);
2698
2699         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2700                 oap->oap_async_flags |= ASYNC_READY;
2701
2702         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2703                 if (list_empty(&oap->oap_rpc_item)) {
2704                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2705                         loi_list_maint(cli, loi);
2706                 }
2707         }
2708
2709         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2710                         oap->oap_async_flags);
2711         RETURN(0);
2712 }
2713
2714 int osc_teardown_async_page(struct obd_export *exp,
2715                             struct lov_stripe_md *lsm,
2716                             struct lov_oinfo *loi, void *cookie)
2717 {
2718         struct client_obd *cli = &exp->exp_obd->u.cli;
2719         struct loi_oap_pages *lop;
2720         struct osc_async_page *oap;
2721         int rc = 0;
2722         ENTRY;
2723
2724         oap = oap_from_cookie(cookie);
2725         if (IS_ERR(oap))
2726                 RETURN(PTR_ERR(oap));
2727
2728         if (loi == NULL)
2729                 loi = lsm->lsm_oinfo[0];
2730
2731         if (oap->oap_cmd & OBD_BRW_WRITE) {
2732                 lop = &loi->loi_write_lop;
2733         } else {
2734                 lop = &loi->loi_read_lop;
2735         }
2736
2737         client_obd_list_lock(&cli->cl_loi_list_lock);
2738
2739         if (!list_empty(&oap->oap_rpc_item))
2740                 GOTO(out, rc = -EBUSY);
2741
2742         osc_exit_cache(cli, oap, 0);
2743         osc_wake_cache_waiters(cli);
2744
2745         if (!list_empty(&oap->oap_urgent_item)) {
2746                 list_del_init(&oap->oap_urgent_item);
2747                 oap->oap_async_flags &= ~ASYNC_URGENT;
2748         }
2749         if (!list_empty(&oap->oap_pending_item)) {
2750                 list_del_init(&oap->oap_pending_item);
2751                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2752         }
2753         loi_list_maint(cli, loi);
2754         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2755 out:
2756         client_obd_list_unlock(&cli->cl_loi_list_lock);
2757         RETURN(rc);
2758 }
2759
2760 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2761                                          struct ldlm_enqueue_info *einfo,
2762                                          int flags)
2763 {
2764         void *data = einfo->ei_cbdata;
2765
2766         LASSERT(lock != NULL);
2767         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2768         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2769         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2770         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2771
2772         lock_res_and_lock(lock);
2773         spin_lock(&osc_ast_guard);
2774         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2775         lock->l_ast_data = data;
2776         spin_unlock(&osc_ast_guard);
2777         unlock_res_and_lock(lock);
2778 }
2779
2780 static void osc_set_data_with_check(struct lustre_handle *lockh,
2781                                     struct ldlm_enqueue_info *einfo,
2782                                     int flags)
2783 {
2784         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2785
2786         if (lock != NULL) {
2787                 osc_set_lock_data_with_check(lock, einfo, flags);
2788                 LDLM_LOCK_PUT(lock);
2789         } else
2790                 CERROR("lockh %p, data %p - client evicted?\n",
2791                        lockh, einfo->ei_cbdata);
2792 }
2793
2794 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2795                              ldlm_iterator_t replace, void *data)
2796 {
2797         struct ldlm_res_id res_id;
2798         struct obd_device *obd = class_exp2obd(exp);
2799
2800         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2801         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2802         return 0;
2803 }
2804
2805 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2806                             obd_enqueue_update_f upcall, void *cookie,
2807                             int *flags, int rc)
2808 {
2809         int intent = *flags & LDLM_FL_HAS_INTENT;
2810         ENTRY;
2811
2812         if (intent) {
2813                 /* The request was created before ldlm_cli_enqueue call. */
2814                 if (rc == ELDLM_LOCK_ABORTED) {
2815                         struct ldlm_reply *rep;
2816                         rep = req_capsule_server_get(&req->rq_pill,
2817                                                      &RMF_DLM_REP);
2818
2819                         LASSERT(rep != NULL);
2820                         if (rep->lock_policy_res1)
2821                                 rc = rep->lock_policy_res1;
2822                 }
2823         }
2824
2825         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2826                 *flags |= LDLM_FL_LVB_READY;
2827                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2828                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2829         }
2830
2831         /* Call the update callback. */
2832         rc = (*upcall)(cookie, rc);
2833         RETURN(rc);
2834 }
2835
2836 static int osc_enqueue_interpret(const struct lu_env *env,
2837                                  struct ptlrpc_request *req,
2838                                  struct osc_enqueue_args *aa, int rc)
2839 {
2840         struct ldlm_lock *lock;
2841         struct lustre_handle handle;
2842         __u32 mode;
2843
2844         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2845          * might be freed anytime after lock upcall has been called. */
2846         lustre_handle_copy(&handle, aa->oa_lockh);
2847         mode = aa->oa_ei->ei_mode;
2848
2849         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2850          * be valid. */
2851         lock = ldlm_handle2lock(&handle);
2852
2853         /* Take an additional reference so that a blocking AST that
2854          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2855          * to arrive after an upcall has been executed by
2856          * osc_enqueue_fini(). */
2857         ldlm_lock_addref(&handle, mode);
2858
2859         /* Complete obtaining the lock procedure. */
2860         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2861                                    mode, aa->oa_flags, aa->oa_lvb,
2862                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2863                                    &handle, rc);
2864         /* Complete osc stuff. */
2865         rc = osc_enqueue_fini(req, aa->oa_lvb,
2866                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2867         /* Release the lock for async request. */
2868         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2869                 /*
2870                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2871                  * not already released by
2872                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2873                  */
2874                 ldlm_lock_decref(&handle, mode);
2875
2876         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2877                  aa->oa_lockh, req, aa);
2878         ldlm_lock_decref(&handle, mode);
2879         LDLM_LOCK_PUT(lock);
2880         return rc;
2881 }
2882
2883 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2884                         struct lov_oinfo *loi, int flags,
2885                         struct ost_lvb *lvb, __u32 mode, int rc)
2886 {
2887         if (rc == ELDLM_OK) {
2888                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2889                 __u64 tmp;
2890
2891                 LASSERT(lock != NULL);
2892                 loi->loi_lvb = *lvb;
2893                 tmp = loi->loi_lvb.lvb_size;
2894                 /* Extend KMS up to the end of this lock and no further
2895                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2896                 if (tmp > lock->l_policy_data.l_extent.end)
2897                         tmp = lock->l_policy_data.l_extent.end + 1;
2898                 if (tmp >= loi->loi_kms) {
2899                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2900                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2901                         loi_kms_set(loi, tmp);
2902                 } else {
2903                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2904                                    LPU64"; leaving kms="LPU64", end="LPU64,
2905                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2906                                    lock->l_policy_data.l_extent.end);
2907                 }
2908                 ldlm_lock_allow_match(lock);
2909                 LDLM_LOCK_PUT(lock);
2910         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2911                 loi->loi_lvb = *lvb;
2912                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2913                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2914                 rc = ELDLM_OK;
2915         }
2916 }
2917 EXPORT_SYMBOL(osc_update_enqueue);
2918
2919 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2920
2921 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2922  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2923  * other synchronous requests, however keeping some locks and trying to obtain
2924  * others may take a considerable amount of time in a case of ost failure; and
2925  * when other sync requests do not get released lock from a client, the client
2926  * is excluded from the cluster -- such scenarious make the life difficult, so
2927  * release locks just after they are obtained. */
2928 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2929                      int *flags, ldlm_policy_data_t *policy,
2930                      struct ost_lvb *lvb, int kms_valid,
2931                      obd_enqueue_update_f upcall, void *cookie,
2932                      struct ldlm_enqueue_info *einfo,
2933                      struct lustre_handle *lockh,
2934                      struct ptlrpc_request_set *rqset, int async)
2935 {
2936         struct obd_device *obd = exp->exp_obd;
2937         struct ptlrpc_request *req = NULL;
2938         int intent = *flags & LDLM_FL_HAS_INTENT;
2939         ldlm_mode_t mode;
2940         int rc;
2941         ENTRY;
2942
2943         /* Filesystem lock extents are extended to page boundaries so that
2944          * dealing with the page cache is a little smoother.  */
2945         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2946         policy->l_extent.end |= ~CFS_PAGE_MASK;
2947
2948         /*
2949          * kms is not valid when either object is completely fresh (so that no
2950          * locks are cached), or object was evicted. In the latter case cached
2951          * lock cannot be used, because it would prime inode state with
2952          * potentially stale LVB.
2953          */
2954         if (!kms_valid)
2955                 goto no_match;
2956
2957         /* Next, search for already existing extent locks that will cover us */
2958         /* If we're trying to read, we also search for an existing PW lock.  The
2959          * VFS and page cache already protect us locally, so lots of readers/
2960          * writers can share a single PW lock.
2961          *
2962          * There are problems with conversion deadlocks, so instead of
2963          * converting a read lock to a write lock, we'll just enqueue a new
2964          * one.
2965          *
2966          * At some point we should cancel the read lock instead of making them
2967          * send us a blocking callback, but there are problems with canceling
2968          * locks out from other users right now, too. */
2969         mode = einfo->ei_mode;
2970         if (einfo->ei_mode == LCK_PR)
2971                 mode |= LCK_PW;
2972         mode = ldlm_lock_match(obd->obd_namespace,
2973                                *flags | LDLM_FL_LVB_READY, res_id,
2974                                einfo->ei_type, policy, mode, lockh, 0);
2975         if (mode) {
2976                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2977
2978                 if (matched->l_ast_data == NULL ||
2979                     matched->l_ast_data == einfo->ei_cbdata) {
2980                         /* addref the lock only if not async requests and PW
2981                          * lock is matched whereas we asked for PR. */
2982                         if (!rqset && einfo->ei_mode != mode)
2983                                 ldlm_lock_addref(lockh, LCK_PR);
2984                         osc_set_lock_data_with_check(matched, einfo, *flags);
2985                         if (intent) {
2986                                 /* I would like to be able to ASSERT here that
2987                                  * rss <= kms, but I can't, for reasons which
2988                                  * are explained in lov_enqueue() */
2989                         }
2990
2991                         /* We already have a lock, and it's referenced */
2992                         (*upcall)(cookie, ELDLM_OK);
2993
2994                         /* For async requests, decref the lock. */
2995                         if (einfo->ei_mode != mode)
2996                                 ldlm_lock_decref(lockh, LCK_PW);
2997                         else if (rqset)
2998                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2999                         LDLM_LOCK_PUT(matched);
3000                         RETURN(ELDLM_OK);
3001                 } else
3002                         ldlm_lock_decref(lockh, mode);
3003                 LDLM_LOCK_PUT(matched);
3004         }
3005
3006  no_match:
3007         if (intent) {
3008                 CFS_LIST_HEAD(cancels);
3009                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3010                                            &RQF_LDLM_ENQUEUE_LVB);
3011                 if (req == NULL)
3012                         RETURN(-ENOMEM);
3013
3014                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3015                 if (rc)
3016                         RETURN(rc);
3017
3018                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3019                                      sizeof *lvb);
3020                 ptlrpc_request_set_replen(req);
3021         }
3022
3023         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3024         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3025
3026         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3027                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3028         if (rqset) {
3029                 if (!rc) {
3030                         struct osc_enqueue_args *aa;
3031                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3032                         aa = ptlrpc_req_async_args(req);
3033                         aa->oa_ei = einfo;
3034                         aa->oa_exp = exp;
3035                         aa->oa_flags  = flags;
3036                         aa->oa_upcall = upcall;
3037                         aa->oa_cookie = cookie;
3038                         aa->oa_lvb    = lvb;
3039                         aa->oa_lockh  = lockh;
3040
3041                         req->rq_interpret_reply =
3042                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3043                         if (rqset == PTLRPCD_SET)
3044                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3045                         else
3046                                 ptlrpc_set_add_req(rqset, req);
3047                 } else if (intent) {
3048                         ptlrpc_req_finished(req);
3049                 }
3050                 RETURN(rc);
3051         }
3052
3053         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3054         if (intent)
3055                 ptlrpc_req_finished(req);
3056
3057         RETURN(rc);
3058 }
3059
3060 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3061                        struct ldlm_enqueue_info *einfo,
3062                        struct ptlrpc_request_set *rqset)
3063 {
3064         struct ldlm_res_id res_id;
3065         int rc;
3066         ENTRY;
3067
3068         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3069                            oinfo->oi_md->lsm_object_gr, &res_id);
3070
3071         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3072                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3073                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3074                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3075                               rqset, rqset != NULL);
3076         RETURN(rc);
3077 }
3078
3079 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3080                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3081                    int *flags, void *data, struct lustre_handle *lockh,
3082                    int unref)
3083 {
3084         struct obd_device *obd = exp->exp_obd;
3085         int lflags = *flags;
3086         ldlm_mode_t rc;
3087         ENTRY;
3088
3089         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3090                 RETURN(-EIO);
3091
3092         /* Filesystem lock extents are extended to page boundaries so that
3093          * dealing with the page cache is a little smoother */
3094         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3095         policy->l_extent.end |= ~CFS_PAGE_MASK;
3096
3097         /* Next, search for already existing extent locks that will cover us */
3098         /* If we're trying to read, we also search for an existing PW lock.  The
3099          * VFS and page cache already protect us locally, so lots of readers/
3100          * writers can share a single PW lock. */
3101         rc = mode;
3102         if (mode == LCK_PR)
3103                 rc |= LCK_PW;
3104         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3105                              res_id, type, policy, rc, lockh, unref);
3106         if (rc) {
3107                 if (data != NULL)
3108                         osc_set_data_with_check(lockh, data, lflags);
3109                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3110                         ldlm_lock_addref(lockh, LCK_PR);
3111                         ldlm_lock_decref(lockh, LCK_PW);
3112                 }
3113                 RETURN(rc);
3114         }
3115         RETURN(rc);
3116 }
3117
3118 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3119 {
3120         ENTRY;
3121
3122         if (unlikely(mode == LCK_GROUP))
3123                 ldlm_lock_decref_and_cancel(lockh, mode);
3124         else
3125                 ldlm_lock_decref(lockh, mode);
3126
3127         RETURN(0);
3128 }
3129
3130 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3131                       __u32 mode, struct lustre_handle *lockh)
3132 {
3133         ENTRY;
3134         RETURN(osc_cancel_base(lockh, mode));
3135 }
3136
3137 static int osc_cancel_unused(struct obd_export *exp,
3138                              struct lov_stripe_md *lsm, int flags,
3139                              void *opaque)
3140 {
3141         struct obd_device *obd = class_exp2obd(exp);
3142         struct ldlm_res_id res_id, *resp = NULL;
3143
3144         if (lsm != NULL) {
3145                 resp = osc_build_res_name(lsm->lsm_object_id,
3146                                           lsm->lsm_object_gr, &res_id);
3147         }
3148
3149         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3150 }
3151
3152 static int osc_statfs_interpret(const struct lu_env *env,
3153                                 struct ptlrpc_request *req,
3154                                 struct osc_async_args *aa, int rc)
3155 {
3156         struct obd_statfs *msfs;
3157         ENTRY;
3158
3159         if (rc != 0)
3160                 GOTO(out, rc);
3161
3162         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3163         if (msfs == NULL) {
3164                 GOTO(out, rc = -EPROTO);
3165         }
3166
3167         *aa->aa_oi->oi_osfs = *msfs;
3168 out:
3169         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3170         RETURN(rc);
3171 }
3172
3173 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3174                             __u64 max_age, struct ptlrpc_request_set *rqset)
3175 {
3176         struct ptlrpc_request *req;
3177         struct osc_async_args *aa;
3178         int                    rc;
3179         ENTRY;
3180
3181         /* We could possibly pass max_age in the request (as an absolute
3182          * timestamp or a "seconds.usec ago") so the target can avoid doing
3183          * extra calls into the filesystem if that isn't necessary (e.g.
3184          * during mount that would help a bit).  Having relative timestamps
3185          * is not so great if request processing is slow, while absolute
3186          * timestamps are not ideal because they need time synchronization. */
3187         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3188         if (req == NULL)
3189                 RETURN(-ENOMEM);
3190
3191         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3192         if (rc) {
3193                 ptlrpc_request_free(req);
3194                 RETURN(rc);
3195         }
3196         ptlrpc_request_set_replen(req);
3197         req->rq_request_portal = OST_CREATE_PORTAL;
3198         ptlrpc_at_set_req_timeout(req);
3199
3200         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3201                 /* procfs requests not want stat in wait for avoid deadlock */
3202                 req->rq_no_resend = 1;
3203                 req->rq_no_delay = 1;
3204         }
3205
3206         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3207         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3208         aa = ptlrpc_req_async_args(req);
3209         aa->aa_oi = oinfo;
3210
3211         ptlrpc_set_add_req(rqset, req);
3212         RETURN(0);
3213 }
3214
3215 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3216                       __u64 max_age, __u32 flags)
3217 {
3218         struct obd_statfs     *msfs;
3219         struct ptlrpc_request *req;
3220         struct obd_import     *imp = NULL;
3221         int rc;
3222         ENTRY;
3223
3224         /*Since the request might also come from lprocfs, so we need
3225          *sync this with client_disconnect_export Bug15684*/
3226         down_read(&obd->u.cli.cl_sem);
3227         if (obd->u.cli.cl_import)
3228                 imp = class_import_get(obd->u.cli.cl_import);
3229         up_read(&obd->u.cli.cl_sem);
3230         if (!imp)
3231                 RETURN(-ENODEV);
3232
3233         /* We could possibly pass max_age in the request (as an absolute
3234          * timestamp or a "seconds.usec ago") so the target can avoid doing
3235          * extra calls into the filesystem if that isn't necessary (e.g.
3236          * during mount that would help a bit).  Having relative timestamps
3237          * is not so great if request processing is slow, while absolute
3238          * timestamps are not ideal because they need time synchronization. */
3239         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3240
3241         class_import_put(imp);
3242
3243         if (req == NULL)
3244                 RETURN(-ENOMEM);
3245
3246         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3247         if (rc) {
3248                 ptlrpc_request_free(req);
3249                 RETURN(rc);
3250         }
3251         ptlrpc_request_set_replen(req);
3252         req->rq_request_portal = OST_CREATE_PORTAL;
3253         ptlrpc_at_set_req_timeout(req);
3254
3255         if (flags & OBD_STATFS_NODELAY) {
3256                 /* procfs requests not want stat in wait for avoid deadlock */
3257                 req->rq_no_resend = 1;
3258                 req->rq_no_delay = 1;
3259         }
3260
3261         rc = ptlrpc_queue_wait(req);
3262         if (rc)
3263                 GOTO(out, rc);
3264
3265         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3266         if (msfs == NULL) {
3267                 GOTO(out, rc = -EPROTO);
3268         }
3269
3270         *osfs = *msfs;
3271
3272         EXIT;
3273  out:
3274         ptlrpc_req_finished(req);
3275         return rc;
3276 }
3277
3278 /* Retrieve object striping information.
3279  *
3280  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3281  * the maximum number of OST indices which will fit in the user buffer.
3282  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3283  */
3284 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3285 {
3286         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3287         struct lov_user_md_v3 lum, *lumk;
3288         struct lov_user_ost_data_v1 *lmm_objects;
3289         int rc = 0, lum_size;
3290         ENTRY;
3291
3292         if (!lsm)
3293                 RETURN(-ENODATA);
3294
3295         /* we only need the header part from user space to get lmm_magic and
3296          * lmm_stripe_count, (the header part is common to v1 and v3) */
3297         lum_size = sizeof(struct lov_user_md_v1);
3298         if (copy_from_user(&lum, lump, lum_size))
3299                 RETURN(-EFAULT);
3300
3301         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3302             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3303                 RETURN(-EINVAL);
3304
3305         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3306         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3307         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3308         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3309
3310         /* we can use lov_mds_md_size() to compute lum_size
3311          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3312         if (lum.lmm_stripe_count > 0) {
3313                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3314                 OBD_ALLOC(lumk, lum_size);
3315                 if (!lumk)
3316                         RETURN(-ENOMEM);
3317
3318                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3319                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3320                 else
3321                         lmm_objects = &(lumk->lmm_objects[0]);
3322                 lmm_objects->l_object_id = lsm->lsm_object_id;
3323         } else {
3324                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3325                 lumk = &lum;
3326         }
3327
3328         lumk->lmm_object_id = lsm->lsm_object_id;
3329         lumk->lmm_object_gr = lsm->lsm_object_gr;
3330         lumk->lmm_stripe_count = 1;
3331
3332         if (copy_to_user(lump, lumk, lum_size))
3333                 rc = -EFAULT;
3334
3335         if (lumk != &lum)
3336                 OBD_FREE(lumk, lum_size);
3337
3338         RETURN(rc);
3339 }
3340
3341
3342 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3343                          void *karg, void *uarg)
3344 {
3345         struct obd_device *obd = exp->exp_obd;
3346         struct obd_ioctl_data *data = karg;
3347         int err = 0;
3348         ENTRY;
3349
3350         if (!try_module_get(THIS_MODULE)) {
3351                 CERROR("Can't get module. Is it alive?");
3352                 return -EINVAL;
3353         }
3354         switch (cmd) {
3355         case OBD_IOC_LOV_GET_CONFIG: {
3356                 char *buf;
3357                 struct lov_desc *desc;
3358                 struct obd_uuid uuid;
3359
3360                 buf = NULL;
3361                 len = 0;
3362                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3363                         GOTO(out, err = -EINVAL);
3364
3365                 data = (struct obd_ioctl_data *)buf;
3366
3367                 if (sizeof(*desc) > data->ioc_inllen1) {
3368                         obd_ioctl_freedata(buf, len);
3369                         GOTO(out, err = -EINVAL);
3370                 }
3371
3372                 if (data->ioc_inllen2 < sizeof(uuid)) {
3373                         obd_ioctl_freedata(buf, len);
3374                         GOTO(out, err = -EINVAL);
3375                 }
3376
3377                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3378                 desc->ld_tgt_count = 1;
3379                 desc->ld_active_tgt_count = 1;
3380                 desc->ld_default_stripe_count = 1;
3381                 desc->ld_default_stripe_size = 0;
3382                 desc->ld_default_stripe_offset = 0;
3383                 desc->ld_pattern = 0;
3384                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3385
3386                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3387
3388                 err = copy_to_user((void *)uarg, buf, len);
3389                 if (err)
3390                         err = -EFAULT;
3391                 obd_ioctl_freedata(buf, len);
3392                 GOTO(out, err);
3393         }
3394         case LL_IOC_LOV_SETSTRIPE:
3395                 err = obd_alloc_memmd(exp, karg);
3396                 if (err > 0)
3397                         err = 0;
3398                 GOTO(out, err);
3399         case LL_IOC_LOV_GETSTRIPE:
3400                 err = osc_getstripe(karg, uarg);
3401                 GOTO(out, err);
3402         case OBD_IOC_CLIENT_RECOVER:
3403                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3404                                             data->ioc_inlbuf1);
3405                 if (err > 0)
3406                         err = 0;
3407                 GOTO(out, err);
3408         case IOC_OSC_SET_ACTIVE:
3409                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3410                                                data->ioc_offset);
3411                 GOTO(out, err);
3412         case OBD_IOC_POLL_QUOTACHECK:
3413                 err = lquota_poll_check(quota_interface, exp,
3414                                         (struct if_quotacheck *)karg);
3415                 GOTO(out, err);
3416         case OBD_IOC_PING_TARGET:
3417                 err = ptlrpc_obd_ping(obd);
3418                 GOTO(out, err);
3419         default:
3420                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3421                        cmd, cfs_curproc_comm());
3422                 GOTO(out, err = -ENOTTY);
3423         }
3424 out:
3425         module_put(THIS_MODULE);
3426         return err;
3427 }
3428
3429 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3430                         void *key, __u32 *vallen, void *val,
3431                         struct lov_stripe_md *lsm)
3432 {
3433         ENTRY;
3434         if (!vallen || !val)
3435                 RETURN(-EFAULT);
3436
3437         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3438                 __u32 *stripe = val;
3439                 *vallen = sizeof(*stripe);
3440                 *stripe = 0;
3441                 RETURN(0);
3442         } else if (KEY_IS(KEY_LAST_ID)) {
3443                 struct ptlrpc_request *req;
3444                 obd_id                *reply;
3445                 char                  *tmp;
3446                 int                    rc;
3447
3448                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3449                                            &RQF_OST_GET_INFO_LAST_ID);
3450                 if (req == NULL)
3451                         RETURN(-ENOMEM);
3452
3453                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3454                                      RCL_CLIENT, keylen);
3455                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3456                 if (rc) {
3457                         ptlrpc_request_free(req);
3458                         RETURN(rc);
3459                 }
3460
3461                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3462                 memcpy(tmp, key, keylen);
3463
3464                 ptlrpc_request_set_replen(req);
3465                 rc = ptlrpc_queue_wait(req);
3466                 if (rc)
3467                         GOTO(out, rc);
3468
3469                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3470                 if (reply == NULL)
3471                         GOTO(out, rc = -EPROTO);
3472
3473                 *((obd_id *)val) = *reply;
3474         out:
3475                 ptlrpc_req_finished(req);
3476                 RETURN(rc);
3477         } else if (KEY_IS(KEY_FIEMAP)) {
3478                 struct ptlrpc_request *req;
3479                 struct ll_user_fiemap *reply;
3480                 char *tmp;
3481                 int rc;
3482
3483                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3484                                            &RQF_OST_GET_INFO_FIEMAP);
3485                 if (req == NULL)
3486                         RETURN(-ENOMEM);
3487
3488                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3489                                      RCL_CLIENT, keylen);
3490                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3491                                      RCL_CLIENT, *vallen);
3492                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3493                                      RCL_SERVER, *vallen);
3494
3495                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3496                 if (rc) {
3497                         ptlrpc_request_free(req);
3498                         RETURN(rc);
3499                 }
3500
3501                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3502                 memcpy(tmp, key, keylen);
3503                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3504                 memcpy(tmp, val, *vallen);
3505
3506                 ptlrpc_request_set_replen(req);
3507                 rc = ptlrpc_queue_wait(req);
3508                 if (rc)
3509                         GOTO(out1, rc);
3510
3511                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3512                 if (reply == NULL)
3513                         GOTO(out1, rc = -EPROTO);
3514
3515                 memcpy(val, reply, *vallen);
3516         out1:
3517                 ptlrpc_req_finished(req);
3518
3519                 RETURN(rc);
3520         }
3521
3522         RETURN(-EINVAL);
3523 }
3524
3525 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3526                                           struct ptlrpc_request *req,
3527                                           void *aa, int rc)
3528 {
3529         struct llog_ctxt *ctxt;
3530         struct obd_import *imp = req->rq_import;
3531         ENTRY;
3532
3533         if (rc != 0)
3534                 RETURN(rc);
3535
3536         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3537         if (ctxt) {
3538                 if (rc == 0)
3539                         rc = llog_initiator_connect(ctxt);
3540                 else
3541                         CERROR("cannot establish connection for "
3542                                "ctxt %p: %d\n", ctxt, rc);
3543         }
3544
3545         llog_ctxt_put(ctxt);
3546         spin_lock(&imp->imp_lock);
3547         imp->imp_server_timeout = 1;
3548         imp->imp_pingable = 1;
3549         spin_unlock(&imp->imp_lock);
3550         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3551
3552         RETURN(rc);
3553 }
3554
3555 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3556                               void *key, obd_count vallen, void *val,
3557                               struct ptlrpc_request_set *set)
3558 {
3559         struct ptlrpc_request *req;
3560         struct obd_device     *obd = exp->exp_obd;
3561         struct obd_import     *imp = class_exp2cliimp(exp);
3562         char                  *tmp;
3563         int                    rc;
3564         ENTRY;
3565
3566         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3567
3568         if (KEY_IS(KEY_NEXT_ID)) {
3569                 if (vallen != sizeof(obd_id))
3570                         RETURN(-ERANGE);
3571                 if (val == NULL)
3572                         RETURN(-EINVAL);
3573                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3574                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3575                        exp->exp_obd->obd_name,
3576                        obd->u.cli.cl_oscc.oscc_next_id);
3577
3578                 RETURN(0);
3579         }
3580
3581         if (KEY_IS(KEY_UNLINKED)) {
3582                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3583                 spin_lock(&oscc->oscc_lock);
3584                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3585                 spin_unlock(&oscc->oscc_lock);
3586                 RETURN(0);
3587         }
3588
3589         if (KEY_IS(KEY_INIT_RECOV)) {
3590                 if (vallen != sizeof(int))
3591                         RETURN(-EINVAL);
3592                 spin_lock(&imp->imp_lock);
3593                 imp->imp_initial_recov = *(int *)val;
3594                 spin_unlock(&imp->imp_lock);
3595                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3596                        exp->exp_obd->obd_name,
3597                        imp->imp_initial_recov);
3598                 RETURN(0);
3599         }
3600
3601         if (KEY_IS(KEY_CHECKSUM)) {
3602                 if (vallen != sizeof(int))
3603                         RETURN(-EINVAL);
3604                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3605                 RETURN(0);
3606         }
3607
3608         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3609                 sptlrpc_conf_client_adapt(obd);
3610                 RETURN(0);
3611         }
3612
3613         if (KEY_IS(KEY_FLUSH_CTX)) {
3614                 sptlrpc_import_flush_my_ctx(imp);
3615                 RETURN(0);
3616         }
3617
3618         if (!set)
3619                 RETURN(-EINVAL);
3620
3621         /* We pass all other commands directly to OST. Since nobody calls osc
3622            methods directly and everybody is supposed to go through LOV, we
3623            assume lov checked invalid values for us.
3624            The only recognised values so far are evict_by_nid and mds_conn.
3625            Even if something bad goes through, we'd get a -EINVAL from OST
3626            anyway. */
3627
3628
3629         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3630         if (req == NULL)
3631                 RETURN(-ENOMEM);
3632
3633         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3634                              RCL_CLIENT, keylen);
3635         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3636                              RCL_CLIENT, vallen);
3637         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3638         if (rc) {
3639                 ptlrpc_request_free(req);
3640                 RETURN(rc);
3641         }
3642
3643         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3644         memcpy(tmp, key, keylen);
3645         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3646         memcpy(tmp, val, vallen);
3647
3648         if (KEY_IS(KEY_MDS_CONN)) {
3649                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3650
3651                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3652                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3653                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3654                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3655         }
3656
3657         ptlrpc_request_set_replen(req);
3658         ptlrpc_set_add_req(set, req);
3659         ptlrpc_check_set(NULL, set);
3660
3661         RETURN(0);
3662 }
3663
3664
3665 static struct llog_operations osc_size_repl_logops = {
3666         lop_cancel: llog_obd_repl_cancel
3667 };
3668