Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(const struct lu_env *env,
349                                  struct ptlrpc_request *req,
350                                  struct osc_async_args *aa, int rc)
351 {
352         struct ost_body *body;
353         ENTRY;
354
355         if (rc != 0)
356                 GOTO(out, rc);
357
358         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
359         if (body == NULL)
360                 GOTO(out, rc = -EPROTO);
361
362         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
363 out:
364         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
365         RETURN(rc);
366 }
367
368 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
369                              struct obd_trans_info *oti,
370                              struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request *req;
373         struct osc_async_args *aa;
374         int                    rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
404                 aa = ptlrpc_req_async_args(req);
405                 aa->aa_oi = oinfo;
406
407                 ptlrpc_set_add_req(rqset, req);
408         }
409
410         RETURN(0);
411 }
412
413 int osc_real_create(struct obd_export *exp, struct obdo *oa,
414                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 {
416         struct ptlrpc_request *req;
417         struct ost_body       *body;
418         struct lov_stripe_md  *lsm;
419         int                    rc;
420         ENTRY;
421
422         LASSERT(oa);
423         LASSERT(ea);
424
425         lsm = *ea;
426         if (!lsm) {
427                 rc = obd_alloc_memmd(exp, &lsm);
428                 if (rc < 0)
429                         RETURN(rc);
430         }
431
432         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433         if (req == NULL)
434                 GOTO(out, rc = -ENOMEM);
435
436         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 GOTO(out, rc);
440         }
441
442         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443         LASSERT(body);
444         lustre_set_wire_obdo(&body->oa, oa);
445
446         ptlrpc_request_set_replen(req);
447
448         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
449             oa->o_flags == OBD_FL_DELORPHAN) {
450                 DEBUG_REQ(D_HA, req,
451                           "delorphan from OST integration");
452                 /* Don't resend the delorphan req */
453                 req->rq_no_resend = req->rq_no_delay = 1;
454         }
455
456         rc = ptlrpc_queue_wait(req);
457         if (rc)
458                 GOTO(out_req, rc);
459
460         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461         if (body == NULL)
462                 GOTO(out_req, rc = -EPROTO);
463
464         lustre_get_wire_obdo(oa, &body->oa);
465
466         /* This should really be sent by the OST */
467         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
468         oa->o_valid |= OBD_MD_FLBLKSZ;
469
470         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
471          * have valid lsm_oinfo data structs, so don't go touching that.
472          * This needs to be fixed in a big way.
473          */
474         lsm->lsm_object_id = oa->o_id;
475         lsm->lsm_object_gr = oa->o_gr;
476         *ea = lsm;
477
478         if (oti != NULL) {
479                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480
481                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
482                         if (!oti->oti_logcookies)
483                                 oti_alloc_cookies(oti, 1);
484                         *oti->oti_logcookies = oa->o_lcookie;
485                 }
486         }
487
488         CDEBUG(D_HA, "transno: "LPD64"\n",
489                lustre_msg_get_transno(req->rq_repmsg));
490 out_req:
491         ptlrpc_req_finished(req);
492 out:
493         if (rc && !*ea)
494                 obd_free_memmd(exp, &lsm);
495         RETURN(rc);
496 }
497
498 static int osc_punch_interpret(const struct lu_env *env,
499                                struct ptlrpc_request *req,
500                                struct osc_punch_args *aa, int rc)
501 {
502         struct ost_body *body;
503         ENTRY;
504
505         if (rc != 0)
506                 GOTO(out, rc);
507
508         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509         if (body == NULL)
510                 GOTO(out, rc = -EPROTO);
511
512         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
513 out:
514         rc = aa->pa_upcall(aa->pa_cookie, rc);
515         RETURN(rc);
516 }
517
518 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
519                    struct obd_capa *capa,
520                    obd_enqueue_update_f upcall, void *cookie,
521                    struct ptlrpc_request_set *rqset)
522 {
523         struct ptlrpc_request *req;
524         struct osc_punch_args *aa;
525         struct ost_body       *body;
526         int                    rc;
527         ENTRY;
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541
542         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543         LASSERT(body);
544         lustre_set_wire_obdo(&body->oa, oa);
545         osc_pack_capa(req, body, capa);
546
547         ptlrpc_request_set_replen(req);
548
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
551         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
552         aa = ptlrpc_req_async_args(req);
553         aa->pa_oa     = oa;
554         aa->pa_upcall = upcall;
555         aa->pa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req, PSCOPE_OTHER);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         RETURN(0);
562 }
563
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565                      struct obd_trans_info *oti,
566                      struct ptlrpc_request_set *rqset)
567 {
568         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
569         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
572                               oinfo->oi_cb_up, oinfo, rqset);
573 }
574
575 static int osc_sync(struct obd_export *exp, struct obdo *oa,
576                     struct lov_stripe_md *md, obd_size start, obd_size end,
577                     void *capa)
578 {
579         struct ptlrpc_request *req;
580         struct ost_body       *body;
581         int                    rc;
582         ENTRY;
583
584         if (!oa) {
585                 CDEBUG(D_INFO, "oa NULL\n");
586                 RETURN(-EINVAL);
587         }
588
589         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
590         if (req == NULL)
591                 RETURN(-ENOMEM);
592
593         osc_set_capa_size(req, &RMF_CAPA1, capa);
594         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
595         if (rc) {
596                 ptlrpc_request_free(req);
597                 RETURN(rc);
598         }
599
600         /* overload the size and blocks fields in the oa with start/end */
601         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
602         LASSERT(body);
603         lustre_set_wire_obdo(&body->oa, oa);
604         body->oa.o_size = start;
605         body->oa.o_blocks = end;
606         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
607         osc_pack_capa(req, body, capa);
608
609         ptlrpc_request_set_replen(req);
610
611         rc = ptlrpc_queue_wait(req);
612         if (rc)
613                 GOTO(out, rc);
614
615         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
616         if (body == NULL)
617                 GOTO(out, rc = -EPROTO);
618
619         lustre_get_wire_obdo(oa, &body->oa);
620
621         EXIT;
622  out:
623         ptlrpc_req_finished(req);
624         return rc;
625 }
626
627 /* Find and cancel locally locks matched by @mode in the resource found by
628  * @objid. Found locks are added into @cancel list. Returns the amount of
629  * locks added to @cancels list. */
630 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
631                                    struct list_head *cancels, ldlm_mode_t mode,
632                                    int lock_flags)
633 {
634         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
635         struct ldlm_res_id res_id;
636         struct ldlm_resource *res;
637         int count;
638         ENTRY;
639
640         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
641         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
642         if (res == NULL)
643                 RETURN(0);
644
645         LDLM_RESOURCE_ADDREF(res);
646         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
647                                            lock_flags, 0, NULL);
648         LDLM_RESOURCE_DELREF(res);
649         ldlm_resource_putref(res);
650         RETURN(count);
651 }
652
653 static int osc_destroy_interpret(const struct lu_env *env,
654                                  struct ptlrpc_request *req, void *data,
655                                  int rc)
656 {
657         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
658
659         atomic_dec(&cli->cl_destroy_in_flight);
660         cfs_waitq_signal(&cli->cl_destroy_waitq);
661         return 0;
662 }
663
664 static int osc_can_send_destroy(struct client_obd *cli)
665 {
666         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
667             cli->cl_max_rpcs_in_flight) {
668                 /* The destroy request can be sent */
669                 return 1;
670         }
671         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
672             cli->cl_max_rpcs_in_flight) {
673                 /*
674                  * The counter has been modified between the two atomic
675                  * operations.
676                  */
677                 cfs_waitq_signal(&cli->cl_destroy_waitq);
678         }
679         return 0;
680 }
681
682 /* Destroy requests can be async always on the client, and we don't even really
683  * care about the return code since the client cannot do anything at all about
684  * a destroy failure.
685  * When the MDS is unlinking a filename, it saves the file objects into a
686  * recovery llog, and these object records are cancelled when the OST reports
687  * they were destroyed and sync'd to disk (i.e. transaction committed).
688  * If the client dies, or the OST is down when the object should be destroyed,
689  * the records are not cancelled, and when the OST reconnects to the MDS next,
690  * it will retrieve the llog unlink logs and then sends the log cancellation
691  * cookies to the MDS after committing destroy transactions. */
692 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
693                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
694                        struct obd_export *md_export, void *capa)
695 {
696         struct client_obd     *cli = &exp->exp_obd->u.cli;
697         struct ptlrpc_request *req;
698         struct ost_body       *body;
699         CFS_LIST_HEAD(cancels);
700         int rc, count;
701         ENTRY;
702
703         if (!oa) {
704                 CDEBUG(D_INFO, "oa NULL\n");
705                 RETURN(-EINVAL);
706         }
707
708         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
709                                         LDLM_FL_DISCARD_DATA);
710
711         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
712         if (req == NULL) {
713                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
714                 RETURN(-ENOMEM);
715         }
716
717         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
718         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
719                                0, &cancels, count);
720         if (rc) {
721                 ptlrpc_request_free(req);
722                 RETURN(rc);
723         }
724
725         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
726         ptlrpc_at_set_req_timeout(req);
727
728         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
729                 oa->o_lcookie = *oti->oti_logcookies;
730         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
731         LASSERT(body);
732         lustre_set_wire_obdo(&body->oa, oa);
733
734         osc_pack_capa(req, body, (struct obd_capa *)capa);
735         ptlrpc_request_set_replen(req);
736
737         /* don't throttle destroy RPCs for the MDT */
738         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
739                 req->rq_interpret_reply = osc_destroy_interpret;
740                 if (!osc_can_send_destroy(cli)) {
741                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
742                                                           NULL);
743
744                         /*
745                          * Wait until the number of on-going destroy RPCs drops
746                          * under max_rpc_in_flight
747                          */
748                         l_wait_event_exclusive(cli->cl_destroy_waitq,
749                                                osc_can_send_destroy(cli), &lwi);
750                 }
751         }
752
753         /* Do not wait for response */
754         ptlrpcd_add_req(req, PSCOPE_OTHER);
755         RETURN(0);
756 }
757
758 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
759                                 long writing_bytes)
760 {
761         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
762
763         LASSERT(!(oa->o_valid & bits));
764
765         oa->o_valid |= bits;
766         client_obd_list_lock(&cli->cl_loi_list_lock);
767         oa->o_dirty = cli->cl_dirty;
768         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
769                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
770                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
771                 oa->o_undirty = 0;
772         } else if (atomic_read(&obd_dirty_pages) -
773                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
774                 CERROR("dirty %d - %d > system dirty_max %d\n",
775                        atomic_read(&obd_dirty_pages),
776                        atomic_read(&obd_dirty_transit_pages),
777                        obd_max_dirty_pages);
778                 oa->o_undirty = 0;
779         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
780                 CERROR("dirty %lu - dirty_max %lu too big???\n",
781                        cli->cl_dirty, cli->cl_dirty_max);
782                 oa->o_undirty = 0;
783         } else {
784                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
785                                 (cli->cl_max_rpcs_in_flight + 1);
786                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
787         }
788         oa->o_grant = cli->cl_avail_grant;
789         oa->o_dropped = cli->cl_lost_grant;
790         cli->cl_lost_grant = 0;
791         client_obd_list_unlock(&cli->cl_loi_list_lock);
792         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
793                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
794
795 }
796
797 static void osc_update_next_shrink(struct client_obd *cli)
798 {
799         cli->cl_next_shrink_grant =
800                 cfs_time_shift(cli->cl_grant_shrink_interval);
801         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
802                cli->cl_next_shrink_grant);
803 }
804
805 /* caller must hold loi_list_lock */
806 static void osc_consume_write_grant(struct client_obd *cli,
807                                     struct brw_page *pga)
808 {
809         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
810         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
811         atomic_inc(&obd_dirty_pages);
812         cli->cl_dirty += CFS_PAGE_SIZE;
813         cli->cl_avail_grant -= CFS_PAGE_SIZE;
814         pga->flag |= OBD_BRW_FROM_GRANT;
815         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
816                CFS_PAGE_SIZE, pga, pga->pg);
817         LASSERT(cli->cl_avail_grant >= 0);
818         osc_update_next_shrink(cli);
819 }
820
821 /* the companion to osc_consume_write_grant, called when a brw has completed.
822  * must be called with the loi lock held. */
823 static void osc_release_write_grant(struct client_obd *cli,
824                                     struct brw_page *pga, int sent)
825 {
826         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
827         ENTRY;
828
829         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
830         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
831                 EXIT;
832                 return;
833         }
834
835         pga->flag &= ~OBD_BRW_FROM_GRANT;
836         atomic_dec(&obd_dirty_pages);
837         cli->cl_dirty -= CFS_PAGE_SIZE;
838         if (pga->flag & OBD_BRW_NOCACHE) {
839                 pga->flag &= ~OBD_BRW_NOCACHE;
840                 atomic_dec(&obd_dirty_transit_pages);
841                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
842         }
843         if (!sent) {
844                 cli->cl_lost_grant += CFS_PAGE_SIZE;
845                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
846                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
847         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
848                 /* For short writes we shouldn't count parts of pages that
849                  * span a whole block on the OST side, or our accounting goes
850                  * wrong.  Should match the code in filter_grant_check. */
851                 int offset = pga->off & ~CFS_PAGE_MASK;
852                 int count = pga->count + (offset & (blocksize - 1));
853                 int end = (offset + pga->count) & (blocksize - 1);
854                 if (end)
855                         count += blocksize - end;
856
857                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
858                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
859                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
860                        cli->cl_avail_grant, cli->cl_dirty);
861         }
862
863         EXIT;
864 }
865
866 static unsigned long rpcs_in_flight(struct client_obd *cli)
867 {
868         return cli->cl_r_in_flight + cli->cl_w_in_flight;
869 }
870
871 /* caller must hold loi_list_lock */
872 void osc_wake_cache_waiters(struct client_obd *cli)
873 {
874         struct list_head *l, *tmp;
875         struct osc_cache_waiter *ocw;
876
877         ENTRY;
878         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
879                 /* if we can't dirty more, we must wait until some is written */
880                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
881                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
882                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
883                                "osc max %ld, sys max %d\n", cli->cl_dirty,
884                                cli->cl_dirty_max, obd_max_dirty_pages);
885                         return;
886                 }
887
888                 /* if still dirty cache but no grant wait for pending RPCs that
889                  * may yet return us some grant before doing sync writes */
890                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
891                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
892                                cli->cl_w_in_flight);
893                         return;
894                 }
895
896                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
897                 list_del_init(&ocw->ocw_entry);
898                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
899                         /* no more RPCs in flight to return grant, do sync IO */
900                         ocw->ocw_rc = -EDQUOT;
901                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
902                 } else {
903                         osc_consume_write_grant(cli,
904                                                 &ocw->ocw_oap->oap_brw_page);
905                 }
906
907                 cfs_waitq_signal(&ocw->ocw_waitq);
908         }
909
910         EXIT;
911 }
912
913 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
914 {
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         cli->cl_avail_grant += grant;
917         client_obd_list_unlock(&cli->cl_loi_list_lock);
918 }
919
920 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
921 {
922         if (body->oa.o_valid & OBD_MD_FLGRANT) {
923                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
924                 __osc_update_grant(cli, body->oa.o_grant);
925         }
926 }
927
928 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
929                               void *key, obd_count vallen, void *val,
930                               struct ptlrpc_request_set *set);
931
932 static int osc_shrink_grant_interpret(const struct lu_env *env,
933                                       struct ptlrpc_request *req,
934                                       void *aa, int rc)
935 {
936         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
937         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
938         struct ost_body *body;
939
940         if (rc != 0) {
941                 __osc_update_grant(cli, oa->o_grant);
942                 GOTO(out, rc);
943         }
944
945         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
946         LASSERT(body);
947         osc_update_grant(cli, body);
948 out:
949         OBD_FREE_PTR(oa);
950         return rc;
951 }
952
953 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
954 {
955         client_obd_list_lock(&cli->cl_loi_list_lock);
956         oa->o_grant = cli->cl_avail_grant / 4;
957         cli->cl_avail_grant -= oa->o_grant;
958         client_obd_list_unlock(&cli->cl_loi_list_lock);
959         oa->o_flags |= OBD_FL_SHRINK_GRANT;
960         osc_update_next_shrink(cli);
961 }
962
963 /* Shrink the current grant, either from some large amount to enough for a
964  * full set of in-flight RPCs, or if we have already shrunk to that limit
965  * then to enough for a single RPC.  This avoids keeping more grant than
966  * needed, and avoids shrinking the grant piecemeal. */
967 static int osc_shrink_grant(struct client_obd *cli)
968 {
969         long target = (cli->cl_max_rpcs_in_flight + 1) *
970                       cli->cl_max_pages_per_rpc;
971
972         client_obd_list_lock(&cli->cl_loi_list_lock);
973         if (cli->cl_avail_grant <= target)
974                 target = cli->cl_max_pages_per_rpc;
975         client_obd_list_unlock(&cli->cl_loi_list_lock);
976
977         return osc_shrink_grant_to_target(cli, target);
978 }
979
980 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
981 {
982         int    rc = 0;
983         struct ost_body     *body;
984         ENTRY;
985
986         client_obd_list_lock(&cli->cl_loi_list_lock);
987         /* Don't shrink if we are already above or below the desired limit
988          * We don't want to shrink below a single RPC, as that will negatively
989          * impact block allocation and long-term performance. */
990         if (target < cli->cl_max_pages_per_rpc)
991                 target = cli->cl_max_pages_per_rpc;
992
993         if (target >= cli->cl_avail_grant) {
994                 client_obd_list_unlock(&cli->cl_loi_list_lock);
995                 RETURN(0);
996         }
997         client_obd_list_unlock(&cli->cl_loi_list_lock);
998
999         OBD_ALLOC_PTR(body);
1000         if (!body)
1001                 RETURN(-ENOMEM);
1002
1003         osc_announce_cached(cli, &body->oa, 0);
1004
1005         client_obd_list_lock(&cli->cl_loi_list_lock);
1006         body->oa.o_grant = cli->cl_avail_grant - target;
1007         cli->cl_avail_grant = target;
1008         client_obd_list_unlock(&cli->cl_loi_list_lock);
1009         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1010         osc_update_next_shrink(cli);
1011
1012         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1013                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1014                                 sizeof(*body), body, NULL);
1015         if (rc != 0)
1016                 __osc_update_grant(cli, body->oa.o_grant);
1017         OBD_FREE_PTR(body);
1018         RETURN(rc);
1019 }
1020
1021 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1022 static int osc_should_shrink_grant(struct client_obd *client)
1023 {
1024         cfs_time_t time = cfs_time_current();
1025         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1026         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1027                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1028                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1029                         return 1;
1030                 else
1031                         osc_update_next_shrink(client);
1032         }
1033         return 0;
1034 }
1035
1036 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1037 {
1038         struct client_obd *client;
1039
1040         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1041                 if (osc_should_shrink_grant(client))
1042                         osc_shrink_grant(client);
1043         }
1044         return 0;
1045 }
1046
1047 static int osc_add_shrink_grant(struct client_obd *client)
1048 {
1049         int rc;
1050
1051         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1052                                        TIMEOUT_GRANT,
1053                                        osc_grant_shrink_grant_cb, NULL,
1054                                        &client->cl_grant_shrink_list);
1055         if (rc) {
1056                 CERROR("add grant client %s error %d\n",
1057                         client->cl_import->imp_obd->obd_name, rc);
1058                 return rc;
1059         }
1060         CDEBUG(D_CACHE, "add grant client %s \n",
1061                client->cl_import->imp_obd->obd_name);
1062         osc_update_next_shrink(client);
1063         return 0;
1064 }
1065
1066 static int osc_del_shrink_grant(struct client_obd *client)
1067 {
1068         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1069                                          TIMEOUT_GRANT);
1070 }
1071
1072 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1073 {
1074         client_obd_list_lock(&cli->cl_loi_list_lock);
1075         cli->cl_avail_grant = ocd->ocd_grant;
1076         client_obd_list_unlock(&cli->cl_loi_list_lock);
1077
1078         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1079             list_empty(&cli->cl_grant_shrink_list))
1080                 osc_add_shrink_grant(cli);
1081
1082         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1083                cli->cl_avail_grant, cli->cl_lost_grant);
1084         LASSERT(cli->cl_avail_grant >= 0);
1085 }
1086
1087 /* We assume that the reason this OSC got a short read is because it read
1088  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1089  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1090  * this stripe never got written at or beyond this stripe offset yet. */
1091 static void handle_short_read(int nob_read, obd_count page_count,
1092                               struct brw_page **pga)
1093 {
1094         char *ptr;
1095         int i = 0;
1096
1097         /* skip bytes read OK */
1098         while (nob_read > 0) {
1099                 LASSERT (page_count > 0);
1100
1101                 if (pga[i]->count > nob_read) {
1102                         /* EOF inside this page */
1103                         ptr = cfs_kmap(pga[i]->pg) +
1104                                 (pga[i]->off & ~CFS_PAGE_MASK);
1105                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1106                         cfs_kunmap(pga[i]->pg);
1107                         page_count--;
1108                         i++;
1109                         break;
1110                 }
1111
1112                 nob_read -= pga[i]->count;
1113                 page_count--;
1114                 i++;
1115         }
1116
1117         /* zero remaining pages */
1118         while (page_count-- > 0) {
1119                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1120                 memset(ptr, 0, pga[i]->count);
1121                 cfs_kunmap(pga[i]->pg);
1122                 i++;
1123         }
1124 }
1125
1126 static int check_write_rcs(struct ptlrpc_request *req,
1127                            int requested_nob, int niocount,
1128                            obd_count page_count, struct brw_page **pga)
1129 {
1130         int    *remote_rcs, i;
1131
1132         /* return error if any niobuf was in error */
1133         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1134                                         sizeof(*remote_rcs) * niocount, NULL);
1135         if (remote_rcs == NULL) {
1136                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1137                 return(-EPROTO);
1138         }
1139         if (ptlrpc_rep_need_swab(req))
1140                 for (i = 0; i < niocount; i++)
1141                         __swab32s(&remote_rcs[i]);
1142
1143         for (i = 0; i < niocount; i++) {
1144                 if (remote_rcs[i] < 0)
1145                         return(remote_rcs[i]);
1146
1147                 if (remote_rcs[i] != 0) {
1148                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1149                                 i, remote_rcs[i], req);
1150                         return(-EPROTO);
1151                 }
1152         }
1153
1154         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1155                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1156                        req->rq_bulk->bd_nob_transferred, requested_nob);
1157                 return(-EPROTO);
1158         }
1159
1160         return (0);
1161 }
1162
1163 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1164 {
1165         if (p1->flag != p2->flag) {
1166                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1167                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1168
1169                 /* warn if we try to combine flags that we don't know to be
1170                  * safe to combine */
1171                 if ((p1->flag & mask) != (p2->flag & mask))
1172                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1173                                "same brw?\n", p1->flag, p2->flag);
1174                 return 0;
1175         }
1176
1177         return (p1->off + p1->count == p2->off);
1178 }
1179
1180 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1181                                    struct brw_page **pga, int opc,
1182                                    cksum_type_t cksum_type)
1183 {
1184         __u32 cksum;
1185         int i = 0;
1186
1187         LASSERT (pg_count > 0);
1188         cksum = init_checksum(cksum_type);
1189         while (nob > 0 && pg_count > 0) {
1190                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1191                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1192                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1193
1194                 /* corrupt the data before we compute the checksum, to
1195                  * simulate an OST->client data error */
1196                 if (i == 0 && opc == OST_READ &&
1197                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1198                         memcpy(ptr + off, "bad1", min(4, nob));
1199                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1200                 cfs_kunmap(pga[i]->pg);
1201                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1202                                off, cksum);
1203
1204                 nob -= pga[i]->count;
1205                 pg_count--;
1206                 i++;
1207         }
1208         /* For sending we only compute the wrong checksum instead
1209          * of corrupting the data so it is still correct on a redo */
1210         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1211                 cksum++;
1212
1213         return cksum;
1214 }
1215
1216 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1217                                 struct lov_stripe_md *lsm, obd_count page_count,
1218                                 struct brw_page **pga,
1219                                 struct ptlrpc_request **reqp,
1220                                 struct obd_capa *ocapa, int reserve)
1221 {
1222         struct ptlrpc_request   *req;
1223         struct ptlrpc_bulk_desc *desc;
1224         struct ost_body         *body;
1225         struct obd_ioobj        *ioobj;
1226         struct niobuf_remote    *niobuf;
1227         int niocount, i, requested_nob, opc, rc;
1228         struct osc_brw_async_args *aa;
1229         struct req_capsule      *pill;
1230         struct brw_page *pg_prev;
1231
1232         ENTRY;
1233         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1234                 RETURN(-ENOMEM); /* Recoverable */
1235         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1236                 RETURN(-EINVAL); /* Fatal */
1237
1238         if ((cmd & OBD_BRW_WRITE) != 0) {
1239                 opc = OST_WRITE;
1240                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1241                                                 cli->cl_import->imp_rq_pool,
1242                                                 &RQF_OST_BRW);
1243         } else {
1244                 opc = OST_READ;
1245                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1246         }
1247         if (req == NULL)
1248                 RETURN(-ENOMEM);
1249
1250         for (niocount = i = 1; i < page_count; i++) {
1251                 if (!can_merge_pages(pga[i - 1], pga[i]))
1252                         niocount++;
1253         }
1254
1255         pill = &req->rq_pill;
1256         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1257                              niocount * sizeof(*niobuf));
1258         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1259
1260         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261         if (rc) {
1262                 ptlrpc_request_free(req);
1263                 RETURN(rc);
1264         }
1265         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266         ptlrpc_at_set_req_timeout(req);
1267
1268         if (opc == OST_WRITE)
1269                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1270                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1271         else
1272                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1273                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1274
1275         if (desc == NULL)
1276                 GOTO(out, rc = -ENOMEM);
1277         /* NB request now owns desc and will free it when it gets freed */
1278
1279         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1280         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1281         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1282         LASSERT(body && ioobj && niobuf);
1283
1284         lustre_set_wire_obdo(&body->oa, oa);
1285
1286         obdo_to_ioobj(oa, ioobj);
1287         ioobj->ioo_bufcnt = niocount;
1288         osc_pack_capa(req, body, ocapa);
1289         LASSERT (page_count > 0);
1290         pg_prev = pga[0];
1291         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1292                 struct brw_page *pg = pga[i];
1293
1294                 LASSERT(pg->count > 0);
1295                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1296                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1297                          pg->off, pg->count);
1298 #ifdef __linux__
1299                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1300                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1301                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1302                          i, page_count,
1303                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1304                          pg_prev->pg, page_private(pg_prev->pg),
1305                          pg_prev->pg->index, pg_prev->off);
1306 #else
1307                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1308                          "i %d p_c %u\n", i, page_count);
1309 #endif
1310                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1311                         (pg->flag & OBD_BRW_SRVLOCK));
1312
1313                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1314                                       pg->count);
1315                 requested_nob += pg->count;
1316
1317                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1318                         niobuf--;
1319                         niobuf->len += pg->count;
1320                 } else {
1321                         niobuf->offset = pg->off;
1322                         niobuf->len    = pg->count;
1323                         niobuf->flags  = pg->flag;
1324                 }
1325                 pg_prev = pg;
1326         }
1327
1328         LASSERTF((void *)(niobuf - niocount) ==
1329                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1330                                niocount * sizeof(*niobuf)),
1331                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1332                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1333                 (void *)(niobuf - niocount));
1334
1335         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1336         if (osc_should_shrink_grant(cli))
1337                 osc_shrink_grant_local(cli, &body->oa);
1338
1339         /* size[REQ_REC_OFF] still sizeof (*body) */
1340         if (opc == OST_WRITE) {
1341                 if (unlikely(cli->cl_checksum) &&
1342                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1343                         /* store cl_cksum_type in a local variable since
1344                          * it can be changed via lprocfs */
1345                         cksum_type_t cksum_type = cli->cl_cksum_type;
1346
1347                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1348                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1349                                 body->oa.o_flags = 0;
1350                         }
1351                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1352                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1353                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1354                                                              page_count, pga,
1355                                                              OST_WRITE,
1356                                                              cksum_type);
1357                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1358                                body->oa.o_cksum);
1359                         /* save this in 'oa', too, for later checking */
1360                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1361                         oa->o_flags |= cksum_type_pack(cksum_type);
1362                 } else {
1363                         /* clear out the checksum flag, in case this is a
1364                          * resend but cl_checksum is no longer set. b=11238 */
1365                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1366                 }
1367                 oa->o_cksum = body->oa.o_cksum;
1368                 /* 1 RC per niobuf */
1369                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1370                                      sizeof(__u32) * niocount);
1371         } else {
1372                 if (unlikely(cli->cl_checksum) &&
1373                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1374                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1375                                 body->oa.o_flags = 0;
1376                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1377                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1378                 }
1379                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1380                 /* 1 RC for the whole I/O */
1381         }
1382         ptlrpc_request_set_replen(req);
1383
1384         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1385         aa = ptlrpc_req_async_args(req);
1386         aa->aa_oa = oa;
1387         aa->aa_requested_nob = requested_nob;
1388         aa->aa_nio_count = niocount;
1389         aa->aa_page_count = page_count;
1390         aa->aa_resends = 0;
1391         aa->aa_ppga = pga;
1392         aa->aa_cli = cli;
1393         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1394         if (ocapa && reserve)
1395                 aa->aa_ocapa = capa_get(ocapa);
1396
1397         *reqp = req;
1398         RETURN(0);
1399
1400  out:
1401         ptlrpc_req_finished(req);
1402         RETURN(rc);
1403 }
1404
1405 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1406                                 __u32 client_cksum, __u32 server_cksum, int nob,
1407                                 obd_count page_count, struct brw_page **pga,
1408                                 cksum_type_t client_cksum_type)
1409 {
1410         __u32 new_cksum;
1411         char *msg;
1412         cksum_type_t cksum_type;
1413
1414         if (server_cksum == client_cksum) {
1415                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1416                 return 0;
1417         }
1418
1419         if (oa->o_valid & OBD_MD_FLFLAGS)
1420                 cksum_type = cksum_type_unpack(oa->o_flags);
1421         else
1422                 cksum_type = OBD_CKSUM_CRC32;
1423
1424         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1425                                       cksum_type);
1426
1427         if (cksum_type != client_cksum_type)
1428                 msg = "the server did not use the checksum type specified in "
1429                       "the original request - likely a protocol problem";
1430         else if (new_cksum == server_cksum)
1431                 msg = "changed on the client after we checksummed it - "
1432                       "likely false positive due to mmap IO (bug 11742)";
1433         else if (new_cksum == client_cksum)
1434                 msg = "changed in transit before arrival at OST";
1435         else
1436                 msg = "changed in transit AND doesn't match the original - "
1437                       "likely false positive due to mmap IO (bug 11742)";
1438
1439         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1440                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1441                            "["LPU64"-"LPU64"]\n",
1442                            msg, libcfs_nid2str(peer->nid),
1443                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1444                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1445                                                         (__u64)0,
1446                            oa->o_id,
1447                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1448                            pga[0]->off,
1449                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1450         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1451                "client csum now %x\n", client_cksum, client_cksum_type,
1452                server_cksum, cksum_type, new_cksum);
1453         return 1;
1454 }
1455
1456 /* Note rc enters this function as number of bytes transferred */
1457 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1458 {
1459         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1460         const lnet_process_id_t *peer =
1461                         &req->rq_import->imp_connection->c_peer;
1462         struct client_obd *cli = aa->aa_cli;
1463         struct ost_body *body;
1464         __u32 client_cksum = 0;
1465         ENTRY;
1466
1467         if (rc < 0 && rc != -EDQUOT)
1468                 RETURN(rc);
1469
1470         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1471         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1472                                   lustre_swab_ost_body);
1473         if (body == NULL) {
1474                 CDEBUG(D_INFO, "Can't unpack body\n");
1475                 RETURN(-EPROTO);
1476         }
1477
1478         /* set/clear over quota flag for a uid/gid */
1479         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1480             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1481                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1482
1483                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1484                              body->oa.o_flags);
1485         }
1486
1487         if (rc < 0)
1488                 RETURN(rc);
1489
1490         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1491                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1492
1493         osc_update_grant(cli, body);
1494
1495         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1496                 if (rc > 0) {
1497                         CERROR("Unexpected +ve rc %d\n", rc);
1498                         RETURN(-EPROTO);
1499                 }
1500                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1501
1502                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1503                         RETURN(-EAGAIN);
1504
1505                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1506                     check_write_checksum(&body->oa, peer, client_cksum,
1507                                          body->oa.o_cksum, aa->aa_requested_nob,
1508                                          aa->aa_page_count, aa->aa_ppga,
1509                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1510                         RETURN(-EAGAIN);
1511
1512                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1513                                      aa->aa_page_count, aa->aa_ppga);
1514                 GOTO(out, rc);
1515         }
1516
1517         /* The rest of this function executes only for OST_READs */
1518
1519         /* if unwrap_bulk failed, return -EAGAIN to retry */
1520         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1521         if (rc < 0)
1522                 GOTO(out, rc = -EAGAIN);
1523
1524         if (rc > aa->aa_requested_nob) {
1525                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1526                        aa->aa_requested_nob);
1527                 RETURN(-EPROTO);
1528         }
1529
1530         if (rc != req->rq_bulk->bd_nob_transferred) {
1531                 CERROR ("Unexpected rc %d (%d transferred)\n",
1532                         rc, req->rq_bulk->bd_nob_transferred);
1533                 return (-EPROTO);
1534         }
1535
1536         if (rc < aa->aa_requested_nob)
1537                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1538
1539         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1540                 static int cksum_counter;
1541                 __u32      server_cksum = body->oa.o_cksum;
1542                 char      *via;
1543                 char      *router;
1544                 cksum_type_t cksum_type;
1545
1546                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1547                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1548                 else
1549                         cksum_type = OBD_CKSUM_CRC32;
1550                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1551                                                  aa->aa_ppga, OST_READ,
1552                                                  cksum_type);
1553
1554                 if (peer->nid == req->rq_bulk->bd_sender) {
1555                         via = router = "";
1556                 } else {
1557                         via = " via ";
1558                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1559                 }
1560
1561                 if (server_cksum == ~0 && rc > 0) {
1562                         CERROR("Protocol error: server %s set the 'checksum' "
1563                                "bit, but didn't send a checksum.  Not fatal, "
1564                                "but please notify on http://bugzilla.lustre.org/\n",
1565                                libcfs_nid2str(peer->nid));
1566                 } else if (server_cksum != client_cksum) {
1567                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1568                                            "%s%s%s inum "LPU64"/"LPU64" object "
1569                                            LPU64"/"LPU64" extent "
1570                                            "["LPU64"-"LPU64"]\n",
1571                                            req->rq_import->imp_obd->obd_name,
1572                                            libcfs_nid2str(peer->nid),
1573                                            via, router,
1574                                            body->oa.o_valid & OBD_MD_FLFID ?
1575                                                 body->oa.o_fid : (__u64)0,
1576                                            body->oa.o_valid & OBD_MD_FLFID ?
1577                                                 body->oa.o_generation :(__u64)0,
1578                                            body->oa.o_id,
1579                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1580                                                 body->oa.o_gr : (__u64)0,
1581                                            aa->aa_ppga[0]->off,
1582                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1583                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1584                                                                         1);
1585                         CERROR("client %x, server %x, cksum_type %x\n",
1586                                client_cksum, server_cksum, cksum_type);
1587                         cksum_counter = 0;
1588                         aa->aa_oa->o_cksum = client_cksum;
1589                         rc = -EAGAIN;
1590                 } else {
1591                         cksum_counter++;
1592                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1593                         rc = 0;
1594                 }
1595         } else if (unlikely(client_cksum)) {
1596                 static int cksum_missed;
1597
1598                 cksum_missed++;
1599                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1600                         CERROR("Checksum %u requested from %s but not sent\n",
1601                                cksum_missed, libcfs_nid2str(peer->nid));
1602         } else {
1603                 rc = 0;
1604         }
1605 out:
1606         if (rc >= 0)
1607                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1608
1609         RETURN(rc);
1610 }
1611
1612 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1613                             struct lov_stripe_md *lsm,
1614                             obd_count page_count, struct brw_page **pga,
1615                             struct obd_capa *ocapa)
1616 {
1617         struct ptlrpc_request *req;
1618         int                    rc;
1619         cfs_waitq_t            waitq;
1620         int                    resends = 0;
1621         struct l_wait_info     lwi;
1622
1623         ENTRY;
1624
1625         cfs_waitq_init(&waitq);
1626
1627 restart_bulk:
1628         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1629                                   page_count, pga, &req, ocapa, 0);
1630         if (rc != 0)
1631                 return (rc);
1632
1633         rc = ptlrpc_queue_wait(req);
1634
1635         if (rc == -ETIMEDOUT && req->rq_resend) {
1636                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1637                 ptlrpc_req_finished(req);
1638                 goto restart_bulk;
1639         }
1640
1641         rc = osc_brw_fini_request(req, rc);
1642
1643         ptlrpc_req_finished(req);
1644         if (osc_recoverable_error(rc)) {
1645                 resends++;
1646                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1647                         CERROR("too many resend retries, returning error\n");
1648                         RETURN(-EIO);
1649                 }
1650
1651                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1652                 l_wait_event(waitq, 0, &lwi);
1653
1654                 goto restart_bulk;
1655         }
1656
1657         RETURN (rc);
1658 }
1659
1660 int osc_brw_redo_request(struct ptlrpc_request *request,
1661                          struct osc_brw_async_args *aa)
1662 {
1663         struct ptlrpc_request *new_req;
1664         struct ptlrpc_request_set *set = request->rq_set;
1665         struct osc_brw_async_args *new_aa;
1666         struct osc_async_page *oap;
1667         int rc = 0;
1668         ENTRY;
1669
1670         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1671                 CERROR("too many resend retries, returning error\n");
1672                 RETURN(-EIO);
1673         }
1674
1675         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1676
1677         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1678                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1679                                   aa->aa_cli, aa->aa_oa,
1680                                   NULL /* lsm unused by osc currently */,
1681                                   aa->aa_page_count, aa->aa_ppga,
1682                                   &new_req, aa->aa_ocapa, 0);
1683         if (rc)
1684                 RETURN(rc);
1685
1686         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1687
1688         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1689                 if (oap->oap_request != NULL) {
1690                         LASSERTF(request == oap->oap_request,
1691                                  "request %p != oap_request %p\n",
1692                                  request, oap->oap_request);
1693                         if (oap->oap_interrupted) {
1694                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1695                                 ptlrpc_req_finished(new_req);
1696                                 RETURN(-EINTR);
1697                         }
1698                 }
1699         }
1700         /* New request takes over pga and oaps from old request.
1701          * Note that copying a list_head doesn't work, need to move it... */
1702         aa->aa_resends++;
1703         new_req->rq_interpret_reply = request->rq_interpret_reply;
1704         new_req->rq_async_args = request->rq_async_args;
1705         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1706
1707         new_aa = ptlrpc_req_async_args(new_req);
1708
1709         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1710         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1711         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1712
1713         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1714                 if (oap->oap_request) {
1715                         ptlrpc_req_finished(oap->oap_request);
1716                         oap->oap_request = ptlrpc_request_addref(new_req);
1717                 }
1718         }
1719
1720         new_aa->aa_ocapa = aa->aa_ocapa;
1721         aa->aa_ocapa = NULL;
1722
1723         /* use ptlrpc_set_add_req is safe because interpret functions work
1724          * in check_set context. only one way exist with access to request
1725          * from different thread got -EINTR - this way protected with
1726          * cl_loi_list_lock */
1727         ptlrpc_set_add_req(set, new_req);
1728
1729         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1730
1731         DEBUG_REQ(D_INFO, new_req, "new request");
1732         RETURN(0);
1733 }
1734
1735 /*
1736  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1737  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1738  * fine for our small page arrays and doesn't require allocation.  its an
1739  * insertion sort that swaps elements that are strides apart, shrinking the
1740  * stride down until its '1' and the array is sorted.
1741  */
1742 static void sort_brw_pages(struct brw_page **array, int num)
1743 {
1744         int stride, i, j;
1745         struct brw_page *tmp;
1746
1747         if (num == 1)
1748                 return;
1749         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1750                 ;
1751
1752         do {
1753                 stride /= 3;
1754                 for (i = stride ; i < num ; i++) {
1755                         tmp = array[i];
1756                         j = i;
1757                         while (j >= stride && array[j - stride]->off > tmp->off) {
1758                                 array[j] = array[j - stride];
1759                                 j -= stride;
1760                         }
1761                         array[j] = tmp;
1762                 }
1763         } while (stride > 1);
1764 }
1765
1766 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1767 {
1768         int count = 1;
1769         int offset;
1770         int i = 0;
1771
1772         LASSERT (pages > 0);
1773         offset = pg[i]->off & ~CFS_PAGE_MASK;
1774
1775         for (;;) {
1776                 pages--;
1777                 if (pages == 0)         /* that's all */
1778                         return count;
1779
1780                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1781                         return count;   /* doesn't end on page boundary */
1782
1783                 i++;
1784                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1785                 if (offset != 0)        /* doesn't start on page boundary */
1786                         return count;
1787
1788                 count++;
1789         }
1790 }
1791
1792 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1793 {
1794         struct brw_page **ppga;
1795         int i;
1796
1797         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1798         if (ppga == NULL)
1799                 return NULL;
1800
1801         for (i = 0; i < count; i++)
1802                 ppga[i] = pga + i;
1803         return ppga;
1804 }
1805
1806 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1807 {
1808         LASSERT(ppga != NULL);
1809         OBD_FREE(ppga, sizeof(*ppga) * count);
1810 }
1811
1812 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1813                    obd_count page_count, struct brw_page *pga,
1814                    struct obd_trans_info *oti)
1815 {
1816         struct obdo *saved_oa = NULL;
1817         struct brw_page **ppga, **orig;
1818         struct obd_import *imp = class_exp2cliimp(exp);
1819         struct client_obd *cli;
1820         int rc, page_count_orig;
1821         ENTRY;
1822
1823         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1824         cli = &imp->imp_obd->u.cli;
1825
1826         if (cmd & OBD_BRW_CHECK) {
1827                 /* The caller just wants to know if there's a chance that this
1828                  * I/O can succeed */
1829
1830                 if (imp->imp_invalid)
1831                         RETURN(-EIO);
1832                 RETURN(0);
1833         }
1834
1835         /* test_brw with a failed create can trip this, maybe others. */
1836         LASSERT(cli->cl_max_pages_per_rpc);
1837
1838         rc = 0;
1839
1840         orig = ppga = osc_build_ppga(pga, page_count);
1841         if (ppga == NULL)
1842                 RETURN(-ENOMEM);
1843         page_count_orig = page_count;
1844
1845         sort_brw_pages(ppga, page_count);
1846         while (page_count) {
1847                 obd_count pages_per_brw;
1848
1849                 if (page_count > cli->cl_max_pages_per_rpc)
1850                         pages_per_brw = cli->cl_max_pages_per_rpc;
1851                 else
1852                         pages_per_brw = page_count;
1853
1854                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1855
1856                 if (saved_oa != NULL) {
1857                         /* restore previously saved oa */
1858                         *oinfo->oi_oa = *saved_oa;
1859                 } else if (page_count > pages_per_brw) {
1860                         /* save a copy of oa (brw will clobber it) */
1861                         OBDO_ALLOC(saved_oa);
1862                         if (saved_oa == NULL)
1863                                 GOTO(out, rc = -ENOMEM);
1864                         *saved_oa = *oinfo->oi_oa;
1865                 }
1866
1867                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1868                                       pages_per_brw, ppga, oinfo->oi_capa);
1869
1870                 if (rc != 0)
1871                         break;
1872
1873                 page_count -= pages_per_brw;
1874                 ppga += pages_per_brw;
1875         }
1876
1877 out:
1878         osc_release_ppga(orig, page_count_orig);
1879
1880         if (saved_oa != NULL)
1881                 OBDO_FREE(saved_oa);
1882
1883         RETURN(rc);
1884 }
1885
1886 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1887  * the dirty accounting.  Writeback completes or truncate happens before
1888  * writing starts.  Must be called with the loi lock held. */
1889 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1890                            int sent)
1891 {
1892         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1893 }
1894
1895
1896 /* This maintains the lists of pending pages to read/write for a given object
1897  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1898  * to quickly find objects that are ready to send an RPC. */
1899 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1900                          int cmd)
1901 {
1902         int optimal;
1903         ENTRY;
1904
1905         if (lop->lop_num_pending == 0)
1906                 RETURN(0);
1907
1908         /* if we have an invalid import we want to drain the queued pages
1909          * by forcing them through rpcs that immediately fail and complete
1910          * the pages.  recovery relies on this to empty the queued pages
1911          * before canceling the locks and evicting down the llite pages */
1912         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1913                 RETURN(1);
1914
1915         /* stream rpcs in queue order as long as as there is an urgent page
1916          * queued.  this is our cheap solution for good batching in the case
1917          * where writepage marks some random page in the middle of the file
1918          * as urgent because of, say, memory pressure */
1919         if (!list_empty(&lop->lop_urgent)) {
1920                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1921                 RETURN(1);
1922         }
1923         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1924         optimal = cli->cl_max_pages_per_rpc;
1925         if (cmd & OBD_BRW_WRITE) {
1926                 /* trigger a write rpc stream as long as there are dirtiers
1927                  * waiting for space.  as they're waiting, they're not going to
1928                  * create more pages to coallesce with what's waiting.. */
1929                 if (!list_empty(&cli->cl_cache_waiters)) {
1930                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1931                         RETURN(1);
1932                 }
1933                 /* +16 to avoid triggering rpcs that would want to include pages
1934                  * that are being queued but which can't be made ready until
1935                  * the queuer finishes with the page. this is a wart for
1936                  * llite::commit_write() */
1937                 optimal += 16;
1938         }
1939         if (lop->lop_num_pending >= optimal)
1940                 RETURN(1);
1941
1942         RETURN(0);
1943 }
1944
1945 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1946 {
1947         struct osc_async_page *oap;
1948         ENTRY;
1949
1950         if (list_empty(&lop->lop_urgent))
1951                 RETURN(0);
1952
1953         oap = list_entry(lop->lop_urgent.next,
1954                          struct osc_async_page, oap_urgent_item);
1955
1956         if (oap->oap_async_flags & ASYNC_HP) {
1957                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1958                 RETURN(1);
1959         }
1960
1961         RETURN(0);
1962 }
1963
1964 static void on_list(struct list_head *item, struct list_head *list,
1965                     int should_be_on)
1966 {
1967         if (list_empty(item) && should_be_on)
1968                 list_add_tail(item, list);
1969         else if (!list_empty(item) && !should_be_on)
1970                 list_del_init(item);
1971 }
1972
1973 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1974  * can find pages to build into rpcs quickly */
1975 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1976 {
1977         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1978             lop_makes_hprpc(&loi->loi_read_lop)) {
1979                 /* HP rpc */
1980                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1981                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1982         } else {
1983                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1984                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1985                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1986                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1987         }
1988
1989         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1990                 loi->loi_write_lop.lop_num_pending);
1991
1992         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1993                 loi->loi_read_lop.lop_num_pending);
1994 }
1995
1996 static void lop_update_pending(struct client_obd *cli,
1997                                struct loi_oap_pages *lop, int cmd, int delta)
1998 {
1999         lop->lop_num_pending += delta;
2000         if (cmd & OBD_BRW_WRITE)
2001                 cli->cl_pending_w_pages += delta;
2002         else
2003                 cli->cl_pending_r_pages += delta;
2004 }
2005
2006 /**
2007  * this is called when a sync waiter receives an interruption.  Its job is to
2008  * get the caller woken as soon as possible.  If its page hasn't been put in an
2009  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2010  * desiring interruption which will forcefully complete the rpc once the rpc
2011  * has timed out.
2012  */
2013 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2014 {
2015         struct loi_oap_pages *lop;
2016         struct lov_oinfo *loi;
2017         int rc = -EBUSY;
2018         ENTRY;
2019
2020         LASSERT(!oap->oap_interrupted);
2021         oap->oap_interrupted = 1;
2022
2023         /* ok, it's been put in an rpc. only one oap gets a request reference */
2024         if (oap->oap_request != NULL) {
2025                 ptlrpc_mark_interrupted(oap->oap_request);
2026                 ptlrpcd_wake(oap->oap_request);
2027                 ptlrpc_req_finished(oap->oap_request);
2028                 oap->oap_request = NULL;
2029         }
2030
2031         /*
2032          * page completion may be called only if ->cpo_prep() method was
2033          * executed by osc_io_submit(), that also adds page the to pending list
2034          */
2035         if (!list_empty(&oap->oap_pending_item)) {
2036                 list_del_init(&oap->oap_pending_item);
2037                 list_del_init(&oap->oap_urgent_item);
2038
2039                 loi = oap->oap_loi;
2040                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2041                         &loi->loi_write_lop : &loi->loi_read_lop;
2042                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2043                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2044                 rc = oap->oap_caller_ops->ap_completion(env,
2045                                           oap->oap_caller_data,
2046                                           oap->oap_cmd, NULL, -EINTR);
2047         }
2048
2049         RETURN(rc);
2050 }
2051
2052 /* this is trying to propogate async writeback errors back up to the
2053  * application.  As an async write fails we record the error code for later if
2054  * the app does an fsync.  As long as errors persist we force future rpcs to be
2055  * sync so that the app can get a sync error and break the cycle of queueing
2056  * pages for which writeback will fail. */
2057 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2058                            int rc)
2059 {
2060         if (rc) {
2061                 if (!ar->ar_rc)
2062                         ar->ar_rc = rc;
2063
2064                 ar->ar_force_sync = 1;
2065                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2066                 return;
2067
2068         }
2069
2070         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2071                 ar->ar_force_sync = 0;
2072 }
2073
2074 void osc_oap_to_pending(struct osc_async_page *oap)
2075 {
2076         struct loi_oap_pages *lop;
2077
2078         if (oap->oap_cmd & OBD_BRW_WRITE)
2079                 lop = &oap->oap_loi->loi_write_lop;
2080         else
2081                 lop = &oap->oap_loi->loi_read_lop;
2082
2083         if (oap->oap_async_flags & ASYNC_HP)
2084                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2085         else if (oap->oap_async_flags & ASYNC_URGENT)
2086                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2087         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2088         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2089 }
2090
2091 /* this must be called holding the loi list lock to give coverage to exit_cache,
2092  * async_flag maintenance, and oap_request */
2093 static void osc_ap_completion(const struct lu_env *env,
2094                               struct client_obd *cli, struct obdo *oa,
2095                               struct osc_async_page *oap, int sent, int rc)
2096 {
2097         __u64 xid = 0;
2098
2099         ENTRY;
2100         if (oap->oap_request != NULL) {
2101                 xid = ptlrpc_req_xid(oap->oap_request);
2102                 ptlrpc_req_finished(oap->oap_request);
2103                 oap->oap_request = NULL;
2104         }
2105
2106         spin_lock(&oap->oap_lock);
2107         oap->oap_async_flags = 0;
2108         spin_unlock(&oap->oap_lock);
2109         oap->oap_interrupted = 0;
2110
2111         if (oap->oap_cmd & OBD_BRW_WRITE) {
2112                 osc_process_ar(&cli->cl_ar, xid, rc);
2113                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2114         }
2115
2116         if (rc == 0 && oa != NULL) {
2117                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2118                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2119                 if (oa->o_valid & OBD_MD_FLMTIME)
2120                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2121                 if (oa->o_valid & OBD_MD_FLATIME)
2122                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2123                 if (oa->o_valid & OBD_MD_FLCTIME)
2124                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2125         }
2126
2127         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2128                                                 oap->oap_cmd, oa, rc);
2129
2130         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2131          * I/O on the page could start, but OSC calls it under lock
2132          * and thus we can add oap back to pending safely */
2133         if (rc)
2134                 /* upper layer wants to leave the page on pending queue */
2135                 osc_oap_to_pending(oap);
2136         else
2137                 osc_exit_cache(cli, oap, sent);
2138         EXIT;
2139 }
2140
2141 static int brw_interpret(const struct lu_env *env,
2142                          struct ptlrpc_request *req, void *data, int rc)
2143 {
2144         struct osc_brw_async_args *aa = data;
2145         struct client_obd *cli;
2146         int async;
2147         ENTRY;
2148
2149         rc = osc_brw_fini_request(req, rc);
2150         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2151         if (osc_recoverable_error(rc)) {
2152                 rc = osc_brw_redo_request(req, aa);
2153                 if (rc == 0)
2154                         RETURN(0);
2155         }
2156
2157         if (aa->aa_ocapa) {
2158                 capa_put(aa->aa_ocapa);
2159                 aa->aa_ocapa = NULL;
2160         }
2161
2162         cli = aa->aa_cli;
2163
2164         client_obd_list_lock(&cli->cl_loi_list_lock);
2165
2166         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2167          * is called so we know whether to go to sync BRWs or wait for more
2168          * RPCs to complete */
2169         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2170                 cli->cl_w_in_flight--;
2171         else
2172                 cli->cl_r_in_flight--;
2173
2174         async = list_empty(&aa->aa_oaps);
2175         if (!async) { /* from osc_send_oap_rpc() */
2176                 struct osc_async_page *oap, *tmp;
2177                 /* the caller may re-use the oap after the completion call so
2178                  * we need to clean it up a little */
2179                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2180                         list_del_init(&oap->oap_rpc_item);
2181                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2182                 }
2183                 OBDO_FREE(aa->aa_oa);
2184         } else { /* from async_internal() */
2185                 int i;
2186                 for (i = 0; i < aa->aa_page_count; i++)
2187                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2188
2189                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2190                         OBDO_FREE(aa->aa_oa);
2191         }
2192         osc_wake_cache_waiters(cli);
2193         osc_check_rpcs(env, cli);
2194         client_obd_list_unlock(&cli->cl_loi_list_lock);
2195         if (!async)
2196                 cl_req_completion(env, aa->aa_clerq, rc);
2197         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2198         RETURN(rc);
2199 }
2200
2201 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2202                                             struct client_obd *cli,
2203                                             struct list_head *rpc_list,
2204                                             int page_count, int cmd)
2205 {
2206         struct ptlrpc_request *req;
2207         struct brw_page **pga = NULL;
2208         struct osc_brw_async_args *aa;
2209         struct obdo *oa = NULL;
2210         const struct obd_async_page_ops *ops = NULL;
2211         void *caller_data = NULL;
2212         struct osc_async_page *oap;
2213         struct osc_async_page *tmp;
2214         struct ost_body *body;
2215         struct cl_req *clerq = NULL;
2216         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2217         struct ldlm_lock *lock = NULL;
2218         struct cl_req_attr crattr;
2219         int i, rc;
2220
2221         ENTRY;
2222         LASSERT(!list_empty(rpc_list));
2223
2224         memset(&crattr, 0, sizeof crattr);
2225         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2226         if (pga == NULL)
2227                 GOTO(out, req = ERR_PTR(-ENOMEM));
2228
2229         OBDO_ALLOC(oa);
2230         if (oa == NULL)
2231                 GOTO(out, req = ERR_PTR(-ENOMEM));
2232
2233         i = 0;
2234         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2235                 struct cl_page *page = osc_oap2cl_page(oap);
2236                 if (ops == NULL) {
2237                         ops = oap->oap_caller_ops;
2238                         caller_data = oap->oap_caller_data;
2239
2240                         clerq = cl_req_alloc(env, page, crt,
2241                                              1 /* only 1-object rpcs for
2242                                                 * now */);
2243                         if (IS_ERR(clerq))
2244                                 GOTO(out, req = (void *)clerq);
2245                         lock = oap->oap_ldlm_lock;
2246                 }
2247                 pga[i] = &oap->oap_brw_page;
2248                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2249                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2250                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2251                 i++;
2252                 cl_req_page_add(env, clerq, page);
2253         }
2254
2255         /* always get the data for the obdo for the rpc */
2256         LASSERT(ops != NULL);
2257         crattr.cra_oa = oa;
2258         crattr.cra_capa = NULL;
2259         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2260         if (lock) {
2261                 oa->o_handle = lock->l_remote_handle;
2262                 oa->o_valid |= OBD_MD_FLHANDLE;
2263         }
2264
2265         rc = cl_req_prep(env, clerq);
2266         if (rc != 0) {
2267                 CERROR("cl_req_prep failed: %d\n", rc);
2268                 GOTO(out, req = ERR_PTR(rc));
2269         }
2270
2271         sort_brw_pages(pga, page_count);
2272         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2273                                   pga, &req, crattr.cra_capa, 1);
2274         if (rc != 0) {
2275                 CERROR("prep_req failed: %d\n", rc);
2276                 GOTO(out, req = ERR_PTR(rc));
2277         }
2278
2279         /* Need to update the timestamps after the request is built in case
2280          * we race with setattr (locally or in queue at OST).  If OST gets
2281          * later setattr before earlier BRW (as determined by the request xid),
2282          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2283          * way to do this in a single call.  bug 10150 */
2284         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2285         cl_req_attr_set(env, clerq, &crattr,
2286                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2287
2288         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2289         aa = ptlrpc_req_async_args(req);
2290         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2291         list_splice(rpc_list, &aa->aa_oaps);
2292         CFS_INIT_LIST_HEAD(rpc_list);
2293         aa->aa_clerq = clerq;
2294 out:
2295         capa_put(crattr.cra_capa);
2296         if (IS_ERR(req)) {
2297                 if (oa)
2298                         OBDO_FREE(oa);
2299                 if (pga)
2300                         OBD_FREE(pga, sizeof(*pga) * page_count);
2301                 /* this should happen rarely and is pretty bad, it makes the
2302                  * pending list not follow the dirty order */
2303                 client_obd_list_lock(&cli->cl_loi_list_lock);
2304                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2305                         list_del_init(&oap->oap_rpc_item);
2306
2307                         /* queued sync pages can be torn down while the pages
2308                          * were between the pending list and the rpc */
2309                         if (oap->oap_interrupted) {
2310                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2311                                 osc_ap_completion(env, cli, NULL, oap, 0,
2312                                                   oap->oap_count);
2313                                 continue;
2314                         }
2315                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2316                 }
2317                 if (clerq && !IS_ERR(clerq))
2318                         cl_req_completion(env, clerq, PTR_ERR(req));
2319         }
2320         RETURN(req);
2321 }
2322
2323 /**
2324  * prepare pages for ASYNC io and put pages in send queue.
2325  *
2326  * \param cmd OBD_BRW_* macroses
2327  * \param lop pending pages
2328  *
2329  * \return zero if pages successfully add to send queue.
2330  * \return not zere if error occurring.
2331  */
2332 static int
2333 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2334                  struct lov_oinfo *loi,
2335                  int cmd, struct loi_oap_pages *lop)
2336 {
2337         struct ptlrpc_request *req;
2338         obd_count page_count = 0;
2339         struct osc_async_page *oap = NULL, *tmp;
2340         struct osc_brw_async_args *aa;
2341         const struct obd_async_page_ops *ops;
2342         CFS_LIST_HEAD(rpc_list);
2343         CFS_LIST_HEAD(tmp_list);
2344         unsigned int ending_offset;
2345         unsigned  starting_offset = 0;
2346         int srvlock = 0;
2347         struct cl_object *clob = NULL;
2348         ENTRY;
2349
2350         /* ASYNC_HP pages first. At present, when the lock the pages is
2351          * to be canceled, the pages covered by the lock will be sent out
2352          * with ASYNC_HP. We have to send out them as soon as possible. */
2353         list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2354                 if (oap->oap_async_flags & ASYNC_HP) 
2355                         list_move(&oap->oap_pending_item, &tmp_list);
2356                 else
2357                         list_move_tail(&oap->oap_pending_item, &tmp_list);
2358                 if (++page_count >= cli->cl_max_pages_per_rpc)
2359                         break;
2360         }
2361
2362         list_splice(&tmp_list, &lop->lop_pending);
2363         page_count = 0;
2364
2365         /* first we find the pages we're allowed to work with */
2366         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2367                                  oap_pending_item) {
2368                 ops = oap->oap_caller_ops;
2369
2370                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2371                          "magic 0x%x\n", oap, oap->oap_magic);
2372
2373                 if (clob == NULL) {
2374                         /* pin object in memory, so that completion call-backs
2375                          * can be safely called under client_obd_list lock. */
2376                         clob = osc_oap2cl_page(oap)->cp_obj;
2377                         cl_object_get(clob);
2378                 }
2379
2380                 if (page_count != 0 &&
2381                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2382                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2383                                " oap %p, page %p, srvlock %u\n",
2384                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2385                         break;
2386                 }
2387
2388                 /* If there is a gap at the start of this page, it can't merge
2389                  * with any previous page, so we'll hand the network a
2390                  * "fragmented" page array that it can't transfer in 1 RDMA */
2391                 if (page_count != 0 && oap->oap_page_off != 0)
2392                         break;
2393
2394                 /* in llite being 'ready' equates to the page being locked
2395                  * until completion unlocks it.  commit_write submits a page
2396                  * as not ready because its unlock will happen unconditionally
2397                  * as the call returns.  if we race with commit_write giving
2398                  * us that page we dont' want to create a hole in the page
2399                  * stream, so we stop and leave the rpc to be fired by
2400                  * another dirtier or kupdated interval (the not ready page
2401                  * will still be on the dirty list).  we could call in
2402                  * at the end of ll_file_write to process the queue again. */
2403                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2404                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2405                                                     cmd);
2406                         if (rc < 0)
2407                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2408                                                 "instead of ready\n", oap,
2409                                                 oap->oap_page, rc);
2410                         switch (rc) {
2411                         case -EAGAIN:
2412                                 /* llite is telling us that the page is still
2413                                  * in commit_write and that we should try
2414                                  * and put it in an rpc again later.  we
2415                                  * break out of the loop so we don't create
2416                                  * a hole in the sequence of pages in the rpc
2417                                  * stream.*/
2418                                 oap = NULL;
2419                                 break;
2420                         case -EINTR:
2421                                 /* the io isn't needed.. tell the checks
2422                                  * below to complete the rpc with EINTR */
2423                                 spin_lock(&oap->oap_lock);
2424                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2425                                 spin_unlock(&oap->oap_lock);
2426                                 oap->oap_count = -EINTR;
2427                                 break;
2428                         case 0:
2429                                 spin_lock(&oap->oap_lock);
2430                                 oap->oap_async_flags |= ASYNC_READY;
2431                                 spin_unlock(&oap->oap_lock);
2432                                 break;
2433                         default:
2434                                 LASSERTF(0, "oap %p page %p returned %d "
2435                                             "from make_ready\n", oap,
2436                                             oap->oap_page, rc);
2437                                 break;
2438                         }
2439                 }
2440                 if (oap == NULL)
2441                         break;
2442                 /*
2443                  * Page submitted for IO has to be locked. Either by
2444                  * ->ap_make_ready() or by higher layers.
2445                  */
2446 #if defined(__KERNEL__) && defined(__linux__)
2447                 {
2448                         struct cl_page *page;
2449
2450                         page = osc_oap2cl_page(oap);
2451
2452                         if (page->cp_type == CPT_CACHEABLE &&
2453                             !(PageLocked(oap->oap_page) &&
2454                               (CheckWriteback(oap->oap_page, cmd)))) {
2455                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2456                                        oap->oap_page,
2457                                        (long)oap->oap_page->flags,
2458                                        oap->oap_async_flags);
2459                                 LBUG();
2460                         }
2461                 }
2462 #endif
2463
2464                 /* take the page out of our book-keeping */
2465                 list_del_init(&oap->oap_pending_item);
2466                 lop_update_pending(cli, lop, cmd, -1);
2467                 list_del_init(&oap->oap_urgent_item);
2468
2469                 if (page_count == 0)
2470                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2471                                           (PTLRPC_MAX_BRW_SIZE - 1);
2472
2473                 /* ask the caller for the size of the io as the rpc leaves. */
2474                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2475                         oap->oap_count =
2476                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2477                                                       cmd);
2478                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2479                 }
2480                 if (oap->oap_count <= 0) {
2481                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2482                                oap->oap_count);
2483                         osc_ap_completion(env, cli, NULL,
2484                                           oap, 0, oap->oap_count);
2485                         continue;
2486                 }
2487
2488                 /* now put the page back in our accounting */
2489                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2490                 if (page_count == 0)
2491                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2492                 if (++page_count >= cli->cl_max_pages_per_rpc)
2493                         break;
2494
2495                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2496                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2497                  * have the same alignment as the initial writes that allocated
2498                  * extents on the server. */
2499                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2500                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2501                 if (ending_offset == 0)
2502                         break;
2503
2504                 /* If there is a gap at the end of this page, it can't merge
2505                  * with any subsequent pages, so we'll hand the network a
2506                  * "fragmented" page array that it can't transfer in 1 RDMA */
2507                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2508                         break;
2509         }
2510
2511         osc_wake_cache_waiters(cli);
2512
2513         loi_list_maint(cli, loi);
2514
2515         client_obd_list_unlock(&cli->cl_loi_list_lock);
2516
2517         if (clob != NULL)
2518                 cl_object_put(env, clob);
2519
2520         if (page_count == 0) {
2521                 client_obd_list_lock(&cli->cl_loi_list_lock);
2522                 RETURN(0);
2523         }
2524
2525         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2526         if (IS_ERR(req)) {
2527                 LASSERT(list_empty(&rpc_list));
2528                 loi_list_maint(cli, loi);
2529                 RETURN(PTR_ERR(req));
2530         }
2531
2532         aa = ptlrpc_req_async_args(req);
2533
2534         if (cmd == OBD_BRW_READ) {
2535                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2536                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2537                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2538                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2539         } else {
2540                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2541                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2542                                  cli->cl_w_in_flight);
2543                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2544                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2545         }
2546         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2547
2548         client_obd_list_lock(&cli->cl_loi_list_lock);
2549
2550         if (cmd == OBD_BRW_READ)
2551                 cli->cl_r_in_flight++;
2552         else
2553                 cli->cl_w_in_flight++;
2554
2555         /* queued sync pages can be torn down while the pages
2556          * were between the pending list and the rpc */
2557         tmp = NULL;
2558         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2559                 /* only one oap gets a request reference */
2560                 if (tmp == NULL)
2561                         tmp = oap;
2562                 if (oap->oap_interrupted && !req->rq_intr) {
2563                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2564                                oap, req);
2565                         ptlrpc_mark_interrupted(req);
2566                 }
2567         }
2568         if (tmp != NULL)
2569                 tmp->oap_request = ptlrpc_request_addref(req);
2570
2571         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2572                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2573
2574         req->rq_interpret_reply = brw_interpret;
2575         ptlrpcd_add_req(req, PSCOPE_BRW);
2576         RETURN(1);
2577 }
2578
2579 #define LOI_DEBUG(LOI, STR, args...)                                     \
2580         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2581                !list_empty(&(LOI)->loi_ready_item) ||                    \
2582                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2583                (LOI)->loi_write_lop.lop_num_pending,                     \
2584                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2585                (LOI)->loi_read_lop.lop_num_pending,                      \
2586                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2587                args)                                                     \
2588
2589 /* This is called by osc_check_rpcs() to find which objects have pages that
2590  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2591 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2592 {
2593         ENTRY;
2594
2595         /* First return objects that have blocked locks so that they
2596          * will be flushed quickly and other clients can get the lock,
2597          * then objects which have pages ready to be stuffed into RPCs */
2598         if (!list_empty(&cli->cl_loi_hp_ready_list))
2599                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2600                                   struct lov_oinfo, loi_hp_ready_item));
2601         if (!list_empty(&cli->cl_loi_ready_list))
2602                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2603                                   struct lov_oinfo, loi_ready_item));
2604
2605         /* then if we have cache waiters, return all objects with queued
2606          * writes.  This is especially important when many small files
2607          * have filled up the cache and not been fired into rpcs because
2608          * they don't pass the nr_pending/object threshhold */
2609         if (!list_empty(&cli->cl_cache_waiters) &&
2610             !list_empty(&cli->cl_loi_write_list))
2611                 RETURN(list_entry(cli->cl_loi_write_list.next,
2612                                   struct lov_oinfo, loi_write_item));
2613
2614         /* then return all queued objects when we have an invalid import
2615          * so that they get flushed */
2616         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2617                 if (!list_empty(&cli->cl_loi_write_list))
2618                         RETURN(list_entry(cli->cl_loi_write_list.next,
2619                                           struct lov_oinfo, loi_write_item));
2620                 if (!list_empty(&cli->cl_loi_read_list))
2621                         RETURN(list_entry(cli->cl_loi_read_list.next,
2622                                           struct lov_oinfo, loi_read_item));
2623         }
2624         RETURN(NULL);
2625 }
2626
2627 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2628 {
2629         struct osc_async_page *oap;
2630         int hprpc = 0;
2631
2632         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2633                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2634                                  struct osc_async_page, oap_urgent_item);
2635                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2636         }
2637
2638         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2639                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2640                                  struct osc_async_page, oap_urgent_item);
2641                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2642         }
2643
2644         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2645 }
2646
2647 /* called with the loi list lock held */
2648 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2649 {
2650         struct lov_oinfo *loi;
2651         int rc = 0, race_counter = 0;
2652         ENTRY;
2653
2654         while ((loi = osc_next_loi(cli)) != NULL) {
2655                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2656
2657                 if (osc_max_rpc_in_flight(cli, loi))
2658                         break;
2659
2660                 /* attempt some read/write balancing by alternating between
2661                  * reads and writes in an object.  The makes_rpc checks here
2662                  * would be redundant if we were getting read/write work items
2663                  * instead of objects.  we don't want send_oap_rpc to drain a
2664                  * partial read pending queue when we're given this object to
2665                  * do io on writes while there are cache waiters */
2666                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2667                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2668                                               &loi->loi_write_lop);
2669                         if (rc < 0) {
2670                                 CERROR("Write request failed with %d\n", rc);
2671
2672                                 /* osc_send_oap_rpc failed, mostly because of
2673                                  * memory pressure.
2674                                  *
2675                                  * It can't break here, because if:
2676                                  *  - a page was submitted by osc_io_submit, so
2677                                  *    page locked;
2678                                  *  - no request in flight
2679                                  *  - no subsequent request
2680                                  * The system will be in live-lock state,
2681                                  * because there is no chance to call
2682                                  * osc_io_unplug() and osc_check_rpcs() any
2683                                  * more. pdflush can't help in this case,
2684                                  * because it might be blocked at grabbing
2685                                  * the page lock as we mentioned.
2686                                  *
2687                                  * Anyway, continue to drain pages. */
2688                                 /* break; */
2689                         }
2690
2691                         if (rc > 0)
2692                                 race_counter = 0;
2693                         else
2694                                 race_counter++;
2695                 }
2696                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2697                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2698                                               &loi->loi_read_lop);
2699                         if (rc < 0)
2700                                 CERROR("Read request failed with %d\n", rc);
2701
2702                         if (rc > 0)
2703                                 race_counter = 0;
2704                         else
2705                                 race_counter++;
2706                 }
2707
2708                 /* attempt some inter-object balancing by issueing rpcs
2709                  * for each object in turn */
2710                 if (!list_empty(&loi->loi_hp_ready_item))
2711                         list_del_init(&loi->loi_hp_ready_item);
2712                 if (!list_empty(&loi->loi_ready_item))
2713                         list_del_init(&loi->loi_ready_item);
2714                 if (!list_empty(&loi->loi_write_item))
2715                         list_del_init(&loi->loi_write_item);
2716                 if (!list_empty(&loi->loi_read_item))
2717                         list_del_init(&loi->loi_read_item);
2718
2719                 loi_list_maint(cli, loi);
2720
2721                 /* send_oap_rpc fails with 0 when make_ready tells it to
2722                  * back off.  llite's make_ready does this when it tries
2723                  * to lock a page queued for write that is already locked.
2724                  * we want to try sending rpcs from many objects, but we
2725                  * don't want to spin failing with 0.  */
2726                 if (race_counter == 10)
2727                         break;
2728         }
2729         EXIT;
2730 }
2731
2732 /* we're trying to queue a page in the osc so we're subject to the
2733  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2734  * If the osc's queued pages are already at that limit, then we want to sleep
2735  * until there is space in the osc's queue for us.  We also may be waiting for
2736  * write credits from the OST if there are RPCs in flight that may return some
2737  * before we fall back to sync writes.
2738  *
2739  * We need this know our allocation was granted in the presence of signals */
2740 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2741 {
2742         int rc;
2743         ENTRY;
2744         client_obd_list_lock(&cli->cl_loi_list_lock);
2745         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2746         client_obd_list_unlock(&cli->cl_loi_list_lock);
2747         RETURN(rc);
2748 };
2749
2750 /**
2751  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2752  * is available.
2753  */
2754 int osc_enter_cache_try(const struct lu_env *env,
2755                         struct client_obd *cli, struct lov_oinfo *loi,
2756                         struct osc_async_page *oap, int transient)
2757 {
2758         int has_grant;
2759
2760         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2761         if (has_grant) {
2762                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2763                 if (transient) {
2764                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2765                         atomic_inc(&obd_dirty_transit_pages);
2766                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2767                 }
2768         }
2769         return has_grant;
2770 }
2771
2772 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2773  * grant or cache space. */
2774 static int osc_enter_cache(const struct lu_env *env,
2775                            struct client_obd *cli, struct lov_oinfo *loi,
2776                            struct osc_async_page *oap)
2777 {
2778         struct osc_cache_waiter ocw;
2779         struct l_wait_info lwi = { 0 };
2780
2781         ENTRY;
2782
2783         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2784                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2785                cli->cl_dirty_max, obd_max_dirty_pages,
2786                cli->cl_lost_grant, cli->cl_avail_grant);
2787
2788         /* force the caller to try sync io.  this can jump the list
2789          * of queued writes and create a discontiguous rpc stream */
2790         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2791             loi->loi_ar.ar_force_sync)
2792                 RETURN(-EDQUOT);
2793
2794         /* Hopefully normal case - cache space and write credits available */
2795         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2796             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2797             osc_enter_cache_try(env, cli, loi, oap, 0))
2798                 RETURN(0);
2799
2800         /* Make sure that there are write rpcs in flight to wait for.  This
2801          * is a little silly as this object may not have any pending but
2802          * other objects sure might. */
2803         if (cli->cl_w_in_flight) {
2804                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2805                 cfs_waitq_init(&ocw.ocw_waitq);
2806                 ocw.ocw_oap = oap;
2807                 ocw.ocw_rc = 0;
2808
2809                 loi_list_maint(cli, loi);
2810                 osc_check_rpcs(env, cli);
2811                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2812
2813                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2814                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2815
2816                 client_obd_list_lock(&cli->cl_loi_list_lock);
2817                 if (!list_empty(&ocw.ocw_entry)) {
2818                         list_del(&ocw.ocw_entry);
2819                         RETURN(-EINTR);
2820                 }
2821                 RETURN(ocw.ocw_rc);
2822         }
2823
2824         RETURN(-EDQUOT);
2825 }
2826
2827
2828 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2829                         struct lov_oinfo *loi, cfs_page_t *page,
2830                         obd_off offset, const struct obd_async_page_ops *ops,
2831                         void *data, void **res, int nocache,
2832                         struct lustre_handle *lockh)
2833 {
2834         struct osc_async_page *oap;
2835
2836         ENTRY;
2837
2838         if (!page)
2839                 return size_round(sizeof(*oap));
2840
2841         oap = *res;
2842         oap->oap_magic = OAP_MAGIC;
2843         oap->oap_cli = &exp->exp_obd->u.cli;
2844         oap->oap_loi = loi;
2845
2846         oap->oap_caller_ops = ops;
2847         oap->oap_caller_data = data;
2848
2849         oap->oap_page = page;
2850         oap->oap_obj_off = offset;
2851         if (!client_is_remote(exp) &&
2852             cfs_capable(CFS_CAP_SYS_RESOURCE))
2853                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2854
2855         LASSERT(!(offset & ~CFS_PAGE_MASK));
2856
2857         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2858         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2859         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2860         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2861
2862         spin_lock_init(&oap->oap_lock);
2863         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2864         RETURN(0);
2865 }
2866
2867 struct osc_async_page *oap_from_cookie(void *cookie)
2868 {
2869         struct osc_async_page *oap = cookie;
2870         if (oap->oap_magic != OAP_MAGIC)
2871                 return ERR_PTR(-EINVAL);
2872         return oap;
2873 };
2874
2875 int osc_queue_async_io(const struct lu_env *env,
2876                        struct obd_export *exp, struct lov_stripe_md *lsm,
2877                        struct lov_oinfo *loi, void *cookie,
2878                        int cmd, obd_off off, int count,
2879                        obd_flag brw_flags, enum async_flags async_flags)
2880 {
2881         struct client_obd *cli = &exp->exp_obd->u.cli;
2882         struct osc_async_page *oap;
2883         int rc = 0;
2884         ENTRY;
2885
2886         oap = oap_from_cookie(cookie);
2887         if (IS_ERR(oap))
2888                 RETURN(PTR_ERR(oap));
2889
2890         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2891                 RETURN(-EIO);
2892
2893         if (!list_empty(&oap->oap_pending_item) ||
2894             !list_empty(&oap->oap_urgent_item) ||
2895             !list_empty(&oap->oap_rpc_item))
2896                 RETURN(-EBUSY);
2897
2898         /* check if the file's owner/group is over quota */
2899         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2900                 struct cl_object *obj;
2901                 struct cl_attr    attr; /* XXX put attr into thread info */
2902                 unsigned int qid[MAXQUOTAS];
2903
2904                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2905
2906                 cl_object_attr_lock(obj);
2907                 rc = cl_object_attr_get(env, obj, &attr);
2908                 cl_object_attr_unlock(obj);
2909
2910                 qid[USRQUOTA] = attr.cat_uid;
2911                 qid[GRPQUOTA] = attr.cat_gid;
2912                 if (rc == 0 &&
2913                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2914                         rc = -EDQUOT;
2915                 if (rc)
2916                         RETURN(rc);
2917         }
2918
2919         if (loi == NULL)
2920                 loi = lsm->lsm_oinfo[0];
2921
2922         client_obd_list_lock(&cli->cl_loi_list_lock);
2923
2924         LASSERT(off + count <= CFS_PAGE_SIZE);
2925         oap->oap_cmd = cmd;
2926         oap->oap_page_off = off;
2927         oap->oap_count = count;
2928         oap->oap_brw_flags = brw_flags;
2929         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2930         if (libcfs_memory_pressure_get())
2931                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2932         spin_lock(&oap->oap_lock);
2933         oap->oap_async_flags = async_flags;
2934         spin_unlock(&oap->oap_lock);
2935
2936         if (cmd & OBD_BRW_WRITE) {
2937                 rc = osc_enter_cache(env, cli, loi, oap);
2938                 if (rc) {
2939                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2940                         RETURN(rc);
2941                 }
2942         }
2943
2944         osc_oap_to_pending(oap);
2945         loi_list_maint(cli, loi);
2946
2947         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2948                   cmd);
2949
2950         osc_check_rpcs(env, cli);
2951         client_obd_list_unlock(&cli->cl_loi_list_lock);
2952
2953         RETURN(0);
2954 }
2955
2956 /* aka (~was & now & flag), but this is more clear :) */
2957 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2958
2959 int osc_set_async_flags_base(struct client_obd *cli,
2960                              struct lov_oinfo *loi, struct osc_async_page *oap,
2961                              obd_flag async_flags)
2962 {
2963         struct loi_oap_pages *lop;
2964         int flags = 0;
2965         ENTRY;
2966
2967         LASSERT(!list_empty(&oap->oap_pending_item));
2968
2969         if (oap->oap_cmd & OBD_BRW_WRITE) {
2970                 lop = &loi->loi_write_lop;
2971         } else {
2972                 lop = &loi->loi_read_lop;
2973         }
2974
2975         if ((oap->oap_async_flags & async_flags) == async_flags)
2976                 RETURN(0);
2977
2978         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2979                 flags |= ASYNC_READY;
2980
2981         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2982             list_empty(&oap->oap_rpc_item)) {
2983                 if (oap->oap_async_flags & ASYNC_HP)
2984                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2985                 else
2986                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2987                 flags |= ASYNC_URGENT;
2988                 loi_list_maint(cli, loi);
2989         }
2990         spin_lock(&oap->oap_lock);
2991         oap->oap_async_flags |= flags;
2992         spin_unlock(&oap->oap_lock);
2993
2994         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2995                         oap->oap_async_flags);
2996         RETURN(0);
2997 }
2998
2999 int osc_teardown_async_page(struct obd_export *exp,
3000                             struct lov_stripe_md *lsm,
3001                             struct lov_oinfo *loi, void *cookie)
3002 {
3003         struct client_obd *cli = &exp->exp_obd->u.cli;
3004         struct loi_oap_pages *lop;
3005         struct osc_async_page *oap;
3006         int rc = 0;
3007         ENTRY;
3008
3009         oap = oap_from_cookie(cookie);
3010         if (IS_ERR(oap))
3011                 RETURN(PTR_ERR(oap));
3012
3013         if (loi == NULL)
3014                 loi = lsm->lsm_oinfo[0];
3015
3016         if (oap->oap_cmd & OBD_BRW_WRITE) {
3017                 lop = &loi->loi_write_lop;
3018         } else {
3019                 lop = &loi->loi_read_lop;
3020         }
3021
3022         client_obd_list_lock(&cli->cl_loi_list_lock);
3023
3024         if (!list_empty(&oap->oap_rpc_item))
3025                 GOTO(out, rc = -EBUSY);
3026
3027         osc_exit_cache(cli, oap, 0);
3028         osc_wake_cache_waiters(cli);
3029
3030         if (!list_empty(&oap->oap_urgent_item)) {
3031                 list_del_init(&oap->oap_urgent_item);
3032                 spin_lock(&oap->oap_lock);
3033                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3034                 spin_unlock(&oap->oap_lock);
3035         }
3036         if (!list_empty(&oap->oap_pending_item)) {
3037                 list_del_init(&oap->oap_pending_item);
3038                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3039         }
3040         loi_list_maint(cli, loi);
3041         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3042 out:
3043         client_obd_list_unlock(&cli->cl_loi_list_lock);
3044         RETURN(rc);
3045 }
3046
3047 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3048                                          struct ldlm_enqueue_info *einfo,
3049                                          int flags)
3050 {
3051         void *data = einfo->ei_cbdata;
3052
3053         LASSERT(lock != NULL);
3054         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3055         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3056         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3057         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3058
3059         lock_res_and_lock(lock);
3060         spin_lock(&osc_ast_guard);
3061         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3062         lock->l_ast_data = data;
3063         spin_unlock(&osc_ast_guard);
3064         unlock_res_and_lock(lock);
3065 }
3066
3067 static void osc_set_data_with_check(struct lustre_handle *lockh,
3068                                     struct ldlm_enqueue_info *einfo,
3069                                     int flags)
3070 {
3071         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3072
3073         if (lock != NULL) {
3074                 osc_set_lock_data_with_check(lock, einfo, flags);
3075                 LDLM_LOCK_PUT(lock);
3076         } else
3077                 CERROR("lockh %p, data %p - client evicted?\n",
3078                        lockh, einfo->ei_cbdata);
3079 }
3080
3081 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3082                              ldlm_iterator_t replace, void *data)
3083 {
3084         struct ldlm_res_id res_id;
3085         struct obd_device *obd = class_exp2obd(exp);
3086
3087         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3088         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3089         return 0;
3090 }
3091
3092 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3093                             obd_enqueue_update_f upcall, void *cookie,
3094                             int *flags, int rc)
3095 {
3096         int intent = *flags & LDLM_FL_HAS_INTENT;
3097         ENTRY;
3098
3099         if (intent) {
3100                 /* The request was created before ldlm_cli_enqueue call. */
3101                 if (rc == ELDLM_LOCK_ABORTED) {
3102                         struct ldlm_reply *rep;
3103                         rep = req_capsule_server_get(&req->rq_pill,
3104                                                      &RMF_DLM_REP);
3105
3106                         LASSERT(rep != NULL);
3107                         if (rep->lock_policy_res1)
3108                                 rc = rep->lock_policy_res1;
3109                 }
3110         }
3111
3112         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3113                 *flags |= LDLM_FL_LVB_READY;
3114                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3115                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3116         }
3117
3118         /* Call the update callback. */
3119         rc = (*upcall)(cookie, rc);
3120         RETURN(rc);
3121 }
3122
3123 static int osc_enqueue_interpret(const struct lu_env *env,
3124                                  struct ptlrpc_request *req,
3125                                  struct osc_enqueue_args *aa, int rc)
3126 {
3127         struct ldlm_lock *lock;
3128         struct lustre_handle handle;
3129         __u32 mode;
3130
3131         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3132          * might be freed anytime after lock upcall has been called. */
3133         lustre_handle_copy(&handle, aa->oa_lockh);
3134         mode = aa->oa_ei->ei_mode;
3135
3136         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3137          * be valid. */
3138         lock = ldlm_handle2lock(&handle);
3139
3140         /* Take an additional reference so that a blocking AST that
3141          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3142          * to arrive after an upcall has been executed by
3143          * osc_enqueue_fini(). */
3144         ldlm_lock_addref(&handle, mode);
3145
3146         /* Complete obtaining the lock procedure. */
3147         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3148                                    mode, aa->oa_flags, aa->oa_lvb,
3149                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3150                                    &handle, rc);
3151         /* Complete osc stuff. */
3152         rc = osc_enqueue_fini(req, aa->oa_lvb,
3153                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3154
3155         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3156
3157         /* Release the lock for async request. */
3158         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3159                 /*
3160                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3161                  * not already released by
3162                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3163                  */
3164                 ldlm_lock_decref(&handle, mode);
3165
3166         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3167                  aa->oa_lockh, req, aa);
3168         ldlm_lock_decref(&handle, mode);
3169         LDLM_LOCK_PUT(lock);
3170         return rc;
3171 }
3172
3173 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3174                         struct lov_oinfo *loi, int flags,
3175                         struct ost_lvb *lvb, __u32 mode, int rc)
3176 {
3177         if (rc == ELDLM_OK) {
3178                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3179                 __u64 tmp;
3180
3181                 LASSERT(lock != NULL);
3182                 loi->loi_lvb = *lvb;
3183                 tmp = loi->loi_lvb.lvb_size;
3184                 /* Extend KMS up to the end of this lock and no further
3185                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3186                 if (tmp > lock->l_policy_data.l_extent.end)
3187                         tmp = lock->l_policy_data.l_extent.end + 1;
3188                 if (tmp >= loi->loi_kms) {
3189                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3190                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3191                         loi_kms_set(loi, tmp);
3192                 } else {
3193                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3194                                    LPU64"; leaving kms="LPU64", end="LPU64,
3195                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3196                                    lock->l_policy_data.l_extent.end);
3197                 }
3198                 ldlm_lock_allow_match(lock);
3199                 LDLM_LOCK_PUT(lock);
3200         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3201                 loi->loi_lvb = *lvb;
3202                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3203                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3204                 rc = ELDLM_OK;
3205         }
3206 }
3207 EXPORT_SYMBOL(osc_update_enqueue);
3208
3209 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3210
3211 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3212  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3213  * other synchronous requests, however keeping some locks and trying to obtain
3214  * others may take a considerable amount of time in a case of ost failure; and
3215  * when other sync requests do not get released lock from a client, the client
3216  * is excluded from the cluster -- such scenarious make the life difficult, so
3217  * release locks just after they are obtained. */
3218 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3219                      int *flags, ldlm_policy_data_t *policy,
3220                      struct ost_lvb *lvb, int kms_valid,
3221                      obd_enqueue_update_f upcall, void *cookie,
3222                      struct ldlm_enqueue_info *einfo,
3223                      struct lustre_handle *lockh,
3224                      struct ptlrpc_request_set *rqset, int async)
3225 {
3226         struct obd_device *obd = exp->exp_obd;
3227         struct ptlrpc_request *req = NULL;
3228         int intent = *flags & LDLM_FL_HAS_INTENT;
3229         ldlm_mode_t mode;
3230         int rc;
3231         ENTRY;
3232
3233         /* Filesystem lock extents are extended to page boundaries so that
3234          * dealing with the page cache is a little smoother.  */
3235         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3236         policy->l_extent.end |= ~CFS_PAGE_MASK;
3237
3238         /*
3239          * kms is not valid when either object is completely fresh (so that no
3240          * locks are cached), or object was evicted. In the latter case cached
3241          * lock cannot be used, because it would prime inode state with
3242          * potentially stale LVB.
3243          */
3244         if (!kms_valid)
3245                 goto no_match;
3246
3247         /* Next, search for already existing extent locks that will cover us */
3248         /* If we're trying to read, we also search for an existing PW lock.  The
3249          * VFS and page cache already protect us locally, so lots of readers/
3250          * writers can share a single PW lock.
3251          *
3252          * There are problems with conversion deadlocks, so instead of
3253          * converting a read lock to a write lock, we'll just enqueue a new
3254          * one.
3255          *
3256          * At some point we should cancel the read lock instead of making them
3257          * send us a blocking callback, but there are problems with canceling
3258          * locks out from other users right now, too. */
3259         mode = einfo->ei_mode;
3260         if (einfo->ei_mode == LCK_PR)
3261                 mode |= LCK_PW;
3262         mode = ldlm_lock_match(obd->obd_namespace,
3263                                *flags | LDLM_FL_LVB_READY, res_id,
3264                                einfo->ei_type, policy, mode, lockh, 0);
3265         if (mode) {
3266                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3267
3268                 if (matched->l_ast_data == NULL ||
3269                     matched->l_ast_data == einfo->ei_cbdata) {
3270                         /* addref the lock only if not async requests and PW
3271                          * lock is matched whereas we asked for PR. */
3272                         if (!rqset && einfo->ei_mode != mode)
3273                                 ldlm_lock_addref(lockh, LCK_PR);
3274                         osc_set_lock_data_with_check(matched, einfo, *flags);
3275                         if (intent) {
3276                                 /* I would like to be able to ASSERT here that
3277                                  * rss <= kms, but I can't, for reasons which
3278                                  * are explained in lov_enqueue() */
3279                         }
3280
3281                         /* We already have a lock, and it's referenced */
3282                         (*upcall)(cookie, ELDLM_OK);
3283
3284                         /* For async requests, decref the lock. */
3285                         if (einfo->ei_mode != mode)
3286                                 ldlm_lock_decref(lockh, LCK_PW);
3287                         else if (rqset)
3288                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3289                         LDLM_LOCK_PUT(matched);
3290                         RETURN(ELDLM_OK);
3291                 } else
3292                         ldlm_lock_decref(lockh, mode);
3293                 LDLM_LOCK_PUT(matched);
3294         }
3295
3296  no_match:
3297         if (intent) {
3298                 CFS_LIST_HEAD(cancels);
3299                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3300                                            &RQF_LDLM_ENQUEUE_LVB);
3301                 if (req == NULL)
3302                         RETURN(-ENOMEM);
3303
3304                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3305                 if (rc)
3306                         RETURN(rc);
3307
3308                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3309                                      sizeof *lvb);
3310                 ptlrpc_request_set_replen(req);
3311         }
3312
3313         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3314         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3315
3316         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3317                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3318         if (rqset) {
3319                 if (!rc) {
3320                         struct osc_enqueue_args *aa;
3321                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3322                         aa = ptlrpc_req_async_args(req);
3323                         aa->oa_ei = einfo;
3324                         aa->oa_exp = exp;
3325                         aa->oa_flags  = flags;
3326                         aa->oa_upcall = upcall;
3327                         aa->oa_cookie = cookie;
3328                         aa->oa_lvb    = lvb;
3329                         aa->oa_lockh  = lockh;
3330
3331                         req->rq_interpret_reply =
3332                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3333                         if (rqset == PTLRPCD_SET)
3334                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3335                         else
3336                                 ptlrpc_set_add_req(rqset, req);
3337                 } else if (intent) {
3338                         ptlrpc_req_finished(req);
3339                 }
3340                 RETURN(rc);
3341         }
3342
3343         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3344         if (intent)
3345                 ptlrpc_req_finished(req);
3346
3347         RETURN(rc);
3348 }
3349
3350 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3351                        struct ldlm_enqueue_info *einfo,
3352                        struct ptlrpc_request_set *rqset)
3353 {
3354         struct ldlm_res_id res_id;
3355         int rc;
3356         ENTRY;
3357
3358         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3359                            oinfo->oi_md->lsm_object_gr, &res_id);
3360
3361         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3362                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3363                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3364                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3365                               rqset, rqset != NULL);
3366         RETURN(rc);
3367 }
3368
3369 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3370                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3371                    int *flags, void *data, struct lustre_handle *lockh,
3372                    int unref)
3373 {
3374         struct obd_device *obd = exp->exp_obd;
3375         int lflags = *flags;
3376         ldlm_mode_t rc;
3377         ENTRY;
3378
3379         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3380                 RETURN(-EIO);
3381
3382         /* Filesystem lock extents are extended to page boundaries so that
3383          * dealing with the page cache is a little smoother */
3384         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3385         policy->l_extent.end |= ~CFS_PAGE_MASK;
3386
3387         /* Next, search for already existing extent locks that will cover us */
3388         /* If we're trying to read, we also search for an existing PW lock.  The
3389          * VFS and page cache already protect us locally, so lots of readers/
3390          * writers can share a single PW lock. */
3391         rc = mode;
3392         if (mode == LCK_PR)
3393                 rc |= LCK_PW;
3394         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3395                              res_id, type, policy, rc, lockh, unref);
3396         if (rc) {
3397                 if (data != NULL)
3398                         osc_set_data_with_check(lockh, data, lflags);
3399                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3400                         ldlm_lock_addref(lockh, LCK_PR);
3401                         ldlm_lock_decref(lockh, LCK_PW);
3402                 }
3403                 RETURN(rc);
3404         }
3405         RETURN(rc);
3406 }
3407
3408 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3409 {
3410         ENTRY;
3411
3412         if (unlikely(mode == LCK_GROUP))
3413                 ldlm_lock_decref_and_cancel(lockh, mode);
3414         else
3415                 ldlm_lock_decref(lockh, mode);
3416
3417         RETURN(0);
3418 }
3419
3420 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3421                       __u32 mode, struct lustre_handle *lockh)
3422 {
3423         ENTRY;
3424         RETURN(osc_cancel_base(lockh, mode));
3425 }
3426
3427 static int osc_cancel_unused(struct obd_export *exp,
3428                              struct lov_stripe_md *lsm, int flags,
3429                              void *opaque)
3430 {
3431         struct obd_device *obd = class_exp2obd(exp);
3432         struct ldlm_res_id res_id, *resp = NULL;
3433
3434         if (lsm != NULL) {
3435                 resp = osc_build_res_name(lsm->lsm_object_id,
3436                                           lsm->lsm_object_gr, &res_id);
3437         }
3438
3439         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3440 }
3441
3442 static int osc_statfs_interpret(const struct lu_env *env,
3443                                 struct ptlrpc_request *req,
3444                                 struct osc_async_args *aa, int rc)
3445 {
3446         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3447         struct obd_statfs *msfs;
3448         __u64 used;
3449         ENTRY;
3450
3451         if (rc == -EBADR)
3452                 /* The request has in fact never been sent
3453                  * due to issues at a higher level (LOV).
3454                  * Exit immediately since the caller is
3455                  * aware of the problem and takes care
3456                  * of the clean up */
3457                  RETURN(rc);
3458
3459         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3460             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3461                 GOTO(out, rc = 0);
3462
3463         if (rc != 0)
3464                 GOTO(out, rc);
3465
3466         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3467         if (msfs == NULL) {
3468                 GOTO(out, rc = -EPROTO);
3469         }
3470
3471         /* Reinitialize the RDONLY and DEGRADED flags at the client
3472          * on each statfs, so they don't stay set permanently. */
3473         spin_lock(&cli->cl_oscc.oscc_lock);
3474
3475         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3476                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3477         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3478                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3479
3480         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3481                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3482         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3483                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3484
3485         /* Add a bit of hysteresis so this flag isn't continually flapping,
3486          * and ensure that new files don't get extremely fragmented due to
3487          * only a small amount of available space in the filesystem.
3488          * We want to set the NOSPC flag when there is less than ~0.1% free
3489          * and clear it when there is at least ~0.2% free space, so:
3490          *                   avail < ~0.1% max          max = avail + used
3491          *            1025 * avail < avail + used       used = blocks - free
3492          *            1024 * avail < used
3493          *            1024 * avail < blocks - free                      
3494          *                   avail < ((blocks - free) >> 10)    
3495          *
3496          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3497          * lose that amount of space so in those cases we report no space left
3498          * if their is less than 1 GB left.                             */
3499         used = min((msfs->os_blocks - msfs->os_bfree) >> 10, 1ULL << 30);
3500         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3501                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3502                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3503         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3504                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3505                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3506
3507         spin_unlock(&cli->cl_oscc.oscc_lock);
3508
3509         *aa->aa_oi->oi_osfs = *msfs;
3510 out:
3511         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3512         RETURN(rc);
3513 }
3514
3515 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3516                             __u64 max_age, struct ptlrpc_request_set *rqset)
3517 {
3518         struct ptlrpc_request *req;
3519         struct osc_async_args *aa;
3520         int                    rc;
3521         ENTRY;
3522
3523         /* We could possibly pass max_age in the request (as an absolute
3524          * timestamp or a "seconds.usec ago") so the target can avoid doing
3525          * extra calls into the filesystem if that isn't necessary (e.g.
3526          * during mount that would help a bit).  Having relative timestamps
3527          * is not so great if request processing is slow, while absolute
3528          * timestamps are not ideal because they need time synchronization. */
3529         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3530         if (req == NULL)
3531                 RETURN(-ENOMEM);
3532
3533         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3534         if (rc) {
3535                 ptlrpc_request_free(req);
3536                 RETURN(rc);
3537         }
3538         ptlrpc_request_set_replen(req);
3539         req->rq_request_portal = OST_CREATE_PORTAL;
3540         ptlrpc_at_set_req_timeout(req);
3541
3542         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3543                 /* procfs requests not want stat in wait for avoid deadlock */
3544                 req->rq_no_resend = 1;
3545                 req->rq_no_delay = 1;
3546         }
3547
3548         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3549         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3550         aa = ptlrpc_req_async_args(req);
3551         aa->aa_oi = oinfo;
3552
3553         ptlrpc_set_add_req(rqset, req);
3554         RETURN(0);
3555 }
3556
3557 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3558                       __u64 max_age, __u32 flags)
3559 {
3560         struct obd_statfs     *msfs;
3561         struct ptlrpc_request *req;
3562         struct obd_import     *imp = NULL;
3563         int rc;
3564         ENTRY;
3565
3566         /*Since the request might also come from lprocfs, so we need
3567          *sync this with client_disconnect_export Bug15684*/
3568         down_read(&obd->u.cli.cl_sem);
3569         if (obd->u.cli.cl_import)
3570                 imp = class_import_get(obd->u.cli.cl_import);
3571         up_read(&obd->u.cli.cl_sem);
3572         if (!imp)
3573                 RETURN(-ENODEV);
3574
3575         /* We could possibly pass max_age in the request (as an absolute
3576          * timestamp or a "seconds.usec ago") so the target can avoid doing
3577          * extra calls into the filesystem if that isn't necessary (e.g.
3578          * during mount that would help a bit).  Having relative timestamps
3579          * is not so great if request processing is slow, while absolute
3580          * timestamps are not ideal because they need time synchronization. */
3581         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3582
3583         class_import_put(imp);
3584
3585         if (req == NULL)
3586                 RETURN(-ENOMEM);
3587
3588         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3589         if (rc) {
3590                 ptlrpc_request_free(req);
3591                 RETURN(rc);
3592         }
3593         ptlrpc_request_set_replen(req);
3594         req->rq_request_portal = OST_CREATE_PORTAL;
3595         ptlrpc_at_set_req_timeout(req);
3596
3597         if (flags & OBD_STATFS_NODELAY) {
3598                 /* procfs requests not want stat in wait for avoid deadlock */
3599                 req->rq_no_resend = 1;
3600                 req->rq_no_delay = 1;
3601         }
3602
3603         rc = ptlrpc_queue_wait(req);
3604         if (rc)
3605                 GOTO(out, rc);
3606
3607         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3608         if (msfs == NULL) {
3609                 GOTO(out, rc = -EPROTO);
3610         }
3611
3612         *osfs = *msfs;
3613
3614         EXIT;
3615  out:
3616         ptlrpc_req_finished(req);
3617         return rc;
3618 }
3619
3620 /* Retrieve object striping information.
3621  *
3622  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3623  * the maximum number of OST indices which will fit in the user buffer.
3624  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3625  */
3626 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3627 {
3628         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3629         struct lov_user_md_v3 lum, *lumk;
3630         struct lov_user_ost_data_v1 *lmm_objects;
3631         int rc = 0, lum_size;
3632         ENTRY;
3633
3634         if (!lsm)
3635                 RETURN(-ENODATA);
3636
3637         /* we only need the header part from user space to get lmm_magic and
3638          * lmm_stripe_count, (the header part is common to v1 and v3) */
3639         lum_size = sizeof(struct lov_user_md_v1);
3640         if (copy_from_user(&lum, lump, lum_size))
3641                 RETURN(-EFAULT);
3642
3643         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3644             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3645                 RETURN(-EINVAL);
3646
3647         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3648         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3649         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3650         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3651
3652         /* we can use lov_mds_md_size() to compute lum_size
3653          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3654         if (lum.lmm_stripe_count > 0) {
3655                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3656                 OBD_ALLOC(lumk, lum_size);
3657                 if (!lumk)
3658                         RETURN(-ENOMEM);
3659
3660                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3661                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3662                 else
3663                         lmm_objects = &(lumk->lmm_objects[0]);
3664                 lmm_objects->l_object_id = lsm->lsm_object_id;
3665         } else {
3666                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3667                 lumk = &lum;
3668         }
3669
3670         lumk->lmm_object_id = lsm->lsm_object_id;
3671         lumk->lmm_object_gr = lsm->lsm_object_gr;
3672         lumk->lmm_stripe_count = 1;
3673
3674         if (copy_to_user(lump, lumk, lum_size))
3675                 rc = -EFAULT;
3676
3677         if (lumk != &lum)
3678                 OBD_FREE(lumk, lum_size);
3679
3680         RETURN(rc);
3681 }
3682
3683
3684 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3685                          void *karg, void *uarg)
3686 {
3687         struct obd_device *obd = exp->exp_obd;
3688         struct obd_ioctl_data *data = karg;
3689         int err = 0;
3690         ENTRY;
3691
3692         if (!try_module_get(THIS_MODULE)) {
3693                 CERROR("Can't get module. Is it alive?");
3694                 return -EINVAL;
3695         }
3696         switch (cmd) {
3697         case OBD_IOC_LOV_GET_CONFIG: {
3698                 char *buf;
3699                 struct lov_desc *desc;
3700                 struct obd_uuid uuid;
3701
3702                 buf = NULL;
3703                 len = 0;
3704                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3705                         GOTO(out, err = -EINVAL);
3706
3707                 data = (struct obd_ioctl_data *)buf;
3708
3709                 if (sizeof(*desc) > data->ioc_inllen1) {
3710                         obd_ioctl_freedata(buf, len);
3711                         GOTO(out, err = -EINVAL);
3712                 }
3713
3714                 if (data->ioc_inllen2 < sizeof(uuid)) {
3715                         obd_ioctl_freedata(buf, len);
3716                         GOTO(out, err = -EINVAL);
3717                 }
3718
3719                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3720                 desc->ld_tgt_count = 1;
3721                 desc->ld_active_tgt_count = 1;
3722                 desc->ld_default_stripe_count = 1;
3723                 desc->ld_default_stripe_size = 0;
3724                 desc->ld_default_stripe_offset = 0;
3725                 desc->ld_pattern = 0;
3726                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3727
3728                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3729
3730                 err = copy_to_user((void *)uarg, buf, len);
3731                 if (err)
3732                         err = -EFAULT;
3733                 obd_ioctl_freedata(buf, len);
3734                 GOTO(out, err);
3735         }
3736         case LL_IOC_LOV_SETSTRIPE:
3737                 err = obd_alloc_memmd(exp, karg);
3738                 if (err > 0)
3739                         err = 0;
3740                 GOTO(out, err);
3741         case LL_IOC_LOV_GETSTRIPE:
3742                 err = osc_getstripe(karg, uarg);
3743                 GOTO(out, err);
3744         case OBD_IOC_CLIENT_RECOVER:
3745                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3746                                             data->ioc_inlbuf1);
3747                 if (err > 0)
3748                         err = 0;
3749                 GOTO(out, err);
3750         case IOC_OSC_SET_ACTIVE:
3751                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3752                                                data->ioc_offset);
3753                 GOTO(out, err);
3754         case OBD_IOC_POLL_QUOTACHECK:
3755                 err = lquota_poll_check(quota_interface, exp,
3756                                         (struct if_quotacheck *)karg);
3757                 GOTO(out, err);
3758         case OBD_IOC_PING_TARGET:
3759                 err = ptlrpc_obd_ping(obd);
3760                 GOTO(out, err);
3761         default:
3762                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3763                        cmd, cfs_curproc_comm());
3764                 GOTO(out, err = -ENOTTY);
3765         }
3766 out:
3767         module_put(THIS_MODULE);
3768         return err;
3769 }
3770
3771 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3772                         void *key, __u32 *vallen, void *val,
3773                         struct lov_stripe_md *lsm)
3774 {
3775         ENTRY;
3776         if (!vallen || !val)
3777                 RETURN(-EFAULT);
3778
3779         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3780                 __u32 *stripe = val;
3781                 *vallen = sizeof(*stripe);
3782                 *stripe = 0;
3783                 RETURN(0);
3784         } else if (KEY_IS(KEY_LAST_ID)) {
3785                 struct ptlrpc_request *req;
3786                 obd_id                *reply;
3787                 char                  *tmp;
3788                 int                    rc;
3789
3790                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3791                                            &RQF_OST_GET_INFO_LAST_ID);
3792                 if (req == NULL)
3793                         RETURN(-ENOMEM);
3794
3795                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3796                                      RCL_CLIENT, keylen);
3797                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3798                 if (rc) {
3799                         ptlrpc_request_free(req);
3800                         RETURN(rc);
3801                 }
3802
3803                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3804                 memcpy(tmp, key, keylen);
3805
3806                 req->rq_no_delay = req->rq_no_resend = 1;
3807                 ptlrpc_request_set_replen(req);
3808                 rc = ptlrpc_queue_wait(req);
3809                 if (rc)
3810                         GOTO(out, rc);
3811
3812                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3813                 if (reply == NULL)
3814                         GOTO(out, rc = -EPROTO);
3815
3816                 *((obd_id *)val) = *reply;
3817         out:
3818                 ptlrpc_req_finished(req);
3819                 RETURN(rc);
3820         } else if (KEY_IS(KEY_FIEMAP)) {
3821                 struct ptlrpc_request *req;
3822                 struct ll_user_fiemap *reply;
3823                 char *tmp;
3824                 int rc;
3825
3826                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3827                                            &RQF_OST_GET_INFO_FIEMAP);
3828                 if (req == NULL)
3829                         RETURN(-ENOMEM);
3830
3831                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3832                                      RCL_CLIENT, keylen);
3833                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3834                                      RCL_CLIENT, *vallen);
3835                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3836                                      RCL_SERVER, *vallen);
3837
3838                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3839                 if (rc) {
3840                         ptlrpc_request_free(req);
3841                         RETURN(rc);
3842                 }
3843
3844                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3845                 memcpy(tmp, key, keylen);
3846                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3847                 memcpy(tmp, val, *vallen);
3848
3849                 ptlrpc_request_set_replen(req);
3850                 rc = ptlrpc_queue_wait(req);
3851                 if (rc)
3852                         GOTO(out1, rc);
3853
3854                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3855                 if (reply == NULL)
3856                         GOTO(out1, rc = -EPROTO);
3857
3858                 memcpy(val, reply, *vallen);
3859         out1:
3860                 ptlrpc_req_finished(req);
3861
3862                 RETURN(rc);
3863         }
3864
3865         RETURN(-EINVAL);
3866 }
3867
3868 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3869 {
3870         struct llog_ctxt *ctxt;
3871         int rc = 0;
3872         ENTRY;
3873
3874         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3875         if (ctxt) {
3876                 rc = llog_initiator_connect(ctxt);
3877                 llog_ctxt_put(ctxt);
3878         } else {
3879                 /* XXX return an error? skip setting below flags? */
3880         }
3881
3882         spin_lock(&imp->imp_lock);
3883         imp->imp_server_timeout = 1;
3884         imp->imp_pingable = 1;
3885         spin_unlock(&imp->imp_lock);
3886         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3887
3888         RETURN(rc);
3889 }
3890
3891 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3892                                           struct ptlrpc_request *req,
3893                                           void *aa, int rc)
3894 {
3895         ENTRY;
3896         if (rc != 0)
3897                 RETURN(rc);
3898
3899         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3900 }
3901
3902 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3903                               void *key, obd_count vallen, void *val,
3904                               struct ptlrpc_request_set *set)
3905 {
3906         struct ptlrpc_request *req;
3907         struct obd_device     *obd = exp->exp_obd;
3908         struct obd_import     *imp = class_exp2cliimp(exp);
3909         char                  *tmp;
3910         int                    rc;
3911         ENTRY;
3912
3913         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3914
3915         if (KEY_IS(KEY_NEXT_ID)) {
3916                 obd_id new_val;
3917                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3918
3919                 if (vallen != sizeof(obd_id))
3920                         RETURN(-ERANGE);
3921                 if (val == NULL)
3922                         RETURN(-EINVAL);
3923
3924                 if (vallen != sizeof(obd_id))
3925                         RETURN(-EINVAL);
3926
3927                 /* avoid race between allocate new object and set next id
3928                  * from ll_sync thread */
3929                 spin_lock(&oscc->oscc_lock);
3930                 new_val = *((obd_id*)val) + 1;
3931                 if (new_val > oscc->oscc_next_id)
3932                         oscc->oscc_next_id = new_val;
3933                 spin_unlock(&oscc->oscc_lock);                        
3934                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3935                        exp->exp_obd->obd_name,
3936                        obd->u.cli.cl_oscc.oscc_next_id);
3937
3938                 RETURN(0);
3939         }
3940
3941         if (KEY_IS(KEY_INIT_RECOV)) {
3942                 if (vallen != sizeof(int))
3943                         RETURN(-EINVAL);
3944                 spin_lock(&imp->imp_lock);
3945                 imp->imp_initial_recov = *(int *)val;
3946                 spin_unlock(&imp->imp_lock);
3947                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3948                        exp->exp_obd->obd_name,
3949                        imp->imp_initial_recov);
3950                 RETURN(0);
3951         }
3952
3953         if (KEY_IS(KEY_CHECKSUM)) {
3954                 if (vallen != sizeof(int))
3955                         RETURN(-EINVAL);
3956                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3957                 RETURN(0);
3958         }
3959
3960         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3961                 sptlrpc_conf_client_adapt(obd);
3962                 RETURN(0);
3963         }
3964
3965         if (KEY_IS(KEY_FLUSH_CTX)) {
3966                 sptlrpc_import_flush_my_ctx(imp);
3967                 RETURN(0);
3968         }
3969
3970         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3971                 RETURN(-EINVAL);
3972
3973         /* We pass all other commands directly to OST. Since nobody calls osc
3974            methods directly and everybody is supposed to go through LOV, we
3975            assume lov checked invalid values for us.
3976            The only recognised values so far are evict_by_nid and mds_conn.
3977            Even if something bad goes through, we'd get a -EINVAL from OST
3978            anyway. */
3979
3980         if (KEY_IS(KEY_GRANT_SHRINK))
3981                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3982         else
3983                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
3984
3985         if (req == NULL)
3986                 RETURN(-ENOMEM);
3987
3988         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3989                              RCL_CLIENT, keylen);
3990         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3991                              RCL_CLIENT, vallen);
3992         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3993         if (rc) {
3994                 ptlrpc_request_free(req);
3995                 RETURN(rc);
3996         }
3997
3998         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3999         memcpy(tmp, key, keylen);
4000         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4001         memcpy(tmp, val, vallen);
4002
4003         if (KEY_IS(KEY_MDS_CONN)) {
4004                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4005
4006                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
4007                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4008                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
4009                 req->rq_no_delay = req->rq_no_resend = 1;
4010                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4011         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4012                 struct osc_grant_args *aa;
4013                 struct obdo *oa;
4014
4015                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4016                 aa = ptlrpc_req_async_args(req);
4017                 OBD_ALLOC_PTR(oa);
4018                 if (!oa) {
4019                         ptlrpc_req_finished(req);
4020                         RETURN(-ENOMEM);
4021                 }
4022                 *oa = ((struct ost_body *)val)->oa;
4023                 aa->aa_oa = oa;
4024                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4025         }
4026
4027         ptlrpc_request_set_replen(req);
4028         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4029                 LASSERT(set != NULL);
4030                 ptlrpc_set_add_req(set, req);
4031                 ptlrpc_check_set(NULL, set);
4032         } else
4033                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4034
4035         RETURN(0);
4036 }
4037
4038
4039 static struct llog_operations osc_size_repl_logops = {
4040         lop_cancel: llog_obd_repl_cancel
4041 };
4042
4043 static struct llog_operations osc_mds_ost_orig_logops;
4044
4045 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4046                            struct obd_device *tgt, struct llog_catid *catid)
4047 {
4048         int rc;
4049         ENTRY;
4050
4051         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4052                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4053         if (rc) {
4054                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4055                 GOTO(out, rc);
4056         }
4057
4058         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4059                         NULL, &osc_size_repl_logops);
4060         if (rc) {
4061                 struct llog_ctxt *ctxt =
4062                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4063                 if (ctxt)
4064                         llog_cleanup(ctxt);
4065                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4066         }
4067         GOTO(out, rc);
4068 out:
4069         if (rc) {
4070                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4071                        obd->obd_name, tgt->obd_name, catid, rc);
4072                 CERROR("logid "LPX64":0x%x\n",
4073                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4074         }
4075         return rc;
4076 }
4077
4078 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4079                          struct obd_device *disk_obd, int *index)
4080 {
4081         struct llog_catid catid;
4082         static char name[32] = CATLIST;
4083         int rc;
4084         ENTRY;
4085
4086         LASSERT(olg == &obd->obd_olg);
4087
4088         mutex_down(&olg->olg_cat_processing);
4089         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4090         if (rc) {
4091                 CERROR("rc: %d\n", rc);
4092                 GOTO(out, rc);
4093         }
4094
4095         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4096                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4097                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4098
4099         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4100         if (rc) {
4101                 CERROR("rc: %d\n", rc);
4102                 GOTO(out, rc);
4103         }
4104
4105         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4106         if (rc) {
4107                 CERROR("rc: %d\n", rc);
4108                 GOTO(out, rc);
4109         }
4110
4111  out:
4112         mutex_up(&olg->olg_cat_processing);
4113
4114         return rc;
4115 }
4116
4117 static int osc_llog_finish(struct obd_device *obd, int count)
4118 {
4119         struct llog_ctxt *ctxt;
4120         int rc = 0, rc2 = 0;
4121         ENTRY;
4122
4123         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4124         if (ctxt)
4125                 rc = llog_cleanup(ctxt);
4126
4127         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4128         if (ctxt)
4129                 rc2 = llog_cleanup(ctxt);
4130         if (!rc)
4131                 rc = rc2;
4132
4133         RETURN(rc);
4134 }
4135
4136 static int osc_reconnect(const struct lu_env *env,
4137                          struct obd_export *exp, struct obd_device *obd,
4138                          struct obd_uuid *cluuid,
4139                          struct obd_connect_data *data,
4140                          void *localdata)
4141 {
4142         struct client_obd *cli = &obd->u.cli;
4143
4144         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4145                 long lost_grant;
4146
4147                 client_obd_list_lock(&cli->cl_loi_list_lock);
4148                 data->ocd_grant = cli->cl_avail_grant ?:
4149                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4150                 lost_grant = cli->cl_lost_grant;
4151                 cli->cl_lost_grant = 0;
4152                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4153
4154                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4155                        "cl_lost_grant: %ld\n", data->ocd_grant,
4156                        cli->cl_avail_grant, lost_grant);
4157                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4158                        " ocd_grant: %d\n", data->ocd_connect_flags,
4159                        data->ocd_version, data->ocd_grant);
4160         }
4161
4162         RETURN(0);
4163 }
4164
4165 static int osc_disconnect(struct obd_export *exp)
4166 {
4167         struct obd_device *obd = class_exp2obd(exp);
4168         struct llog_ctxt  *ctxt;
4169         int rc;
4170
4171         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4172         if (ctxt) {
4173                 if (obd->u.cli.cl_conn_count == 1) {
4174                         /* Flush any remaining cancel messages out to the
4175                          * target */
4176                         llog_sync(ctxt, exp);
4177                 }
4178                 llog_ctxt_put(ctxt);
4179         } else {
4180                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4181                        obd);
4182         }
4183
4184         rc = client_disconnect_export(exp);
4185         /**
4186          * Initially we put del_shrink_grant before disconnect_export, but it
4187          * causes the following problem if setup (connect) and cleanup
4188          * (disconnect) are tangled together.
4189          *      connect p1                     disconnect p2
4190          *   ptlrpc_connect_import
4191          *     ...............               class_manual_cleanup
4192          *                                     osc_disconnect
4193          *                                     del_shrink_grant
4194          *   ptlrpc_connect_interrupt
4195          *     init_grant_shrink
4196          *   add this client to shrink list
4197          *                                      cleanup_osc
4198          * Bang! pinger trigger the shrink.
4199          * So the osc should be disconnected from the shrink list, after we
4200          * are sure the import has been destroyed. BUG18662
4201          */
4202         if (obd->u.cli.cl_import == NULL)
4203                 osc_del_shrink_grant(&obd->u.cli);
4204         return rc;
4205 }
4206
4207 static int osc_import_event(struct obd_device *obd,
4208                             struct obd_import *imp,
4209                             enum obd_import_event event)
4210 {
4211         struct client_obd *cli;
4212         int rc = 0;
4213
4214         ENTRY;
4215         LASSERT(imp->imp_obd == obd);
4216
4217         switch (event) {
4218         case IMP_EVENT_DISCON: {
4219                 /* Only do this on the MDS OSC's */
4220                 if (imp->imp_server_timeout) {
4221                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4222
4223                         spin_lock(&oscc->oscc_lock);
4224                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4225                         spin_unlock(&oscc->oscc_lock);
4226                 }
4227                 cli = &obd->u.cli;
4228                 client_obd_list_lock(&cli->cl_loi_list_lock);
4229                 cli->cl_avail_grant = 0;
4230                 cli->cl_lost_grant = 0;
4231                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4232                 break;
4233         }
4234         case IMP_EVENT_INACTIVE: {
4235                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4236                 break;
4237         }
4238         case IMP_EVENT_INVALIDATE: {
4239                 struct ldlm_namespace *ns = obd->obd_namespace;
4240                 struct lu_env         *env;
4241                 int                    refcheck;
4242
4243                 env = cl_env_get(&refcheck);
4244                 if (!IS_ERR(env)) {
4245                         /* Reset grants */
4246                         cli = &obd->u.cli;
4247                         client_obd_list_lock(&cli->cl_loi_list_lock);
4248                         /* all pages go to failing rpcs due to the invalid
4249                          * import */
4250                         osc_check_rpcs(env, cli);
4251                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4252
4253                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4254                         cl_env_put(env, &refcheck);
4255                 } else
4256                         rc = PTR_ERR(env);
4257                 break;
4258         }
4259         case IMP_EVENT_ACTIVE: {
4260                 /* Only do this on the MDS OSC's */
4261                 if (imp->imp_server_timeout) {
4262                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4263
4264                         spin_lock(&oscc->oscc_lock);
4265                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4266                         spin_unlock(&oscc->oscc_lock);
4267                 }
4268                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4269                 break;
4270         }
4271         case IMP_EVENT_OCD: {
4272                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4273
4274                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4275                         osc_init_grant(&obd->u.cli, ocd);
4276
4277                 /* See bug 7198 */
4278                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4279                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4280
4281                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4282                 break;
4283         }
4284         default:
4285                 CERROR("Unknown import event %d\n", event);
4286                 LBUG();
4287         }
4288         RETURN(rc);
4289 }
4290
4291 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4292 {
4293         int rc;
4294         ENTRY;
4295
4296         ENTRY;
4297         rc = ptlrpcd_addref();
4298         if (rc)
4299                 RETURN(rc);
4300
4301         rc = client_obd_setup(obd, lcfg);
4302         if (rc) {
4303                 ptlrpcd_decref();
4304         } else {
4305                 struct lprocfs_static_vars lvars = { 0 };
4306                 struct client_obd *cli = &obd->u.cli;
4307
4308                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4309                 lprocfs_osc_init_vars(&lvars);
4310                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4311                         lproc_osc_attach_seqstat(obd);
4312                         sptlrpc_lprocfs_cliobd_attach(obd);
4313                         ptlrpc_lprocfs_register_obd(obd);
4314                 }
4315
4316                 oscc_init(obd);
4317                 /* We need to allocate a few requests more, because
4318                    brw_interpret tries to create new requests before freeing
4319                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4320                    reserved, but I afraid that might be too much wasted RAM
4321                    in fact, so 2 is just my guess and still should work. */
4322                 cli->cl_import->imp_rq_pool =
4323                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4324                                             OST_MAXREQSIZE,
4325                                             ptlrpc_add_rqs_to_pool);
4326
4327                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4328                 sema_init(&cli->cl_grant_sem, 1);
4329         }
4330
4331         RETURN(rc);
4332 }
4333
4334 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4335 {
4336         int rc = 0;
4337         ENTRY;
4338
4339         switch (stage) {
4340         case OBD_CLEANUP_EARLY: {
4341                 struct obd_import *imp;
4342                 imp = obd->u.cli.cl_import;
4343                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4344                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4345                 ptlrpc_deactivate_import(imp);
4346                 spin_lock(&imp->imp_lock);
4347                 imp->imp_pingable = 0;
4348                 spin_unlock(&imp->imp_lock);
4349                 break;
4350         }
4351         case OBD_CLEANUP_EXPORTS: {
4352                 /* If we set up but never connected, the
4353                    client import will not have been cleaned. */
4354                 if (obd->u.cli.cl_import) {
4355                         struct obd_import *imp;
4356                         down_write(&obd->u.cli.cl_sem);
4357                         imp = obd->u.cli.cl_import;
4358                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4359                                obd->obd_name);
4360                         ptlrpc_invalidate_import(imp);
4361                         if (imp->imp_rq_pool) {
4362                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4363                                 imp->imp_rq_pool = NULL;
4364                         }
4365                         class_destroy_import(imp);
4366                         up_write(&obd->u.cli.cl_sem);
4367                         obd->u.cli.cl_import = NULL;
4368                 }
4369                 rc = obd_llog_finish(obd, 0);
4370                 if (rc != 0)
4371                         CERROR("failed to cleanup llogging subsystems\n");
4372                 break;
4373                 }
4374         }
4375         RETURN(rc);
4376 }
4377
4378 int osc_cleanup(struct obd_device *obd)
4379 {
4380         int rc;
4381
4382         ENTRY;
4383         ptlrpc_lprocfs_unregister_obd(obd);
4384         lprocfs_obd_cleanup(obd);
4385
4386         /* free memory of osc quota cache */
4387         lquota_cleanup(quota_interface, obd);
4388
4389         rc = client_obd_cleanup(obd);
4390
4391         ptlrpcd_decref();
4392         RETURN(rc);
4393 }
4394
4395 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4396 {
4397         struct lprocfs_static_vars lvars = { 0 };
4398         int rc = 0;
4399
4400         lprocfs_osc_init_vars(&lvars);
4401
4402         switch (lcfg->lcfg_command) {
4403         default:
4404                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4405                                               lcfg, obd);
4406                 if (rc > 0)
4407                         rc = 0;
4408                 break;
4409         }
4410
4411         return(rc);
4412 }
4413
4414 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4415 {
4416         return osc_process_config_base(obd, buf);
4417 }
4418
4419 struct obd_ops osc_obd_ops = {
4420         .o_owner                = THIS_MODULE,
4421         .o_setup                = osc_setup,
4422         .o_precleanup           = osc_precleanup,
4423         .o_cleanup              = osc_cleanup,
4424         .o_add_conn             = client_import_add_conn,
4425         .o_del_conn             = client_import_del_conn,
4426         .o_connect              = client_connect_import,
4427         .o_reconnect            = osc_reconnect,
4428         .o_disconnect           = osc_disconnect,
4429         .o_statfs               = osc_statfs,
4430         .o_statfs_async         = osc_statfs_async,
4431         .o_packmd               = osc_packmd,
4432         .o_unpackmd             = osc_unpackmd,
4433         .o_precreate            = osc_precreate,
4434         .o_create               = osc_create,
4435         .o_create_async         = osc_create_async,
4436         .o_destroy              = osc_destroy,
4437         .o_getattr              = osc_getattr,
4438         .o_getattr_async        = osc_getattr_async,
4439         .o_setattr              = osc_setattr,
4440         .o_setattr_async        = osc_setattr_async,
4441         .o_brw                  = osc_brw,
4442         .o_punch                = osc_punch,
4443         .o_sync                 = osc_sync,
4444         .o_enqueue              = osc_enqueue,
4445         .o_change_cbdata        = osc_change_cbdata,
4446         .o_cancel               = osc_cancel,
4447         .o_cancel_unused        = osc_cancel_unused,
4448         .o_iocontrol            = osc_iocontrol,
4449         .o_get_info             = osc_get_info,
4450         .o_set_info_async       = osc_set_info_async,
4451         .o_import_event         = osc_import_event,
4452         .o_llog_init            = osc_llog_init,
4453         .o_llog_finish          = osc_llog_finish,
4454         .o_process_config       = osc_process_config,
4455 };
4456
4457 extern struct lu_kmem_descr  osc_caches[];
4458 extern spinlock_t            osc_ast_guard;
4459 extern struct lock_class_key osc_ast_guard_class;
4460
4461 int __init osc_init(void)
4462 {
4463         struct lprocfs_static_vars lvars = { 0 };
4464         int rc;
4465         ENTRY;
4466
4467         /* print an address of _any_ initialized kernel symbol from this
4468          * module, to allow debugging with gdb that doesn't support data
4469          * symbols from modules.*/
4470         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4471
4472         rc = lu_kmem_init(osc_caches);
4473
4474         lprocfs_osc_init_vars(&lvars);
4475
4476         request_module("lquota");
4477         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4478         lquota_init(quota_interface);
4479         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4480
4481         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4482                                  LUSTRE_OSC_NAME, &osc_device_type);
4483         if (rc) {
4484                 if (quota_interface)
4485                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4486                 lu_kmem_fini(osc_caches);
4487                 RETURN(rc);
4488         }
4489
4490         spin_lock_init(&osc_ast_guard);
4491         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4492
4493         osc_mds_ost_orig_logops = llog_lvfs_ops;
4494         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4495         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4496         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4497         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4498
4499         RETURN(rc);
4500 }
4501
4502 #ifdef __KERNEL__
4503 static void /*__exit*/ osc_exit(void)
4504 {
4505         lu_device_type_fini(&osc_device_type);
4506
4507         lquota_exit(quota_interface);
4508         if (quota_interface)
4509                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4510
4511         class_unregister_type(LUSTRE_OSC_NAME);
4512         lu_kmem_fini(osc_caches);
4513 }
4514
4515 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4516 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4517 MODULE_LICENSE("GPL");
4518
4519 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4520 #endif