Whamcloud - gitweb
10b27ba0147fc317879b3e0f171318e2f24c9a4c
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         lustre_set_wire_obdo(&body->oa, oa);
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         lustre_get_wire_obdo(oa, &body->oa);
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&body->oa, oa);
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&body->oa, oa);
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         lustre_get_wire_obdo(oa, &body->oa);
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         lustre_set_wire_obdo(&body->oa, oa);
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
747                                                           NULL);
748
749                         /*
750                          * Wait until the number of on-going destroy RPCs drops
751                          * under max_rpc_in_flight
752                          */
753                         l_wait_event_exclusive(cli->cl_destroy_waitq,
754                                                osc_can_send_destroy(cli), &lwi);
755                 }
756         }
757
758         /* Do not wait for response */
759         ptlrpcd_add_req(req, PSCOPE_OTHER);
760         RETURN(0);
761 }
762
763 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
764                                 long writing_bytes)
765 {
766         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767
768         LASSERT(!(oa->o_valid & bits));
769
770         oa->o_valid |= bits;
771         client_obd_list_lock(&cli->cl_loi_list_lock);
772         oa->o_dirty = cli->cl_dirty;
773         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
774                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
775                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776                 oa->o_undirty = 0;
777         } else if (atomic_read(&obd_dirty_pages) -
778                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
779                 CERROR("dirty %d - %d > system dirty_max %d\n",
780                        atomic_read(&obd_dirty_pages),
781                        atomic_read(&obd_dirty_transit_pages),
782                        obd_max_dirty_pages);
783                 oa->o_undirty = 0;
784         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
785                 CERROR("dirty %lu - dirty_max %lu too big???\n",
786                        cli->cl_dirty, cli->cl_dirty_max);
787                 oa->o_undirty = 0;
788         } else {
789                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
790                                 (cli->cl_max_rpcs_in_flight + 1);
791                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792         }
793         oa->o_grant = cli->cl_avail_grant;
794         oa->o_dropped = cli->cl_lost_grant;
795         cli->cl_lost_grant = 0;
796         client_obd_list_unlock(&cli->cl_loi_list_lock);
797         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
798                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
799
800 }
801
802 static void osc_update_next_shrink(struct client_obd *cli)
803 {
804         cli->cl_next_shrink_grant =
805                 cfs_time_shift(cli->cl_grant_shrink_interval);
806         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
807                cli->cl_next_shrink_grant);
808 }
809
810 /* caller must hold loi_list_lock */
811 static void osc_consume_write_grant(struct client_obd *cli,
812                                     struct brw_page *pga)
813 {
814         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
815         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
816         atomic_inc(&obd_dirty_pages);
817         cli->cl_dirty += CFS_PAGE_SIZE;
818         cli->cl_avail_grant -= CFS_PAGE_SIZE;
819         pga->flag |= OBD_BRW_FROM_GRANT;
820         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
821                CFS_PAGE_SIZE, pga, pga->pg);
822         LASSERT(cli->cl_avail_grant >= 0);
823         osc_update_next_shrink(cli);
824 }
825
826 /* the companion to osc_consume_write_grant, called when a brw has completed.
827  * must be called with the loi lock held. */
828 static void osc_release_write_grant(struct client_obd *cli,
829                                     struct brw_page *pga, int sent)
830 {
831         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832         ENTRY;
833
834         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
835         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
836                 EXIT;
837                 return;
838         }
839
840         pga->flag &= ~OBD_BRW_FROM_GRANT;
841         atomic_dec(&obd_dirty_pages);
842         cli->cl_dirty -= CFS_PAGE_SIZE;
843         if (pga->flag & OBD_BRW_NOCACHE) {
844                 pga->flag &= ~OBD_BRW_NOCACHE;
845                 atomic_dec(&obd_dirty_transit_pages);
846                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847         }
848         if (!sent) {
849                 cli->cl_lost_grant += CFS_PAGE_SIZE;
850                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
851                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
852         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
853                 /* For short writes we shouldn't count parts of pages that
854                  * span a whole block on the OST side, or our accounting goes
855                  * wrong.  Should match the code in filter_grant_check. */
856                 int offset = pga->off & ~CFS_PAGE_MASK;
857                 int count = pga->count + (offset & (blocksize - 1));
858                 int end = (offset + pga->count) & (blocksize - 1);
859                 if (end)
860                         count += blocksize - end;
861
862                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
863                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
864                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
865                        cli->cl_avail_grant, cli->cl_dirty);
866         }
867
868         EXIT;
869 }
870
871 static unsigned long rpcs_in_flight(struct client_obd *cli)
872 {
873         return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 }
875
876 /* caller must hold loi_list_lock */
877 void osc_wake_cache_waiters(struct client_obd *cli)
878 {
879         struct list_head *l, *tmp;
880         struct osc_cache_waiter *ocw;
881
882         ENTRY;
883         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
884                 /* if we can't dirty more, we must wait until some is written */
885                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
886                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
887                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
888                                "osc max %ld, sys max %d\n", cli->cl_dirty,
889                                cli->cl_dirty_max, obd_max_dirty_pages);
890                         return;
891                 }
892
893                 /* if still dirty cache but no grant wait for pending RPCs that
894                  * may yet return us some grant before doing sync writes */
895                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
897                                cli->cl_w_in_flight);
898                         return;
899                 }
900
901                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
902                 list_del_init(&ocw->ocw_entry);
903                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
904                         /* no more RPCs in flight to return grant, do sync IO */
905                         ocw->ocw_rc = -EDQUOT;
906                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907                 } else {
908                         osc_consume_write_grant(cli,
909                                                 &ocw->ocw_oap->oap_brw_page);
910                 }
911
912                 cfs_waitq_signal(&ocw->ocw_waitq);
913         }
914
915         EXIT;
916 }
917
918 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 {
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         cli->cl_avail_grant += grant;
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 {
927         if (body->oa.o_valid & OBD_MD_FLGRANT) {
928                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
929                 __osc_update_grant(cli, body->oa.o_grant);
930         }
931 }
932
933 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
934                               void *key, obd_count vallen, void *val,
935                               struct ptlrpc_request_set *set);
936
937 static int osc_shrink_grant_interpret(const struct lu_env *env,
938                                       struct ptlrpc_request *req,
939                                       void *aa, int rc)
940 {
941         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
942         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
943         struct ost_body *body;
944
945         if (rc != 0) {
946                 __osc_update_grant(cli, oa->o_grant);
947                 GOTO(out, rc);
948         }
949
950         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951         LASSERT(body);
952         osc_update_grant(cli, body);
953 out:
954         OBD_FREE_PTR(oa);
955         return rc;
956 }
957
958 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 {
960         client_obd_list_lock(&cli->cl_loi_list_lock);
961         oa->o_grant = cli->cl_avail_grant / 4;
962         cli->cl_avail_grant -= oa->o_grant;
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964         oa->o_flags |= OBD_FL_SHRINK_GRANT;
965         osc_update_next_shrink(cli);
966 }
967
968 /* Shrink the current grant, either from some large amount to enough for a
969  * full set of in-flight RPCs, or if we have already shrunk to that limit
970  * then to enough for a single RPC.  This avoids keeping more grant than
971  * needed, and avoids shrinking the grant piecemeal. */
972 static int osc_shrink_grant(struct client_obd *cli)
973 {
974         long target = (cli->cl_max_rpcs_in_flight + 1) *
975                       cli->cl_max_pages_per_rpc;
976
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         if (cli->cl_avail_grant <= target)
979                 target = cli->cl_max_pages_per_rpc;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981
982         return osc_shrink_grant_to_target(cli, target);
983 }
984
985 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
986 {
987         int    rc = 0;
988         struct ost_body     *body;
989         ENTRY;
990
991         client_obd_list_lock(&cli->cl_loi_list_lock);
992         /* Don't shrink if we are already above or below the desired limit
993          * We don't want to shrink below a single RPC, as that will negatively
994          * impact block allocation and long-term performance. */
995         if (target < cli->cl_max_pages_per_rpc)
996                 target = cli->cl_max_pages_per_rpc;
997
998         if (target >= cli->cl_avail_grant) {
999                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1000                 RETURN(0);
1001         }
1002         client_obd_list_unlock(&cli->cl_loi_list_lock);
1003
1004         OBD_ALLOC_PTR(body);
1005         if (!body)
1006                 RETURN(-ENOMEM);
1007
1008         osc_announce_cached(cli, &body->oa, 0);
1009
1010         client_obd_list_lock(&cli->cl_loi_list_lock);
1011         body->oa.o_grant = cli->cl_avail_grant - target;
1012         cli->cl_avail_grant = target;
1013         client_obd_list_unlock(&cli->cl_loi_list_lock);
1014         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1015         osc_update_next_shrink(cli);
1016
1017         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1018                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1019                                 sizeof(*body), body, NULL);
1020         if (rc != 0)
1021                 __osc_update_grant(cli, body->oa.o_grant);
1022         OBD_FREE_PTR(body);
1023         RETURN(rc);
1024 }
1025
1026 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1027 static int osc_should_shrink_grant(struct client_obd *client)
1028 {
1029         cfs_time_t time = cfs_time_current();
1030         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1031         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1032                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1033                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1034                         return 1;
1035                 else
1036                         osc_update_next_shrink(client);
1037         }
1038         return 0;
1039 }
1040
1041 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 {
1043         struct client_obd *client;
1044
1045         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1046                 if (osc_should_shrink_grant(client))
1047                         osc_shrink_grant(client);
1048         }
1049         return 0;
1050 }
1051
1052 static int osc_add_shrink_grant(struct client_obd *client)
1053 {
1054         int rc;
1055
1056         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057                                        TIMEOUT_GRANT,
1058                                        osc_grant_shrink_grant_cb, NULL,
1059                                        &client->cl_grant_shrink_list);
1060         if (rc) {
1061                 CERROR("add grant client %s error %d\n",
1062                         client->cl_import->imp_obd->obd_name, rc);
1063                 return rc;
1064         }
1065         CDEBUG(D_CACHE, "add grant client %s \n",
1066                client->cl_import->imp_obd->obd_name);
1067         osc_update_next_shrink(client);
1068         return 0;
1069 }
1070
1071 static int osc_del_shrink_grant(struct client_obd *client)
1072 {
1073         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074                                          TIMEOUT_GRANT);
1075 }
1076
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 {
1079         client_obd_list_lock(&cli->cl_loi_list_lock);
1080         cli->cl_avail_grant = ocd->ocd_grant;
1081         client_obd_list_unlock(&cli->cl_loi_list_lock);
1082
1083         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1084             list_empty(&cli->cl_grant_shrink_list))
1085                 osc_add_shrink_grant(cli);
1086
1087         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1088                cli->cl_avail_grant, cli->cl_lost_grant);
1089         LASSERT(cli->cl_avail_grant >= 0);
1090 }
1091
1092 /* We assume that the reason this OSC got a short read is because it read
1093  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1094  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1095  * this stripe never got written at or beyond this stripe offset yet. */
1096 static void handle_short_read(int nob_read, obd_count page_count,
1097                               struct brw_page **pga)
1098 {
1099         char *ptr;
1100         int i = 0;
1101
1102         /* skip bytes read OK */
1103         while (nob_read > 0) {
1104                 LASSERT (page_count > 0);
1105
1106                 if (pga[i]->count > nob_read) {
1107                         /* EOF inside this page */
1108                         ptr = cfs_kmap(pga[i]->pg) +
1109                                 (pga[i]->off & ~CFS_PAGE_MASK);
1110                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1111                         cfs_kunmap(pga[i]->pg);
1112                         page_count--;
1113                         i++;
1114                         break;
1115                 }
1116
1117                 nob_read -= pga[i]->count;
1118                 page_count--;
1119                 i++;
1120         }
1121
1122         /* zero remaining pages */
1123         while (page_count-- > 0) {
1124                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1125                 memset(ptr, 0, pga[i]->count);
1126                 cfs_kunmap(pga[i]->pg);
1127                 i++;
1128         }
1129 }
1130
1131 static int check_write_rcs(struct ptlrpc_request *req,
1132                            int requested_nob, int niocount,
1133                            obd_count page_count, struct brw_page **pga)
1134 {
1135         int    *remote_rcs, i;
1136
1137         /* return error if any niobuf was in error */
1138         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1139                                         sizeof(*remote_rcs) * niocount, NULL);
1140         if (remote_rcs == NULL) {
1141                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142                 return(-EPROTO);
1143         }
1144         if (lustre_msg_swabbed(req->rq_repmsg))
1145                 for (i = 0; i < niocount; i++)
1146                         __swab32s(&remote_rcs[i]);
1147
1148         for (i = 0; i < niocount; i++) {
1149                 if (remote_rcs[i] < 0)
1150                         return(remote_rcs[i]);
1151
1152                 if (remote_rcs[i] != 0) {
1153                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1154                                 i, remote_rcs[i], req);
1155                         return(-EPROTO);
1156                 }
1157         }
1158
1159         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1160                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1161                        req->rq_bulk->bd_nob_transferred, requested_nob);
1162                 return(-EPROTO);
1163         }
1164
1165         return (0);
1166 }
1167
1168 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1169 {
1170         if (p1->flag != p2->flag) {
1171                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1172                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1173
1174                 /* warn if we try to combine flags that we don't know to be
1175                  * safe to combine */
1176                 if ((p1->flag & mask) != (p2->flag & mask))
1177                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1178                                "same brw?\n", p1->flag, p2->flag);
1179                 return 0;
1180         }
1181
1182         return (p1->off + p1->count == p2->off);
1183 }
1184
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186                                    struct brw_page **pga, int opc,
1187                                    cksum_type_t cksum_type)
1188 {
1189         __u32 cksum;
1190         int i = 0;
1191
1192         LASSERT (pg_count > 0);
1193         cksum = init_checksum(cksum_type);
1194         while (nob > 0 && pg_count > 0) {
1195                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1196                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1197                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1198
1199                 /* corrupt the data before we compute the checksum, to
1200                  * simulate an OST->client data error */
1201                 if (i == 0 && opc == OST_READ &&
1202                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1203                         memcpy(ptr + off, "bad1", min(4, nob));
1204                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1205                 cfs_kunmap(pga[i]->pg);
1206                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1207                                off, cksum);
1208
1209                 nob -= pga[i]->count;
1210                 pg_count--;
1211                 i++;
1212         }
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve)
1226 {
1227         struct ptlrpc_request   *req;
1228         struct ptlrpc_bulk_desc *desc;
1229         struct ost_body         *body;
1230         struct obd_ioobj        *ioobj;
1231         struct niobuf_remote    *niobuf;
1232         int niocount, i, requested_nob, opc, rc;
1233         struct osc_brw_async_args *aa;
1234         struct req_capsule      *pill;
1235         struct brw_page *pg_prev;
1236
1237         ENTRY;
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 RETURN(-ENOMEM); /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 RETURN(-EINVAL); /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1251         }
1252         if (req == NULL)
1253                 RETURN(-ENOMEM);
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1262                              niocount * sizeof(*niobuf));
1263         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1264
1265         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1266         if (rc) {
1267                 ptlrpc_request_free(req);
1268                 RETURN(rc);
1269         }
1270         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1271         ptlrpc_at_set_req_timeout(req);
1272
1273         if (opc == OST_WRITE)
1274                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1275                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1276         else
1277                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1278                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1279
1280         if (desc == NULL)
1281                 GOTO(out, rc = -ENOMEM);
1282         /* NB request now owns desc and will free it when it gets freed */
1283
1284         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1285         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1286         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1287         LASSERT(body && ioobj && niobuf);
1288
1289         lustre_set_wire_obdo(&body->oa, oa);
1290
1291         obdo_to_ioobj(oa, ioobj);
1292         ioobj->ioo_bufcnt = niocount;
1293         osc_pack_capa(req, body, ocapa);
1294         LASSERT (page_count > 0);
1295         pg_prev = pga[0];
1296         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1297                 struct brw_page *pg = pga[i];
1298
1299                 LASSERT(pg->count > 0);
1300                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1301                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1302                          pg->off, pg->count);
1303 #ifdef __linux__
1304                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1305                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1306                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1307                          i, page_count,
1308                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1309                          pg_prev->pg, page_private(pg_prev->pg),
1310                          pg_prev->pg->index, pg_prev->off);
1311 #else
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u\n", i, page_count);
1314 #endif
1315                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1316                         (pg->flag & OBD_BRW_SRVLOCK));
1317
1318                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1319                                       pg->count);
1320                 requested_nob += pg->count;
1321
1322                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1323                         niobuf--;
1324                         niobuf->len += pg->count;
1325                 } else {
1326                         niobuf->offset = pg->off;
1327                         niobuf->len    = pg->count;
1328                         niobuf->flags  = pg->flag;
1329                 }
1330                 pg_prev = pg;
1331         }
1332
1333         LASSERTF((void *)(niobuf - niocount) ==
1334                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1335                                niocount * sizeof(*niobuf)),
1336                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1337                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1338                 (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (osc_should_shrink_grant(cli))
1342                 osc_shrink_grant_local(cli, &body->oa);
1343
1344         /* size[REQ_REC_OFF] still sizeof (*body) */
1345         if (opc == OST_WRITE) {
1346                 if (unlikely(cli->cl_checksum) &&
1347                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1348                         /* store cl_cksum_type in a local variable since
1349                          * it can be changed via lprocfs */
1350                         cksum_type_t cksum_type = cli->cl_cksum_type;
1351
1352                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1354                                 body->oa.o_flags = 0;
1355                         }
1356                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1357                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1358                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1359                                                              page_count, pga,
1360                                                              OST_WRITE,
1361                                                              cksum_type);
1362                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1363                                body->oa.o_cksum);
1364                         /* save this in 'oa', too, for later checking */
1365                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         oa->o_flags |= cksum_type_pack(cksum_type);
1367                 } else {
1368                         /* clear out the checksum flag, in case this is a
1369                          * resend but cl_checksum is no longer set. b=11238 */
1370                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1371                 }
1372                 oa->o_cksum = body->oa.o_cksum;
1373                 /* 1 RC per niobuf */
1374                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1375                                      sizeof(__u32) * niocount);
1376         } else {
1377                 if (unlikely(cli->cl_checksum) &&
1378                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1379                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1380                                 body->oa.o_flags = 0;
1381                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1382                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1383                 }
1384                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1385                 /* 1 RC for the whole I/O */
1386         }
1387         ptlrpc_request_set_replen(req);
1388
1389         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1390         aa = ptlrpc_req_async_args(req);
1391         aa->aa_oa = oa;
1392         aa->aa_requested_nob = requested_nob;
1393         aa->aa_nio_count = niocount;
1394         aa->aa_page_count = page_count;
1395         aa->aa_resends = 0;
1396         aa->aa_ppga = pga;
1397         aa->aa_cli = cli;
1398         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1399         if (ocapa && reserve)
1400                 aa->aa_ocapa = capa_get(ocapa);
1401
1402         *reqp = req;
1403         RETURN(0);
1404
1405  out:
1406         ptlrpc_req_finished(req);
1407         RETURN(rc);
1408 }
1409
1410 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1411                                 __u32 client_cksum, __u32 server_cksum, int nob,
1412                                 obd_count page_count, struct brw_page **pga,
1413                                 cksum_type_t client_cksum_type)
1414 {
1415         __u32 new_cksum;
1416         char *msg;
1417         cksum_type_t cksum_type;
1418
1419         if (server_cksum == client_cksum) {
1420                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1421                 return 0;
1422         }
1423
1424         if (oa->o_valid & OBD_MD_FLFLAGS)
1425                 cksum_type = cksum_type_unpack(oa->o_flags);
1426         else
1427                 cksum_type = OBD_CKSUM_CRC32;
1428
1429         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1430                                       cksum_type);
1431
1432         if (cksum_type != client_cksum_type)
1433                 msg = "the server did not use the checksum type specified in "
1434                       "the original request - likely a protocol problem";
1435         else if (new_cksum == server_cksum)
1436                 msg = "changed on the client after we checksummed it - "
1437                       "likely false positive due to mmap IO (bug 11742)";
1438         else if (new_cksum == client_cksum)
1439                 msg = "changed in transit before arrival at OST";
1440         else
1441                 msg = "changed in transit AND doesn't match the original - "
1442                       "likely false positive due to mmap IO (bug 11742)";
1443
1444         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1445                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1446                            "["LPU64"-"LPU64"]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1450                                                         (__u64)0,
1451                            oa->o_id,
1452                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1453                            pga[0]->off,
1454                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1455         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1456                "client csum now %x\n", client_cksum, client_cksum_type,
1457                server_cksum, cksum_type, new_cksum);
1458         return 1;
1459 }
1460
1461 /* Note rc enters this function as number of bytes transferred */
1462 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1463 {
1464         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1465         const lnet_process_id_t *peer =
1466                         &req->rq_import->imp_connection->c_peer;
1467         struct client_obd *cli = aa->aa_cli;
1468         struct ost_body *body;
1469         __u32 client_cksum = 0;
1470         ENTRY;
1471
1472         if (rc < 0 && rc != -EDQUOT)
1473                 RETURN(rc);
1474
1475         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1476         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1477                                   lustre_swab_ost_body);
1478         if (body == NULL) {
1479                 CDEBUG(D_INFO, "Can't unpack body\n");
1480                 RETURN(-EPROTO);
1481         }
1482
1483         /* set/clear over quota flag for a uid/gid */
1484         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1485             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1486                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1487
1488                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1489                              body->oa.o_flags);
1490         }
1491
1492         if (rc < 0)
1493                 RETURN(rc);
1494
1495         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1496                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1497
1498         osc_update_grant(cli, body);
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         RETURN(-EPROTO);
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         RETURN(-EAGAIN);
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         RETURN(-EAGAIN);
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1518                                      aa->aa_page_count, aa->aa_ppga);
1519                 GOTO(out, rc);
1520         }
1521
1522         /* The rest of this function executes only for OST_READs */
1523
1524         /* if unwrap_bulk failed, return -EAGAIN to retry */
1525         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1526         if (rc < 0)
1527                 GOTO(out, rc = -EAGAIN);
1528
1529         if (rc > aa->aa_requested_nob) {
1530                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1531                        aa->aa_requested_nob);
1532                 RETURN(-EPROTO);
1533         }
1534
1535         if (rc != req->rq_bulk->bd_nob_transferred) {
1536                 CERROR ("Unexpected rc %d (%d transferred)\n",
1537                         rc, req->rq_bulk->bd_nob_transferred);
1538                 return (-EPROTO);
1539         }
1540
1541         if (rc < aa->aa_requested_nob)
1542                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1543
1544         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1545                 static int cksum_counter;
1546                 __u32      server_cksum = body->oa.o_cksum;
1547                 char      *via;
1548                 char      *router;
1549                 cksum_type_t cksum_type;
1550
1551                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1552                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1553                 else
1554                         cksum_type = OBD_CKSUM_CRC32;
1555                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1556                                                  aa->aa_ppga, OST_READ,
1557                                                  cksum_type);
1558
1559                 if (peer->nid == req->rq_bulk->bd_sender) {
1560                         via = router = "";
1561                 } else {
1562                         via = " via ";
1563                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1564                 }
1565
1566                 if (server_cksum == ~0 && rc > 0) {
1567                         CERROR("Protocol error: server %s set the 'checksum' "
1568                                "bit, but didn't send a checksum.  Not fatal, "
1569                                "but please notify on http://bugzilla.lustre.org/\n",
1570                                libcfs_nid2str(peer->nid));
1571                 } else if (server_cksum != client_cksum) {
1572                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1573                                            "%s%s%s inum "LPU64"/"LPU64" object "
1574                                            LPU64"/"LPU64" extent "
1575                                            "["LPU64"-"LPU64"]\n",
1576                                            req->rq_import->imp_obd->obd_name,
1577                                            libcfs_nid2str(peer->nid),
1578                                            via, router,
1579                                            body->oa.o_valid & OBD_MD_FLFID ?
1580                                                 body->oa.o_fid : (__u64)0,
1581                                            body->oa.o_valid & OBD_MD_FLFID ?
1582                                                 body->oa.o_generation :(__u64)0,
1583                                            body->oa.o_id,
1584                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1585                                                 body->oa.o_gr : (__u64)0,
1586                                            aa->aa_ppga[0]->off,
1587                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1588                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1589                                                                         1);
1590                         CERROR("client %x, server %x, cksum_type %x\n",
1591                                client_cksum, server_cksum, cksum_type);
1592                         cksum_counter = 0;
1593                         aa->aa_oa->o_cksum = client_cksum;
1594                         rc = -EAGAIN;
1595                 } else {
1596                         cksum_counter++;
1597                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1598                         rc = 0;
1599                 }
1600         } else if (unlikely(client_cksum)) {
1601                 static int cksum_missed;
1602
1603                 cksum_missed++;
1604                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1605                         CERROR("Checksum %u requested from %s but not sent\n",
1606                                cksum_missed, libcfs_nid2str(peer->nid));
1607         } else {
1608                 rc = 0;
1609         }
1610 out:
1611         if (rc >= 0)
1612                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1613
1614         RETURN(rc);
1615 }
1616
1617 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1618                             struct lov_stripe_md *lsm,
1619                             obd_count page_count, struct brw_page **pga,
1620                             struct obd_capa *ocapa)
1621 {
1622         struct ptlrpc_request *req;
1623         int                    rc;
1624         cfs_waitq_t            waitq;
1625         int                    resends = 0;
1626         struct l_wait_info     lwi;
1627
1628         ENTRY;
1629
1630         cfs_waitq_init(&waitq);
1631
1632 restart_bulk:
1633         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1634                                   page_count, pga, &req, ocapa, 0);
1635         if (rc != 0)
1636                 return (rc);
1637
1638         rc = ptlrpc_queue_wait(req);
1639
1640         if (rc == -ETIMEDOUT && req->rq_resend) {
1641                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1642                 ptlrpc_req_finished(req);
1643                 goto restart_bulk;
1644         }
1645
1646         rc = osc_brw_fini_request(req, rc);
1647
1648         ptlrpc_req_finished(req);
1649         if (osc_recoverable_error(rc)) {
1650                 resends++;
1651                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1652                         CERROR("too many resend retries, returning error\n");
1653                         RETURN(-EIO);
1654                 }
1655
1656                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1657                 l_wait_event(waitq, 0, &lwi);
1658
1659                 goto restart_bulk;
1660         }
1661
1662         RETURN (rc);
1663 }
1664
1665 int osc_brw_redo_request(struct ptlrpc_request *request,
1666                          struct osc_brw_async_args *aa)
1667 {
1668         struct ptlrpc_request *new_req;
1669         struct ptlrpc_request_set *set = request->rq_set;
1670         struct osc_brw_async_args *new_aa;
1671         struct osc_async_page *oap;
1672         int rc = 0;
1673         ENTRY;
1674
1675         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1676                 CERROR("too many resend retries, returning error\n");
1677                 RETURN(-EIO);
1678         }
1679
1680         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1681
1682         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1683                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1684                                   aa->aa_cli, aa->aa_oa,
1685                                   NULL /* lsm unused by osc currently */,
1686                                   aa->aa_page_count, aa->aa_ppga,
1687                                   &new_req, aa->aa_ocapa, 0);
1688         if (rc)
1689                 RETURN(rc);
1690
1691         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1692
1693         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1694                 if (oap->oap_request != NULL) {
1695                         LASSERTF(request == oap->oap_request,
1696                                  "request %p != oap_request %p\n",
1697                                  request, oap->oap_request);
1698                         if (oap->oap_interrupted) {
1699                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1700                                 ptlrpc_req_finished(new_req);
1701                                 RETURN(-EINTR);
1702                         }
1703                 }
1704         }
1705         /* New request takes over pga and oaps from old request.
1706          * Note that copying a list_head doesn't work, need to move it... */
1707         aa->aa_resends++;
1708         new_req->rq_interpret_reply = request->rq_interpret_reply;
1709         new_req->rq_async_args = request->rq_async_args;
1710         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1711
1712         new_aa = ptlrpc_req_async_args(new_req);
1713
1714         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1715         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1716         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1717
1718         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1719                 if (oap->oap_request) {
1720                         ptlrpc_req_finished(oap->oap_request);
1721                         oap->oap_request = ptlrpc_request_addref(new_req);
1722                 }
1723         }
1724
1725         new_aa->aa_ocapa = aa->aa_ocapa;
1726         aa->aa_ocapa = NULL;
1727
1728         /* use ptlrpc_set_add_req is safe because interpret functions work
1729          * in check_set context. only one way exist with access to request
1730          * from different thread got -EINTR - this way protected with
1731          * cl_loi_list_lock */
1732         ptlrpc_set_add_req(set, new_req);
1733
1734         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1735
1736         DEBUG_REQ(D_INFO, new_req, "new request");
1737         RETURN(0);
1738 }
1739
1740 /*
1741  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1742  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1743  * fine for our small page arrays and doesn't require allocation.  its an
1744  * insertion sort that swaps elements that are strides apart, shrinking the
1745  * stride down until its '1' and the array is sorted.
1746  */
1747 static void sort_brw_pages(struct brw_page **array, int num)
1748 {
1749         int stride, i, j;
1750         struct brw_page *tmp;
1751
1752         if (num == 1)
1753                 return;
1754         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1755                 ;
1756
1757         do {
1758                 stride /= 3;
1759                 for (i = stride ; i < num ; i++) {
1760                         tmp = array[i];
1761                         j = i;
1762                         while (j >= stride && array[j - stride]->off > tmp->off) {
1763                                 array[j] = array[j - stride];
1764                                 j -= stride;
1765                         }
1766                         array[j] = tmp;
1767                 }
1768         } while (stride > 1);
1769 }
1770
1771 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1772 {
1773         int count = 1;
1774         int offset;
1775         int i = 0;
1776
1777         LASSERT (pages > 0);
1778         offset = pg[i]->off & ~CFS_PAGE_MASK;
1779
1780         for (;;) {
1781                 pages--;
1782                 if (pages == 0)         /* that's all */
1783                         return count;
1784
1785                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1786                         return count;   /* doesn't end on page boundary */
1787
1788                 i++;
1789                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1790                 if (offset != 0)        /* doesn't start on page boundary */
1791                         return count;
1792
1793                 count++;
1794         }
1795 }
1796
1797 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1798 {
1799         struct brw_page **ppga;
1800         int i;
1801
1802         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1803         if (ppga == NULL)
1804                 return NULL;
1805
1806         for (i = 0; i < count; i++)
1807                 ppga[i] = pga + i;
1808         return ppga;
1809 }
1810
1811 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1812 {
1813         LASSERT(ppga != NULL);
1814         OBD_FREE(ppga, sizeof(*ppga) * count);
1815 }
1816
1817 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1818                    obd_count page_count, struct brw_page *pga,
1819                    struct obd_trans_info *oti)
1820 {
1821         struct obdo *saved_oa = NULL;
1822         struct brw_page **ppga, **orig;
1823         struct obd_import *imp = class_exp2cliimp(exp);
1824         struct client_obd *cli;
1825         int rc, page_count_orig;
1826         ENTRY;
1827
1828         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1829         cli = &imp->imp_obd->u.cli;
1830
1831         if (cmd & OBD_BRW_CHECK) {
1832                 /* The caller just wants to know if there's a chance that this
1833                  * I/O can succeed */
1834
1835                 if (imp->imp_invalid)
1836                         RETURN(-EIO);
1837                 RETURN(0);
1838         }
1839
1840         /* test_brw with a failed create can trip this, maybe others. */
1841         LASSERT(cli->cl_max_pages_per_rpc);
1842
1843         rc = 0;
1844
1845         orig = ppga = osc_build_ppga(pga, page_count);
1846         if (ppga == NULL)
1847                 RETURN(-ENOMEM);
1848         page_count_orig = page_count;
1849
1850         sort_brw_pages(ppga, page_count);
1851         while (page_count) {
1852                 obd_count pages_per_brw;
1853
1854                 if (page_count > cli->cl_max_pages_per_rpc)
1855                         pages_per_brw = cli->cl_max_pages_per_rpc;
1856                 else
1857                         pages_per_brw = page_count;
1858
1859                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1860
1861                 if (saved_oa != NULL) {
1862                         /* restore previously saved oa */
1863                         *oinfo->oi_oa = *saved_oa;
1864                 } else if (page_count > pages_per_brw) {
1865                         /* save a copy of oa (brw will clobber it) */
1866                         OBDO_ALLOC(saved_oa);
1867                         if (saved_oa == NULL)
1868                                 GOTO(out, rc = -ENOMEM);
1869                         *saved_oa = *oinfo->oi_oa;
1870                 }
1871
1872                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1873                                       pages_per_brw, ppga, oinfo->oi_capa);
1874
1875                 if (rc != 0)
1876                         break;
1877
1878                 page_count -= pages_per_brw;
1879                 ppga += pages_per_brw;
1880         }
1881
1882 out:
1883         osc_release_ppga(orig, page_count_orig);
1884
1885         if (saved_oa != NULL)
1886                 OBDO_FREE(saved_oa);
1887
1888         RETURN(rc);
1889 }
1890
1891 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1892  * the dirty accounting.  Writeback completes or truncate happens before
1893  * writing starts.  Must be called with the loi lock held. */
1894 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1895                            int sent)
1896 {
1897         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1898 }
1899
1900
1901 /* This maintains the lists of pending pages to read/write for a given object
1902  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1903  * to quickly find objects that are ready to send an RPC. */
1904 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1905                          int cmd)
1906 {
1907         int optimal;
1908         ENTRY;
1909
1910         if (lop->lop_num_pending == 0)
1911                 RETURN(0);
1912
1913         /* if we have an invalid import we want to drain the queued pages
1914          * by forcing them through rpcs that immediately fail and complete
1915          * the pages.  recovery relies on this to empty the queued pages
1916          * before canceling the locks and evicting down the llite pages */
1917         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1918                 RETURN(1);
1919
1920         /* stream rpcs in queue order as long as as there is an urgent page
1921          * queued.  this is our cheap solution for good batching in the case
1922          * where writepage marks some random page in the middle of the file
1923          * as urgent because of, say, memory pressure */
1924         if (!list_empty(&lop->lop_urgent)) {
1925                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1926                 RETURN(1);
1927         }
1928         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1929         optimal = cli->cl_max_pages_per_rpc;
1930         if (cmd & OBD_BRW_WRITE) {
1931                 /* trigger a write rpc stream as long as there are dirtiers
1932                  * waiting for space.  as they're waiting, they're not going to
1933                  * create more pages to coallesce with what's waiting.. */
1934                 if (!list_empty(&cli->cl_cache_waiters)) {
1935                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1936                         RETURN(1);
1937                 }
1938                 /* +16 to avoid triggering rpcs that would want to include pages
1939                  * that are being queued but which can't be made ready until
1940                  * the queuer finishes with the page. this is a wart for
1941                  * llite::commit_write() */
1942                 optimal += 16;
1943         }
1944         if (lop->lop_num_pending >= optimal)
1945                 RETURN(1);
1946
1947         RETURN(0);
1948 }
1949
1950 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1951 {
1952         struct osc_async_page *oap;
1953         ENTRY;
1954
1955         if (list_empty(&lop->lop_urgent))
1956                 RETURN(0);
1957
1958         oap = list_entry(lop->lop_urgent.next,
1959                          struct osc_async_page, oap_urgent_item);
1960
1961         if (oap->oap_async_flags & ASYNC_HP) {
1962                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1963                 RETURN(1);
1964         }
1965
1966         RETURN(0);
1967 }
1968
1969 static void on_list(struct list_head *item, struct list_head *list,
1970                     int should_be_on)
1971 {
1972         if (list_empty(item) && should_be_on)
1973                 list_add_tail(item, list);
1974         else if (!list_empty(item) && !should_be_on)
1975                 list_del_init(item);
1976 }
1977
1978 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1979  * can find pages to build into rpcs quickly */
1980 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1981 {
1982         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1983             lop_makes_hprpc(&loi->loi_read_lop)) {
1984                 /* HP rpc */
1985                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1986                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1987         } else {
1988                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1989                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1990                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1991                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1992         }
1993
1994         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1995                 loi->loi_write_lop.lop_num_pending);
1996
1997         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1998                 loi->loi_read_lop.lop_num_pending);
1999 }
2000
2001 static void lop_update_pending(struct client_obd *cli,
2002                                struct loi_oap_pages *lop, int cmd, int delta)
2003 {
2004         lop->lop_num_pending += delta;
2005         if (cmd & OBD_BRW_WRITE)
2006                 cli->cl_pending_w_pages += delta;
2007         else
2008                 cli->cl_pending_r_pages += delta;
2009 }
2010
2011 /**
2012  * this is called when a sync waiter receives an interruption.  Its job is to
2013  * get the caller woken as soon as possible.  If its page hasn't been put in an
2014  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2015  * desiring interruption which will forcefully complete the rpc once the rpc
2016  * has timed out.
2017  */
2018 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2019 {
2020         struct loi_oap_pages *lop;
2021         struct lov_oinfo *loi;
2022         int rc = -EBUSY;
2023         ENTRY;
2024
2025         LASSERT(!oap->oap_interrupted);
2026         oap->oap_interrupted = 1;
2027
2028         /* ok, it's been put in an rpc. only one oap gets a request reference */
2029         if (oap->oap_request != NULL) {
2030                 ptlrpc_mark_interrupted(oap->oap_request);
2031                 ptlrpcd_wake(oap->oap_request);
2032                 ptlrpc_req_finished(oap->oap_request);
2033                 oap->oap_request = NULL;
2034         }
2035
2036         /*
2037          * page completion may be called only if ->cpo_prep() method was
2038          * executed by osc_io_submit(), that also adds page the to pending list
2039          */
2040         if (!list_empty(&oap->oap_pending_item)) {
2041                 list_del_init(&oap->oap_pending_item);
2042                 list_del_init(&oap->oap_urgent_item);
2043
2044                 loi = oap->oap_loi;
2045                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2046                         &loi->loi_write_lop : &loi->loi_read_lop;
2047                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2048                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2049                 rc = oap->oap_caller_ops->ap_completion(env,
2050                                           oap->oap_caller_data,
2051                                           oap->oap_cmd, NULL, -EINTR);
2052         }
2053
2054         RETURN(rc);
2055 }
2056
2057 /* this is trying to propogate async writeback errors back up to the
2058  * application.  As an async write fails we record the error code for later if
2059  * the app does an fsync.  As long as errors persist we force future rpcs to be
2060  * sync so that the app can get a sync error and break the cycle of queueing
2061  * pages for which writeback will fail. */
2062 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2063                            int rc)
2064 {
2065         if (rc) {
2066                 if (!ar->ar_rc)
2067                         ar->ar_rc = rc;
2068
2069                 ar->ar_force_sync = 1;
2070                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2071                 return;
2072
2073         }
2074
2075         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2076                 ar->ar_force_sync = 0;
2077 }
2078
2079 void osc_oap_to_pending(struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082
2083         if (oap->oap_cmd & OBD_BRW_WRITE)
2084                 lop = &oap->oap_loi->loi_write_lop;
2085         else
2086                 lop = &oap->oap_loi->loi_read_lop;
2087
2088         if (oap->oap_async_flags & ASYNC_HP)
2089                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2090         else if (oap->oap_async_flags & ASYNC_URGENT)
2091                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2092         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2093         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2094 }
2095
2096 /* this must be called holding the loi list lock to give coverage to exit_cache,
2097  * async_flag maintenance, and oap_request */
2098 static void osc_ap_completion(const struct lu_env *env,
2099                               struct client_obd *cli, struct obdo *oa,
2100                               struct osc_async_page *oap, int sent, int rc)
2101 {
2102         __u64 xid = 0;
2103
2104         ENTRY;
2105         if (oap->oap_request != NULL) {
2106                 xid = ptlrpc_req_xid(oap->oap_request);
2107                 ptlrpc_req_finished(oap->oap_request);
2108                 oap->oap_request = NULL;
2109         }
2110
2111         spin_lock(&oap->oap_lock);
2112         oap->oap_async_flags = 0;
2113         spin_unlock(&oap->oap_lock);
2114         oap->oap_interrupted = 0;
2115
2116         if (oap->oap_cmd & OBD_BRW_WRITE) {
2117                 osc_process_ar(&cli->cl_ar, xid, rc);
2118                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2119         }
2120
2121         if (rc == 0 && oa != NULL) {
2122                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2123                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2124                 if (oa->o_valid & OBD_MD_FLMTIME)
2125                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2126                 if (oa->o_valid & OBD_MD_FLATIME)
2127                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2128                 if (oa->o_valid & OBD_MD_FLCTIME)
2129                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2130         }
2131
2132         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2133                                                 oap->oap_cmd, oa, rc);
2134
2135         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2136          * I/O on the page could start, but OSC calls it under lock
2137          * and thus we can add oap back to pending safely */
2138         if (rc)
2139                 /* upper layer wants to leave the page on pending queue */
2140                 osc_oap_to_pending(oap);
2141         else
2142                 osc_exit_cache(cli, oap, sent);
2143         EXIT;
2144 }
2145
2146 static int brw_interpret(const struct lu_env *env,
2147                          struct ptlrpc_request *req, void *data, int rc)
2148 {
2149         struct osc_brw_async_args *aa = data;
2150         struct client_obd *cli;
2151         int async;
2152         ENTRY;
2153
2154         rc = osc_brw_fini_request(req, rc);
2155         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2156         if (osc_recoverable_error(rc)) {
2157                 rc = osc_brw_redo_request(req, aa);
2158                 if (rc == 0)
2159                         RETURN(0);
2160         }
2161
2162         if (aa->aa_ocapa) {
2163                 capa_put(aa->aa_ocapa);
2164                 aa->aa_ocapa = NULL;
2165         }
2166
2167         cli = aa->aa_cli;
2168
2169         client_obd_list_lock(&cli->cl_loi_list_lock);
2170
2171         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2172          * is called so we know whether to go to sync BRWs or wait for more
2173          * RPCs to complete */
2174         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2175                 cli->cl_w_in_flight--;
2176         else
2177                 cli->cl_r_in_flight--;
2178
2179         async = list_empty(&aa->aa_oaps);
2180         if (!async) { /* from osc_send_oap_rpc() */
2181                 struct osc_async_page *oap, *tmp;
2182                 /* the caller may re-use the oap after the completion call so
2183                  * we need to clean it up a little */
2184                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2185                         list_del_init(&oap->oap_rpc_item);
2186                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2187                 }
2188                 OBDO_FREE(aa->aa_oa);
2189         } else { /* from async_internal() */
2190                 int i;
2191                 for (i = 0; i < aa->aa_page_count; i++)
2192                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2193                
2194                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2195                         OBDO_FREE(aa->aa_oa);
2196         }
2197         osc_wake_cache_waiters(cli);
2198         osc_check_rpcs(env, cli);
2199         client_obd_list_unlock(&cli->cl_loi_list_lock);
2200         if (!async)
2201                 cl_req_completion(env, aa->aa_clerq, rc);
2202         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2203         RETURN(rc);
2204 }
2205
2206 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2207                                             struct client_obd *cli,
2208                                             struct list_head *rpc_list,
2209                                             int page_count, int cmd)
2210 {
2211         struct ptlrpc_request *req;
2212         struct brw_page **pga = NULL;
2213         struct osc_brw_async_args *aa;
2214         struct obdo *oa = NULL;
2215         const struct obd_async_page_ops *ops = NULL;
2216         void *caller_data = NULL;
2217         struct osc_async_page *oap;
2218         struct osc_async_page *tmp;
2219         struct ost_body *body;
2220         struct cl_req *clerq = NULL;
2221         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2222         struct ldlm_lock *lock = NULL;
2223         struct cl_req_attr crattr;
2224         int i, rc;
2225
2226         ENTRY;
2227         LASSERT(!list_empty(rpc_list));
2228
2229         memset(&crattr, 0, sizeof crattr);
2230         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2231         if (pga == NULL)
2232                 GOTO(out, req = ERR_PTR(-ENOMEM));
2233
2234         OBDO_ALLOC(oa);
2235         if (oa == NULL)
2236                 GOTO(out, req = ERR_PTR(-ENOMEM));
2237
2238         i = 0;
2239         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2240                 struct cl_page *page = osc_oap2cl_page(oap);
2241                 if (ops == NULL) {
2242                         ops = oap->oap_caller_ops;
2243                         caller_data = oap->oap_caller_data;
2244
2245                         clerq = cl_req_alloc(env, page, crt,
2246                                              1 /* only 1-object rpcs for
2247                                                 * now */);
2248                         if (IS_ERR(clerq))
2249                                 GOTO(out, req = (void *)clerq);
2250                         lock = oap->oap_ldlm_lock;
2251                 }
2252                 pga[i] = &oap->oap_brw_page;
2253                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2254                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2255                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2256                 i++;
2257                 cl_req_page_add(env, clerq, page);
2258         }
2259
2260         /* always get the data for the obdo for the rpc */
2261         LASSERT(ops != NULL);
2262         crattr.cra_oa = oa;
2263         crattr.cra_capa = NULL;
2264         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2265         if (lock) {
2266                 oa->o_handle = lock->l_remote_handle;
2267                 oa->o_valid |= OBD_MD_FLHANDLE;
2268         }
2269
2270         rc = cl_req_prep(env, clerq);
2271         if (rc != 0) {
2272                 CERROR("cl_req_prep failed: %d\n", rc);
2273                 GOTO(out, req = ERR_PTR(rc));
2274         }
2275
2276         sort_brw_pages(pga, page_count);
2277         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2278                                   pga, &req, crattr.cra_capa, 1);
2279         if (rc != 0) {
2280                 CERROR("prep_req failed: %d\n", rc);
2281                 GOTO(out, req = ERR_PTR(rc));
2282         }
2283
2284         /* Need to update the timestamps after the request is built in case
2285          * we race with setattr (locally or in queue at OST).  If OST gets
2286          * later setattr before earlier BRW (as determined by the request xid),
2287          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2288          * way to do this in a single call.  bug 10150 */
2289         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2290         cl_req_attr_set(env, clerq, &crattr,
2291                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2292
2293         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2294         aa = ptlrpc_req_async_args(req);
2295         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2296         list_splice(rpc_list, &aa->aa_oaps);
2297         CFS_INIT_LIST_HEAD(rpc_list);
2298         aa->aa_clerq = clerq;
2299 out:
2300         capa_put(crattr.cra_capa);
2301         if (IS_ERR(req)) {
2302                 if (oa)
2303                         OBDO_FREE(oa);
2304                 if (pga)
2305                         OBD_FREE(pga, sizeof(*pga) * page_count);
2306                 /* this should happen rarely and is pretty bad, it makes the
2307                  * pending list not follow the dirty order */
2308                 client_obd_list_lock(&cli->cl_loi_list_lock);
2309                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2310                         list_del_init(&oap->oap_rpc_item);
2311
2312                         /* queued sync pages can be torn down while the pages
2313                          * were between the pending list and the rpc */
2314                         if (oap->oap_interrupted) {
2315                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2316                                 osc_ap_completion(env, cli, NULL, oap, 0,
2317                                                   oap->oap_count);
2318                                 continue;
2319                         }
2320                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2321                 }
2322                 if (clerq && !IS_ERR(clerq))
2323                         cl_req_completion(env, clerq, PTR_ERR(req));
2324         }
2325         RETURN(req);
2326 }
2327
2328 /**
2329  * prepare pages for ASYNC io and put pages in send queue.
2330  *
2331  * \param cli -
2332  * \param loi -
2333  * \param cmd - OBD_BRW_* macroses
2334  * \param lop - pending pages
2335  *
2336  * \return zero if pages successfully add to send queue.
2337  * \return not zere if error occurring.
2338  */
2339 static int
2340 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2341                  struct lov_oinfo *loi,
2342                  int cmd, struct loi_oap_pages *lop)
2343 {
2344         struct ptlrpc_request *req;
2345         obd_count page_count = 0;
2346         struct osc_async_page *oap = NULL, *tmp;
2347         struct osc_brw_async_args *aa;
2348         const struct obd_async_page_ops *ops;
2349         CFS_LIST_HEAD(rpc_list);
2350         unsigned int ending_offset;
2351         unsigned  starting_offset = 0;
2352         int srvlock = 0;
2353         struct cl_object *clob = NULL;
2354         ENTRY;
2355
2356         /* If there are HP OAPs we need to handle at least 1 of them,
2357          * move it the beginning of the pending list for that. */
2358         if (!list_empty(&lop->lop_urgent)) {
2359                 oap = list_entry(lop->lop_urgent.next,
2360                                  struct osc_async_page, oap_urgent_item);
2361                 if (oap->oap_async_flags & ASYNC_HP)
2362                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2363         }
2364
2365         /* first we find the pages we're allowed to work with */
2366         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2367                                  oap_pending_item) {
2368                 ops = oap->oap_caller_ops;
2369
2370                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2371                          "magic 0x%x\n", oap, oap->oap_magic);
2372
2373                 if (clob == NULL) {
2374                         /* pin object in memory, so that completion call-backs
2375                          * can be safely called under client_obd_list lock. */
2376                         clob = osc_oap2cl_page(oap)->cp_obj;
2377                         cl_object_get(clob);
2378                 }
2379
2380                 if (page_count != 0 &&
2381                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2382                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2383                                " oap %p, page %p, srvlock %u\n",
2384                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2385                         break;
2386                 }
2387                 /* in llite being 'ready' equates to the page being locked
2388                  * until completion unlocks it.  commit_write submits a page
2389                  * as not ready because its unlock will happen unconditionally
2390                  * as the call returns.  if we race with commit_write giving
2391                  * us that page we dont' want to create a hole in the page
2392                  * stream, so we stop and leave the rpc to be fired by
2393                  * another dirtier or kupdated interval (the not ready page
2394                  * will still be on the dirty list).  we could call in
2395                  * at the end of ll_file_write to process the queue again. */
2396                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2397                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2398                                                     cmd);
2399                         if (rc < 0)
2400                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2401                                                 "instead of ready\n", oap,
2402                                                 oap->oap_page, rc);
2403                         switch (rc) {
2404                         case -EAGAIN:
2405                                 /* llite is telling us that the page is still
2406                                  * in commit_write and that we should try
2407                                  * and put it in an rpc again later.  we
2408                                  * break out of the loop so we don't create
2409                                  * a hole in the sequence of pages in the rpc
2410                                  * stream.*/
2411                                 oap = NULL;
2412                                 break;
2413                         case -EINTR:
2414                                 /* the io isn't needed.. tell the checks
2415                                  * below to complete the rpc with EINTR */
2416                                 spin_lock(&oap->oap_lock);
2417                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2418                                 spin_unlock(&oap->oap_lock);
2419                                 oap->oap_count = -EINTR;
2420                                 break;
2421                         case 0:
2422                                 spin_lock(&oap->oap_lock);
2423                                 oap->oap_async_flags |= ASYNC_READY;
2424                                 spin_unlock(&oap->oap_lock);
2425                                 break;
2426                         default:
2427                                 LASSERTF(0, "oap %p page %p returned %d "
2428                                             "from make_ready\n", oap,
2429                                             oap->oap_page, rc);
2430                                 break;
2431                         }
2432                 }
2433                 if (oap == NULL)
2434                         break;
2435                 /*
2436                  * Page submitted for IO has to be locked. Either by
2437                  * ->ap_make_ready() or by higher layers.
2438                  */
2439 #if defined(__KERNEL__) && defined(__linux__)
2440                 {
2441                         struct cl_page *page;
2442
2443                         page = osc_oap2cl_page(oap);
2444
2445                         if (page->cp_type == CPT_CACHEABLE &&
2446                             !(PageLocked(oap->oap_page) &&
2447                               (CheckWriteback(oap->oap_page, cmd)))) {
2448                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2449                                        oap->oap_page,
2450                                        (long)oap->oap_page->flags,
2451                                        oap->oap_async_flags);
2452                                 LBUG();
2453                         }
2454                 }
2455 #endif
2456                 /* If there is a gap at the start of this page, it can't merge
2457                  * with any previous page, so we'll hand the network a
2458                  * "fragmented" page array that it can't transfer in 1 RDMA */
2459                 if (page_count != 0 && oap->oap_page_off != 0)
2460                         break;
2461
2462                 /* take the page out of our book-keeping */
2463                 list_del_init(&oap->oap_pending_item);
2464                 lop_update_pending(cli, lop, cmd, -1);
2465                 list_del_init(&oap->oap_urgent_item);
2466
2467                 if (page_count == 0)
2468                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2469                                           (PTLRPC_MAX_BRW_SIZE - 1);
2470
2471                 /* ask the caller for the size of the io as the rpc leaves. */
2472                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2473                         oap->oap_count =
2474                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2475                                                       cmd);
2476                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2477                 }
2478                 if (oap->oap_count <= 0) {
2479                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2480                                oap->oap_count);
2481                         osc_ap_completion(env, cli, NULL,
2482                                           oap, 0, oap->oap_count);
2483                         continue;
2484                 }
2485
2486                 /* now put the page back in our accounting */
2487                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2488                 if (page_count == 0)
2489                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2490                 if (++page_count >= cli->cl_max_pages_per_rpc)
2491                         break;
2492
2493                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2494                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2495                  * have the same alignment as the initial writes that allocated
2496                  * extents on the server. */
2497                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2498                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2499                 if (ending_offset == 0)
2500                         break;
2501
2502                 /* If there is a gap at the end of this page, it can't merge
2503                  * with any subsequent pages, so we'll hand the network a
2504                  * "fragmented" page array that it can't transfer in 1 RDMA */
2505                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2506                         break;
2507         }
2508
2509         osc_wake_cache_waiters(cli);
2510
2511         loi_list_maint(cli, loi);
2512
2513         client_obd_list_unlock(&cli->cl_loi_list_lock);
2514
2515         if (clob != NULL)
2516                 cl_object_put(env, clob);
2517
2518         if (page_count == 0) {
2519                 client_obd_list_lock(&cli->cl_loi_list_lock);
2520                 RETURN(0);
2521         }
2522
2523         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2524         if (IS_ERR(req)) {
2525                 LASSERT(list_empty(&rpc_list));
2526                 /* loi_list_maint(cli, loi); */
2527                 RETURN(PTR_ERR(req));
2528         }
2529
2530         aa = ptlrpc_req_async_args(req);
2531
2532         if (cmd == OBD_BRW_READ) {
2533                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2534                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2535                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2536                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2537         } else {
2538                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2539                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2540                                  cli->cl_w_in_flight);
2541                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2542                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2543         }
2544         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2545
2546         client_obd_list_lock(&cli->cl_loi_list_lock);
2547
2548         if (cmd == OBD_BRW_READ)
2549                 cli->cl_r_in_flight++;
2550         else
2551                 cli->cl_w_in_flight++;
2552
2553         /* queued sync pages can be torn down while the pages
2554          * were between the pending list and the rpc */
2555         tmp = NULL;
2556         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2557                 /* only one oap gets a request reference */
2558                 if (tmp == NULL)
2559                         tmp = oap;
2560                 if (oap->oap_interrupted && !req->rq_intr) {
2561                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2562                                oap, req);
2563                         ptlrpc_mark_interrupted(req);
2564                 }
2565         }
2566         if (tmp != NULL)
2567                 tmp->oap_request = ptlrpc_request_addref(req);
2568
2569         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2570                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2571
2572         req->rq_interpret_reply = brw_interpret;
2573         ptlrpcd_add_req(req, PSCOPE_BRW);
2574         RETURN(1);
2575 }
2576
2577 #define LOI_DEBUG(LOI, STR, args...)                                     \
2578         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2579                !list_empty(&(LOI)->loi_ready_item) ||                    \
2580                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2581                (LOI)->loi_write_lop.lop_num_pending,                     \
2582                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2583                (LOI)->loi_read_lop.lop_num_pending,                      \
2584                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2585                args)                                                     \
2586
2587 /* This is called by osc_check_rpcs() to find which objects have pages that
2588  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2589 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2590 {
2591         ENTRY;
2592
2593         /* First return objects that have blocked locks so that they
2594          * will be flushed quickly and other clients can get the lock,
2595          * then objects which have pages ready to be stuffed into RPCs */
2596         if (!list_empty(&cli->cl_loi_hp_ready_list))
2597                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2598                                   struct lov_oinfo, loi_hp_ready_item));
2599         if (!list_empty(&cli->cl_loi_ready_list))
2600                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2601                                   struct lov_oinfo, loi_ready_item));
2602
2603         /* then if we have cache waiters, return all objects with queued
2604          * writes.  This is especially important when many small files
2605          * have filled up the cache and not been fired into rpcs because
2606          * they don't pass the nr_pending/object threshhold */
2607         if (!list_empty(&cli->cl_cache_waiters) &&
2608             !list_empty(&cli->cl_loi_write_list))
2609                 RETURN(list_entry(cli->cl_loi_write_list.next,
2610                                   struct lov_oinfo, loi_write_item));
2611
2612         /* then return all queued objects when we have an invalid import
2613          * so that they get flushed */
2614         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2615                 if (!list_empty(&cli->cl_loi_write_list))
2616                         RETURN(list_entry(cli->cl_loi_write_list.next,
2617                                           struct lov_oinfo, loi_write_item));
2618                 if (!list_empty(&cli->cl_loi_read_list))
2619                         RETURN(list_entry(cli->cl_loi_read_list.next,
2620                                           struct lov_oinfo, loi_read_item));
2621         }
2622         RETURN(NULL);
2623 }
2624
2625 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2626 {
2627         struct osc_async_page *oap;
2628         int hprpc = 0;
2629
2630         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2631                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2632                                  struct osc_async_page, oap_urgent_item);
2633                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2634         }
2635
2636         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2637                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2638                                  struct osc_async_page, oap_urgent_item);
2639                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2640         }
2641
2642         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2643 }
2644
2645 /* called with the loi list lock held */
2646 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2647 {
2648         struct lov_oinfo *loi;
2649         int rc = 0, race_counter = 0;
2650         ENTRY;
2651
2652         while ((loi = osc_next_loi(cli)) != NULL) {
2653                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2654
2655                 if (osc_max_rpc_in_flight(cli, loi))
2656                         break;
2657
2658                 /* attempt some read/write balancing by alternating between
2659                  * reads and writes in an object.  The makes_rpc checks here
2660                  * would be redundant if we were getting read/write work items
2661                  * instead of objects.  we don't want send_oap_rpc to drain a
2662                  * partial read pending queue when we're given this object to
2663                  * do io on writes while there are cache waiters */
2664                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2665                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2666                                               &loi->loi_write_lop);
2667                         if (rc < 0)
2668                                 break;
2669                         if (rc > 0)
2670                                 race_counter = 0;
2671                         else
2672                                 race_counter++;
2673                 }
2674                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2675                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2676                                               &loi->loi_read_lop);
2677                         if (rc < 0)
2678                                 break;
2679                         if (rc > 0)
2680                                 race_counter = 0;
2681                         else
2682                                 race_counter++;
2683                 }
2684
2685                 /* attempt some inter-object balancing by issueing rpcs
2686                  * for each object in turn */
2687                 if (!list_empty(&loi->loi_hp_ready_item))
2688                         list_del_init(&loi->loi_hp_ready_item);
2689                 if (!list_empty(&loi->loi_ready_item))
2690                         list_del_init(&loi->loi_ready_item);
2691                 if (!list_empty(&loi->loi_write_item))
2692                         list_del_init(&loi->loi_write_item);
2693                 if (!list_empty(&loi->loi_read_item))
2694                         list_del_init(&loi->loi_read_item);
2695
2696                 loi_list_maint(cli, loi);
2697
2698                 /* send_oap_rpc fails with 0 when make_ready tells it to
2699                  * back off.  llite's make_ready does this when it tries
2700                  * to lock a page queued for write that is already locked.
2701                  * we want to try sending rpcs from many objects, but we
2702                  * don't want to spin failing with 0.  */
2703                 if (race_counter == 10)
2704                         break;
2705         }
2706         EXIT;
2707 }
2708
2709 /* we're trying to queue a page in the osc so we're subject to the
2710  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2711  * If the osc's queued pages are already at that limit, then we want to sleep
2712  * until there is space in the osc's queue for us.  We also may be waiting for
2713  * write credits from the OST if there are RPCs in flight that may return some
2714  * before we fall back to sync writes.
2715  *
2716  * We need this know our allocation was granted in the presence of signals */
2717 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2718 {
2719         int rc;
2720         ENTRY;
2721         client_obd_list_lock(&cli->cl_loi_list_lock);
2722         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2723         client_obd_list_unlock(&cli->cl_loi_list_lock);
2724         RETURN(rc);
2725 };
2726
2727 /**
2728  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2729  * is available.
2730  */
2731 int osc_enter_cache_try(const struct lu_env *env,
2732                         struct client_obd *cli, struct lov_oinfo *loi,
2733                         struct osc_async_page *oap, int transient)
2734 {
2735         int has_grant;
2736
2737         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2738         if (has_grant) {
2739                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2740                 if (transient) {
2741                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2742                         atomic_inc(&obd_dirty_transit_pages);
2743                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2744                 }
2745         }
2746         return has_grant;
2747 }
2748
2749 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2750  * grant or cache space. */
2751 static int osc_enter_cache(const struct lu_env *env,
2752                            struct client_obd *cli, struct lov_oinfo *loi,
2753                            struct osc_async_page *oap)
2754 {
2755         struct osc_cache_waiter ocw;
2756         struct l_wait_info lwi = { 0 };
2757
2758         ENTRY;
2759
2760         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2761                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2762                cli->cl_dirty_max, obd_max_dirty_pages,
2763                cli->cl_lost_grant, cli->cl_avail_grant);
2764
2765         /* force the caller to try sync io.  this can jump the list
2766          * of queued writes and create a discontiguous rpc stream */
2767         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2768             loi->loi_ar.ar_force_sync)
2769                 RETURN(-EDQUOT);
2770
2771         /* Hopefully normal case - cache space and write credits available */
2772         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2773             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2774             osc_enter_cache_try(env, cli, loi, oap, 0))
2775                 RETURN(0);
2776
2777         /* Make sure that there are write rpcs in flight to wait for.  This
2778          * is a little silly as this object may not have any pending but
2779          * other objects sure might. */
2780         if (cli->cl_w_in_flight) {
2781                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2782                 cfs_waitq_init(&ocw.ocw_waitq);
2783                 ocw.ocw_oap = oap;
2784                 ocw.ocw_rc = 0;
2785
2786                 loi_list_maint(cli, loi);
2787                 osc_check_rpcs(env, cli);
2788                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2789
2790                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2791                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2792
2793                 client_obd_list_lock(&cli->cl_loi_list_lock);
2794                 if (!list_empty(&ocw.ocw_entry)) {
2795                         list_del(&ocw.ocw_entry);
2796                         RETURN(-EINTR);
2797                 }
2798                 RETURN(ocw.ocw_rc);
2799         }
2800
2801         RETURN(-EDQUOT);
2802 }
2803
2804
2805 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2806                         struct lov_oinfo *loi, cfs_page_t *page,
2807                         obd_off offset, const struct obd_async_page_ops *ops,
2808                         void *data, void **res, int nocache,
2809                         struct lustre_handle *lockh)
2810 {
2811         struct osc_async_page *oap;
2812
2813         ENTRY;
2814
2815         if (!page)
2816                 return size_round(sizeof(*oap));
2817
2818         oap = *res;
2819         oap->oap_magic = OAP_MAGIC;
2820         oap->oap_cli = &exp->exp_obd->u.cli;
2821         oap->oap_loi = loi;
2822
2823         oap->oap_caller_ops = ops;
2824         oap->oap_caller_data = data;
2825
2826         oap->oap_page = page;
2827         oap->oap_obj_off = offset;
2828         if (!client_is_remote(exp) &&
2829             cfs_capable(CFS_CAP_SYS_RESOURCE))
2830                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2831
2832         LASSERT(!(offset & ~CFS_PAGE_MASK));
2833
2834         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2835         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2836         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2837         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2838
2839         spin_lock_init(&oap->oap_lock);
2840         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2841         RETURN(0);
2842 }
2843
2844 struct osc_async_page *oap_from_cookie(void *cookie)
2845 {
2846         struct osc_async_page *oap = cookie;
2847         if (oap->oap_magic != OAP_MAGIC)
2848                 return ERR_PTR(-EINVAL);
2849         return oap;
2850 };
2851
2852 int osc_queue_async_io(const struct lu_env *env,
2853                        struct obd_export *exp, struct lov_stripe_md *lsm,
2854                        struct lov_oinfo *loi, void *cookie,
2855                        int cmd, obd_off off, int count,
2856                        obd_flag brw_flags, enum async_flags async_flags)
2857 {
2858         struct client_obd *cli = &exp->exp_obd->u.cli;
2859         struct osc_async_page *oap;
2860         int rc = 0;
2861         ENTRY;
2862
2863         oap = oap_from_cookie(cookie);
2864         if (IS_ERR(oap))
2865                 RETURN(PTR_ERR(oap));
2866
2867         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2868                 RETURN(-EIO);
2869
2870         if (!list_empty(&oap->oap_pending_item) ||
2871             !list_empty(&oap->oap_urgent_item) ||
2872             !list_empty(&oap->oap_rpc_item))
2873                 RETURN(-EBUSY);
2874
2875         /* check if the file's owner/group is over quota */
2876         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2877                 struct cl_object *obj;
2878                 struct cl_attr    attr; /* XXX put attr into thread info */
2879                 unsigned int qid[MAXQUOTAS];
2880
2881                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2882
2883                 cl_object_attr_lock(obj);
2884                 rc = cl_object_attr_get(env, obj, &attr);
2885                 cl_object_attr_unlock(obj);
2886
2887                 qid[USRQUOTA] = attr.cat_uid;
2888                 qid[GRPQUOTA] = attr.cat_gid;
2889                 if (rc == 0 &&
2890                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2891                         rc = -EDQUOT;
2892                 if (rc)
2893                         RETURN(rc);
2894         }
2895
2896         if (loi == NULL)
2897                 loi = lsm->lsm_oinfo[0];
2898
2899         client_obd_list_lock(&cli->cl_loi_list_lock);
2900
2901         LASSERT(off + count <= CFS_PAGE_SIZE);
2902         oap->oap_cmd = cmd;
2903         oap->oap_page_off = off;
2904         oap->oap_count = count;
2905         oap->oap_brw_flags = brw_flags;
2906         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2907         if (libcfs_memory_pressure_get())
2908                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2909         spin_lock(&oap->oap_lock);
2910         oap->oap_async_flags = async_flags;
2911         spin_unlock(&oap->oap_lock);
2912
2913         if (cmd & OBD_BRW_WRITE) {
2914                 rc = osc_enter_cache(env, cli, loi, oap);
2915                 if (rc) {
2916                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2917                         RETURN(rc);
2918                 }
2919         }
2920
2921         osc_oap_to_pending(oap);
2922         loi_list_maint(cli, loi);
2923
2924         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2925                   cmd);
2926
2927         osc_check_rpcs(env, cli);
2928         client_obd_list_unlock(&cli->cl_loi_list_lock);
2929
2930         RETURN(0);
2931 }
2932
2933 /* aka (~was & now & flag), but this is more clear :) */
2934 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2935
2936 int osc_set_async_flags_base(struct client_obd *cli,
2937                              struct lov_oinfo *loi, struct osc_async_page *oap,
2938                              obd_flag async_flags)
2939 {
2940         struct loi_oap_pages *lop;
2941         int flags = 0;
2942         ENTRY;
2943
2944         LASSERT(!list_empty(&oap->oap_pending_item));
2945
2946         if (oap->oap_cmd & OBD_BRW_WRITE) {
2947                 lop = &loi->loi_write_lop;
2948         } else {
2949                 lop = &loi->loi_read_lop;
2950         }
2951
2952         if ((oap->oap_async_flags & async_flags) == async_flags)
2953                 RETURN(0);
2954
2955         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2956                 flags |= ASYNC_READY;
2957
2958         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2959             list_empty(&oap->oap_rpc_item)) {
2960                 if (oap->oap_async_flags & ASYNC_HP)
2961                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2962                 else
2963                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2964                 flags |= ASYNC_URGENT;
2965                 loi_list_maint(cli, loi);
2966         }
2967         spin_lock(&oap->oap_lock);
2968         oap->oap_async_flags |= flags;
2969         spin_unlock(&oap->oap_lock);
2970
2971         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2972                         oap->oap_async_flags);
2973         RETURN(0);
2974 }
2975
2976 int osc_teardown_async_page(struct obd_export *exp,
2977                             struct lov_stripe_md *lsm,
2978                             struct lov_oinfo *loi, void *cookie)
2979 {
2980         struct client_obd *cli = &exp->exp_obd->u.cli;
2981         struct loi_oap_pages *lop;
2982         struct osc_async_page *oap;
2983         int rc = 0;
2984         ENTRY;
2985
2986         oap = oap_from_cookie(cookie);
2987         if (IS_ERR(oap))
2988                 RETURN(PTR_ERR(oap));
2989
2990         if (loi == NULL)
2991                 loi = lsm->lsm_oinfo[0];
2992
2993         if (oap->oap_cmd & OBD_BRW_WRITE) {
2994                 lop = &loi->loi_write_lop;
2995         } else {
2996                 lop = &loi->loi_read_lop;
2997         }
2998
2999         client_obd_list_lock(&cli->cl_loi_list_lock);
3000
3001         if (!list_empty(&oap->oap_rpc_item))
3002                 GOTO(out, rc = -EBUSY);
3003
3004         osc_exit_cache(cli, oap, 0);
3005         osc_wake_cache_waiters(cli);
3006
3007         if (!list_empty(&oap->oap_urgent_item)) {
3008                 list_del_init(&oap->oap_urgent_item);
3009                 spin_lock(&oap->oap_lock);
3010                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3011                 spin_unlock(&oap->oap_lock);
3012         }
3013         if (!list_empty(&oap->oap_pending_item)) {
3014                 list_del_init(&oap->oap_pending_item);
3015                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3016         }
3017         loi_list_maint(cli, loi);
3018         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3019 out:
3020         client_obd_list_unlock(&cli->cl_loi_list_lock);
3021         RETURN(rc);
3022 }
3023
3024 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3025                                          struct ldlm_enqueue_info *einfo,
3026                                          int flags)
3027 {
3028         void *data = einfo->ei_cbdata;
3029
3030         LASSERT(lock != NULL);
3031         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3032         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3033         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3034         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3035
3036         lock_res_and_lock(lock);
3037         spin_lock(&osc_ast_guard);
3038         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3039         lock->l_ast_data = data;
3040         spin_unlock(&osc_ast_guard);
3041         unlock_res_and_lock(lock);
3042 }
3043
3044 static void osc_set_data_with_check(struct lustre_handle *lockh,
3045                                     struct ldlm_enqueue_info *einfo,
3046                                     int flags)
3047 {
3048         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3049
3050         if (lock != NULL) {
3051                 osc_set_lock_data_with_check(lock, einfo, flags);
3052                 LDLM_LOCK_PUT(lock);
3053         } else
3054                 CERROR("lockh %p, data %p - client evicted?\n",
3055                        lockh, einfo->ei_cbdata);
3056 }
3057
3058 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3059                              ldlm_iterator_t replace, void *data)
3060 {
3061         struct ldlm_res_id res_id;
3062         struct obd_device *obd = class_exp2obd(exp);
3063
3064         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3065         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3066         return 0;
3067 }
3068
3069 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3070                             obd_enqueue_update_f upcall, void *cookie,
3071                             int *flags, int rc)
3072 {
3073         int intent = *flags & LDLM_FL_HAS_INTENT;
3074         ENTRY;
3075
3076         if (intent) {
3077                 /* The request was created before ldlm_cli_enqueue call. */
3078                 if (rc == ELDLM_LOCK_ABORTED) {
3079                         struct ldlm_reply *rep;
3080                         rep = req_capsule_server_get(&req->rq_pill,
3081                                                      &RMF_DLM_REP);
3082
3083                         LASSERT(rep != NULL);
3084                         if (rep->lock_policy_res1)
3085                                 rc = rep->lock_policy_res1;
3086                 }
3087         }
3088
3089         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3090                 *flags |= LDLM_FL_LVB_READY;
3091                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3092                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3093         }
3094
3095         /* Call the update callback. */
3096         rc = (*upcall)(cookie, rc);
3097         RETURN(rc);
3098 }
3099
3100 static int osc_enqueue_interpret(const struct lu_env *env,
3101                                  struct ptlrpc_request *req,
3102                                  struct osc_enqueue_args *aa, int rc)
3103 {
3104         struct ldlm_lock *lock;
3105         struct lustre_handle handle;
3106         __u32 mode;
3107
3108         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3109          * might be freed anytime after lock upcall has been called. */
3110         lustre_handle_copy(&handle, aa->oa_lockh);
3111         mode = aa->oa_ei->ei_mode;
3112
3113         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3114          * be valid. */
3115         lock = ldlm_handle2lock(&handle);
3116
3117         /* Take an additional reference so that a blocking AST that
3118          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3119          * to arrive after an upcall has been executed by
3120          * osc_enqueue_fini(). */
3121         ldlm_lock_addref(&handle, mode);
3122
3123         /* Complete obtaining the lock procedure. */
3124         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3125                                    mode, aa->oa_flags, aa->oa_lvb,
3126                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3127                                    &handle, rc);
3128         /* Complete osc stuff. */
3129         rc = osc_enqueue_fini(req, aa->oa_lvb,
3130                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3131
3132         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3133
3134         /* Release the lock for async request. */
3135         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3136                 /*
3137                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3138                  * not already released by
3139                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3140                  */
3141                 ldlm_lock_decref(&handle, mode);
3142
3143         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3144                  aa->oa_lockh, req, aa);
3145         ldlm_lock_decref(&handle, mode);
3146         LDLM_LOCK_PUT(lock);
3147         return rc;
3148 }
3149
3150 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3151                         struct lov_oinfo *loi, int flags,
3152                         struct ost_lvb *lvb, __u32 mode, int rc)
3153 {
3154         if (rc == ELDLM_OK) {
3155                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3156                 __u64 tmp;
3157
3158                 LASSERT(lock != NULL);
3159                 loi->loi_lvb = *lvb;
3160                 tmp = loi->loi_lvb.lvb_size;
3161                 /* Extend KMS up to the end of this lock and no further
3162                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3163                 if (tmp > lock->l_policy_data.l_extent.end)
3164                         tmp = lock->l_policy_data.l_extent.end + 1;
3165                 if (tmp >= loi->loi_kms) {
3166                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3167                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3168                         loi_kms_set(loi, tmp);
3169                 } else {
3170                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3171                                    LPU64"; leaving kms="LPU64", end="LPU64,
3172                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3173                                    lock->l_policy_data.l_extent.end);
3174                 }
3175                 ldlm_lock_allow_match(lock);
3176                 LDLM_LOCK_PUT(lock);
3177         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3178                 loi->loi_lvb = *lvb;
3179                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3180                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3181                 rc = ELDLM_OK;
3182         }
3183 }
3184 EXPORT_SYMBOL(osc_update_enqueue);
3185
3186 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3187
3188 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3189  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3190  * other synchronous requests, however keeping some locks and trying to obtain
3191  * others may take a considerable amount of time in a case of ost failure; and
3192  * when other sync requests do not get released lock from a client, the client
3193  * is excluded from the cluster -- such scenarious make the life difficult, so
3194  * release locks just after they are obtained. */
3195 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3196                      int *flags, ldlm_policy_data_t *policy,
3197                      struct ost_lvb *lvb, int kms_valid,
3198                      obd_enqueue_update_f upcall, void *cookie,
3199                      struct ldlm_enqueue_info *einfo,
3200                      struct lustre_handle *lockh,
3201                      struct ptlrpc_request_set *rqset, int async)
3202 {
3203         struct obd_device *obd = exp->exp_obd;
3204         struct ptlrpc_request *req = NULL;
3205         int intent = *flags & LDLM_FL_HAS_INTENT;
3206         ldlm_mode_t mode;
3207         int rc;
3208         ENTRY;
3209
3210         /* Filesystem lock extents are extended to page boundaries so that
3211          * dealing with the page cache is a little smoother.  */
3212         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3213         policy->l_extent.end |= ~CFS_PAGE_MASK;
3214
3215         /*
3216          * kms is not valid when either object is completely fresh (so that no
3217          * locks are cached), or object was evicted. In the latter case cached
3218          * lock cannot be used, because it would prime inode state with
3219          * potentially stale LVB.
3220          */
3221         if (!kms_valid)
3222                 goto no_match;
3223
3224         /* Next, search for already existing extent locks that will cover us */
3225         /* If we're trying to read, we also search for an existing PW lock.  The
3226          * VFS and page cache already protect us locally, so lots of readers/
3227          * writers can share a single PW lock.
3228          *
3229          * There are problems with conversion deadlocks, so instead of
3230          * converting a read lock to a write lock, we'll just enqueue a new
3231          * one.
3232          *
3233          * At some point we should cancel the read lock instead of making them
3234          * send us a blocking callback, but there are problems with canceling
3235          * locks out from other users right now, too. */
3236         mode = einfo->ei_mode;
3237         if (einfo->ei_mode == LCK_PR)
3238                 mode |= LCK_PW;
3239         mode = ldlm_lock_match(obd->obd_namespace,
3240                                *flags | LDLM_FL_LVB_READY, res_id,
3241                                einfo->ei_type, policy, mode, lockh, 0);
3242         if (mode) {
3243                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3244
3245                 if (matched->l_ast_data == NULL ||
3246                     matched->l_ast_data == einfo->ei_cbdata) {
3247                         /* addref the lock only if not async requests and PW
3248                          * lock is matched whereas we asked for PR. */
3249                         if (!rqset && einfo->ei_mode != mode)
3250                                 ldlm_lock_addref(lockh, LCK_PR);
3251                         osc_set_lock_data_with_check(matched, einfo, *flags);
3252                         if (intent) {
3253                                 /* I would like to be able to ASSERT here that
3254                                  * rss <= kms, but I can't, for reasons which
3255                                  * are explained in lov_enqueue() */
3256                         }
3257
3258                         /* We already have a lock, and it's referenced */
3259                         (*upcall)(cookie, ELDLM_OK);
3260
3261                         /* For async requests, decref the lock. */
3262                         if (einfo->ei_mode != mode)
3263                                 ldlm_lock_decref(lockh, LCK_PW);
3264                         else if (rqset)
3265                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3266                         LDLM_LOCK_PUT(matched);
3267                         RETURN(ELDLM_OK);
3268                 } else
3269                         ldlm_lock_decref(lockh, mode);
3270                 LDLM_LOCK_PUT(matched);
3271         }
3272
3273  no_match:
3274         if (intent) {
3275                 CFS_LIST_HEAD(cancels);
3276                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3277                                            &RQF_LDLM_ENQUEUE_LVB);
3278                 if (req == NULL)
3279                         RETURN(-ENOMEM);
3280
3281                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3282                 if (rc)
3283                         RETURN(rc);
3284
3285                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3286                                      sizeof *lvb);
3287                 ptlrpc_request_set_replen(req);
3288         }
3289
3290         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3291         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3292
3293         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3294                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3295         if (rqset) {
3296                 if (!rc) {
3297                         struct osc_enqueue_args *aa;
3298                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3299                         aa = ptlrpc_req_async_args(req);
3300                         aa->oa_ei = einfo;
3301                         aa->oa_exp = exp;
3302                         aa->oa_flags  = flags;
3303                         aa->oa_upcall = upcall;
3304                         aa->oa_cookie = cookie;
3305                         aa->oa_lvb    = lvb;
3306                         aa->oa_lockh  = lockh;
3307
3308                         req->rq_interpret_reply =
3309                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3310                         if (rqset == PTLRPCD_SET)
3311                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3312                         else
3313                                 ptlrpc_set_add_req(rqset, req);
3314                 } else if (intent) {
3315                         ptlrpc_req_finished(req);
3316                 }
3317                 RETURN(rc);
3318         }
3319
3320         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3321         if (intent)
3322                 ptlrpc_req_finished(req);
3323
3324         RETURN(rc);
3325 }
3326
3327 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3328                        struct ldlm_enqueue_info *einfo,
3329                        struct ptlrpc_request_set *rqset)
3330 {
3331         struct ldlm_res_id res_id;
3332         int rc;
3333         ENTRY;
3334
3335         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3336                            oinfo->oi_md->lsm_object_gr, &res_id);
3337
3338         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3339                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3340                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3341                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3342                               rqset, rqset != NULL);
3343         RETURN(rc);
3344 }
3345
3346 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3347                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3348                    int *flags, void *data, struct lustre_handle *lockh,
3349                    int unref)
3350 {
3351         struct obd_device *obd = exp->exp_obd;
3352         int lflags = *flags;
3353         ldlm_mode_t rc;
3354         ENTRY;
3355
3356         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3357                 RETURN(-EIO);
3358
3359         /* Filesystem lock extents are extended to page boundaries so that
3360          * dealing with the page cache is a little smoother */
3361         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3362         policy->l_extent.end |= ~CFS_PAGE_MASK;
3363
3364         /* Next, search for already existing extent locks that will cover us */
3365         /* If we're trying to read, we also search for an existing PW lock.  The
3366          * VFS and page cache already protect us locally, so lots of readers/
3367          * writers can share a single PW lock. */
3368         rc = mode;
3369         if (mode == LCK_PR)
3370                 rc |= LCK_PW;
3371         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3372                              res_id, type, policy, rc, lockh, unref);
3373         if (rc) {
3374                 if (data != NULL)
3375                         osc_set_data_with_check(lockh, data, lflags);
3376                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3377                         ldlm_lock_addref(lockh, LCK_PR);
3378                         ldlm_lock_decref(lockh, LCK_PW);
3379                 }
3380                 RETURN(rc);
3381         }
3382         RETURN(rc);
3383 }
3384
3385 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3386 {
3387         ENTRY;
3388
3389         if (unlikely(mode == LCK_GROUP))
3390                 ldlm_lock_decref_and_cancel(lockh, mode);
3391         else
3392                 ldlm_lock_decref(lockh, mode);
3393
3394         RETURN(0);
3395 }
3396
3397 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3398                       __u32 mode, struct lustre_handle *lockh)
3399 {
3400         ENTRY;
3401         RETURN(osc_cancel_base(lockh, mode));
3402 }
3403
3404 static int osc_cancel_unused(struct obd_export *exp,
3405                              struct lov_stripe_md *lsm, int flags,
3406                              void *opaque)
3407 {
3408         struct obd_device *obd = class_exp2obd(exp);
3409         struct ldlm_res_id res_id, *resp = NULL;
3410
3411         if (lsm != NULL) {
3412                 resp = osc_build_res_name(lsm->lsm_object_id,
3413                                           lsm->lsm_object_gr, &res_id);
3414         }
3415
3416         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3417 }
3418
3419 static int osc_statfs_interpret(const struct lu_env *env,
3420                                 struct ptlrpc_request *req,
3421                                 struct osc_async_args *aa, int rc)
3422 {
3423         struct obd_statfs *msfs;
3424         ENTRY;
3425
3426         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3427             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3428                 GOTO(out, rc = 0);
3429
3430         if (rc != 0)
3431                 GOTO(out, rc);
3432
3433         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3434         if (msfs == NULL) {
3435                 GOTO(out, rc = -EPROTO);
3436         }
3437
3438         *aa->aa_oi->oi_osfs = *msfs;
3439 out:
3440         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3441         RETURN(rc);
3442 }
3443
3444 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3445                             __u64 max_age, struct ptlrpc_request_set *rqset)
3446 {
3447         struct ptlrpc_request *req;
3448         struct osc_async_args *aa;
3449         int                    rc;
3450         ENTRY;
3451
3452         /* We could possibly pass max_age in the request (as an absolute
3453          * timestamp or a "seconds.usec ago") so the target can avoid doing
3454          * extra calls into the filesystem if that isn't necessary (e.g.
3455          * during mount that would help a bit).  Having relative timestamps
3456          * is not so great if request processing is slow, while absolute
3457          * timestamps are not ideal because they need time synchronization. */
3458         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3459         if (req == NULL)
3460                 RETURN(-ENOMEM);
3461
3462         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3463         if (rc) {
3464                 ptlrpc_request_free(req);
3465                 RETURN(rc);
3466         }
3467         ptlrpc_request_set_replen(req);
3468         req->rq_request_portal = OST_CREATE_PORTAL;
3469         ptlrpc_at_set_req_timeout(req);
3470
3471         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3472                 /* procfs requests not want stat in wait for avoid deadlock */
3473                 req->rq_no_resend = 1;
3474                 req->rq_no_delay = 1;
3475         }
3476
3477         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3478         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3479         aa = ptlrpc_req_async_args(req);
3480         aa->aa_oi = oinfo;
3481
3482         ptlrpc_set_add_req(rqset, req);
3483         RETURN(0);
3484 }
3485
3486 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3487                       __u64 max_age, __u32 flags)
3488 {
3489         struct obd_statfs     *msfs;
3490         struct ptlrpc_request *req;
3491         struct obd_import     *imp = NULL;
3492         int rc;
3493         ENTRY;
3494
3495         /*Since the request might also come from lprocfs, so we need
3496          *sync this with client_disconnect_export Bug15684*/
3497         down_read(&obd->u.cli.cl_sem);
3498         if (obd->u.cli.cl_import)
3499                 imp = class_import_get(obd->u.cli.cl_import);
3500         up_read(&obd->u.cli.cl_sem);
3501         if (!imp)
3502                 RETURN(-ENODEV);
3503
3504         /* We could possibly pass max_age in the request (as an absolute
3505          * timestamp or a "seconds.usec ago") so the target can avoid doing
3506          * extra calls into the filesystem if that isn't necessary (e.g.
3507          * during mount that would help a bit).  Having relative timestamps
3508          * is not so great if request processing is slow, while absolute
3509          * timestamps are not ideal because they need time synchronization. */
3510         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3511
3512         class_import_put(imp);
3513
3514         if (req == NULL)
3515                 RETURN(-ENOMEM);
3516
3517         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3518         if (rc) {
3519                 ptlrpc_request_free(req);
3520                 RETURN(rc);
3521         }
3522         ptlrpc_request_set_replen(req);
3523         req->rq_request_portal = OST_CREATE_PORTAL;
3524         ptlrpc_at_set_req_timeout(req);
3525
3526         if (flags & OBD_STATFS_NODELAY) {
3527                 /* procfs requests not want stat in wait for avoid deadlock */
3528                 req->rq_no_resend = 1;
3529                 req->rq_no_delay = 1;
3530         }
3531
3532         rc = ptlrpc_queue_wait(req);
3533         if (rc)
3534                 GOTO(out, rc);
3535
3536         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3537         if (msfs == NULL) {
3538                 GOTO(out, rc = -EPROTO);
3539         }
3540
3541         *osfs = *msfs;
3542
3543         EXIT;
3544  out:
3545         ptlrpc_req_finished(req);
3546         return rc;
3547 }
3548
3549 /* Retrieve object striping information.
3550  *
3551  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3552  * the maximum number of OST indices which will fit in the user buffer.
3553  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3554  */
3555 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3556 {
3557         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3558         struct lov_user_md_v3 lum, *lumk;
3559         struct lov_user_ost_data_v1 *lmm_objects;
3560         int rc = 0, lum_size;
3561         ENTRY;
3562
3563         if (!lsm)
3564                 RETURN(-ENODATA);
3565
3566         /* we only need the header part from user space to get lmm_magic and
3567          * lmm_stripe_count, (the header part is common to v1 and v3) */
3568         lum_size = sizeof(struct lov_user_md_v1);
3569         if (copy_from_user(&lum, lump, lum_size))
3570                 RETURN(-EFAULT);
3571
3572         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3573             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3574                 RETURN(-EINVAL);
3575
3576         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3577         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3578         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3579         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3580
3581         /* we can use lov_mds_md_size() to compute lum_size
3582          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3583         if (lum.lmm_stripe_count > 0) {
3584                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3585                 OBD_ALLOC(lumk, lum_size);
3586                 if (!lumk)
3587                         RETURN(-ENOMEM);
3588
3589                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3590                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3591                 else
3592                         lmm_objects = &(lumk->lmm_objects[0]);
3593                 lmm_objects->l_object_id = lsm->lsm_object_id;
3594         } else {
3595                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3596                 lumk = &lum;
3597         }
3598
3599         lumk->lmm_object_id = lsm->lsm_object_id;
3600         lumk->lmm_object_gr = lsm->lsm_object_gr;
3601         lumk->lmm_stripe_count = 1;
3602
3603         if (copy_to_user(lump, lumk, lum_size))
3604                 rc = -EFAULT;
3605
3606         if (lumk != &lum)
3607                 OBD_FREE(lumk, lum_size);
3608
3609         RETURN(rc);
3610 }
3611
3612
3613 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3614                          void *karg, void *uarg)
3615 {
3616         struct obd_device *obd = exp->exp_obd;
3617         struct obd_ioctl_data *data = karg;
3618         int err = 0;
3619         ENTRY;
3620
3621         if (!try_module_get(THIS_MODULE)) {
3622                 CERROR("Can't get module. Is it alive?");
3623                 return -EINVAL;
3624         }
3625         switch (cmd) {
3626         case OBD_IOC_LOV_GET_CONFIG: {
3627                 char *buf;
3628                 struct lov_desc *desc;
3629                 struct obd_uuid uuid;
3630
3631                 buf = NULL;
3632                 len = 0;
3633                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3634                         GOTO(out, err = -EINVAL);
3635
3636                 data = (struct obd_ioctl_data *)buf;
3637
3638                 if (sizeof(*desc) > data->ioc_inllen1) {
3639                         obd_ioctl_freedata(buf, len);
3640                         GOTO(out, err = -EINVAL);
3641                 }
3642
3643                 if (data->ioc_inllen2 < sizeof(uuid)) {
3644                         obd_ioctl_freedata(buf, len);
3645                         GOTO(out, err = -EINVAL);
3646                 }
3647
3648                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3649                 desc->ld_tgt_count = 1;
3650                 desc->ld_active_tgt_count = 1;
3651                 desc->ld_default_stripe_count = 1;
3652                 desc->ld_default_stripe_size = 0;
3653                 desc->ld_default_stripe_offset = 0;
3654                 desc->ld_pattern = 0;
3655                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3656
3657                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3658
3659                 err = copy_to_user((void *)uarg, buf, len);
3660                 if (err)
3661                         err = -EFAULT;
3662                 obd_ioctl_freedata(buf, len);
3663                 GOTO(out, err);
3664         }
3665         case LL_IOC_LOV_SETSTRIPE:
3666                 err = obd_alloc_memmd(exp, karg);
3667                 if (err > 0)
3668                         err = 0;
3669                 GOTO(out, err);
3670         case LL_IOC_LOV_GETSTRIPE:
3671                 err = osc_getstripe(karg, uarg);
3672                 GOTO(out, err);
3673         case OBD_IOC_CLIENT_RECOVER:
3674                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3675                                             data->ioc_inlbuf1);
3676                 if (err > 0)
3677                         err = 0;
3678                 GOTO(out, err);
3679         case IOC_OSC_SET_ACTIVE:
3680                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3681                                                data->ioc_offset);
3682                 GOTO(out, err);
3683         case OBD_IOC_POLL_QUOTACHECK:
3684                 err = lquota_poll_check(quota_interface, exp,
3685                                         (struct if_quotacheck *)karg);
3686                 GOTO(out, err);
3687         case OBD_IOC_PING_TARGET:
3688                 err = ptlrpc_obd_ping(obd);
3689                 GOTO(out, err);
3690         default:
3691                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3692                        cmd, cfs_curproc_comm());
3693                 GOTO(out, err = -ENOTTY);
3694         }
3695 out:
3696         module_put(THIS_MODULE);
3697         return err;
3698 }
3699
3700 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3701                         void *key, __u32 *vallen, void *val,
3702                         struct lov_stripe_md *lsm)
3703 {
3704         ENTRY;
3705         if (!vallen || !val)
3706                 RETURN(-EFAULT);
3707
3708         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3709                 __u32 *stripe = val;
3710                 *vallen = sizeof(*stripe);
3711                 *stripe = 0;
3712                 RETURN(0);
3713         } else if (KEY_IS(KEY_LAST_ID)) {
3714                 struct ptlrpc_request *req;
3715                 obd_id                *reply;
3716                 char                  *tmp;
3717                 int                    rc;
3718
3719                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3720                                            &RQF_OST_GET_INFO_LAST_ID);
3721                 if (req == NULL)
3722                         RETURN(-ENOMEM);
3723
3724                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3725                                      RCL_CLIENT, keylen);
3726                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3727                 if (rc) {
3728                         ptlrpc_request_free(req);
3729                         RETURN(rc);
3730                 }
3731
3732                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3733                 memcpy(tmp, key, keylen);
3734
3735                 req->rq_no_delay = req->rq_no_resend = 1;
3736                 ptlrpc_request_set_replen(req);
3737                 rc = ptlrpc_queue_wait(req);
3738                 if (rc)
3739                         GOTO(out, rc);
3740
3741                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3742                 if (reply == NULL)
3743                         GOTO(out, rc = -EPROTO);
3744
3745                 *((obd_id *)val) = *reply;
3746         out:
3747                 ptlrpc_req_finished(req);
3748                 RETURN(rc);
3749         } else if (KEY_IS(KEY_FIEMAP)) {
3750                 struct ptlrpc_request *req;
3751                 struct ll_user_fiemap *reply;
3752                 char *tmp;
3753                 int rc;
3754
3755                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3756                                            &RQF_OST_GET_INFO_FIEMAP);
3757                 if (req == NULL)
3758                         RETURN(-ENOMEM);
3759
3760                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3761                                      RCL_CLIENT, keylen);
3762                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3763                                      RCL_CLIENT, *vallen);
3764                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3765                                      RCL_SERVER, *vallen);
3766
3767                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3768                 if (rc) {
3769                         ptlrpc_request_free(req);
3770                         RETURN(rc);
3771                 }
3772
3773                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3774                 memcpy(tmp, key, keylen);
3775                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3776                 memcpy(tmp, val, *vallen);
3777
3778                 ptlrpc_request_set_replen(req);
3779                 rc = ptlrpc_queue_wait(req);
3780                 if (rc)
3781                         GOTO(out1, rc);
3782
3783                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3784                 if (reply == NULL)
3785                         GOTO(out1, rc = -EPROTO);
3786
3787                 memcpy(val, reply, *vallen);
3788         out1:
3789                 ptlrpc_req_finished(req);
3790
3791                 RETURN(rc);
3792         }
3793
3794         RETURN(-EINVAL);
3795 }
3796
3797 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3798 {
3799         struct llog_ctxt *ctxt;
3800         int rc = 0;
3801         ENTRY;
3802
3803         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3804         if (ctxt) {
3805                 rc = llog_initiator_connect(ctxt);
3806                 llog_ctxt_put(ctxt);
3807         } else {
3808                 /* XXX return an error? skip setting below flags? */
3809         }
3810
3811         spin_lock(&imp->imp_lock);
3812         imp->imp_server_timeout = 1;
3813         imp->imp_pingable = 1;
3814         spin_unlock(&imp->imp_lock);
3815         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3816
3817         RETURN(rc);
3818 }
3819
3820 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3821                                           struct ptlrpc_request *req,
3822                                           void *aa, int rc)
3823 {
3824         ENTRY;
3825         if (rc != 0)
3826                 RETURN(rc);
3827
3828         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3829 }
3830
3831 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3832                               void *key, obd_count vallen, void *val,
3833                               struct ptlrpc_request_set *set)
3834 {
3835         struct ptlrpc_request *req;
3836         struct obd_device     *obd = exp->exp_obd;
3837         struct obd_import     *imp = class_exp2cliimp(exp);
3838         char                  *tmp;
3839         int                    rc;
3840         ENTRY;
3841
3842         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3843
3844         if (KEY_IS(KEY_NEXT_ID)) {
3845                 if (vallen != sizeof(obd_id))
3846                         RETURN(-ERANGE);
3847                 if (val == NULL)
3848                         RETURN(-EINVAL);
3849                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3850                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3851                        exp->exp_obd->obd_name,
3852                        obd->u.cli.cl_oscc.oscc_next_id);
3853
3854                 RETURN(0);
3855         }
3856
3857         if (KEY_IS(KEY_UNLINKED)) {
3858                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3859                 spin_lock(&oscc->oscc_lock);
3860                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3861                 spin_unlock(&oscc->oscc_lock);
3862                 RETURN(0);
3863         }
3864
3865         if (KEY_IS(KEY_INIT_RECOV)) {
3866                 if (vallen != sizeof(int))
3867                         RETURN(-EINVAL);
3868                 spin_lock(&imp->imp_lock);
3869                 imp->imp_initial_recov = *(int *)val;
3870                 spin_unlock(&imp->imp_lock);
3871                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3872                        exp->exp_obd->obd_name,
3873                        imp->imp_initial_recov);
3874                 RETURN(0);
3875         }
3876
3877         if (KEY_IS(KEY_CHECKSUM)) {
3878                 if (vallen != sizeof(int))
3879                         RETURN(-EINVAL);
3880                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3881                 RETURN(0);
3882         }
3883
3884         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3885                 sptlrpc_conf_client_adapt(obd);
3886                 RETURN(0);
3887         }
3888
3889         if (KEY_IS(KEY_FLUSH_CTX)) {
3890                 sptlrpc_import_flush_my_ctx(imp);
3891                 RETURN(0);
3892         }
3893
3894         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3895                 RETURN(-EINVAL);
3896
3897         /* We pass all other commands directly to OST. Since nobody calls osc
3898            methods directly and everybody is supposed to go through LOV, we
3899            assume lov checked invalid values for us.
3900            The only recognised values so far are evict_by_nid and mds_conn.
3901            Even if something bad goes through, we'd get a -EINVAL from OST
3902            anyway. */
3903
3904         if (KEY_IS(KEY_GRANT_SHRINK))
3905                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3906         else
3907                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3908
3909         if (req == NULL)
3910                 RETURN(-ENOMEM);
3911
3912         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3913                              RCL_CLIENT, keylen);
3914         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3915                              RCL_CLIENT, vallen);
3916         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3917         if (rc) {
3918                 ptlrpc_request_free(req);
3919                 RETURN(rc);
3920         }
3921
3922         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3923         memcpy(tmp, key, keylen);
3924         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3925         memcpy(tmp, val, vallen);
3926
3927         if (KEY_IS(KEY_MDS_CONN)) {
3928                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3929
3930                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3931                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3932                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3933                 req->rq_no_delay = req->rq_no_resend = 1;
3934                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3935         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3936                 struct osc_grant_args *aa;
3937                 struct obdo *oa;
3938
3939                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3940                 aa = ptlrpc_req_async_args(req);
3941                 OBD_ALLOC_PTR(oa);
3942                 if (!oa) {
3943                         ptlrpc_req_finished(req);
3944                         RETURN(-ENOMEM);
3945                 }
3946                 *oa = ((struct ost_body *)val)->oa;
3947                 aa->aa_oa = oa;
3948                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3949         }
3950
3951         ptlrpc_request_set_replen(req);
3952         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3953                 LASSERT(set != NULL);
3954                 ptlrpc_set_add_req(set, req);
3955                 ptlrpc_check_set(NULL, set);
3956         } else
3957                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3958
3959         RETURN(0);
3960 }
3961
3962
3963 static struct llog_operations osc_size_repl_logops = {
3964         lop_cancel: llog_obd_repl_cancel
3965 };
3966
3967 static struct llog_operations osc_mds_ost_orig_logops;
3968 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3969                          struct obd_device *tgt, int count,
3970                          struct llog_catid *catid, struct obd_uuid *uuid)
3971 {
3972         int rc;
3973         ENTRY;
3974
3975         LASSERT(olg == &obd->obd_olg);
3976         spin_lock(&obd->obd_dev_lock);
3977         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3978                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3979                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3980                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3981                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3982                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3983         }
3984         spin_unlock(&obd->obd_dev_lock);
3985
3986         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3987                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3988         if (rc) {
3989                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3990                 GOTO(out, rc);
3991         }
3992
3993         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3994                         NULL, &osc_size_repl_logops);
3995         if (rc) {
3996                 struct llog_ctxt *ctxt =
3997                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3998                 if (ctxt)
3999                         llog_cleanup(ctxt);
4000                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4001         }
4002         GOTO(out, rc);
4003 out:
4004         if (rc) {
4005                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
4006                        obd->obd_name, tgt->obd_name, count, catid, rc);
4007                 CERROR("logid "LPX64":0x%x\n",
4008                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4009         }
4010         return rc;
4011 }
4012
4013 static int osc_llog_finish(struct obd_device *obd, int count)
4014 {
4015         struct llog_ctxt *ctxt;
4016         int rc = 0, rc2 = 0;
4017         ENTRY;
4018
4019         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4020         if (ctxt)
4021                 rc = llog_cleanup(ctxt);
4022
4023         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4024         if (ctxt)
4025                 rc2 = llog_cleanup(ctxt);
4026         if (!rc)
4027                 rc = rc2;
4028
4029         RETURN(rc);
4030 }
4031
4032 static int osc_reconnect(const struct lu_env *env,
4033                          struct obd_export *exp, struct obd_device *obd,
4034                          struct obd_uuid *cluuid,
4035                          struct obd_connect_data *data,
4036                          void *localdata)
4037 {
4038         struct client_obd *cli = &obd->u.cli;
4039
4040         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4041                 long lost_grant;
4042
4043                 client_obd_list_lock(&cli->cl_loi_list_lock);
4044                 data->ocd_grant = cli->cl_avail_grant ?:
4045                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4046                 lost_grant = cli->cl_lost_grant;
4047                 cli->cl_lost_grant = 0;
4048                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4049
4050                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4051                        "cl_lost_grant: %ld\n", data->ocd_grant,
4052                        cli->cl_avail_grant, lost_grant);
4053                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4054                        " ocd_grant: %d\n", data->ocd_connect_flags,
4055                        data->ocd_version, data->ocd_grant);
4056         }
4057
4058         RETURN(0);
4059 }
4060
4061 static int osc_disconnect(struct obd_export *exp)
4062 {
4063         struct obd_device *obd = class_exp2obd(exp);
4064         struct llog_ctxt  *ctxt;
4065         int rc;
4066
4067         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4068         if (ctxt) {
4069                 if (obd->u.cli.cl_conn_count == 1) {
4070                         /* Flush any remaining cancel messages out to the
4071                          * target */
4072                         llog_sync(ctxt, exp);
4073                 }
4074                 llog_ctxt_put(ctxt);
4075         } else {
4076                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4077                        obd);
4078         }
4079
4080         rc = client_disconnect_export(exp);
4081         /**
4082          * Initially we put del_shrink_grant before disconnect_export, but it
4083          * causes the following problem if setup (connect) and cleanup
4084          * (disconnect) are tangled together.
4085          *      connect p1                     disconnect p2
4086          *   ptlrpc_connect_import
4087          *     ...............               class_manual_cleanup
4088          *                                     osc_disconnect
4089          *                                     del_shrink_grant
4090          *   ptlrpc_connect_interrupt
4091          *     init_grant_shrink
4092          *   add this client to shrink list
4093          *                                      cleanup_osc
4094          * Bang! pinger trigger the shrink.
4095          * So the osc should be disconnected from the shrink list, after we
4096          * are sure the import has been destroyed. BUG18662
4097          */
4098         if (obd->u.cli.cl_import == NULL)
4099                 osc_del_shrink_grant(&obd->u.cli);
4100         return rc;
4101 }
4102
4103 static int osc_import_event(struct obd_device *obd,
4104                             struct obd_import *imp,
4105                             enum obd_import_event event)
4106 {
4107         struct client_obd *cli;
4108         int rc = 0;
4109
4110         ENTRY;
4111         LASSERT(imp->imp_obd == obd);
4112
4113         switch (event) {
4114         case IMP_EVENT_DISCON: {
4115                 /* Only do this on the MDS OSC's */
4116                 if (imp->imp_server_timeout) {
4117                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4118
4119                         spin_lock(&oscc->oscc_lock);
4120                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4121                         spin_unlock(&oscc->oscc_lock);
4122                 }
4123                 cli = &obd->u.cli;
4124                 client_obd_list_lock(&cli->cl_loi_list_lock);
4125                 cli->cl_avail_grant = 0;
4126                 cli->cl_lost_grant = 0;
4127                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4128                 break;
4129         }
4130         case IMP_EVENT_INACTIVE: {
4131                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4132                 break;
4133         }
4134         case IMP_EVENT_INVALIDATE: {
4135                 struct ldlm_namespace *ns = obd->obd_namespace;
4136                 struct lu_env         *env;
4137                 int                    refcheck;
4138
4139                 env = cl_env_get(&refcheck);
4140                 if (!IS_ERR(env)) {
4141                         /* Reset grants */
4142                         cli = &obd->u.cli;
4143                         client_obd_list_lock(&cli->cl_loi_list_lock);
4144                         /* all pages go to failing rpcs due to the invalid
4145                          * import */
4146                         osc_check_rpcs(env, cli);
4147                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4148
4149                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4150                         cl_env_put(env, &refcheck);
4151                 } else
4152                         rc = PTR_ERR(env);
4153                 break;
4154         }
4155         case IMP_EVENT_ACTIVE: {
4156                 /* Only do this on the MDS OSC's */
4157                 if (imp->imp_server_timeout) {
4158                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4159
4160                         spin_lock(&oscc->oscc_lock);
4161                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4162                         spin_unlock(&oscc->oscc_lock);
4163                 }
4164                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4165                 break;
4166         }
4167         case IMP_EVENT_OCD: {
4168                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4169
4170                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4171                         osc_init_grant(&obd->u.cli, ocd);
4172
4173                 /* See bug 7198 */
4174                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4175                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4176
4177                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4178                 break;
4179         }
4180         default:
4181                 CERROR("Unknown import event %d\n", event);
4182                 LBUG();
4183         }
4184         RETURN(rc);
4185 }
4186
4187 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4188 {
4189         int rc;
4190         ENTRY;
4191
4192         ENTRY;
4193         rc = ptlrpcd_addref();
4194         if (rc)
4195                 RETURN(rc);
4196
4197         rc = client_obd_setup(obd, lcfg);
4198         if (rc) {
4199                 ptlrpcd_decref();
4200         } else {
4201                 struct lprocfs_static_vars lvars = { 0 };
4202                 struct client_obd *cli = &obd->u.cli;
4203
4204                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4205                 lprocfs_osc_init_vars(&lvars);
4206                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4207                         lproc_osc_attach_seqstat(obd);
4208                         sptlrpc_lprocfs_cliobd_attach(obd);
4209                         ptlrpc_lprocfs_register_obd(obd);
4210                 }
4211
4212                 oscc_init(obd);
4213                 /* We need to allocate a few requests more, because
4214                    brw_interpret tries to create new requests before freeing
4215                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4216                    reserved, but I afraid that might be too much wasted RAM
4217                    in fact, so 2 is just my guess and still should work. */
4218                 cli->cl_import->imp_rq_pool =
4219                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4220                                             OST_MAXREQSIZE,
4221                                             ptlrpc_add_rqs_to_pool);
4222
4223                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4224                 sema_init(&cli->cl_grant_sem, 1);
4225         }
4226
4227         RETURN(rc);
4228 }
4229
4230 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4231 {
4232         int rc = 0;
4233         ENTRY;
4234
4235         switch (stage) {
4236         case OBD_CLEANUP_EARLY: {
4237                 struct obd_import *imp;
4238                 imp = obd->u.cli.cl_import;
4239                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4240                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4241                 ptlrpc_deactivate_import(imp);
4242                 spin_lock(&imp->imp_lock);
4243                 imp->imp_pingable = 0;
4244                 spin_unlock(&imp->imp_lock);
4245                 break;
4246         }
4247         case OBD_CLEANUP_EXPORTS: {
4248                 /* If we set up but never connected, the
4249                    client import will not have been cleaned. */
4250                 if (obd->u.cli.cl_import) {
4251                         struct obd_import *imp;
4252                         down_write(&obd->u.cli.cl_sem);
4253                         imp = obd->u.cli.cl_import;
4254                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4255                                obd->obd_name);
4256                         ptlrpc_invalidate_import(imp);
4257                         if (imp->imp_rq_pool) {
4258                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4259                                 imp->imp_rq_pool = NULL;
4260                         }
4261                         class_destroy_import(imp);
4262                         up_write(&obd->u.cli.cl_sem);
4263                         obd->u.cli.cl_import = NULL;
4264                 }
4265                 rc = obd_llog_finish(obd, 0);
4266                 if (rc != 0)
4267                         CERROR("failed to cleanup llogging subsystems\n");
4268                 break;
4269                 }
4270         }
4271         RETURN(rc);
4272 }
4273
4274 int osc_cleanup(struct obd_device *obd)
4275 {
4276         int rc;
4277
4278         ENTRY;
4279         ptlrpc_lprocfs_unregister_obd(obd);
4280         lprocfs_obd_cleanup(obd);
4281
4282         /* free memory of osc quota cache */
4283         lquota_cleanup(quota_interface, obd);
4284
4285         rc = client_obd_cleanup(obd);
4286
4287         ptlrpcd_decref();
4288         RETURN(rc);
4289 }
4290
4291 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4292 {
4293         struct lprocfs_static_vars lvars = { 0 };
4294         int rc = 0;
4295
4296         lprocfs_osc_init_vars(&lvars);
4297
4298         switch (lcfg->lcfg_command) {
4299         default:
4300                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4301                                               lcfg, obd);
4302                 if (rc > 0)
4303                         rc = 0;
4304                 break;
4305         }
4306
4307         return(rc);
4308 }
4309
4310 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4311 {
4312         return osc_process_config_base(obd, buf);
4313 }
4314
4315 struct obd_ops osc_obd_ops = {
4316         .o_owner                = THIS_MODULE,
4317         .o_setup                = osc_setup,
4318         .o_precleanup           = osc_precleanup,
4319         .o_cleanup              = osc_cleanup,
4320         .o_add_conn             = client_import_add_conn,
4321         .o_del_conn             = client_import_del_conn,
4322         .o_connect              = client_connect_import,
4323         .o_reconnect            = osc_reconnect,
4324         .o_disconnect           = osc_disconnect,
4325         .o_statfs               = osc_statfs,
4326         .o_statfs_async         = osc_statfs_async,
4327         .o_packmd               = osc_packmd,
4328         .o_unpackmd             = osc_unpackmd,
4329         .o_precreate            = osc_precreate,
4330         .o_create               = osc_create,
4331         .o_create_async         = osc_create_async,
4332         .o_destroy              = osc_destroy,
4333         .o_getattr              = osc_getattr,
4334         .o_getattr_async        = osc_getattr_async,
4335         .o_setattr              = osc_setattr,
4336         .o_setattr_async        = osc_setattr_async,
4337         .o_brw                  = osc_brw,
4338         .o_punch                = osc_punch,
4339         .o_sync                 = osc_sync,
4340         .o_enqueue              = osc_enqueue,
4341         .o_change_cbdata        = osc_change_cbdata,
4342         .o_cancel               = osc_cancel,
4343         .o_cancel_unused        = osc_cancel_unused,
4344         .o_iocontrol            = osc_iocontrol,
4345         .o_get_info             = osc_get_info,
4346         .o_set_info_async       = osc_set_info_async,
4347         .o_import_event         = osc_import_event,
4348         .o_llog_init            = osc_llog_init,
4349         .o_llog_finish          = osc_llog_finish,
4350         .o_process_config       = osc_process_config,
4351 };
4352
4353 extern struct lu_kmem_descr  osc_caches[];
4354 extern spinlock_t            osc_ast_guard;
4355 extern struct lock_class_key osc_ast_guard_class;
4356
4357 int __init osc_init(void)
4358 {
4359         struct lprocfs_static_vars lvars = { 0 };
4360         int rc;
4361         ENTRY;
4362
4363         /* print an address of _any_ initialized kernel symbol from this
4364          * module, to allow debugging with gdb that doesn't support data
4365          * symbols from modules.*/
4366         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4367
4368         rc = lu_kmem_init(osc_caches);
4369
4370         lprocfs_osc_init_vars(&lvars);
4371
4372         request_module("lquota");
4373         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4374         lquota_init(quota_interface);
4375         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4376
4377         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4378                                  LUSTRE_OSC_NAME, &osc_device_type);
4379         if (rc) {
4380                 if (quota_interface)
4381                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4382                 lu_kmem_fini(osc_caches);
4383                 RETURN(rc);
4384         }
4385
4386         spin_lock_init(&osc_ast_guard);
4387         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4388
4389         RETURN(rc);
4390 }
4391
4392 #ifdef __KERNEL__
4393 static void /*__exit*/ osc_exit(void)
4394 {
4395         lu_device_type_fini(&osc_device_type);
4396
4397         lquota_exit(quota_interface);
4398         if (quota_interface)
4399                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4400
4401         class_unregister_type(LUSTRE_OSC_NAME);
4402         lu_kmem_fini(osc_caches);
4403 }
4404
4405 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4406 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4407 MODULE_LICENSE("GPL");
4408
4409 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4410 #endif