Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_async_args *aa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 out:
364         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
365         RETURN(rc);
366 }
367
368 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
369                              struct obd_trans_info *oti,
370                              struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request *req;
373         struct osc_async_args *aa;
374         int                    rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
404                 aa = ptlrpc_req_async_args(req);
405                 aa->aa_oi = oinfo;
406
407                 ptlrpc_set_add_req(rqset, req);
408         }
409
410         RETURN(0);
411 }
412
413 int osc_real_create(struct obd_export *exp, struct obdo *oa,
414                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 {
416         struct ptlrpc_request *req;
417         struct ost_body       *body;
418         struct lov_stripe_md  *lsm;
419         int                    rc;
420         ENTRY;
421
422         LASSERT(oa);
423         LASSERT(ea);
424
425         lsm = *ea;
426         if (!lsm) {
427                 rc = obd_alloc_memmd(exp, &lsm);
428                 if (rc < 0)
429                         RETURN(rc);
430         }
431
432         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433         if (req == NULL)
434                 GOTO(out, rc = -ENOMEM);
435
436         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 GOTO(out, rc);
440         }
441
442         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443         LASSERT(body);
444         lustre_set_wire_obdo(&body->oa, oa);
445
446         ptlrpc_request_set_replen(req);
447
448         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
449             oa->o_flags == OBD_FL_DELORPHAN) {
450                 DEBUG_REQ(D_HA, req,
451                           "delorphan from OST integration");
452                 /* Don't resend the delorphan req */
453                 req->rq_no_resend = req->rq_no_delay = 1;
454         }
455
456         rc = ptlrpc_queue_wait(req);
457         if (rc)
458                 GOTO(out_req, rc);
459
460         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461         if (body == NULL)
462                 GOTO(out_req, rc = -EPROTO);
463
464         lustre_get_wire_obdo(oa, &body->oa);
465
466         /* This should really be sent by the OST */
467         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
468         oa->o_valid |= OBD_MD_FLBLKSZ;
469
470         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
471          * have valid lsm_oinfo data structs, so don't go touching that.
472          * This needs to be fixed in a big way.
473          */
474         lsm->lsm_object_id = oa->o_id;
475         lsm->lsm_object_gr = oa->o_gr;
476         *ea = lsm;
477
478         if (oti != NULL) {
479                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480
481                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
482                         if (!oti->oti_logcookies)
483                                 oti_alloc_cookies(oti, 1);
484                         *oti->oti_logcookies = oa->o_lcookie;
485                 }
486         }
487
488         CDEBUG(D_HA, "transno: "LPD64"\n",
489                lustre_msg_get_transno(req->rq_repmsg));
490 out_req:
491         ptlrpc_req_finished(req);
492 out:
493         if (rc && !*ea)
494                 obd_free_memmd(exp, &lsm);
495         RETURN(rc);
496 }
497
498 static int osc_punch_interpret(const struct lu_env *env,
499                                struct ptlrpc_request *req,
500                                struct osc_punch_args *aa, int rc)
501 {
502         struct ost_body *body;
503         ENTRY;
504
505         if (rc != 0)
506                 GOTO(out, rc);
507
508         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509         if (body == NULL)
510                 GOTO(out, rc = -EPROTO);
511
512         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 out:
514         rc = aa->pa_upcall(aa->pa_cookie, rc);
515         RETURN(rc);
516 }
517
518 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
519                    struct obd_capa *capa,
520                    obd_enqueue_update_f upcall, void *cookie,
521                    struct ptlrpc_request_set *rqset)
522 {
523         struct ptlrpc_request *req;
524         struct osc_punch_args *aa;
525         struct ost_body       *body;
526         int                    rc;
527         ENTRY;
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541
542         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543         LASSERT(body);
544         lustre_set_wire_obdo(&body->oa, oa);
545         osc_pack_capa(req, body, capa);
546
547         ptlrpc_request_set_replen(req);
548
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
551         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
552         aa = ptlrpc_req_async_args(req);
553         aa->pa_oa     = oa;
554         aa->pa_upcall = upcall;
555         aa->pa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req, PSCOPE_OTHER);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         RETURN(0);
562 }
563
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565                      struct obd_trans_info *oti,
566                      struct ptlrpc_request_set *rqset)
567 {
568         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
569         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
572                               oinfo->oi_cb_up, oinfo, rqset);
573 }
574
575 static int osc_sync(struct obd_export *exp, struct obdo *oa,
576                     struct lov_stripe_md *md, obd_size start, obd_size end,
577                     void *capa)
578 {
579         struct ptlrpc_request *req;
580         struct ost_body       *body;
581         int                    rc;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
590         if (req == NULL)
591                 RETURN(-ENOMEM);
592
593         osc_set_capa_size(req, &RMF_CAPA1, capa);
594         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595         if (rc) {
596                 ptlrpc_request_free(req);
597                 RETURN(rc);
598         }
599
600         /* overload the size and blocks fields in the oa with start/end */
601         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602         LASSERT(body);
603         lustre_set_wire_obdo(&body->oa, oa);
604         body->oa.o_size = start;
605         body->oa.o_blocks = end;
606         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
607         osc_pack_capa(req, body, capa);
608
609         ptlrpc_request_set_replen(req);
610
611         rc = ptlrpc_queue_wait(req);
612         if (rc)
613                 GOTO(out, rc);
614
615         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616         if (body == NULL)
617                 GOTO(out, rc = -EPROTO);
618
619         lustre_get_wire_obdo(oa, &body->oa);
620
621         EXIT;
622  out:
623         ptlrpc_req_finished(req);
624         return rc;
625 }
626
627 /* Find and cancel locally locks matched by @mode in the resource found by
628  * @objid. Found locks are added into @cancel list. Returns the amount of
629  * locks added to @cancels list. */
630 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
631                                    struct list_head *cancels, ldlm_mode_t mode,
632                                    int lock_flags)
633 {
634         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
635         struct ldlm_res_id res_id;
636         struct ldlm_resource *res;
637         int count;
638         ENTRY;
639
640         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
641         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
642         if (res == NULL)
643                 RETURN(0);
644
645         LDLM_RESOURCE_ADDREF(res);
646         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
647                                            lock_flags, 0, NULL);
648         LDLM_RESOURCE_DELREF(res);
649         ldlm_resource_putref(res);
650         RETURN(count);
651 }
652
653 static int osc_destroy_interpret(const struct lu_env *env,
654                                  struct ptlrpc_request *req, void *data,
655                                  int rc)
656 {
657         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658
659         atomic_dec(&cli->cl_destroy_in_flight);
660         cfs_waitq_signal(&cli->cl_destroy_waitq);
661         return 0;
662 }
663
664 static int osc_can_send_destroy(struct client_obd *cli)
665 {
666         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
667             cli->cl_max_rpcs_in_flight) {
668                 /* The destroy request can be sent */
669                 return 1;
670         }
671         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
672             cli->cl_max_rpcs_in_flight) {
673                 /*
674                  * The counter has been modified between the two atomic
675                  * operations.
676                  */
677                 cfs_waitq_signal(&cli->cl_destroy_waitq);
678         }
679         return 0;
680 }
681
682 /* Destroy requests can be async always on the client, and we don't even really
683  * care about the return code since the client cannot do anything at all about
684  * a destroy failure.
685  * When the MDS is unlinking a filename, it saves the file objects into a
686  * recovery llog, and these object records are cancelled when the OST reports
687  * they were destroyed and sync'd to disk (i.e. transaction committed).
688  * If the client dies, or the OST is down when the object should be destroyed,
689  * the records are not cancelled, and when the OST reconnects to the MDS next,
690  * it will retrieve the llog unlink logs and then sends the log cancellation
691  * cookies to the MDS after committing destroy transactions. */
692 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
693                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
694                        struct obd_export *md_export, void *capa)
695 {
696         struct client_obd     *cli = &exp->exp_obd->u.cli;
697         struct ptlrpc_request *req;
698         struct ost_body       *body;
699         CFS_LIST_HEAD(cancels);
700         int rc, count;
701         ENTRY;
702
703         if (!oa) {
704                 CDEBUG(D_INFO, "oa NULL\n");
705                 RETURN(-EINVAL);
706         }
707
708         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
709                                         LDLM_FL_DISCARD_DATA);
710
711         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712         if (req == NULL) {
713                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
714                 RETURN(-ENOMEM);
715         }
716
717         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
718         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
719                                0, &cancels, count);
720         if (rc) {
721                 ptlrpc_request_free(req);
722                 RETURN(rc);
723         }
724
725         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
726         ptlrpc_at_set_req_timeout(req);
727
728         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
729                 oa->o_lcookie = *oti->oti_logcookies;
730         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731         LASSERT(body);
732         lustre_set_wire_obdo(&body->oa, oa);
733
734         osc_pack_capa(req, body, (struct obd_capa *)capa);
735         ptlrpc_request_set_replen(req);
736
737         /* don't throttle destroy RPCs for the MDT */
738         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
739                 req->rq_interpret_reply = osc_destroy_interpret;
740                 if (!osc_can_send_destroy(cli)) {
741                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
742                                                           NULL);
743
744                         /*
745                          * Wait until the number of on-going destroy RPCs drops
746                          * under max_rpc_in_flight
747                          */
748                         l_wait_event_exclusive(cli->cl_destroy_waitq,
749                                                osc_can_send_destroy(cli), &lwi);
750                 }
751         }
752
753         /* Do not wait for response */
754         ptlrpcd_add_req(req, PSCOPE_OTHER);
755         RETURN(0);
756 }
757
758 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
759                                 long writing_bytes)
760 {
761         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762
763         LASSERT(!(oa->o_valid & bits));
764
765         oa->o_valid |= bits;
766         client_obd_list_lock(&cli->cl_loi_list_lock);
767         oa->o_dirty = cli->cl_dirty;
768         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
769                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
770                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771                 oa->o_undirty = 0;
772         } else if (atomic_read(&obd_dirty_pages) -
773                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
774                 CERROR("dirty %d - %d > system dirty_max %d\n",
775                        atomic_read(&obd_dirty_pages),
776                        atomic_read(&obd_dirty_transit_pages),
777                        obd_max_dirty_pages);
778                 oa->o_undirty = 0;
779         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
780                 CERROR("dirty %lu - dirty_max %lu too big???\n",
781                        cli->cl_dirty, cli->cl_dirty_max);
782                 oa->o_undirty = 0;
783         } else {
784                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
785                                 (cli->cl_max_rpcs_in_flight + 1);
786                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
787         }
788         oa->o_grant = cli->cl_avail_grant;
789         oa->o_dropped = cli->cl_lost_grant;
790         cli->cl_lost_grant = 0;
791         client_obd_list_unlock(&cli->cl_loi_list_lock);
792         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
793                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
794
795 }
796
797 static void osc_update_next_shrink(struct client_obd *cli)
798 {
799         cli->cl_next_shrink_grant =
800                 cfs_time_shift(cli->cl_grant_shrink_interval);
801         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
802                cli->cl_next_shrink_grant);
803 }
804
805 /* caller must hold loi_list_lock */
806 static void osc_consume_write_grant(struct client_obd *cli,
807                                     struct brw_page *pga)
808 {
809         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
810         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
811         atomic_inc(&obd_dirty_pages);
812         cli->cl_dirty += CFS_PAGE_SIZE;
813         cli->cl_avail_grant -= CFS_PAGE_SIZE;
814         pga->flag |= OBD_BRW_FROM_GRANT;
815         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
816                CFS_PAGE_SIZE, pga, pga->pg);
817         LASSERT(cli->cl_avail_grant >= 0);
818         osc_update_next_shrink(cli);
819 }
820
821 /* the companion to osc_consume_write_grant, called when a brw has completed.
822  * must be called with the loi lock held. */
823 static void osc_release_write_grant(struct client_obd *cli,
824                                     struct brw_page *pga, int sent)
825 {
826         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
827         ENTRY;
828
829         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831                 EXIT;
832                 return;
833         }
834
835         pga->flag &= ~OBD_BRW_FROM_GRANT;
836         atomic_dec(&obd_dirty_pages);
837         cli->cl_dirty -= CFS_PAGE_SIZE;
838         if (pga->flag & OBD_BRW_NOCACHE) {
839                 pga->flag &= ~OBD_BRW_NOCACHE;
840                 atomic_dec(&obd_dirty_transit_pages);
841                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
842         }
843         if (!sent) {
844                 cli->cl_lost_grant += CFS_PAGE_SIZE;
845                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
846                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
847         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
848                 /* For short writes we shouldn't count parts of pages that
849                  * span a whole block on the OST side, or our accounting goes
850                  * wrong.  Should match the code in filter_grant_check. */
851                 int offset = pga->off & ~CFS_PAGE_MASK;
852                 int count = pga->count + (offset & (blocksize - 1));
853                 int end = (offset + pga->count) & (blocksize - 1);
854                 if (end)
855                         count += blocksize - end;
856
857                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
858                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
859                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
860                        cli->cl_avail_grant, cli->cl_dirty);
861         }
862
863         EXIT;
864 }
865
866 static unsigned long rpcs_in_flight(struct client_obd *cli)
867 {
868         return cli->cl_r_in_flight + cli->cl_w_in_flight;
869 }
870
871 /* caller must hold loi_list_lock */
872 void osc_wake_cache_waiters(struct client_obd *cli)
873 {
874         struct list_head *l, *tmp;
875         struct osc_cache_waiter *ocw;
876
877         ENTRY;
878         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
879                 /* if we can't dirty more, we must wait until some is written */
880                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
881                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
882                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
883                                "osc max %ld, sys max %d\n", cli->cl_dirty,
884                                cli->cl_dirty_max, obd_max_dirty_pages);
885                         return;
886                 }
887
888                 /* if still dirty cache but no grant wait for pending RPCs that
889                  * may yet return us some grant before doing sync writes */
890                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
891                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
892                                cli->cl_w_in_flight);
893                         return;
894                 }
895
896                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
897                 list_del_init(&ocw->ocw_entry);
898                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
899                         /* no more RPCs in flight to return grant, do sync IO */
900                         ocw->ocw_rc = -EDQUOT;
901                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
902                 } else {
903                         osc_consume_write_grant(cli,
904                                                 &ocw->ocw_oap->oap_brw_page);
905                 }
906
907                 cfs_waitq_signal(&ocw->ocw_waitq);
908         }
909
910         EXIT;
911 }
912
913 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
914 {
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         cli->cl_avail_grant += grant;
917         client_obd_list_unlock(&cli->cl_loi_list_lock);
918 }
919
920 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
921 {
922         if (body->oa.o_valid & OBD_MD_FLGRANT) {
923                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
924                 __osc_update_grant(cli, body->oa.o_grant);
925         }
926 }
927
928 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
929                               void *key, obd_count vallen, void *val,
930                               struct ptlrpc_request_set *set);
931
932 static int osc_shrink_grant_interpret(const struct lu_env *env,
933                                       struct ptlrpc_request *req,
934                                       void *aa, int rc)
935 {
936         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
937         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
938         struct ost_body *body;
939
940         if (rc != 0) {
941                 __osc_update_grant(cli, oa->o_grant);
942                 GOTO(out, rc);
943         }
944
945         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946         LASSERT(body);
947         osc_update_grant(cli, body);
948 out:
949         OBD_FREE_PTR(oa);
950         return rc;
951 }
952
953 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 {
955         client_obd_list_lock(&cli->cl_loi_list_lock);
956         oa->o_grant = cli->cl_avail_grant / 4;
957         cli->cl_avail_grant -= oa->o_grant;
958         client_obd_list_unlock(&cli->cl_loi_list_lock);
959         oa->o_flags |= OBD_FL_SHRINK_GRANT;
960         osc_update_next_shrink(cli);
961 }
962
963 /* Shrink the current grant, either from some large amount to enough for a
964  * full set of in-flight RPCs, or if we have already shrunk to that limit
965  * then to enough for a single RPC.  This avoids keeping more grant than
966  * needed, and avoids shrinking the grant piecemeal. */
967 static int osc_shrink_grant(struct client_obd *cli)
968 {
969         long target = (cli->cl_max_rpcs_in_flight + 1) *
970                       cli->cl_max_pages_per_rpc;
971
972         client_obd_list_lock(&cli->cl_loi_list_lock);
973         if (cli->cl_avail_grant <= target)
974                 target = cli->cl_max_pages_per_rpc;
975         client_obd_list_unlock(&cli->cl_loi_list_lock);
976
977         return osc_shrink_grant_to_target(cli, target);
978 }
979
980 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
981 {
982         int    rc = 0;
983         struct ost_body     *body;
984         ENTRY;
985
986         client_obd_list_lock(&cli->cl_loi_list_lock);
987         /* Don't shrink if we are already above or below the desired limit
988          * We don't want to shrink below a single RPC, as that will negatively
989          * impact block allocation and long-term performance. */
990         if (target < cli->cl_max_pages_per_rpc)
991                 target = cli->cl_max_pages_per_rpc;
992
993         if (target >= cli->cl_avail_grant) {
994                 client_obd_list_unlock(&cli->cl_loi_list_lock);
995                 RETURN(0);
996         }
997         client_obd_list_unlock(&cli->cl_loi_list_lock);
998
999         OBD_ALLOC_PTR(body);
1000         if (!body)
1001                 RETURN(-ENOMEM);
1002
1003         osc_announce_cached(cli, &body->oa, 0);
1004
1005         client_obd_list_lock(&cli->cl_loi_list_lock);
1006         body->oa.o_grant = cli->cl_avail_grant - target;
1007         cli->cl_avail_grant = target;
1008         client_obd_list_unlock(&cli->cl_loi_list_lock);
1009         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1010         osc_update_next_shrink(cli);
1011
1012         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1013                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1014                                 sizeof(*body), body, NULL);
1015         if (rc != 0)
1016                 __osc_update_grant(cli, body->oa.o_grant);
1017         OBD_FREE_PTR(body);
1018         RETURN(rc);
1019 }
1020
1021 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1022 static int osc_should_shrink_grant(struct client_obd *client)
1023 {
1024         cfs_time_t time = cfs_time_current();
1025         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1026         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1027                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1028                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1029                         return 1;
1030                 else
1031                         osc_update_next_shrink(client);
1032         }
1033         return 0;
1034 }
1035
1036 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1037 {
1038         struct client_obd *client;
1039
1040         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1041                 if (osc_should_shrink_grant(client))
1042                         osc_shrink_grant(client);
1043         }
1044         return 0;
1045 }
1046
1047 static int osc_add_shrink_grant(struct client_obd *client)
1048 {
1049         int rc;
1050
1051         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1052                                        TIMEOUT_GRANT,
1053                                        osc_grant_shrink_grant_cb, NULL,
1054                                        &client->cl_grant_shrink_list);
1055         if (rc) {
1056                 CERROR("add grant client %s error %d\n",
1057                         client->cl_import->imp_obd->obd_name, rc);
1058                 return rc;
1059         }
1060         CDEBUG(D_CACHE, "add grant client %s \n",
1061                client->cl_import->imp_obd->obd_name);
1062         osc_update_next_shrink(client);
1063         return 0;
1064 }
1065
1066 static int osc_del_shrink_grant(struct client_obd *client)
1067 {
1068         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1069                                          TIMEOUT_GRANT);
1070 }
1071
1072 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1073 {
1074         /*
1075          * ocd_grant is the total grant amount we're expect to hold: if we've
1076          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1077          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1078          *
1079          * race is tolerable here: if we're evicted, but imp_state already
1080          * left EVICTED state, then cl_dirty must be 0 already.
1081          */
1082         client_obd_list_lock(&cli->cl_loi_list_lock);
1083         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1084                 cli->cl_avail_grant = ocd->ocd_grant;
1085         else
1086                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1087         client_obd_list_unlock(&cli->cl_loi_list_lock);
1088
1089         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1090                cli->cl_avail_grant, cli->cl_lost_grant);
1091         LASSERT(cli->cl_avail_grant >= 0);
1092
1093         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1094             list_empty(&cli->cl_grant_shrink_list))
1095                 osc_add_shrink_grant(cli);
1096 }
1097
1098 /* We assume that the reason this OSC got a short read is because it read
1099  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1100  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1101  * this stripe never got written at or beyond this stripe offset yet. */
1102 static void handle_short_read(int nob_read, obd_count page_count,
1103                               struct brw_page **pga)
1104 {
1105         char *ptr;
1106         int i = 0;
1107
1108         /* skip bytes read OK */
1109         while (nob_read > 0) {
1110                 LASSERT (page_count > 0);
1111
1112                 if (pga[i]->count > nob_read) {
1113                         /* EOF inside this page */
1114                         ptr = cfs_kmap(pga[i]->pg) +
1115                                 (pga[i]->off & ~CFS_PAGE_MASK);
1116                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1117                         cfs_kunmap(pga[i]->pg);
1118                         page_count--;
1119                         i++;
1120                         break;
1121                 }
1122
1123                 nob_read -= pga[i]->count;
1124                 page_count--;
1125                 i++;
1126         }
1127
1128         /* zero remaining pages */
1129         while (page_count-- > 0) {
1130                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1131                 memset(ptr, 0, pga[i]->count);
1132                 cfs_kunmap(pga[i]->pg);
1133                 i++;
1134         }
1135 }
1136
1137 static int check_write_rcs(struct ptlrpc_request *req,
1138                            int requested_nob, int niocount,
1139                            obd_count page_count, struct brw_page **pga)
1140 {
1141         int    *remote_rcs, i;
1142
1143         /* return error if any niobuf was in error */
1144         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1145                                         sizeof(*remote_rcs) * niocount, NULL);
1146         if (remote_rcs == NULL) {
1147                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1148                 return(-EPROTO);
1149         }
1150         if (ptlrpc_rep_need_swab(req))
1151                 for (i = 0; i < niocount; i++)
1152                         __swab32s(&remote_rcs[i]);
1153
1154         for (i = 0; i < niocount; i++) {
1155                 if (remote_rcs[i] < 0)
1156                         return(remote_rcs[i]);
1157
1158                 if (remote_rcs[i] != 0) {
1159                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1160                                 i, remote_rcs[i], req);
1161                         return(-EPROTO);
1162                 }
1163         }
1164
1165         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1166                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1167                        req->rq_bulk->bd_nob_transferred, requested_nob);
1168                 return(-EPROTO);
1169         }
1170
1171         return (0);
1172 }
1173
1174 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1175 {
1176         if (p1->flag != p2->flag) {
1177                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1178                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1179
1180                 /* warn if we try to combine flags that we don't know to be
1181                  * safe to combine */
1182                 if ((p1->flag & mask) != (p2->flag & mask))
1183                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1184                                "same brw?\n", p1->flag, p2->flag);
1185                 return 0;
1186         }
1187
1188         return (p1->off + p1->count == p2->off);
1189 }
1190
1191 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1192                                    struct brw_page **pga, int opc,
1193                                    cksum_type_t cksum_type)
1194 {
1195         __u32 cksum;
1196         int i = 0;
1197
1198         LASSERT (pg_count > 0);
1199         cksum = init_checksum(cksum_type);
1200         while (nob > 0 && pg_count > 0) {
1201                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1202                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1203                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1204
1205                 /* corrupt the data before we compute the checksum, to
1206                  * simulate an OST->client data error */
1207                 if (i == 0 && opc == OST_READ &&
1208                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1209                         memcpy(ptr + off, "bad1", min(4, nob));
1210                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1211                 cfs_kunmap(pga[i]->pg);
1212                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1213                                off, cksum);
1214
1215                 nob -= pga[i]->count;
1216                 pg_count--;
1217                 i++;
1218         }
1219         /* For sending we only compute the wrong checksum instead
1220          * of corrupting the data so it is still correct on a redo */
1221         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1222                 cksum++;
1223
1224         return cksum;
1225 }
1226
1227 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1228                                 struct lov_stripe_md *lsm, obd_count page_count,
1229                                 struct brw_page **pga,
1230                                 struct ptlrpc_request **reqp,
1231                                 struct obd_capa *ocapa, int reserve)
1232 {
1233         struct ptlrpc_request   *req;
1234         struct ptlrpc_bulk_desc *desc;
1235         struct ost_body         *body;
1236         struct obd_ioobj        *ioobj;
1237         struct niobuf_remote    *niobuf;
1238         int niocount, i, requested_nob, opc, rc;
1239         struct osc_brw_async_args *aa;
1240         struct req_capsule      *pill;
1241         struct brw_page *pg_prev;
1242
1243         ENTRY;
1244         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1245                 RETURN(-ENOMEM); /* Recoverable */
1246         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1247                 RETURN(-EINVAL); /* Fatal */
1248
1249         if ((cmd & OBD_BRW_WRITE) != 0) {
1250                 opc = OST_WRITE;
1251                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1252                                                 cli->cl_import->imp_rq_pool,
1253                                                 &RQF_OST_BRW);
1254         } else {
1255                 opc = OST_READ;
1256                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1257         }
1258         if (req == NULL)
1259                 RETURN(-ENOMEM);
1260
1261         for (niocount = i = 1; i < page_count; i++) {
1262                 if (!can_merge_pages(pga[i - 1], pga[i]))
1263                         niocount++;
1264         }
1265
1266         pill = &req->rq_pill;
1267         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268                              niocount * sizeof(*niobuf));
1269         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1270
1271         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1272         if (rc) {
1273                 ptlrpc_request_free(req);
1274                 RETURN(rc);
1275         }
1276         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1277         ptlrpc_at_set_req_timeout(req);
1278
1279         if (opc == OST_WRITE)
1280                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1281                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1282         else
1283                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1284                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1285
1286         if (desc == NULL)
1287                 GOTO(out, rc = -ENOMEM);
1288         /* NB request now owns desc and will free it when it gets freed */
1289
1290         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1291         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1292         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1293         LASSERT(body && ioobj && niobuf);
1294
1295         lustre_set_wire_obdo(&body->oa, oa);
1296
1297         obdo_to_ioobj(oa, ioobj);
1298         ioobj->ioo_bufcnt = niocount;
1299         osc_pack_capa(req, body, ocapa);
1300         LASSERT (page_count > 0);
1301         pg_prev = pga[0];
1302         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1303                 struct brw_page *pg = pga[i];
1304
1305                 LASSERT(pg->count > 0);
1306                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1307                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1308                          pg->off, pg->count);
1309 #ifdef __linux__
1310                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1311                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1312                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1313                          i, page_count,
1314                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1315                          pg_prev->pg, page_private(pg_prev->pg),
1316                          pg_prev->pg->index, pg_prev->off);
1317 #else
1318                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                          "i %d p_c %u\n", i, page_count);
1320 #endif
1321                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1322                         (pg->flag & OBD_BRW_SRVLOCK));
1323
1324                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1325                                       pg->count);
1326                 requested_nob += pg->count;
1327
1328                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1329                         niobuf--;
1330                         niobuf->len += pg->count;
1331                 } else {
1332                         niobuf->offset = pg->off;
1333                         niobuf->len    = pg->count;
1334                         niobuf->flags  = pg->flag;
1335                 }
1336                 pg_prev = pg;
1337         }
1338
1339         LASSERTF((void *)(niobuf - niocount) ==
1340                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1341                                niocount * sizeof(*niobuf)),
1342                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1343                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1344                 (void *)(niobuf - niocount));
1345
1346         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1347         if (osc_should_shrink_grant(cli))
1348                 osc_shrink_grant_local(cli, &body->oa);
1349
1350         /* size[REQ_REC_OFF] still sizeof (*body) */
1351         if (opc == OST_WRITE) {
1352                 if (unlikely(cli->cl_checksum) &&
1353                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1354                         /* store cl_cksum_type in a local variable since
1355                          * it can be changed via lprocfs */
1356                         cksum_type_t cksum_type = cli->cl_cksum_type;
1357
1358                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1359                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1360                                 body->oa.o_flags = 0;
1361                         }
1362                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1363                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1364                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1365                                                              page_count, pga,
1366                                                              OST_WRITE,
1367                                                              cksum_type);
1368                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1369                                body->oa.o_cksum);
1370                         /* save this in 'oa', too, for later checking */
1371                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1372                         oa->o_flags |= cksum_type_pack(cksum_type);
1373                 } else {
1374                         /* clear out the checksum flag, in case this is a
1375                          * resend but cl_checksum is no longer set. b=11238 */
1376                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1377                 }
1378                 oa->o_cksum = body->oa.o_cksum;
1379                 /* 1 RC per niobuf */
1380                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1381                                      sizeof(__u32) * niocount);
1382         } else {
1383                 if (unlikely(cli->cl_checksum) &&
1384                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1386                                 body->oa.o_flags = 0;
1387                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1388                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1389                 }
1390                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1391                 /* 1 RC for the whole I/O */
1392         }
1393         ptlrpc_request_set_replen(req);
1394
1395         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396         aa = ptlrpc_req_async_args(req);
1397         aa->aa_oa = oa;
1398         aa->aa_requested_nob = requested_nob;
1399         aa->aa_nio_count = niocount;
1400         aa->aa_page_count = page_count;
1401         aa->aa_resends = 0;
1402         aa->aa_ppga = pga;
1403         aa->aa_cli = cli;
1404         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1405         if (ocapa && reserve)
1406                 aa->aa_ocapa = capa_get(ocapa);
1407
1408         *reqp = req;
1409         RETURN(0);
1410
1411  out:
1412         ptlrpc_req_finished(req);
1413         RETURN(rc);
1414 }
1415
1416 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1417                                 __u32 client_cksum, __u32 server_cksum, int nob,
1418                                 obd_count page_count, struct brw_page **pga,
1419                                 cksum_type_t client_cksum_type)
1420 {
1421         __u32 new_cksum;
1422         char *msg;
1423         cksum_type_t cksum_type;
1424
1425         if (server_cksum == client_cksum) {
1426                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1427                 return 0;
1428         }
1429
1430         if (oa->o_valid & OBD_MD_FLFLAGS)
1431                 cksum_type = cksum_type_unpack(oa->o_flags);
1432         else
1433                 cksum_type = OBD_CKSUM_CRC32;
1434
1435         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1436                                       cksum_type);
1437
1438         if (cksum_type != client_cksum_type)
1439                 msg = "the server did not use the checksum type specified in "
1440                       "the original request - likely a protocol problem";
1441         else if (new_cksum == server_cksum)
1442                 msg = "changed on the client after we checksummed it - "
1443                       "likely false positive due to mmap IO (bug 11742)";
1444         else if (new_cksum == client_cksum)
1445                 msg = "changed in transit before arrival at OST";
1446         else
1447                 msg = "changed in transit AND doesn't match the original - "
1448                       "likely false positive due to mmap IO (bug 11742)";
1449
1450         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1451                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1452                            "["LPU64"-"LPU64"]\n",
1453                            msg, libcfs_nid2str(peer->nid),
1454                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1455                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1456                                                         (__u64)0,
1457                            oa->o_id,
1458                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1459                            pga[0]->off,
1460                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1461         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1462                "client csum now %x\n", client_cksum, client_cksum_type,
1463                server_cksum, cksum_type, new_cksum);
1464         return 1;
1465 }
1466
1467 /* Note rc enters this function as number of bytes transferred */
1468 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1469 {
1470         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1471         const lnet_process_id_t *peer =
1472                         &req->rq_import->imp_connection->c_peer;
1473         struct client_obd *cli = aa->aa_cli;
1474         struct ost_body *body;
1475         __u32 client_cksum = 0;
1476         ENTRY;
1477
1478         if (rc < 0 && rc != -EDQUOT)
1479                 RETURN(rc);
1480
1481         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1482         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1483                                   lustre_swab_ost_body);
1484         if (body == NULL) {
1485                 CDEBUG(D_INFO, "Can't unpack body\n");
1486                 RETURN(-EPROTO);
1487         }
1488
1489         /* set/clear over quota flag for a uid/gid */
1490         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1491             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1492                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1493
1494                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1495                              body->oa.o_flags);
1496         }
1497
1498         if (rc < 0)
1499                 RETURN(rc);
1500
1501         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1502                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1503
1504         osc_update_grant(cli, body);
1505
1506         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1507                 if (rc > 0) {
1508                         CERROR("Unexpected +ve rc %d\n", rc);
1509                         RETURN(-EPROTO);
1510                 }
1511                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1512
1513                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1514                         RETURN(-EAGAIN);
1515
1516                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1517                     check_write_checksum(&body->oa, peer, client_cksum,
1518                                          body->oa.o_cksum, aa->aa_requested_nob,
1519                                          aa->aa_page_count, aa->aa_ppga,
1520                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1521                         RETURN(-EAGAIN);
1522
1523                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1524                                      aa->aa_page_count, aa->aa_ppga);
1525                 GOTO(out, rc);
1526         }
1527
1528         /* The rest of this function executes only for OST_READs */
1529
1530         /* if unwrap_bulk failed, return -EAGAIN to retry */
1531         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1532         if (rc < 0)
1533                 GOTO(out, rc = -EAGAIN);
1534
1535         if (rc > aa->aa_requested_nob) {
1536                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1537                        aa->aa_requested_nob);
1538                 RETURN(-EPROTO);
1539         }
1540
1541         if (rc != req->rq_bulk->bd_nob_transferred) {
1542                 CERROR ("Unexpected rc %d (%d transferred)\n",
1543                         rc, req->rq_bulk->bd_nob_transferred);
1544                 return (-EPROTO);
1545         }
1546
1547         if (rc < aa->aa_requested_nob)
1548                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1549
1550         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1551                 static int cksum_counter;
1552                 __u32      server_cksum = body->oa.o_cksum;
1553                 char      *via;
1554                 char      *router;
1555                 cksum_type_t cksum_type;
1556
1557                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1558                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1559                 else
1560                         cksum_type = OBD_CKSUM_CRC32;
1561                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1562                                                  aa->aa_ppga, OST_READ,
1563                                                  cksum_type);
1564
1565                 if (peer->nid == req->rq_bulk->bd_sender) {
1566                         via = router = "";
1567                 } else {
1568                         via = " via ";
1569                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1570                 }
1571
1572                 if (server_cksum == ~0 && rc > 0) {
1573                         CERROR("Protocol error: server %s set the 'checksum' "
1574                                "bit, but didn't send a checksum.  Not fatal, "
1575                                "but please notify on http://bugzilla.lustre.org/\n",
1576                                libcfs_nid2str(peer->nid));
1577                 } else if (server_cksum != client_cksum) {
1578                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1579                                            "%s%s%s inum "LPU64"/"LPU64" object "
1580                                            LPU64"/"LPU64" extent "
1581                                            "["LPU64"-"LPU64"]\n",
1582                                            req->rq_import->imp_obd->obd_name,
1583                                            libcfs_nid2str(peer->nid),
1584                                            via, router,
1585                                            body->oa.o_valid & OBD_MD_FLFID ?
1586                                                 body->oa.o_fid : (__u64)0,
1587                                            body->oa.o_valid & OBD_MD_FLFID ?
1588                                                 body->oa.o_generation :(__u64)0,
1589                                            body->oa.o_id,
1590                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1591                                                 body->oa.o_gr : (__u64)0,
1592                                            aa->aa_ppga[0]->off,
1593                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1594                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1595                                                                         1);
1596                         CERROR("client %x, server %x, cksum_type %x\n",
1597                                client_cksum, server_cksum, cksum_type);
1598                         cksum_counter = 0;
1599                         aa->aa_oa->o_cksum = client_cksum;
1600                         rc = -EAGAIN;
1601                 } else {
1602                         cksum_counter++;
1603                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1604                         rc = 0;
1605                 }
1606         } else if (unlikely(client_cksum)) {
1607                 static int cksum_missed;
1608
1609                 cksum_missed++;
1610                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1611                         CERROR("Checksum %u requested from %s but not sent\n",
1612                                cksum_missed, libcfs_nid2str(peer->nid));
1613         } else {
1614                 rc = 0;
1615         }
1616 out:
1617         if (rc >= 0)
1618                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1619
1620         RETURN(rc);
1621 }
1622
1623 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1624                             struct lov_stripe_md *lsm,
1625                             obd_count page_count, struct brw_page **pga,
1626                             struct obd_capa *ocapa)
1627 {
1628         struct ptlrpc_request *req;
1629         int                    rc;
1630         cfs_waitq_t            waitq;
1631         int                    resends = 0;
1632         struct l_wait_info     lwi;
1633
1634         ENTRY;
1635
1636         cfs_waitq_init(&waitq);
1637
1638 restart_bulk:
1639         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1640                                   page_count, pga, &req, ocapa, 0);
1641         if (rc != 0)
1642                 return (rc);
1643
1644         rc = ptlrpc_queue_wait(req);
1645
1646         if (rc == -ETIMEDOUT && req->rq_resend) {
1647                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1648                 ptlrpc_req_finished(req);
1649                 goto restart_bulk;
1650         }
1651
1652         rc = osc_brw_fini_request(req, rc);
1653
1654         ptlrpc_req_finished(req);
1655         if (osc_recoverable_error(rc)) {
1656                 resends++;
1657                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1658                         CERROR("too many resend retries, returning error\n");
1659                         RETURN(-EIO);
1660                 }
1661
1662                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1663                 l_wait_event(waitq, 0, &lwi);
1664
1665                 goto restart_bulk;
1666         }
1667
1668         RETURN (rc);
1669 }
1670
1671 int osc_brw_redo_request(struct ptlrpc_request *request,
1672                          struct osc_brw_async_args *aa)
1673 {
1674         struct ptlrpc_request *new_req;
1675         struct ptlrpc_request_set *set = request->rq_set;
1676         struct osc_brw_async_args *new_aa;
1677         struct osc_async_page *oap;
1678         int rc = 0;
1679         ENTRY;
1680
1681         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1682                 CERROR("too many resend retries, returning error\n");
1683                 RETURN(-EIO);
1684         }
1685
1686         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1687
1688         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1689                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1690                                   aa->aa_cli, aa->aa_oa,
1691                                   NULL /* lsm unused by osc currently */,
1692                                   aa->aa_page_count, aa->aa_ppga,
1693                                   &new_req, aa->aa_ocapa, 0);
1694         if (rc)
1695                 RETURN(rc);
1696
1697         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1698
1699         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1700                 if (oap->oap_request != NULL) {
1701                         LASSERTF(request == oap->oap_request,
1702                                  "request %p != oap_request %p\n",
1703                                  request, oap->oap_request);
1704                         if (oap->oap_interrupted) {
1705                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1706                                 ptlrpc_req_finished(new_req);
1707                                 RETURN(-EINTR);
1708                         }
1709                 }
1710         }
1711         /* New request takes over pga and oaps from old request.
1712          * Note that copying a list_head doesn't work, need to move it... */
1713         aa->aa_resends++;
1714         new_req->rq_interpret_reply = request->rq_interpret_reply;
1715         new_req->rq_async_args = request->rq_async_args;
1716         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1717
1718         new_aa = ptlrpc_req_async_args(new_req);
1719
1720         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1721         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1722         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1723
1724         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1725                 if (oap->oap_request) {
1726                         ptlrpc_req_finished(oap->oap_request);
1727                         oap->oap_request = ptlrpc_request_addref(new_req);
1728                 }
1729         }
1730
1731         new_aa->aa_ocapa = aa->aa_ocapa;
1732         aa->aa_ocapa = NULL;
1733
1734         /* use ptlrpc_set_add_req is safe because interpret functions work
1735          * in check_set context. only one way exist with access to request
1736          * from different thread got -EINTR - this way protected with
1737          * cl_loi_list_lock */
1738         ptlrpc_set_add_req(set, new_req);
1739
1740         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1741
1742         DEBUG_REQ(D_INFO, new_req, "new request");
1743         RETURN(0);
1744 }
1745
1746 /*
1747  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1748  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1749  * fine for our small page arrays and doesn't require allocation.  its an
1750  * insertion sort that swaps elements that are strides apart, shrinking the
1751  * stride down until its '1' and the array is sorted.
1752  */
1753 static void sort_brw_pages(struct brw_page **array, int num)
1754 {
1755         int stride, i, j;
1756         struct brw_page *tmp;
1757
1758         if (num == 1)
1759                 return;
1760         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1761                 ;
1762
1763         do {
1764                 stride /= 3;
1765                 for (i = stride ; i < num ; i++) {
1766                         tmp = array[i];
1767                         j = i;
1768                         while (j >= stride && array[j - stride]->off > tmp->off) {
1769                                 array[j] = array[j - stride];
1770                                 j -= stride;
1771                         }
1772                         array[j] = tmp;
1773                 }
1774         } while (stride > 1);
1775 }
1776
1777 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1778 {
1779         int count = 1;
1780         int offset;
1781         int i = 0;
1782
1783         LASSERT (pages > 0);
1784         offset = pg[i]->off & ~CFS_PAGE_MASK;
1785
1786         for (;;) {
1787                 pages--;
1788                 if (pages == 0)         /* that's all */
1789                         return count;
1790
1791                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1792                         return count;   /* doesn't end on page boundary */
1793
1794                 i++;
1795                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1796                 if (offset != 0)        /* doesn't start on page boundary */
1797                         return count;
1798
1799                 count++;
1800         }
1801 }
1802
1803 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1804 {
1805         struct brw_page **ppga;
1806         int i;
1807
1808         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1809         if (ppga == NULL)
1810                 return NULL;
1811
1812         for (i = 0; i < count; i++)
1813                 ppga[i] = pga + i;
1814         return ppga;
1815 }
1816
1817 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1818 {
1819         LASSERT(ppga != NULL);
1820         OBD_FREE(ppga, sizeof(*ppga) * count);
1821 }
1822
1823 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1824                    obd_count page_count, struct brw_page *pga,
1825                    struct obd_trans_info *oti)
1826 {
1827         struct obdo *saved_oa = NULL;
1828         struct brw_page **ppga, **orig;
1829         struct obd_import *imp = class_exp2cliimp(exp);
1830         struct client_obd *cli;
1831         int rc, page_count_orig;
1832         ENTRY;
1833
1834         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1835         cli = &imp->imp_obd->u.cli;
1836
1837         if (cmd & OBD_BRW_CHECK) {
1838                 /* The caller just wants to know if there's a chance that this
1839                  * I/O can succeed */
1840
1841                 if (imp->imp_invalid)
1842                         RETURN(-EIO);
1843                 RETURN(0);
1844         }
1845
1846         /* test_brw with a failed create can trip this, maybe others. */
1847         LASSERT(cli->cl_max_pages_per_rpc);
1848
1849         rc = 0;
1850
1851         orig = ppga = osc_build_ppga(pga, page_count);
1852         if (ppga == NULL)
1853                 RETURN(-ENOMEM);
1854         page_count_orig = page_count;
1855
1856         sort_brw_pages(ppga, page_count);
1857         while (page_count) {
1858                 obd_count pages_per_brw;
1859
1860                 if (page_count > cli->cl_max_pages_per_rpc)
1861                         pages_per_brw = cli->cl_max_pages_per_rpc;
1862                 else
1863                         pages_per_brw = page_count;
1864
1865                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1866
1867                 if (saved_oa != NULL) {
1868                         /* restore previously saved oa */
1869                         *oinfo->oi_oa = *saved_oa;
1870                 } else if (page_count > pages_per_brw) {
1871                         /* save a copy of oa (brw will clobber it) */
1872                         OBDO_ALLOC(saved_oa);
1873                         if (saved_oa == NULL)
1874                                 GOTO(out, rc = -ENOMEM);
1875                         *saved_oa = *oinfo->oi_oa;
1876                 }
1877
1878                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1879                                       pages_per_brw, ppga, oinfo->oi_capa);
1880
1881                 if (rc != 0)
1882                         break;
1883
1884                 page_count -= pages_per_brw;
1885                 ppga += pages_per_brw;
1886         }
1887
1888 out:
1889         osc_release_ppga(orig, page_count_orig);
1890
1891         if (saved_oa != NULL)
1892                 OBDO_FREE(saved_oa);
1893
1894         RETURN(rc);
1895 }
1896
1897 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1898  * the dirty accounting.  Writeback completes or truncate happens before
1899  * writing starts.  Must be called with the loi lock held. */
1900 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1901                            int sent)
1902 {
1903         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1904 }
1905
1906
1907 /* This maintains the lists of pending pages to read/write for a given object
1908  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1909  * to quickly find objects that are ready to send an RPC. */
1910 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1911                          int cmd)
1912 {
1913         int optimal;
1914         ENTRY;
1915
1916         if (lop->lop_num_pending == 0)
1917                 RETURN(0);
1918
1919         /* if we have an invalid import we want to drain the queued pages
1920          * by forcing them through rpcs that immediately fail and complete
1921          * the pages.  recovery relies on this to empty the queued pages
1922          * before canceling the locks and evicting down the llite pages */
1923         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1924                 RETURN(1);
1925
1926         /* stream rpcs in queue order as long as as there is an urgent page
1927          * queued.  this is our cheap solution for good batching in the case
1928          * where writepage marks some random page in the middle of the file
1929          * as urgent because of, say, memory pressure */
1930         if (!list_empty(&lop->lop_urgent)) {
1931                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1932                 RETURN(1);
1933         }
1934         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1935         optimal = cli->cl_max_pages_per_rpc;
1936         if (cmd & OBD_BRW_WRITE) {
1937                 /* trigger a write rpc stream as long as there are dirtiers
1938                  * waiting for space.  as they're waiting, they're not going to
1939                  * create more pages to coallesce with what's waiting.. */
1940                 if (!list_empty(&cli->cl_cache_waiters)) {
1941                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1942                         RETURN(1);
1943                 }
1944                 /* +16 to avoid triggering rpcs that would want to include pages
1945                  * that are being queued but which can't be made ready until
1946                  * the queuer finishes with the page. this is a wart for
1947                  * llite::commit_write() */
1948                 optimal += 16;
1949         }
1950         if (lop->lop_num_pending >= optimal)
1951                 RETURN(1);
1952
1953         RETURN(0);
1954 }
1955
1956 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1957 {
1958         struct osc_async_page *oap;
1959         ENTRY;
1960
1961         if (list_empty(&lop->lop_urgent))
1962                 RETURN(0);
1963
1964         oap = list_entry(lop->lop_urgent.next,
1965                          struct osc_async_page, oap_urgent_item);
1966
1967         if (oap->oap_async_flags & ASYNC_HP) {
1968                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1969                 RETURN(1);
1970         }
1971
1972         RETURN(0);
1973 }
1974
1975 static void on_list(struct list_head *item, struct list_head *list,
1976                     int should_be_on)
1977 {
1978         if (list_empty(item) && should_be_on)
1979                 list_add_tail(item, list);
1980         else if (!list_empty(item) && !should_be_on)
1981                 list_del_init(item);
1982 }
1983
1984 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1985  * can find pages to build into rpcs quickly */
1986 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1987 {
1988         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1989             lop_makes_hprpc(&loi->loi_read_lop)) {
1990                 /* HP rpc */
1991                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1992                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1993         } else {
1994                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1995                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1996                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1997                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1998         }
1999
2000         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2001                 loi->loi_write_lop.lop_num_pending);
2002
2003         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2004                 loi->loi_read_lop.lop_num_pending);
2005 }
2006
2007 static void lop_update_pending(struct client_obd *cli,
2008                                struct loi_oap_pages *lop, int cmd, int delta)
2009 {
2010         lop->lop_num_pending += delta;
2011         if (cmd & OBD_BRW_WRITE)
2012                 cli->cl_pending_w_pages += delta;
2013         else
2014                 cli->cl_pending_r_pages += delta;
2015 }
2016
2017 /**
2018  * this is called when a sync waiter receives an interruption.  Its job is to
2019  * get the caller woken as soon as possible.  If its page hasn't been put in an
2020  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2021  * desiring interruption which will forcefully complete the rpc once the rpc
2022  * has timed out.
2023  */
2024 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2025 {
2026         struct loi_oap_pages *lop;
2027         struct lov_oinfo *loi;
2028         int rc = -EBUSY;
2029         ENTRY;
2030
2031         LASSERT(!oap->oap_interrupted);
2032         oap->oap_interrupted = 1;
2033
2034         /* ok, it's been put in an rpc. only one oap gets a request reference */
2035         if (oap->oap_request != NULL) {
2036                 ptlrpc_mark_interrupted(oap->oap_request);
2037                 ptlrpcd_wake(oap->oap_request);
2038                 ptlrpc_req_finished(oap->oap_request);
2039                 oap->oap_request = NULL;
2040         }
2041
2042         /*
2043          * page completion may be called only if ->cpo_prep() method was
2044          * executed by osc_io_submit(), that also adds page the to pending list
2045          */
2046         if (!list_empty(&oap->oap_pending_item)) {
2047                 list_del_init(&oap->oap_pending_item);
2048                 list_del_init(&oap->oap_urgent_item);
2049
2050                 loi = oap->oap_loi;
2051                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2052                         &loi->loi_write_lop : &loi->loi_read_lop;
2053                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2054                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2055                 rc = oap->oap_caller_ops->ap_completion(env,
2056                                           oap->oap_caller_data,
2057                                           oap->oap_cmd, NULL, -EINTR);
2058         }
2059
2060         RETURN(rc);
2061 }
2062
2063 /* this is trying to propogate async writeback errors back up to the
2064  * application.  As an async write fails we record the error code for later if
2065  * the app does an fsync.  As long as errors persist we force future rpcs to be
2066  * sync so that the app can get a sync error and break the cycle of queueing
2067  * pages for which writeback will fail. */
2068 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2069                            int rc)
2070 {
2071         if (rc) {
2072                 if (!ar->ar_rc)
2073                         ar->ar_rc = rc;
2074
2075                 ar->ar_force_sync = 1;
2076                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2077                 return;
2078
2079         }
2080
2081         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2082                 ar->ar_force_sync = 0;
2083 }
2084
2085 void osc_oap_to_pending(struct osc_async_page *oap)
2086 {
2087         struct loi_oap_pages *lop;
2088
2089         if (oap->oap_cmd & OBD_BRW_WRITE)
2090                 lop = &oap->oap_loi->loi_write_lop;
2091         else
2092                 lop = &oap->oap_loi->loi_read_lop;
2093
2094         if (oap->oap_async_flags & ASYNC_HP)
2095                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2096         else if (oap->oap_async_flags & ASYNC_URGENT)
2097                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2098         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2099         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2100 }
2101
2102 /* this must be called holding the loi list lock to give coverage to exit_cache,
2103  * async_flag maintenance, and oap_request */
2104 static void osc_ap_completion(const struct lu_env *env,
2105                               struct client_obd *cli, struct obdo *oa,
2106                               struct osc_async_page *oap, int sent, int rc)
2107 {
2108         __u64 xid = 0;
2109
2110         ENTRY;
2111         if (oap->oap_request != NULL) {
2112                 xid = ptlrpc_req_xid(oap->oap_request);
2113                 ptlrpc_req_finished(oap->oap_request);
2114                 oap->oap_request = NULL;
2115         }
2116
2117         spin_lock(&oap->oap_lock);
2118         oap->oap_async_flags = 0;
2119         spin_unlock(&oap->oap_lock);
2120         oap->oap_interrupted = 0;
2121
2122         if (oap->oap_cmd & OBD_BRW_WRITE) {
2123                 osc_process_ar(&cli->cl_ar, xid, rc);
2124                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2125         }
2126
2127         if (rc == 0 && oa != NULL) {
2128                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2129                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2130                 if (oa->o_valid & OBD_MD_FLMTIME)
2131                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2132                 if (oa->o_valid & OBD_MD_FLATIME)
2133                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2134                 if (oa->o_valid & OBD_MD_FLCTIME)
2135                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2136         }
2137
2138         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2139                                                 oap->oap_cmd, oa, rc);
2140
2141         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2142          * I/O on the page could start, but OSC calls it under lock
2143          * and thus we can add oap back to pending safely */
2144         if (rc)
2145                 /* upper layer wants to leave the page on pending queue */
2146                 osc_oap_to_pending(oap);
2147         else
2148                 osc_exit_cache(cli, oap, sent);
2149         EXIT;
2150 }
2151
2152 static int brw_interpret(const struct lu_env *env,
2153                          struct ptlrpc_request *req, void *data, int rc)
2154 {
2155         struct osc_brw_async_args *aa = data;
2156         struct client_obd *cli;
2157         int async;
2158         ENTRY;
2159
2160         rc = osc_brw_fini_request(req, rc);
2161         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2162         if (osc_recoverable_error(rc)) {
2163                 rc = osc_brw_redo_request(req, aa);
2164                 if (rc == 0)
2165                         RETURN(0);
2166         }
2167
2168         if (aa->aa_ocapa) {
2169                 capa_put(aa->aa_ocapa);
2170                 aa->aa_ocapa = NULL;
2171         }
2172
2173         cli = aa->aa_cli;
2174
2175         client_obd_list_lock(&cli->cl_loi_list_lock);
2176
2177         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2178          * is called so we know whether to go to sync BRWs or wait for more
2179          * RPCs to complete */
2180         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2181                 cli->cl_w_in_flight--;
2182         else
2183                 cli->cl_r_in_flight--;
2184
2185         async = list_empty(&aa->aa_oaps);
2186         if (!async) { /* from osc_send_oap_rpc() */
2187                 struct osc_async_page *oap, *tmp;
2188                 /* the caller may re-use the oap after the completion call so
2189                  * we need to clean it up a little */
2190                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2191                         list_del_init(&oap->oap_rpc_item);
2192                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2193                 }
2194                 OBDO_FREE(aa->aa_oa);
2195         } else { /* from async_internal() */
2196                 int i;
2197                 for (i = 0; i < aa->aa_page_count; i++)
2198                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2199
2200                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2201                         OBDO_FREE(aa->aa_oa);
2202         }
2203         osc_wake_cache_waiters(cli);
2204         osc_check_rpcs(env, cli);
2205         client_obd_list_unlock(&cli->cl_loi_list_lock);
2206         if (!async)
2207                 cl_req_completion(env, aa->aa_clerq, rc);
2208         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2209         RETURN(rc);
2210 }
2211
2212 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2213                                             struct client_obd *cli,
2214                                             struct list_head *rpc_list,
2215                                             int page_count, int cmd)
2216 {
2217         struct ptlrpc_request *req;
2218         struct brw_page **pga = NULL;
2219         struct osc_brw_async_args *aa;
2220         struct obdo *oa = NULL;
2221         const struct obd_async_page_ops *ops = NULL;
2222         void *caller_data = NULL;
2223         struct osc_async_page *oap;
2224         struct osc_async_page *tmp;
2225         struct ost_body *body;
2226         struct cl_req *clerq = NULL;
2227         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2228         struct ldlm_lock *lock = NULL;
2229         struct cl_req_attr crattr;
2230         int i, rc;
2231
2232         ENTRY;
2233         LASSERT(!list_empty(rpc_list));
2234
2235         memset(&crattr, 0, sizeof crattr);
2236         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2237         if (pga == NULL)
2238                 GOTO(out, req = ERR_PTR(-ENOMEM));
2239
2240         OBDO_ALLOC(oa);
2241         if (oa == NULL)
2242                 GOTO(out, req = ERR_PTR(-ENOMEM));
2243
2244         i = 0;
2245         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2246                 struct cl_page *page = osc_oap2cl_page(oap);
2247                 if (ops == NULL) {
2248                         ops = oap->oap_caller_ops;
2249                         caller_data = oap->oap_caller_data;
2250
2251                         clerq = cl_req_alloc(env, page, crt,
2252                                              1 /* only 1-object rpcs for
2253                                                 * now */);
2254                         if (IS_ERR(clerq))
2255                                 GOTO(out, req = (void *)clerq);
2256                         lock = oap->oap_ldlm_lock;
2257                 }
2258                 pga[i] = &oap->oap_brw_page;
2259                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2260                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2261                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2262                 i++;
2263                 cl_req_page_add(env, clerq, page);
2264         }
2265
2266         /* always get the data for the obdo for the rpc */
2267         LASSERT(ops != NULL);
2268         crattr.cra_oa = oa;
2269         crattr.cra_capa = NULL;
2270         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2271         if (lock) {
2272                 oa->o_handle = lock->l_remote_handle;
2273                 oa->o_valid |= OBD_MD_FLHANDLE;
2274         }
2275
2276         rc = cl_req_prep(env, clerq);
2277         if (rc != 0) {
2278                 CERROR("cl_req_prep failed: %d\n", rc);
2279                 GOTO(out, req = ERR_PTR(rc));
2280         }
2281
2282         sort_brw_pages(pga, page_count);
2283         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2284                                   pga, &req, crattr.cra_capa, 1);
2285         if (rc != 0) {
2286                 CERROR("prep_req failed: %d\n", rc);
2287                 GOTO(out, req = ERR_PTR(rc));
2288         }
2289
2290         /* Need to update the timestamps after the request is built in case
2291          * we race with setattr (locally or in queue at OST).  If OST gets
2292          * later setattr before earlier BRW (as determined by the request xid),
2293          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2294          * way to do this in a single call.  bug 10150 */
2295         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2296         cl_req_attr_set(env, clerq, &crattr,
2297                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2298
2299         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2300         aa = ptlrpc_req_async_args(req);
2301         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2302         list_splice(rpc_list, &aa->aa_oaps);
2303         CFS_INIT_LIST_HEAD(rpc_list);
2304         aa->aa_clerq = clerq;
2305 out:
2306         capa_put(crattr.cra_capa);
2307         if (IS_ERR(req)) {
2308                 if (oa)
2309                         OBDO_FREE(oa);
2310                 if (pga)
2311                         OBD_FREE(pga, sizeof(*pga) * page_count);
2312                 /* this should happen rarely and is pretty bad, it makes the
2313                  * pending list not follow the dirty order */
2314                 client_obd_list_lock(&cli->cl_loi_list_lock);
2315                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2316                         list_del_init(&oap->oap_rpc_item);
2317
2318                         /* queued sync pages can be torn down while the pages
2319                          * were between the pending list and the rpc */
2320                         if (oap->oap_interrupted) {
2321                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2322                                 osc_ap_completion(env, cli, NULL, oap, 0,
2323                                                   oap->oap_count);
2324                                 continue;
2325                         }
2326                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2327                 }
2328                 if (clerq && !IS_ERR(clerq))
2329                         cl_req_completion(env, clerq, PTR_ERR(req));
2330         }
2331         RETURN(req);
2332 }
2333
2334 /**
2335  * prepare pages for ASYNC io and put pages in send queue.
2336  *
2337  * \param cmd OBD_BRW_* macroses
2338  * \param lop pending pages
2339  *
2340  * \return zero if pages successfully add to send queue.
2341  * \return not zere if error occurring.
2342  */
2343 static int
2344 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2345                  struct lov_oinfo *loi,
2346                  int cmd, struct loi_oap_pages *lop)
2347 {
2348         struct ptlrpc_request *req;
2349         obd_count page_count = 0;
2350         struct osc_async_page *oap = NULL, *tmp;
2351         struct osc_brw_async_args *aa;
2352         const struct obd_async_page_ops *ops;
2353         CFS_LIST_HEAD(rpc_list);
2354         CFS_LIST_HEAD(tmp_list);
2355         unsigned int ending_offset;
2356         unsigned  starting_offset = 0;
2357         int srvlock = 0;
2358         struct cl_object *clob = NULL;
2359         ENTRY;
2360
2361         /* ASYNC_HP pages first. At present, when the lock the pages is
2362          * to be canceled, the pages covered by the lock will be sent out
2363          * with ASYNC_HP. We have to send out them as soon as possible. */
2364         list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2365                 if (oap->oap_async_flags & ASYNC_HP) 
2366                         list_move(&oap->oap_pending_item, &tmp_list);
2367                 else
2368                         list_move_tail(&oap->oap_pending_item, &tmp_list);
2369                 if (++page_count >= cli->cl_max_pages_per_rpc)
2370                         break;
2371         }
2372
2373         list_splice(&tmp_list, &lop->lop_pending);
2374         page_count = 0;
2375
2376         /* first we find the pages we're allowed to work with */
2377         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2378                                  oap_pending_item) {
2379                 ops = oap->oap_caller_ops;
2380
2381                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2382                          "magic 0x%x\n", oap, oap->oap_magic);
2383
2384                 if (clob == NULL) {
2385                         /* pin object in memory, so that completion call-backs
2386                          * can be safely called under client_obd_list lock. */
2387                         clob = osc_oap2cl_page(oap)->cp_obj;
2388                         cl_object_get(clob);
2389                 }
2390
2391                 if (page_count != 0 &&
2392                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2393                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2394                                " oap %p, page %p, srvlock %u\n",
2395                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2396                         break;
2397                 }
2398
2399                 /* If there is a gap at the start of this page, it can't merge
2400                  * with any previous page, so we'll hand the network a
2401                  * "fragmented" page array that it can't transfer in 1 RDMA */
2402                 if (page_count != 0 && oap->oap_page_off != 0)
2403                         break;
2404
2405                 /* in llite being 'ready' equates to the page being locked
2406                  * until completion unlocks it.  commit_write submits a page
2407                  * as not ready because its unlock will happen unconditionally
2408                  * as the call returns.  if we race with commit_write giving
2409                  * us that page we dont' want to create a hole in the page
2410                  * stream, so we stop and leave the rpc to be fired by
2411                  * another dirtier or kupdated interval (the not ready page
2412                  * will still be on the dirty list).  we could call in
2413                  * at the end of ll_file_write to process the queue again. */
2414                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2415                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2416                                                     cmd);
2417                         if (rc < 0)
2418                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2419                                                 "instead of ready\n", oap,
2420                                                 oap->oap_page, rc);
2421                         switch (rc) {
2422                         case -EAGAIN:
2423                                 /* llite is telling us that the page is still
2424                                  * in commit_write and that we should try
2425                                  * and put it in an rpc again later.  we
2426                                  * break out of the loop so we don't create
2427                                  * a hole in the sequence of pages in the rpc
2428                                  * stream.*/
2429                                 oap = NULL;
2430                                 break;
2431                         case -EINTR:
2432                                 /* the io isn't needed.. tell the checks
2433                                  * below to complete the rpc with EINTR */
2434                                 spin_lock(&oap->oap_lock);
2435                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2436                                 spin_unlock(&oap->oap_lock);
2437                                 oap->oap_count = -EINTR;
2438                                 break;
2439                         case 0:
2440                                 spin_lock(&oap->oap_lock);
2441                                 oap->oap_async_flags |= ASYNC_READY;
2442                                 spin_unlock(&oap->oap_lock);
2443                                 break;
2444                         default:
2445                                 LASSERTF(0, "oap %p page %p returned %d "
2446                                             "from make_ready\n", oap,
2447                                             oap->oap_page, rc);
2448                                 break;
2449                         }
2450                 }
2451                 if (oap == NULL)
2452                         break;
2453                 /*
2454                  * Page submitted for IO has to be locked. Either by
2455                  * ->ap_make_ready() or by higher layers.
2456                  */
2457 #if defined(__KERNEL__) && defined(__linux__)
2458                 {
2459                         struct cl_page *page;
2460
2461                         page = osc_oap2cl_page(oap);
2462
2463                         if (page->cp_type == CPT_CACHEABLE &&
2464                             !(PageLocked(oap->oap_page) &&
2465                               (CheckWriteback(oap->oap_page, cmd)))) {
2466                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2467                                        oap->oap_page,
2468                                        (long)oap->oap_page->flags,
2469                                        oap->oap_async_flags);
2470                                 LBUG();
2471                         }
2472                 }
2473 #endif
2474
2475                 /* take the page out of our book-keeping */
2476                 list_del_init(&oap->oap_pending_item);
2477                 lop_update_pending(cli, lop, cmd, -1);
2478                 list_del_init(&oap->oap_urgent_item);
2479
2480                 if (page_count == 0)
2481                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2482                                           (PTLRPC_MAX_BRW_SIZE - 1);
2483
2484                 /* ask the caller for the size of the io as the rpc leaves. */
2485                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2486                         oap->oap_count =
2487                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2488                                                       cmd);
2489                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2490                 }
2491                 if (oap->oap_count <= 0) {
2492                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2493                                oap->oap_count);
2494                         osc_ap_completion(env, cli, NULL,
2495                                           oap, 0, oap->oap_count);
2496                         continue;
2497                 }
2498
2499                 /* now put the page back in our accounting */
2500                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2501                 if (page_count == 0)
2502                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2503                 if (++page_count >= cli->cl_max_pages_per_rpc)
2504                         break;
2505
2506                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2507                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2508                  * have the same alignment as the initial writes that allocated
2509                  * extents on the server. */
2510                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2511                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2512                 if (ending_offset == 0)
2513                         break;
2514
2515                 /* If there is a gap at the end of this page, it can't merge
2516                  * with any subsequent pages, so we'll hand the network a
2517                  * "fragmented" page array that it can't transfer in 1 RDMA */
2518                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2519                         break;
2520         }
2521
2522         osc_wake_cache_waiters(cli);
2523
2524         loi_list_maint(cli, loi);
2525
2526         client_obd_list_unlock(&cli->cl_loi_list_lock);
2527
2528         if (clob != NULL)
2529                 cl_object_put(env, clob);
2530
2531         if (page_count == 0) {
2532                 client_obd_list_lock(&cli->cl_loi_list_lock);
2533                 RETURN(0);
2534         }
2535
2536         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2537         if (IS_ERR(req)) {
2538                 LASSERT(list_empty(&rpc_list));
2539                 loi_list_maint(cli, loi);
2540                 RETURN(PTR_ERR(req));
2541         }
2542
2543         aa = ptlrpc_req_async_args(req);
2544
2545         if (cmd == OBD_BRW_READ) {
2546                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2547                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2548                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2549                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2550         } else {
2551                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2552                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2553                                  cli->cl_w_in_flight);
2554                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2555                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2556         }
2557         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2558
2559         client_obd_list_lock(&cli->cl_loi_list_lock);
2560
2561         if (cmd == OBD_BRW_READ)
2562                 cli->cl_r_in_flight++;
2563         else
2564                 cli->cl_w_in_flight++;
2565
2566         /* queued sync pages can be torn down while the pages
2567          * were between the pending list and the rpc */
2568         tmp = NULL;
2569         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2570                 /* only one oap gets a request reference */
2571                 if (tmp == NULL)
2572                         tmp = oap;
2573                 if (oap->oap_interrupted && !req->rq_intr) {
2574                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2575                                oap, req);
2576                         ptlrpc_mark_interrupted(req);
2577                 }
2578         }
2579         if (tmp != NULL)
2580                 tmp->oap_request = ptlrpc_request_addref(req);
2581
2582         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2583                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2584
2585         req->rq_interpret_reply = brw_interpret;
2586         ptlrpcd_add_req(req, PSCOPE_BRW);
2587         RETURN(1);
2588 }
2589
2590 #define LOI_DEBUG(LOI, STR, args...)                                     \
2591         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2592                !list_empty(&(LOI)->loi_ready_item) ||                    \
2593                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2594                (LOI)->loi_write_lop.lop_num_pending,                     \
2595                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2596                (LOI)->loi_read_lop.lop_num_pending,                      \
2597                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2598                args)                                                     \
2599
2600 /* This is called by osc_check_rpcs() to find which objects have pages that
2601  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2602 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2603 {
2604         ENTRY;
2605
2606         /* First return objects that have blocked locks so that they
2607          * will be flushed quickly and other clients can get the lock,
2608          * then objects which have pages ready to be stuffed into RPCs */
2609         if (!list_empty(&cli->cl_loi_hp_ready_list))
2610                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2611                                   struct lov_oinfo, loi_hp_ready_item));
2612         if (!list_empty(&cli->cl_loi_ready_list))
2613                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2614                                   struct lov_oinfo, loi_ready_item));
2615
2616         /* then if we have cache waiters, return all objects with queued
2617          * writes.  This is especially important when many small files
2618          * have filled up the cache and not been fired into rpcs because
2619          * they don't pass the nr_pending/object threshhold */
2620         if (!list_empty(&cli->cl_cache_waiters) &&
2621             !list_empty(&cli->cl_loi_write_list))
2622                 RETURN(list_entry(cli->cl_loi_write_list.next,
2623                                   struct lov_oinfo, loi_write_item));
2624
2625         /* then return all queued objects when we have an invalid import
2626          * so that they get flushed */
2627         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2628                 if (!list_empty(&cli->cl_loi_write_list))
2629                         RETURN(list_entry(cli->cl_loi_write_list.next,
2630                                           struct lov_oinfo, loi_write_item));
2631                 if (!list_empty(&cli->cl_loi_read_list))
2632                         RETURN(list_entry(cli->cl_loi_read_list.next,
2633                                           struct lov_oinfo, loi_read_item));
2634         }
2635         RETURN(NULL);
2636 }
2637
2638 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2639 {
2640         struct osc_async_page *oap;
2641         int hprpc = 0;
2642
2643         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2644                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2645                                  struct osc_async_page, oap_urgent_item);
2646                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2647         }
2648
2649         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2650                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2651                                  struct osc_async_page, oap_urgent_item);
2652                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2653         }
2654
2655         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2656 }
2657
2658 /* called with the loi list lock held */
2659 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2660 {
2661         struct lov_oinfo *loi;
2662         int rc = 0, race_counter = 0;
2663         ENTRY;
2664
2665         while ((loi = osc_next_loi(cli)) != NULL) {
2666                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2667
2668                 if (osc_max_rpc_in_flight(cli, loi))
2669                         break;
2670
2671                 /* attempt some read/write balancing by alternating between
2672                  * reads and writes in an object.  The makes_rpc checks here
2673                  * would be redundant if we were getting read/write work items
2674                  * instead of objects.  we don't want send_oap_rpc to drain a
2675                  * partial read pending queue when we're given this object to
2676                  * do io on writes while there are cache waiters */
2677                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2678                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2679                                               &loi->loi_write_lop);
2680                         if (rc < 0) {
2681                                 CERROR("Write request failed with %d\n", rc);
2682
2683                                 /* osc_send_oap_rpc failed, mostly because of
2684                                  * memory pressure.
2685                                  *
2686                                  * It can't break here, because if:
2687                                  *  - a page was submitted by osc_io_submit, so
2688                                  *    page locked;
2689                                  *  - no request in flight
2690                                  *  - no subsequent request
2691                                  * The system will be in live-lock state,
2692                                  * because there is no chance to call
2693                                  * osc_io_unplug() and osc_check_rpcs() any
2694                                  * more. pdflush can't help in this case,
2695                                  * because it might be blocked at grabbing
2696                                  * the page lock as we mentioned.
2697                                  *
2698                                  * Anyway, continue to drain pages. */
2699                                 /* break; */
2700                         }
2701
2702                         if (rc > 0)
2703                                 race_counter = 0;
2704                         else
2705                                 race_counter++;
2706                 }
2707                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2708                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2709                                               &loi->loi_read_lop);
2710                         if (rc < 0)
2711                                 CERROR("Read request failed with %d\n", rc);
2712
2713                         if (rc > 0)
2714                                 race_counter = 0;
2715                         else
2716                                 race_counter++;
2717                 }
2718
2719                 /* attempt some inter-object balancing by issueing rpcs
2720                  * for each object in turn */
2721                 if (!list_empty(&loi->loi_hp_ready_item))
2722                         list_del_init(&loi->loi_hp_ready_item);
2723                 if (!list_empty(&loi->loi_ready_item))
2724                         list_del_init(&loi->loi_ready_item);
2725                 if (!list_empty(&loi->loi_write_item))
2726                         list_del_init(&loi->loi_write_item);
2727                 if (!list_empty(&loi->loi_read_item))
2728                         list_del_init(&loi->loi_read_item);
2729
2730                 loi_list_maint(cli, loi);
2731
2732                 /* send_oap_rpc fails with 0 when make_ready tells it to
2733                  * back off.  llite's make_ready does this when it tries
2734                  * to lock a page queued for write that is already locked.
2735                  * we want to try sending rpcs from many objects, but we
2736                  * don't want to spin failing with 0.  */
2737                 if (race_counter == 10)
2738                         break;
2739         }
2740         EXIT;
2741 }
2742
2743 /* we're trying to queue a page in the osc so we're subject to the
2744  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2745  * If the osc's queued pages are already at that limit, then we want to sleep
2746  * until there is space in the osc's queue for us.  We also may be waiting for
2747  * write credits from the OST if there are RPCs in flight that may return some
2748  * before we fall back to sync writes.
2749  *
2750  * We need this know our allocation was granted in the presence of signals */
2751 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2752 {
2753         int rc;
2754         ENTRY;
2755         client_obd_list_lock(&cli->cl_loi_list_lock);
2756         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2757         client_obd_list_unlock(&cli->cl_loi_list_lock);
2758         RETURN(rc);
2759 };
2760
2761 /**
2762  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2763  * is available.
2764  */
2765 int osc_enter_cache_try(const struct lu_env *env,
2766                         struct client_obd *cli, struct lov_oinfo *loi,
2767                         struct osc_async_page *oap, int transient)
2768 {
2769         int has_grant;
2770
2771         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2772         if (has_grant) {
2773                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2774                 if (transient) {
2775                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2776                         atomic_inc(&obd_dirty_transit_pages);
2777                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2778                 }
2779         }
2780         return has_grant;
2781 }
2782
2783 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2784  * grant or cache space. */
2785 static int osc_enter_cache(const struct lu_env *env,
2786                            struct client_obd *cli, struct lov_oinfo *loi,
2787                            struct osc_async_page *oap)
2788 {
2789         struct osc_cache_waiter ocw;
2790         struct l_wait_info lwi = { 0 };
2791
2792         ENTRY;
2793
2794         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2795                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2796                cli->cl_dirty_max, obd_max_dirty_pages,
2797                cli->cl_lost_grant, cli->cl_avail_grant);
2798
2799         /* force the caller to try sync io.  this can jump the list
2800          * of queued writes and create a discontiguous rpc stream */
2801         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2802             loi->loi_ar.ar_force_sync)
2803                 RETURN(-EDQUOT);
2804
2805         /* Hopefully normal case - cache space and write credits available */
2806         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2807             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2808             osc_enter_cache_try(env, cli, loi, oap, 0))
2809                 RETURN(0);
2810
2811         /* Make sure that there are write rpcs in flight to wait for.  This
2812          * is a little silly as this object may not have any pending but
2813          * other objects sure might. */
2814         if (cli->cl_w_in_flight) {
2815                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2816                 cfs_waitq_init(&ocw.ocw_waitq);
2817                 ocw.ocw_oap = oap;
2818                 ocw.ocw_rc = 0;
2819
2820                 loi_list_maint(cli, loi);
2821                 osc_check_rpcs(env, cli);
2822                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2823
2824                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2825                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2826
2827                 client_obd_list_lock(&cli->cl_loi_list_lock);
2828                 if (!list_empty(&ocw.ocw_entry)) {
2829                         list_del(&ocw.ocw_entry);
2830                         RETURN(-EINTR);
2831                 }
2832                 RETURN(ocw.ocw_rc);
2833         }
2834
2835         RETURN(-EDQUOT);
2836 }
2837
2838
2839 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2840                         struct lov_oinfo *loi, cfs_page_t *page,
2841                         obd_off offset, const struct obd_async_page_ops *ops,
2842                         void *data, void **res, int nocache,
2843                         struct lustre_handle *lockh)
2844 {
2845         struct osc_async_page *oap;
2846
2847         ENTRY;
2848
2849         if (!page)
2850                 return size_round(sizeof(*oap));
2851
2852         oap = *res;
2853         oap->oap_magic = OAP_MAGIC;
2854         oap->oap_cli = &exp->exp_obd->u.cli;
2855         oap->oap_loi = loi;
2856
2857         oap->oap_caller_ops = ops;
2858         oap->oap_caller_data = data;
2859
2860         oap->oap_page = page;
2861         oap->oap_obj_off = offset;
2862         if (!client_is_remote(exp) &&
2863             cfs_capable(CFS_CAP_SYS_RESOURCE))
2864                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2865
2866         LASSERT(!(offset & ~CFS_PAGE_MASK));
2867
2868         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2869         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2870         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2871         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2872
2873         spin_lock_init(&oap->oap_lock);
2874         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2875         RETURN(0);
2876 }
2877
2878 struct osc_async_page *oap_from_cookie(void *cookie)
2879 {
2880         struct osc_async_page *oap = cookie;
2881         if (oap->oap_magic != OAP_MAGIC)
2882                 return ERR_PTR(-EINVAL);
2883         return oap;
2884 };
2885
2886 int osc_queue_async_io(const struct lu_env *env,
2887                        struct obd_export *exp, struct lov_stripe_md *lsm,
2888                        struct lov_oinfo *loi, void *cookie,
2889                        int cmd, obd_off off, int count,
2890                        obd_flag brw_flags, enum async_flags async_flags)
2891 {
2892         struct client_obd *cli = &exp->exp_obd->u.cli;
2893         struct osc_async_page *oap;
2894         int rc = 0;
2895         ENTRY;
2896
2897         oap = oap_from_cookie(cookie);
2898         if (IS_ERR(oap))
2899                 RETURN(PTR_ERR(oap));
2900
2901         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2902                 RETURN(-EIO);
2903
2904         if (!list_empty(&oap->oap_pending_item) ||
2905             !list_empty(&oap->oap_urgent_item) ||
2906             !list_empty(&oap->oap_rpc_item))
2907                 RETURN(-EBUSY);
2908
2909         /* check if the file's owner/group is over quota */
2910         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2911                 struct cl_object *obj;
2912                 struct cl_attr    attr; /* XXX put attr into thread info */
2913                 unsigned int qid[MAXQUOTAS];
2914
2915                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2916
2917                 cl_object_attr_lock(obj);
2918                 rc = cl_object_attr_get(env, obj, &attr);
2919                 cl_object_attr_unlock(obj);
2920
2921                 qid[USRQUOTA] = attr.cat_uid;
2922                 qid[GRPQUOTA] = attr.cat_gid;
2923                 if (rc == 0 &&
2924                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2925                         rc = -EDQUOT;
2926                 if (rc)
2927                         RETURN(rc);
2928         }
2929
2930         if (loi == NULL)
2931                 loi = lsm->lsm_oinfo[0];
2932
2933         client_obd_list_lock(&cli->cl_loi_list_lock);
2934
2935         LASSERT(off + count <= CFS_PAGE_SIZE);
2936         oap->oap_cmd = cmd;
2937         oap->oap_page_off = off;
2938         oap->oap_count = count;
2939         oap->oap_brw_flags = brw_flags;
2940         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2941         if (libcfs_memory_pressure_get())
2942                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2943         spin_lock(&oap->oap_lock);
2944         oap->oap_async_flags = async_flags;
2945         spin_unlock(&oap->oap_lock);
2946
2947         if (cmd & OBD_BRW_WRITE) {
2948                 rc = osc_enter_cache(env, cli, loi, oap);
2949                 if (rc) {
2950                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2951                         RETURN(rc);
2952                 }
2953         }
2954
2955         osc_oap_to_pending(oap);
2956         loi_list_maint(cli, loi);
2957
2958         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2959                   cmd);
2960
2961         osc_check_rpcs(env, cli);
2962         client_obd_list_unlock(&cli->cl_loi_list_lock);
2963
2964         RETURN(0);
2965 }
2966
2967 /* aka (~was & now & flag), but this is more clear :) */
2968 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2969
2970 int osc_set_async_flags_base(struct client_obd *cli,
2971                              struct lov_oinfo *loi, struct osc_async_page *oap,
2972                              obd_flag async_flags)
2973 {
2974         struct loi_oap_pages *lop;
2975         int flags = 0;
2976         ENTRY;
2977
2978         LASSERT(!list_empty(&oap->oap_pending_item));
2979
2980         if (oap->oap_cmd & OBD_BRW_WRITE) {
2981                 lop = &loi->loi_write_lop;
2982         } else {
2983                 lop = &loi->loi_read_lop;
2984         }
2985
2986         if ((oap->oap_async_flags & async_flags) == async_flags)
2987                 RETURN(0);
2988
2989         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2990                 flags |= ASYNC_READY;
2991
2992         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2993             list_empty(&oap->oap_rpc_item)) {
2994                 if (oap->oap_async_flags & ASYNC_HP)
2995                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2996                 else
2997                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2998                 flags |= ASYNC_URGENT;
2999                 loi_list_maint(cli, loi);
3000         }
3001         spin_lock(&oap->oap_lock);
3002         oap->oap_async_flags |= flags;
3003         spin_unlock(&oap->oap_lock);
3004
3005         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3006                         oap->oap_async_flags);
3007         RETURN(0);
3008 }
3009
3010 int osc_teardown_async_page(struct obd_export *exp,
3011                             struct lov_stripe_md *lsm,
3012                             struct lov_oinfo *loi, void *cookie)
3013 {
3014         struct client_obd *cli = &exp->exp_obd->u.cli;
3015         struct loi_oap_pages *lop;
3016         struct osc_async_page *oap;
3017         int rc = 0;
3018         ENTRY;
3019
3020         oap = oap_from_cookie(cookie);
3021         if (IS_ERR(oap))
3022                 RETURN(PTR_ERR(oap));
3023
3024         if (loi == NULL)
3025                 loi = lsm->lsm_oinfo[0];
3026
3027         if (oap->oap_cmd & OBD_BRW_WRITE) {
3028                 lop = &loi->loi_write_lop;
3029         } else {
3030                 lop = &loi->loi_read_lop;
3031         }
3032
3033         client_obd_list_lock(&cli->cl_loi_list_lock);
3034
3035         if (!list_empty(&oap->oap_rpc_item))
3036                 GOTO(out, rc = -EBUSY);
3037
3038         osc_exit_cache(cli, oap, 0);
3039         osc_wake_cache_waiters(cli);
3040
3041         if (!list_empty(&oap->oap_urgent_item)) {
3042                 list_del_init(&oap->oap_urgent_item);
3043                 spin_lock(&oap->oap_lock);
3044                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3045                 spin_unlock(&oap->oap_lock);
3046         }
3047         if (!list_empty(&oap->oap_pending_item)) {
3048                 list_del_init(&oap->oap_pending_item);
3049                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3050         }
3051         loi_list_maint(cli, loi);
3052         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3053 out:
3054         client_obd_list_unlock(&cli->cl_loi_list_lock);
3055         RETURN(rc);
3056 }
3057
3058 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3059                                          struct ldlm_enqueue_info *einfo,
3060                                          int flags)
3061 {
3062         void *data = einfo->ei_cbdata;
3063
3064         LASSERT(lock != NULL);
3065         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3066         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3067         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3068         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3069
3070         lock_res_and_lock(lock);
3071         spin_lock(&osc_ast_guard);
3072         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3073         lock->l_ast_data = data;
3074         spin_unlock(&osc_ast_guard);
3075         unlock_res_and_lock(lock);
3076 }
3077
3078 static void osc_set_data_with_check(struct lustre_handle *lockh,
3079                                     struct ldlm_enqueue_info *einfo,
3080                                     int flags)
3081 {
3082         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3083
3084         if (lock != NULL) {
3085                 osc_set_lock_data_with_check(lock, einfo, flags);
3086                 LDLM_LOCK_PUT(lock);
3087         } else
3088                 CERROR("lockh %p, data %p - client evicted?\n",
3089                        lockh, einfo->ei_cbdata);
3090 }
3091
3092 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3093                              ldlm_iterator_t replace, void *data)
3094 {
3095         struct ldlm_res_id res_id;
3096         struct obd_device *obd = class_exp2obd(exp);
3097
3098         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3099         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3100         return 0;
3101 }
3102
3103 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3104                             obd_enqueue_update_f upcall, void *cookie,
3105                             int *flags, int rc)
3106 {
3107         int intent = *flags & LDLM_FL_HAS_INTENT;
3108         ENTRY;
3109
3110         if (intent) {
3111                 /* The request was created before ldlm_cli_enqueue call. */
3112                 if (rc == ELDLM_LOCK_ABORTED) {
3113                         struct ldlm_reply *rep;
3114                         rep = req_capsule_server_get(&req->rq_pill,
3115                                                      &RMF_DLM_REP);
3116
3117                         LASSERT(rep != NULL);
3118                         if (rep->lock_policy_res1)
3119                                 rc = rep->lock_policy_res1;
3120                 }
3121         }
3122
3123         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3124                 *flags |= LDLM_FL_LVB_READY;
3125                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3126                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3127         }
3128
3129         /* Call the update callback. */
3130         rc = (*upcall)(cookie, rc);
3131         RETURN(rc);
3132 }
3133
3134 static int osc_enqueue_interpret(const struct lu_env *env,
3135                                  struct ptlrpc_request *req,
3136                                  struct osc_enqueue_args *aa, int rc)
3137 {
3138         struct ldlm_lock *lock;
3139         struct lustre_handle handle;
3140         __u32 mode;
3141
3142         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3143          * might be freed anytime after lock upcall has been called. */
3144         lustre_handle_copy(&handle, aa->oa_lockh);
3145         mode = aa->oa_ei->ei_mode;
3146
3147         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3148          * be valid. */
3149         lock = ldlm_handle2lock(&handle);
3150
3151         /* Take an additional reference so that a blocking AST that
3152          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3153          * to arrive after an upcall has been executed by
3154          * osc_enqueue_fini(). */
3155         ldlm_lock_addref(&handle, mode);
3156
3157         /* Complete obtaining the lock procedure. */
3158         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3159                                    mode, aa->oa_flags, aa->oa_lvb,
3160                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3161                                    &handle, rc);
3162         /* Complete osc stuff. */
3163         rc = osc_enqueue_fini(req, aa->oa_lvb,
3164                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3165
3166         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3167
3168         /* Release the lock for async request. */
3169         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3170                 /*
3171                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3172                  * not already released by
3173                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3174                  */
3175                 ldlm_lock_decref(&handle, mode);
3176
3177         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3178                  aa->oa_lockh, req, aa);
3179         ldlm_lock_decref(&handle, mode);
3180         LDLM_LOCK_PUT(lock);
3181         return rc;
3182 }
3183
3184 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3185                         struct lov_oinfo *loi, int flags,
3186                         struct ost_lvb *lvb, __u32 mode, int rc)
3187 {
3188         if (rc == ELDLM_OK) {
3189                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3190                 __u64 tmp;
3191
3192                 LASSERT(lock != NULL);
3193                 loi->loi_lvb = *lvb;
3194                 tmp = loi->loi_lvb.lvb_size;
3195                 /* Extend KMS up to the end of this lock and no further
3196                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3197                 if (tmp > lock->l_policy_data.l_extent.end)
3198                         tmp = lock->l_policy_data.l_extent.end + 1;
3199                 if (tmp >= loi->loi_kms) {
3200                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3201                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3202                         loi_kms_set(loi, tmp);
3203                 } else {
3204                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3205                                    LPU64"; leaving kms="LPU64", end="LPU64,
3206                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3207                                    lock->l_policy_data.l_extent.end);
3208                 }
3209                 ldlm_lock_allow_match(lock);
3210                 LDLM_LOCK_PUT(lock);
3211         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3212                 loi->loi_lvb = *lvb;
3213                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3214                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3215                 rc = ELDLM_OK;
3216         }
3217 }
3218 EXPORT_SYMBOL(osc_update_enqueue);
3219
3220 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3221
3222 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3223  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3224  * other synchronous requests, however keeping some locks and trying to obtain
3225  * others may take a considerable amount of time in a case of ost failure; and
3226  * when other sync requests do not get released lock from a client, the client
3227  * is excluded from the cluster -- such scenarious make the life difficult, so
3228  * release locks just after they are obtained. */
3229 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3230                      int *flags, ldlm_policy_data_t *policy,
3231                      struct ost_lvb *lvb, int kms_valid,
3232                      obd_enqueue_update_f upcall, void *cookie,
3233                      struct ldlm_enqueue_info *einfo,
3234                      struct lustre_handle *lockh,
3235                      struct ptlrpc_request_set *rqset, int async)
3236 {
3237         struct obd_device *obd = exp->exp_obd;
3238         struct ptlrpc_request *req = NULL;
3239         int intent = *flags & LDLM_FL_HAS_INTENT;
3240         ldlm_mode_t mode;
3241         int rc;
3242         ENTRY;
3243
3244         /* Filesystem lock extents are extended to page boundaries so that
3245          * dealing with the page cache is a little smoother.  */
3246         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3247         policy->l_extent.end |= ~CFS_PAGE_MASK;
3248
3249         /*
3250          * kms is not valid when either object is completely fresh (so that no
3251          * locks are cached), or object was evicted. In the latter case cached
3252          * lock cannot be used, because it would prime inode state with
3253          * potentially stale LVB.
3254          */
3255         if (!kms_valid)
3256                 goto no_match;
3257
3258         /* Next, search for already existing extent locks that will cover us */
3259         /* If we're trying to read, we also search for an existing PW lock.  The
3260          * VFS and page cache already protect us locally, so lots of readers/
3261          * writers can share a single PW lock.
3262          *
3263          * There are problems with conversion deadlocks, so instead of
3264          * converting a read lock to a write lock, we'll just enqueue a new
3265          * one.
3266          *
3267          * At some point we should cancel the read lock instead of making them
3268          * send us a blocking callback, but there are problems with canceling
3269          * locks out from other users right now, too. */
3270         mode = einfo->ei_mode;
3271         if (einfo->ei_mode == LCK_PR)
3272                 mode |= LCK_PW;
3273         mode = ldlm_lock_match(obd->obd_namespace,
3274                                *flags | LDLM_FL_LVB_READY, res_id,
3275                                einfo->ei_type, policy, mode, lockh, 0);
3276         if (mode) {
3277                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3278
3279                 if (matched->l_ast_data == NULL ||
3280                     matched->l_ast_data == einfo->ei_cbdata) {
3281                         /* addref the lock only if not async requests and PW
3282                          * lock is matched whereas we asked for PR. */
3283                         if (!rqset && einfo->ei_mode != mode)
3284                                 ldlm_lock_addref(lockh, LCK_PR);
3285                         osc_set_lock_data_with_check(matched, einfo, *flags);
3286                         if (intent) {
3287                                 /* I would like to be able to ASSERT here that
3288                                  * rss <= kms, but I can't, for reasons which
3289                                  * are explained in lov_enqueue() */
3290                         }
3291
3292                         /* We already have a lock, and it's referenced */
3293                         (*upcall)(cookie, ELDLM_OK);
3294
3295                         /* For async requests, decref the lock. */
3296                         if (einfo->ei_mode != mode)
3297                                 ldlm_lock_decref(lockh, LCK_PW);
3298                         else if (rqset)
3299                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3300                         LDLM_LOCK_PUT(matched);
3301                         RETURN(ELDLM_OK);
3302                 } else
3303                         ldlm_lock_decref(lockh, mode);
3304                 LDLM_LOCK_PUT(matched);
3305         }
3306
3307  no_match:
3308         if (intent) {
3309                 CFS_LIST_HEAD(cancels);
3310                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3311                                            &RQF_LDLM_ENQUEUE_LVB);
3312                 if (req == NULL)
3313                         RETURN(-ENOMEM);
3314
3315                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3316                 if (rc)
3317                         RETURN(rc);
3318
3319                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3320                                      sizeof *lvb);
3321                 ptlrpc_request_set_replen(req);
3322         }
3323
3324         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3325         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3326
3327         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3328                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3329         if (rqset) {
3330                 if (!rc) {
3331                         struct osc_enqueue_args *aa;
3332                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3333                         aa = ptlrpc_req_async_args(req);
3334                         aa->oa_ei = einfo;
3335                         aa->oa_exp = exp;
3336                         aa->oa_flags  = flags;
3337                         aa->oa_upcall = upcall;
3338                         aa->oa_cookie = cookie;
3339                         aa->oa_lvb    = lvb;
3340                         aa->oa_lockh  = lockh;
3341
3342                         req->rq_interpret_reply =
3343                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3344                         if (rqset == PTLRPCD_SET)
3345                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3346                         else
3347                                 ptlrpc_set_add_req(rqset, req);
3348                 } else if (intent) {
3349                         ptlrpc_req_finished(req);
3350                 }
3351                 RETURN(rc);
3352         }
3353
3354         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3355         if (intent)
3356                 ptlrpc_req_finished(req);
3357
3358         RETURN(rc);
3359 }
3360
3361 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3362                        struct ldlm_enqueue_info *einfo,
3363                        struct ptlrpc_request_set *rqset)
3364 {
3365         struct ldlm_res_id res_id;
3366         int rc;
3367         ENTRY;
3368
3369         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3370                            oinfo->oi_md->lsm_object_gr, &res_id);
3371
3372         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3373                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3374                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3375                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3376                               rqset, rqset != NULL);
3377         RETURN(rc);
3378 }
3379
3380 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3381                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3382                    int *flags, void *data, struct lustre_handle *lockh,
3383                    int unref)
3384 {
3385         struct obd_device *obd = exp->exp_obd;
3386         int lflags = *flags;
3387         ldlm_mode_t rc;
3388         ENTRY;
3389
3390         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3391                 RETURN(-EIO);
3392
3393         /* Filesystem lock extents are extended to page boundaries so that
3394          * dealing with the page cache is a little smoother */
3395         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3396         policy->l_extent.end |= ~CFS_PAGE_MASK;
3397
3398         /* Next, search for already existing extent locks that will cover us */
3399         /* If we're trying to read, we also search for an existing PW lock.  The
3400          * VFS and page cache already protect us locally, so lots of readers/
3401          * writers can share a single PW lock. */
3402         rc = mode;
3403         if (mode == LCK_PR)
3404                 rc |= LCK_PW;
3405         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3406                              res_id, type, policy, rc, lockh, unref);
3407         if (rc) {
3408                 if (data != NULL)
3409                         osc_set_data_with_check(lockh, data, lflags);
3410                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3411                         ldlm_lock_addref(lockh, LCK_PR);
3412                         ldlm_lock_decref(lockh, LCK_PW);
3413                 }
3414                 RETURN(rc);
3415         }
3416         RETURN(rc);
3417 }
3418
3419 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3420 {
3421         ENTRY;
3422
3423         if (unlikely(mode == LCK_GROUP))
3424                 ldlm_lock_decref_and_cancel(lockh, mode);
3425         else
3426                 ldlm_lock_decref(lockh, mode);
3427
3428         RETURN(0);
3429 }
3430
3431 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3432                       __u32 mode, struct lustre_handle *lockh)
3433 {
3434         ENTRY;
3435         RETURN(osc_cancel_base(lockh, mode));
3436 }
3437
3438 static int osc_cancel_unused(struct obd_export *exp,
3439                              struct lov_stripe_md *lsm, int flags,
3440                              void *opaque)
3441 {
3442         struct obd_device *obd = class_exp2obd(exp);
3443         struct ldlm_res_id res_id, *resp = NULL;
3444
3445         if (lsm != NULL) {
3446                 resp = osc_build_res_name(lsm->lsm_object_id,
3447                                           lsm->lsm_object_gr, &res_id);
3448         }
3449
3450         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3451 }
3452
3453 static int osc_statfs_interpret(const struct lu_env *env,
3454                                 struct ptlrpc_request *req,
3455                                 struct osc_async_args *aa, int rc)
3456 {
3457         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3458         struct obd_statfs *msfs;
3459         __u64 used;
3460         ENTRY;
3461
3462         if (rc == -EBADR)
3463                 /* The request has in fact never been sent
3464                  * due to issues at a higher level (LOV).
3465                  * Exit immediately since the caller is
3466                  * aware of the problem and takes care
3467                  * of the clean up */
3468                  RETURN(rc);
3469
3470         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3471             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3472                 GOTO(out, rc = 0);
3473
3474         if (rc != 0)
3475                 GOTO(out, rc);
3476
3477         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3478         if (msfs == NULL) {
3479                 GOTO(out, rc = -EPROTO);
3480         }
3481
3482         /* Reinitialize the RDONLY and DEGRADED flags at the client
3483          * on each statfs, so they don't stay set permanently. */
3484         spin_lock(&cli->cl_oscc.oscc_lock);
3485
3486         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3487                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3488         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3489                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3490
3491         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3492                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3493         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3494                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3495
3496         /* Add a bit of hysteresis so this flag isn't continually flapping,
3497          * and ensure that new files don't get extremely fragmented due to
3498          * only a small amount of available space in the filesystem.
3499          * We want to set the NOSPC flag when there is less than ~0.1% free
3500          * and clear it when there is at least ~0.2% free space, so:
3501          *                   avail < ~0.1% max          max = avail + used
3502          *            1025 * avail < avail + used       used = blocks - free
3503          *            1024 * avail < used
3504          *            1024 * avail < blocks - free                      
3505          *                   avail < ((blocks - free) >> 10)    
3506          *
3507          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3508          * lose that amount of space so in those cases we report no space left
3509          * if their is less than 1 GB left.                             */
3510         used = min((msfs->os_blocks - msfs->os_bfree) >> 10, 1ULL << 30);
3511         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3512                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3513                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3514         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3515                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3516                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3517
3518         spin_unlock(&cli->cl_oscc.oscc_lock);
3519
3520         *aa->aa_oi->oi_osfs = *msfs;
3521 out:
3522         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3523         RETURN(rc);
3524 }
3525
3526 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3527                             __u64 max_age, struct ptlrpc_request_set *rqset)
3528 {
3529         struct ptlrpc_request *req;
3530         struct osc_async_args *aa;
3531         int                    rc;
3532         ENTRY;
3533
3534         /* We could possibly pass max_age in the request (as an absolute
3535          * timestamp or a "seconds.usec ago") so the target can avoid doing
3536          * extra calls into the filesystem if that isn't necessary (e.g.
3537          * during mount that would help a bit).  Having relative timestamps
3538          * is not so great if request processing is slow, while absolute
3539          * timestamps are not ideal because they need time synchronization. */
3540         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3541         if (req == NULL)
3542                 RETURN(-ENOMEM);
3543
3544         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3545         if (rc) {
3546                 ptlrpc_request_free(req);
3547                 RETURN(rc);
3548         }
3549         ptlrpc_request_set_replen(req);
3550         req->rq_request_portal = OST_CREATE_PORTAL;
3551         ptlrpc_at_set_req_timeout(req);
3552
3553         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3554                 /* procfs requests not want stat in wait for avoid deadlock */
3555                 req->rq_no_resend = 1;
3556                 req->rq_no_delay = 1;
3557         }
3558
3559         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3560         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3561         aa = ptlrpc_req_async_args(req);
3562         aa->aa_oi = oinfo;
3563
3564         ptlrpc_set_add_req(rqset, req);
3565         RETURN(0);
3566 }
3567
3568 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3569                       __u64 max_age, __u32 flags)
3570 {
3571         struct obd_statfs     *msfs;
3572         struct ptlrpc_request *req;
3573         struct obd_import     *imp = NULL;
3574         int rc;
3575         ENTRY;
3576
3577         /*Since the request might also come from lprocfs, so we need
3578          *sync this with client_disconnect_export Bug15684*/
3579         down_read(&obd->u.cli.cl_sem);
3580         if (obd->u.cli.cl_import)
3581                 imp = class_import_get(obd->u.cli.cl_import);
3582         up_read(&obd->u.cli.cl_sem);
3583         if (!imp)
3584                 RETURN(-ENODEV);
3585
3586         /* We could possibly pass max_age in the request (as an absolute
3587          * timestamp or a "seconds.usec ago") so the target can avoid doing
3588          * extra calls into the filesystem if that isn't necessary (e.g.
3589          * during mount that would help a bit).  Having relative timestamps
3590          * is not so great if request processing is slow, while absolute
3591          * timestamps are not ideal because they need time synchronization. */
3592         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3593
3594         class_import_put(imp);
3595
3596         if (req == NULL)
3597                 RETURN(-ENOMEM);
3598
3599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3600         if (rc) {
3601                 ptlrpc_request_free(req);
3602                 RETURN(rc);
3603         }
3604         ptlrpc_request_set_replen(req);
3605         req->rq_request_portal = OST_CREATE_PORTAL;
3606         ptlrpc_at_set_req_timeout(req);
3607
3608         if (flags & OBD_STATFS_NODELAY) {
3609                 /* procfs requests not want stat in wait for avoid deadlock */
3610                 req->rq_no_resend = 1;
3611                 req->rq_no_delay = 1;
3612         }
3613
3614         rc = ptlrpc_queue_wait(req);
3615         if (rc)
3616                 GOTO(out, rc);
3617
3618         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3619         if (msfs == NULL) {
3620                 GOTO(out, rc = -EPROTO);
3621         }
3622
3623         *osfs = *msfs;
3624
3625         EXIT;
3626  out:
3627         ptlrpc_req_finished(req);
3628         return rc;
3629 }
3630
3631 /* Retrieve object striping information.
3632  *
3633  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3634  * the maximum number of OST indices which will fit in the user buffer.
3635  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3636  */
3637 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3638 {
3639         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3640         struct lov_user_md_v3 lum, *lumk;
3641         struct lov_user_ost_data_v1 *lmm_objects;
3642         int rc = 0, lum_size;
3643         ENTRY;
3644
3645         if (!lsm)
3646                 RETURN(-ENODATA);
3647
3648         /* we only need the header part from user space to get lmm_magic and
3649          * lmm_stripe_count, (the header part is common to v1 and v3) */
3650         lum_size = sizeof(struct lov_user_md_v1);
3651         if (copy_from_user(&lum, lump, lum_size))
3652                 RETURN(-EFAULT);
3653
3654         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3655             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3656                 RETURN(-EINVAL);
3657
3658         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3659         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3660         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3661         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3662
3663         /* we can use lov_mds_md_size() to compute lum_size
3664          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3665         if (lum.lmm_stripe_count > 0) {
3666                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3667                 OBD_ALLOC(lumk, lum_size);
3668                 if (!lumk)
3669                         RETURN(-ENOMEM);
3670
3671                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3672                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3673                 else
3674                         lmm_objects = &(lumk->lmm_objects[0]);
3675                 lmm_objects->l_object_id = lsm->lsm_object_id;
3676         } else {
3677                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3678                 lumk = &lum;
3679         }
3680
3681         lumk->lmm_object_id = lsm->lsm_object_id;
3682         lumk->lmm_object_gr = lsm->lsm_object_gr;
3683         lumk->lmm_stripe_count = 1;
3684
3685         if (copy_to_user(lump, lumk, lum_size))
3686                 rc = -EFAULT;
3687
3688         if (lumk != &lum)
3689                 OBD_FREE(lumk, lum_size);
3690
3691         RETURN(rc);
3692 }
3693
3694
3695 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3696                          void *karg, void *uarg)
3697 {
3698         struct obd_device *obd = exp->exp_obd;
3699         struct obd_ioctl_data *data = karg;
3700         int err = 0;
3701         ENTRY;
3702
3703         if (!try_module_get(THIS_MODULE)) {
3704                 CERROR("Can't get module. Is it alive?");
3705                 return -EINVAL;
3706         }
3707         switch (cmd) {
3708         case OBD_IOC_LOV_GET_CONFIG: {
3709                 char *buf;
3710                 struct lov_desc *desc;
3711                 struct obd_uuid uuid;
3712
3713                 buf = NULL;
3714                 len = 0;
3715                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3716                         GOTO(out, err = -EINVAL);
3717
3718                 data = (struct obd_ioctl_data *)buf;
3719
3720                 if (sizeof(*desc) > data->ioc_inllen1) {
3721                         obd_ioctl_freedata(buf, len);
3722                         GOTO(out, err = -EINVAL);
3723                 }
3724
3725                 if (data->ioc_inllen2 < sizeof(uuid)) {
3726                         obd_ioctl_freedata(buf, len);
3727                         GOTO(out, err = -EINVAL);
3728                 }
3729
3730                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3731                 desc->ld_tgt_count = 1;
3732                 desc->ld_active_tgt_count = 1;
3733                 desc->ld_default_stripe_count = 1;
3734                 desc->ld_default_stripe_size = 0;
3735                 desc->ld_default_stripe_offset = 0;
3736                 desc->ld_pattern = 0;
3737                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3738
3739                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3740
3741                 err = copy_to_user((void *)uarg, buf, len);
3742                 if (err)
3743                         err = -EFAULT;
3744                 obd_ioctl_freedata(buf, len);
3745                 GOTO(out, err);
3746         }
3747         case LL_IOC_LOV_SETSTRIPE:
3748                 err = obd_alloc_memmd(exp, karg);
3749                 if (err > 0)
3750                         err = 0;
3751                 GOTO(out, err);
3752         case LL_IOC_LOV_GETSTRIPE:
3753                 err = osc_getstripe(karg, uarg);
3754                 GOTO(out, err);
3755         case OBD_IOC_CLIENT_RECOVER:
3756                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3757                                             data->ioc_inlbuf1);
3758                 if (err > 0)
3759                         err = 0;
3760                 GOTO(out, err);
3761         case IOC_OSC_SET_ACTIVE:
3762                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3763                                                data->ioc_offset);
3764                 GOTO(out, err);
3765         case OBD_IOC_POLL_QUOTACHECK:
3766                 err = lquota_poll_check(quota_interface, exp,
3767                                         (struct if_quotacheck *)karg);
3768                 GOTO(out, err);
3769         case OBD_IOC_PING_TARGET:
3770                 err = ptlrpc_obd_ping(obd);
3771                 GOTO(out, err);
3772         default:
3773                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3774                        cmd, cfs_curproc_comm());
3775                 GOTO(out, err = -ENOTTY);
3776         }
3777 out:
3778         module_put(THIS_MODULE);
3779         return err;
3780 }
3781
3782 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3783                         void *key, __u32 *vallen, void *val,
3784                         struct lov_stripe_md *lsm)
3785 {
3786         ENTRY;
3787         if (!vallen || !val)
3788                 RETURN(-EFAULT);
3789
3790         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3791                 __u32 *stripe = val;
3792                 *vallen = sizeof(*stripe);
3793                 *stripe = 0;
3794                 RETURN(0);
3795         } else if (KEY_IS(KEY_LAST_ID)) {
3796                 struct ptlrpc_request *req;
3797                 obd_id                *reply;
3798                 char                  *tmp;
3799                 int                    rc;
3800
3801                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3802                                            &RQF_OST_GET_INFO_LAST_ID);
3803                 if (req == NULL)
3804                         RETURN(-ENOMEM);
3805
3806                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3807                                      RCL_CLIENT, keylen);
3808                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3809                 if (rc) {
3810                         ptlrpc_request_free(req);
3811                         RETURN(rc);
3812                 }
3813
3814                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3815                 memcpy(tmp, key, keylen);
3816
3817                 req->rq_no_delay = req->rq_no_resend = 1;
3818                 ptlrpc_request_set_replen(req);
3819                 rc = ptlrpc_queue_wait(req);
3820                 if (rc)
3821                         GOTO(out, rc);
3822
3823                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3824                 if (reply == NULL)
3825                         GOTO(out, rc = -EPROTO);
3826
3827                 *((obd_id *)val) = *reply;
3828         out:
3829                 ptlrpc_req_finished(req);
3830                 RETURN(rc);
3831         } else if (KEY_IS(KEY_FIEMAP)) {
3832                 struct ptlrpc_request *req;
3833                 struct ll_user_fiemap *reply;
3834                 char *tmp;
3835                 int rc;
3836
3837                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3838                                            &RQF_OST_GET_INFO_FIEMAP);
3839                 if (req == NULL)
3840                         RETURN(-ENOMEM);
3841
3842                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3843                                      RCL_CLIENT, keylen);
3844                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3845                                      RCL_CLIENT, *vallen);
3846                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3847                                      RCL_SERVER, *vallen);
3848
3849                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3850                 if (rc) {
3851                         ptlrpc_request_free(req);
3852                         RETURN(rc);
3853                 }
3854
3855                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3856                 memcpy(tmp, key, keylen);
3857                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3858                 memcpy(tmp, val, *vallen);
3859
3860                 ptlrpc_request_set_replen(req);
3861                 rc = ptlrpc_queue_wait(req);
3862                 if (rc)
3863                         GOTO(out1, rc);
3864
3865                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3866                 if (reply == NULL)
3867                         GOTO(out1, rc = -EPROTO);
3868
3869                 memcpy(val, reply, *vallen);
3870         out1:
3871                 ptlrpc_req_finished(req);
3872
3873                 RETURN(rc);
3874         }
3875
3876         RETURN(-EINVAL);
3877 }
3878
3879 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3880 {
3881         struct llog_ctxt *ctxt;
3882         int rc = 0;
3883         ENTRY;
3884
3885         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3886         if (ctxt) {
3887                 rc = llog_initiator_connect(ctxt);
3888                 llog_ctxt_put(ctxt);
3889         } else {
3890                 /* XXX return an error? skip setting below flags? */
3891         }
3892
3893         spin_lock(&imp->imp_lock);
3894         imp->imp_server_timeout = 1;
3895         imp->imp_pingable = 1;
3896         spin_unlock(&imp->imp_lock);
3897         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3898
3899         RETURN(rc);
3900 }
3901
3902 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3903                                           struct ptlrpc_request *req,
3904                                           void *aa, int rc)
3905 {
3906         ENTRY;
3907         if (rc != 0)
3908                 RETURN(rc);
3909
3910         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3911 }
3912
3913 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3914                               void *key, obd_count vallen, void *val,
3915                               struct ptlrpc_request_set *set)
3916 {
3917         struct ptlrpc_request *req;
3918         struct obd_device     *obd = exp->exp_obd;
3919         struct obd_import     *imp = class_exp2cliimp(exp);
3920         char                  *tmp;
3921         int                    rc;
3922         ENTRY;
3923
3924         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3925
3926         if (KEY_IS(KEY_NEXT_ID)) {
3927                 obd_id new_val;
3928                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3929
3930                 if (vallen != sizeof(obd_id))
3931                         RETURN(-ERANGE);
3932                 if (val == NULL)
3933                         RETURN(-EINVAL);
3934
3935                 if (vallen != sizeof(obd_id))
3936                         RETURN(-EINVAL);
3937
3938                 /* avoid race between allocate new object and set next id
3939                  * from ll_sync thread */
3940                 spin_lock(&oscc->oscc_lock);
3941                 new_val = *((obd_id*)val) + 1;
3942                 if (new_val > oscc->oscc_next_id)
3943                         oscc->oscc_next_id = new_val;
3944                 spin_unlock(&oscc->oscc_lock);                        
3945                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3946                        exp->exp_obd->obd_name,
3947                        obd->u.cli.cl_oscc.oscc_next_id);
3948
3949                 RETURN(0);
3950         }
3951
3952         if (KEY_IS(KEY_INIT_RECOV)) {
3953                 if (vallen != sizeof(int))
3954                         RETURN(-EINVAL);
3955                 spin_lock(&imp->imp_lock);
3956                 imp->imp_initial_recov = *(int *)val;
3957                 spin_unlock(&imp->imp_lock);
3958                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3959                        exp->exp_obd->obd_name,
3960                        imp->imp_initial_recov);
3961                 RETURN(0);
3962         }
3963
3964         if (KEY_IS(KEY_CHECKSUM)) {
3965                 if (vallen != sizeof(int))
3966                         RETURN(-EINVAL);
3967                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3968                 RETURN(0);
3969         }
3970
3971         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3972                 sptlrpc_conf_client_adapt(obd);
3973                 RETURN(0);
3974         }
3975
3976         if (KEY_IS(KEY_FLUSH_CTX)) {
3977                 sptlrpc_import_flush_my_ctx(imp);
3978                 RETURN(0);
3979         }
3980
3981         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3982                 RETURN(-EINVAL);
3983
3984         /* We pass all other commands directly to OST. Since nobody calls osc
3985            methods directly and everybody is supposed to go through LOV, we
3986            assume lov checked invalid values for us.
3987            The only recognised values so far are evict_by_nid and mds_conn.
3988            Even if something bad goes through, we'd get a -EINVAL from OST
3989            anyway. */
3990
3991         if (KEY_IS(KEY_GRANT_SHRINK))
3992                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3993         else
3994                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3995
3996         if (req == NULL)
3997                 RETURN(-ENOMEM);
3998
3999         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4000                              RCL_CLIENT, keylen);
4001         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4002                              RCL_CLIENT, vallen);
4003         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4004         if (rc) {
4005                 ptlrpc_request_free(req);
4006                 RETURN(rc);
4007         }
4008
4009         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4010         memcpy(tmp, key, keylen);
4011         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4012         memcpy(tmp, val, vallen);
4013
4014         if (KEY_IS(KEY_MDS_CONN)) {
4015                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4016
4017                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4018                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4019                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4020                 req->rq_no_delay = req->rq_no_resend = 1;
4021                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4022         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4023                 struct osc_grant_args *aa;
4024                 struct obdo *oa;
4025
4026                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4027                 aa = ptlrpc_req_async_args(req);
4028                 OBD_ALLOC_PTR(oa);
4029                 if (!oa) {
4030                         ptlrpc_req_finished(req);
4031                         RETURN(-ENOMEM);
4032                 }
4033                 *oa = ((struct ost_body *)val)->oa;
4034                 aa->aa_oa = oa;
4035                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4036         }
4037
4038         ptlrpc_request_set_replen(req);
4039         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4040                 LASSERT(set != NULL);
4041                 ptlrpc_set_add_req(set, req);
4042                 ptlrpc_check_set(NULL, set);
4043         } else
4044                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4045
4046         RETURN(0);
4047 }
4048
4049
4050 static struct llog_operations osc_size_repl_logops = {
4051         lop_cancel: llog_obd_repl_cancel
4052 };
4053
4054 static struct llog_operations osc_mds_ost_orig_logops;
4055
4056 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4057                            struct obd_device *tgt, struct llog_catid *catid)
4058 {
4059         int rc;
4060         ENTRY;
4061
4062         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4063                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4064         if (rc) {
4065                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4066                 GOTO(out, rc);
4067         }
4068
4069         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4070                         NULL, &osc_size_repl_logops);
4071         if (rc) {
4072                 struct llog_ctxt *ctxt =
4073                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4074                 if (ctxt)
4075                         llog_cleanup(ctxt);
4076                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4077         }
4078         GOTO(out, rc);
4079 out:
4080         if (rc) {
4081                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4082                        obd->obd_name, tgt->obd_name, catid, rc);
4083                 CERROR("logid "LPX64":0x%x\n",
4084                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4085         }
4086         return rc;
4087 }
4088
4089 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4090                          struct obd_device *disk_obd, int *index)
4091 {
4092         struct llog_catid catid;
4093         static char name[32] = CATLIST;
4094         int rc;
4095         ENTRY;
4096
4097         LASSERT(olg == &obd->obd_olg);
4098
4099         mutex_down(&olg->olg_cat_processing);
4100         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4101         if (rc) {
4102                 CERROR("rc: %d\n", rc);
4103                 GOTO(out, rc);
4104         }
4105
4106         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4107                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4108                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4109
4110         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4111         if (rc) {
4112                 CERROR("rc: %d\n", rc);
4113                 GOTO(out, rc);
4114         }
4115
4116         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4117         if (rc) {
4118                 CERROR("rc: %d\n", rc);
4119                 GOTO(out, rc);
4120         }
4121
4122  out:
4123         mutex_up(&olg->olg_cat_processing);
4124
4125         return rc;
4126 }
4127
4128 static int osc_llog_finish(struct obd_device *obd, int count)
4129 {
4130         struct llog_ctxt *ctxt;
4131         int rc = 0, rc2 = 0;
4132         ENTRY;
4133
4134         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4135         if (ctxt)
4136                 rc = llog_cleanup(ctxt);
4137
4138         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4139         if (ctxt)
4140                 rc2 = llog_cleanup(ctxt);
4141         if (!rc)
4142                 rc = rc2;
4143
4144         RETURN(rc);
4145 }
4146
4147 static int osc_reconnect(const struct lu_env *env,
4148                          struct obd_export *exp, struct obd_device *obd,
4149                          struct obd_uuid *cluuid,
4150                          struct obd_connect_data *data,
4151                          void *localdata)
4152 {
4153         struct client_obd *cli = &obd->u.cli;
4154
4155         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4156                 long lost_grant;
4157
4158                 client_obd_list_lock(&cli->cl_loi_list_lock);
4159                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4160                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4161                 lost_grant = cli->cl_lost_grant;
4162                 cli->cl_lost_grant = 0;
4163                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4164
4165                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4166                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4167                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4168                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4169                        " ocd_grant: %d\n", data->ocd_connect_flags,
4170                        data->ocd_version, data->ocd_grant);
4171         }
4172
4173         RETURN(0);
4174 }
4175
4176 static int osc_disconnect(struct obd_export *exp)
4177 {
4178         struct obd_device *obd = class_exp2obd(exp);
4179         struct llog_ctxt  *ctxt;
4180         int rc;
4181
4182         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4183         if (ctxt) {
4184                 if (obd->u.cli.cl_conn_count == 1) {
4185                         /* Flush any remaining cancel messages out to the
4186                          * target */
4187                         llog_sync(ctxt, exp);
4188                 }
4189                 llog_ctxt_put(ctxt);
4190         } else {
4191                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4192                        obd);
4193         }
4194
4195         rc = client_disconnect_export(exp);
4196         /**
4197          * Initially we put del_shrink_grant before disconnect_export, but it
4198          * causes the following problem if setup (connect) and cleanup
4199          * (disconnect) are tangled together.
4200          *      connect p1                     disconnect p2
4201          *   ptlrpc_connect_import
4202          *     ...............               class_manual_cleanup
4203          *                                     osc_disconnect
4204          *                                     del_shrink_grant
4205          *   ptlrpc_connect_interrupt
4206          *     init_grant_shrink
4207          *   add this client to shrink list
4208          *                                      cleanup_osc
4209          * Bang! pinger trigger the shrink.
4210          * So the osc should be disconnected from the shrink list, after we
4211          * are sure the import has been destroyed. BUG18662
4212          */
4213         if (obd->u.cli.cl_import == NULL)
4214                 osc_del_shrink_grant(&obd->u.cli);
4215         return rc;
4216 }
4217
4218 static int osc_import_event(struct obd_device *obd,
4219                             struct obd_import *imp,
4220                             enum obd_import_event event)
4221 {
4222         struct client_obd *cli;
4223         int rc = 0;
4224
4225         ENTRY;
4226         LASSERT(imp->imp_obd == obd);
4227
4228         switch (event) {
4229         case IMP_EVENT_DISCON: {
4230                 /* Only do this on the MDS OSC's */
4231                 if (imp->imp_server_timeout) {
4232                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4233
4234                         spin_lock(&oscc->oscc_lock);
4235                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4236                         spin_unlock(&oscc->oscc_lock);
4237                 }
4238                 cli = &obd->u.cli;
4239                 client_obd_list_lock(&cli->cl_loi_list_lock);
4240                 cli->cl_avail_grant = 0;
4241                 cli->cl_lost_grant = 0;
4242                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4243                 break;
4244         }
4245         case IMP_EVENT_INACTIVE: {
4246                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4247                 break;
4248         }
4249         case IMP_EVENT_INVALIDATE: {
4250                 struct ldlm_namespace *ns = obd->obd_namespace;
4251                 struct lu_env         *env;
4252                 int                    refcheck;
4253
4254                 env = cl_env_get(&refcheck);
4255                 if (!IS_ERR(env)) {
4256                         /* Reset grants */
4257                         cli = &obd->u.cli;
4258                         client_obd_list_lock(&cli->cl_loi_list_lock);
4259                         /* all pages go to failing rpcs due to the invalid
4260                          * import */
4261                         osc_check_rpcs(env, cli);
4262                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4263
4264                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4265                         cl_env_put(env, &refcheck);
4266                 } else
4267                         rc = PTR_ERR(env);
4268                 break;
4269         }
4270         case IMP_EVENT_ACTIVE: {
4271                 /* Only do this on the MDS OSC's */
4272                 if (imp->imp_server_timeout) {
4273                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4274
4275                         spin_lock(&oscc->oscc_lock);
4276                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4277                         spin_unlock(&oscc->oscc_lock);
4278                 }
4279                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4280                 break;
4281         }
4282         case IMP_EVENT_OCD: {
4283                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4284
4285                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4286                         osc_init_grant(&obd->u.cli, ocd);
4287
4288                 /* See bug 7198 */
4289                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4290                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4291
4292                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4293                 break;
4294         }
4295         default:
4296                 CERROR("Unknown import event %d\n", event);
4297                 LBUG();
4298         }
4299         RETURN(rc);
4300 }
4301
4302 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4303 {
4304         int rc;
4305         ENTRY;
4306
4307         ENTRY;
4308         rc = ptlrpcd_addref();
4309         if (rc)
4310                 RETURN(rc);
4311
4312         rc = client_obd_setup(obd, lcfg);
4313         if (rc) {
4314                 ptlrpcd_decref();
4315         } else {
4316                 struct lprocfs_static_vars lvars = { 0 };
4317                 struct client_obd *cli = &obd->u.cli;
4318
4319                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4320                 lprocfs_osc_init_vars(&lvars);
4321                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4322                         lproc_osc_attach_seqstat(obd);
4323                         sptlrpc_lprocfs_cliobd_attach(obd);
4324                         ptlrpc_lprocfs_register_obd(obd);
4325                 }
4326
4327                 oscc_init(obd);
4328                 /* We need to allocate a few requests more, because
4329                    brw_interpret tries to create new requests before freeing
4330                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4331                    reserved, but I afraid that might be too much wasted RAM
4332                    in fact, so 2 is just my guess and still should work. */
4333                 cli->cl_import->imp_rq_pool =
4334                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4335                                             OST_MAXREQSIZE,
4336                                             ptlrpc_add_rqs_to_pool);
4337
4338                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4339                 sema_init(&cli->cl_grant_sem, 1);
4340         }
4341
4342         RETURN(rc);
4343 }
4344
4345 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4346 {
4347         int rc = 0;
4348         ENTRY;
4349
4350         switch (stage) {
4351         case OBD_CLEANUP_EARLY: {
4352                 struct obd_import *imp;
4353                 imp = obd->u.cli.cl_import;
4354                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4355                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4356                 ptlrpc_deactivate_import(imp);
4357                 spin_lock(&imp->imp_lock);
4358                 imp->imp_pingable = 0;
4359                 spin_unlock(&imp->imp_lock);
4360                 break;
4361         }
4362         case OBD_CLEANUP_EXPORTS: {
4363                 /* If we set up but never connected, the
4364                    client import will not have been cleaned. */
4365                 if (obd->u.cli.cl_import) {
4366                         struct obd_import *imp;
4367                         down_write(&obd->u.cli.cl_sem);
4368                         imp = obd->u.cli.cl_import;
4369                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4370                                obd->obd_name);
4371                         ptlrpc_invalidate_import(imp);
4372                         if (imp->imp_rq_pool) {
4373                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4374                                 imp->imp_rq_pool = NULL;
4375                         }
4376                         class_destroy_import(imp);
4377                         up_write(&obd->u.cli.cl_sem);
4378                         obd->u.cli.cl_import = NULL;
4379                 }
4380                 rc = obd_llog_finish(obd, 0);
4381                 if (rc != 0)
4382                         CERROR("failed to cleanup llogging subsystems\n");
4383                 break;
4384                 }
4385         }
4386         RETURN(rc);
4387 }
4388
4389 int osc_cleanup(struct obd_device *obd)
4390 {
4391         int rc;
4392
4393         ENTRY;
4394         ptlrpc_lprocfs_unregister_obd(obd);
4395         lprocfs_obd_cleanup(obd);
4396
4397         /* free memory of osc quota cache */
4398         lquota_cleanup(quota_interface, obd);
4399
4400         rc = client_obd_cleanup(obd);
4401
4402         ptlrpcd_decref();
4403         RETURN(rc);
4404 }
4405
4406 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4407 {
4408         struct lprocfs_static_vars lvars = { 0 };
4409         int rc = 0;
4410
4411         lprocfs_osc_init_vars(&lvars);
4412
4413         switch (lcfg->lcfg_command) {
4414         default:
4415                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4416                                               lcfg, obd);
4417                 if (rc > 0)
4418                         rc = 0;
4419                 break;
4420         }
4421
4422         return(rc);
4423 }
4424
4425 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4426 {
4427         return osc_process_config_base(obd, buf);
4428 }
4429
4430 struct obd_ops osc_obd_ops = {
4431         .o_owner                = THIS_MODULE,
4432         .o_setup                = osc_setup,
4433         .o_precleanup           = osc_precleanup,
4434         .o_cleanup              = osc_cleanup,
4435         .o_add_conn             = client_import_add_conn,
4436         .o_del_conn             = client_import_del_conn,
4437         .o_connect              = client_connect_import,
4438         .o_reconnect            = osc_reconnect,
4439         .o_disconnect           = osc_disconnect,
4440         .o_statfs               = osc_statfs,
4441         .o_statfs_async         = osc_statfs_async,
4442         .o_packmd               = osc_packmd,
4443         .o_unpackmd             = osc_unpackmd,
4444         .o_precreate            = osc_precreate,
4445         .o_create               = osc_create,
4446         .o_create_async         = osc_create_async,
4447         .o_destroy              = osc_destroy,
4448         .o_getattr              = osc_getattr,
4449         .o_getattr_async        = osc_getattr_async,
4450         .o_setattr              = osc_setattr,
4451         .o_setattr_async        = osc_setattr_async,
4452         .o_brw                  = osc_brw,
4453         .o_punch                = osc_punch,
4454         .o_sync                 = osc_sync,
4455         .o_enqueue              = osc_enqueue,
4456         .o_change_cbdata        = osc_change_cbdata,
4457         .o_cancel               = osc_cancel,
4458         .o_cancel_unused        = osc_cancel_unused,
4459         .o_iocontrol            = osc_iocontrol,
4460         .o_get_info             = osc_get_info,
4461         .o_set_info_async       = osc_set_info_async,
4462         .o_import_event         = osc_import_event,
4463         .o_llog_init            = osc_llog_init,
4464         .o_llog_finish          = osc_llog_finish,
4465         .o_process_config       = osc_process_config,
4466 };
4467
4468 extern struct lu_kmem_descr  osc_caches[];
4469 extern spinlock_t            osc_ast_guard;
4470 extern struct lock_class_key osc_ast_guard_class;
4471
4472 int __init osc_init(void)
4473 {
4474         struct lprocfs_static_vars lvars = { 0 };
4475         int rc;
4476         ENTRY;
4477
4478         /* print an address of _any_ initialized kernel symbol from this
4479          * module, to allow debugging with gdb that doesn't support data
4480          * symbols from modules.*/
4481         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4482
4483         rc = lu_kmem_init(osc_caches);
4484
4485         lprocfs_osc_init_vars(&lvars);
4486
4487         request_module("lquota");
4488         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4489         lquota_init(quota_interface);
4490         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4491
4492         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4493                                  LUSTRE_OSC_NAME, &osc_device_type);
4494         if (rc) {
4495                 if (quota_interface)
4496                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4497                 lu_kmem_fini(osc_caches);
4498                 RETURN(rc);
4499         }
4500
4501         spin_lock_init(&osc_ast_guard);
4502         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4503
4504         osc_mds_ost_orig_logops = llog_lvfs_ops;
4505         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4506         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4507         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4508         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4509
4510         RETURN(rc);
4511 }
4512
4513 #ifdef __KERNEL__
4514 static void /*__exit*/ osc_exit(void)
4515 {
4516         lu_device_type_fini(&osc_device_type);
4517
4518         lquota_exit(quota_interface);
4519         if (quota_interface)
4520                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4521
4522         class_unregister_type(LUSTRE_OSC_NAME);
4523         lu_kmem_fini(osc_caches);
4524 }
4525
4526 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4527 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4528 MODULE_LICENSE("GPL");
4529
4530 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4531 #endif