Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         lustre_set_wire_obdo(&body->oa, oa);
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         lustre_get_wire_obdo(oa, &body->oa);
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         lustre_get_wire_obdo(aa->pa_oa, &body->oa);
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         lustre_set_wire_obdo(&body->oa, oa);
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         lustre_set_wire_obdo(&body->oa, oa);
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         lustre_get_wire_obdo(oa, &body->oa);
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         lustre_set_wire_obdo(&body->oa, oa);
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
747                                                           NULL);
748
749                         /*
750                          * Wait until the number of on-going destroy RPCs drops
751                          * under max_rpc_in_flight
752                          */
753                         l_wait_event_exclusive(cli->cl_destroy_waitq,
754                                                osc_can_send_destroy(cli), &lwi);
755                 }
756         }
757
758         /* Do not wait for response */
759         ptlrpcd_add_req(req, PSCOPE_OTHER);
760         RETURN(0);
761 }
762
763 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
764                                 long writing_bytes)
765 {
766         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
767
768         LASSERT(!(oa->o_valid & bits));
769
770         oa->o_valid |= bits;
771         client_obd_list_lock(&cli->cl_loi_list_lock);
772         oa->o_dirty = cli->cl_dirty;
773         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
774                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
775                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
776                 oa->o_undirty = 0;
777         } else if (atomic_read(&obd_dirty_pages) -
778                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
779                 CERROR("dirty %d - %d > system dirty_max %d\n",
780                        atomic_read(&obd_dirty_pages),
781                        atomic_read(&obd_dirty_transit_pages),
782                        obd_max_dirty_pages);
783                 oa->o_undirty = 0;
784         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
785                 CERROR("dirty %lu - dirty_max %lu too big???\n",
786                        cli->cl_dirty, cli->cl_dirty_max);
787                 oa->o_undirty = 0;
788         } else {
789                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
790                                 (cli->cl_max_rpcs_in_flight + 1);
791                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
792         }
793         oa->o_grant = cli->cl_avail_grant;
794         oa->o_dropped = cli->cl_lost_grant;
795         cli->cl_lost_grant = 0;
796         client_obd_list_unlock(&cli->cl_loi_list_lock);
797         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
798                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
799
800 }
801
802 static void osc_update_next_shrink(struct client_obd *cli)
803 {
804         cli->cl_next_shrink_grant =
805                 cfs_time_shift(cli->cl_grant_shrink_interval);
806         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
807                cli->cl_next_shrink_grant);
808 }
809
810 /* caller must hold loi_list_lock */
811 static void osc_consume_write_grant(struct client_obd *cli,
812                                     struct brw_page *pga)
813 {
814         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
815         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
816         atomic_inc(&obd_dirty_pages);
817         cli->cl_dirty += CFS_PAGE_SIZE;
818         cli->cl_avail_grant -= CFS_PAGE_SIZE;
819         pga->flag |= OBD_BRW_FROM_GRANT;
820         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
821                CFS_PAGE_SIZE, pga, pga->pg);
822         LASSERT(cli->cl_avail_grant >= 0);
823         osc_update_next_shrink(cli);
824 }
825
826 /* the companion to osc_consume_write_grant, called when a brw has completed.
827  * must be called with the loi lock held. */
828 static void osc_release_write_grant(struct client_obd *cli,
829                                     struct brw_page *pga, int sent)
830 {
831         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
832         ENTRY;
833
834         LASSERT(client_obd_list_is_locked(&cli->cl_loi_list_lock));
835         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
836                 EXIT;
837                 return;
838         }
839
840         pga->flag &= ~OBD_BRW_FROM_GRANT;
841         atomic_dec(&obd_dirty_pages);
842         cli->cl_dirty -= CFS_PAGE_SIZE;
843         if (pga->flag & OBD_BRW_NOCACHE) {
844                 pga->flag &= ~OBD_BRW_NOCACHE;
845                 atomic_dec(&obd_dirty_transit_pages);
846                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
847         }
848         if (!sent) {
849                 cli->cl_lost_grant += CFS_PAGE_SIZE;
850                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
851                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
852         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
853                 /* For short writes we shouldn't count parts of pages that
854                  * span a whole block on the OST side, or our accounting goes
855                  * wrong.  Should match the code in filter_grant_check. */
856                 int offset = pga->off & ~CFS_PAGE_MASK;
857                 int count = pga->count + (offset & (blocksize - 1));
858                 int end = (offset + pga->count) & (blocksize - 1);
859                 if (end)
860                         count += blocksize - end;
861
862                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
863                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
864                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
865                        cli->cl_avail_grant, cli->cl_dirty);
866         }
867
868         EXIT;
869 }
870
871 static unsigned long rpcs_in_flight(struct client_obd *cli)
872 {
873         return cli->cl_r_in_flight + cli->cl_w_in_flight;
874 }
875
876 /* caller must hold loi_list_lock */
877 void osc_wake_cache_waiters(struct client_obd *cli)
878 {
879         struct list_head *l, *tmp;
880         struct osc_cache_waiter *ocw;
881
882         ENTRY;
883         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
884                 /* if we can't dirty more, we must wait until some is written */
885                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
886                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
887                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
888                                "osc max %ld, sys max %d\n", cli->cl_dirty,
889                                cli->cl_dirty_max, obd_max_dirty_pages);
890                         return;
891                 }
892
893                 /* if still dirty cache but no grant wait for pending RPCs that
894                  * may yet return us some grant before doing sync writes */
895                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
896                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
897                                cli->cl_w_in_flight);
898                         return;
899                 }
900
901                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
902                 list_del_init(&ocw->ocw_entry);
903                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
904                         /* no more RPCs in flight to return grant, do sync IO */
905                         ocw->ocw_rc = -EDQUOT;
906                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
907                 } else {
908                         osc_consume_write_grant(cli,
909                                                 &ocw->ocw_oap->oap_brw_page);
910                 }
911
912                 cfs_waitq_signal(&ocw->ocw_waitq);
913         }
914
915         EXIT;
916 }
917
918 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
919 {
920         client_obd_list_lock(&cli->cl_loi_list_lock);
921         cli->cl_avail_grant += grant;
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
926 {
927         if (body->oa.o_valid & OBD_MD_FLGRANT) {
928                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
929                 __osc_update_grant(cli, body->oa.o_grant);
930         }
931 }
932
933 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
934                               void *key, obd_count vallen, void *val,
935                               struct ptlrpc_request_set *set);
936
937 static int osc_shrink_grant_interpret(const struct lu_env *env,
938                                       struct ptlrpc_request *req,
939                                       void *aa, int rc)
940 {
941         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
942         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
943         struct ost_body *body;
944
945         if (rc != 0) {
946                 __osc_update_grant(cli, oa->o_grant);
947                 GOTO(out, rc);
948         }
949
950         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
951         LASSERT(body);
952         osc_update_grant(cli, body);
953 out:
954         OBD_FREE_PTR(oa);
955         return rc;
956 }
957
958 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
959 {
960         client_obd_list_lock(&cli->cl_loi_list_lock);
961         oa->o_grant = cli->cl_avail_grant / 4;
962         cli->cl_avail_grant -= oa->o_grant;
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964         oa->o_flags |= OBD_FL_SHRINK_GRANT;
965         osc_update_next_shrink(cli);
966 }
967
968 /* Shrink the current grant, either from some large amount to enough for a
969  * full set of in-flight RPCs, or if we have already shrunk to that limit
970  * then to enough for a single RPC.  This avoids keeping more grant than
971  * needed, and avoids shrinking the grant piecemeal. */
972 static int osc_shrink_grant(struct client_obd *cli)
973 {
974         long target = (cli->cl_max_rpcs_in_flight + 1) *
975                       cli->cl_max_pages_per_rpc;
976
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         if (cli->cl_avail_grant <= target)
979                 target = cli->cl_max_pages_per_rpc;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981
982         return osc_shrink_grant_to_target(cli, target);
983 }
984
985 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
986 {
987         int    rc = 0;
988         struct ost_body     *body;
989         ENTRY;
990
991         client_obd_list_lock(&cli->cl_loi_list_lock);
992         /* Don't shrink if we are already above or below the desired limit
993          * We don't want to shrink below a single RPC, as that will negatively
994          * impact block allocation and long-term performance. */
995         if (target < cli->cl_max_pages_per_rpc)
996                 target = cli->cl_max_pages_per_rpc;
997
998         if (target >= cli->cl_avail_grant) {
999                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1000                 RETURN(0);
1001         }
1002         client_obd_list_unlock(&cli->cl_loi_list_lock);
1003
1004         OBD_ALLOC_PTR(body);
1005         if (!body)
1006                 RETURN(-ENOMEM);
1007
1008         osc_announce_cached(cli, &body->oa, 0);
1009
1010         client_obd_list_lock(&cli->cl_loi_list_lock);
1011         body->oa.o_grant = cli->cl_avail_grant - target;
1012         cli->cl_avail_grant = target;
1013         client_obd_list_unlock(&cli->cl_loi_list_lock);
1014         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1015         osc_update_next_shrink(cli);
1016
1017         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1018                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1019                                 sizeof(*body), body, NULL);
1020         if (rc != 0)
1021                 __osc_update_grant(cli, body->oa.o_grant);
1022         OBD_FREE_PTR(body);
1023         RETURN(rc);
1024 }
1025
1026 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1027 static int osc_should_shrink_grant(struct client_obd *client)
1028 {
1029         cfs_time_t time = cfs_time_current();
1030         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1031         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1032                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1033                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1034                         return 1;
1035                 else
1036                         osc_update_next_shrink(client);
1037         }
1038         return 0;
1039 }
1040
1041 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1042 {
1043         struct client_obd *client;
1044
1045         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1046                 if (osc_should_shrink_grant(client))
1047                         osc_shrink_grant(client);
1048         }
1049         return 0;
1050 }
1051
1052 static int osc_add_shrink_grant(struct client_obd *client)
1053 {
1054         int rc;
1055
1056         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1057                                        TIMEOUT_GRANT,
1058                                        osc_grant_shrink_grant_cb, NULL,
1059                                        &client->cl_grant_shrink_list);
1060         if (rc) {
1061                 CERROR("add grant client %s error %d\n",
1062                         client->cl_import->imp_obd->obd_name, rc);
1063                 return rc;
1064         }
1065         CDEBUG(D_CACHE, "add grant client %s \n",
1066                client->cl_import->imp_obd->obd_name);
1067         osc_update_next_shrink(client);
1068         return 0;
1069 }
1070
1071 static int osc_del_shrink_grant(struct client_obd *client)
1072 {
1073         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1074                                          TIMEOUT_GRANT);
1075 }
1076
1077 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1078 {
1079         client_obd_list_lock(&cli->cl_loi_list_lock);
1080         cli->cl_avail_grant = ocd->ocd_grant;
1081         client_obd_list_unlock(&cli->cl_loi_list_lock);
1082
1083         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1084             list_empty(&cli->cl_grant_shrink_list))
1085                 osc_add_shrink_grant(cli);
1086
1087         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1088                cli->cl_avail_grant, cli->cl_lost_grant);
1089         LASSERT(cli->cl_avail_grant >= 0);
1090 }
1091
1092 /* We assume that the reason this OSC got a short read is because it read
1093  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1094  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1095  * this stripe never got written at or beyond this stripe offset yet. */
1096 static void handle_short_read(int nob_read, obd_count page_count,
1097                               struct brw_page **pga)
1098 {
1099         char *ptr;
1100         int i = 0;
1101
1102         /* skip bytes read OK */
1103         while (nob_read > 0) {
1104                 LASSERT (page_count > 0);
1105
1106                 if (pga[i]->count > nob_read) {
1107                         /* EOF inside this page */
1108                         ptr = cfs_kmap(pga[i]->pg) +
1109                                 (pga[i]->off & ~CFS_PAGE_MASK);
1110                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1111                         cfs_kunmap(pga[i]->pg);
1112                         page_count--;
1113                         i++;
1114                         break;
1115                 }
1116
1117                 nob_read -= pga[i]->count;
1118                 page_count--;
1119                 i++;
1120         }
1121
1122         /* zero remaining pages */
1123         while (page_count-- > 0) {
1124                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1125                 memset(ptr, 0, pga[i]->count);
1126                 cfs_kunmap(pga[i]->pg);
1127                 i++;
1128         }
1129 }
1130
1131 static int check_write_rcs(struct ptlrpc_request *req,
1132                            int requested_nob, int niocount,
1133                            obd_count page_count, struct brw_page **pga)
1134 {
1135         int    *remote_rcs, i;
1136
1137         /* return error if any niobuf was in error */
1138         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1139                                         sizeof(*remote_rcs) * niocount, NULL);
1140         if (remote_rcs == NULL) {
1141                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142                 return(-EPROTO);
1143         }
1144         if (lustre_msg_swabbed(req->rq_repmsg))
1145                 for (i = 0; i < niocount; i++)
1146                         __swab32s(&remote_rcs[i]);
1147
1148         for (i = 0; i < niocount; i++) {
1149                 if (remote_rcs[i] < 0)
1150                         return(remote_rcs[i]);
1151
1152                 if (remote_rcs[i] != 0) {
1153                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1154                                 i, remote_rcs[i], req);
1155                         return(-EPROTO);
1156                 }
1157         }
1158
1159         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1160                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1161                        req->rq_bulk->bd_nob_transferred, requested_nob);
1162                 return(-EPROTO);
1163         }
1164
1165         return (0);
1166 }
1167
1168 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1169 {
1170         if (p1->flag != p2->flag) {
1171                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1172                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1173
1174                 /* warn if we try to combine flags that we don't know to be
1175                  * safe to combine */
1176                 if ((p1->flag & mask) != (p2->flag & mask))
1177                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1178                                "same brw?\n", p1->flag, p2->flag);
1179                 return 0;
1180         }
1181
1182         return (p1->off + p1->count == p2->off);
1183 }
1184
1185 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186                                    struct brw_page **pga, int opc,
1187                                    cksum_type_t cksum_type)
1188 {
1189         __u32 cksum;
1190         int i = 0;
1191
1192         LASSERT (pg_count > 0);
1193         cksum = init_checksum(cksum_type);
1194         while (nob > 0 && pg_count > 0) {
1195                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1196                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1197                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1198
1199                 /* corrupt the data before we compute the checksum, to
1200                  * simulate an OST->client data error */
1201                 if (i == 0 && opc == OST_READ &&
1202                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1203                         memcpy(ptr + off, "bad1", min(4, nob));
1204                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1205                 cfs_kunmap(pga[i]->pg);
1206                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1207                                off, cksum);
1208
1209                 nob -= pga[i]->count;
1210                 pg_count--;
1211                 i++;
1212         }
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve)
1226 {
1227         struct ptlrpc_request   *req;
1228         struct ptlrpc_bulk_desc *desc;
1229         struct ost_body         *body;
1230         struct obd_ioobj        *ioobj;
1231         struct niobuf_remote    *niobuf;
1232         int niocount, i, requested_nob, opc, rc;
1233         struct osc_brw_async_args *aa;
1234         struct req_capsule      *pill;
1235         struct brw_page *pg_prev;
1236
1237         ENTRY;
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 RETURN(-ENOMEM); /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 RETURN(-EINVAL); /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1251         }
1252         if (req == NULL)
1253                 RETURN(-ENOMEM);
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1262                              niocount * sizeof(*niobuf));
1263         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1264
1265         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1266         if (rc) {
1267                 ptlrpc_request_free(req);
1268                 RETURN(rc);
1269         }
1270         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1271         ptlrpc_at_set_req_timeout(req);
1272
1273         if (opc == OST_WRITE)
1274                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1275                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1276         else
1277                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1278                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1279
1280         if (desc == NULL)
1281                 GOTO(out, rc = -ENOMEM);
1282         /* NB request now owns desc and will free it when it gets freed */
1283
1284         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1285         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1286         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1287         LASSERT(body && ioobj && niobuf);
1288
1289         lustre_set_wire_obdo(&body->oa, oa);
1290
1291         obdo_to_ioobj(oa, ioobj);
1292         ioobj->ioo_bufcnt = niocount;
1293         osc_pack_capa(req, body, ocapa);
1294         LASSERT (page_count > 0);
1295         pg_prev = pga[0];
1296         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1297                 struct brw_page *pg = pga[i];
1298
1299                 LASSERT(pg->count > 0);
1300                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1301                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1302                          pg->off, pg->count);
1303 #ifdef __linux__
1304                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1305                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1306                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1307                          i, page_count,
1308                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1309                          pg_prev->pg, page_private(pg_prev->pg),
1310                          pg_prev->pg->index, pg_prev->off);
1311 #else
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u\n", i, page_count);
1314 #endif
1315                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1316                         (pg->flag & OBD_BRW_SRVLOCK));
1317
1318                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1319                                       pg->count);
1320                 requested_nob += pg->count;
1321
1322                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1323                         niobuf--;
1324                         niobuf->len += pg->count;
1325                 } else {
1326                         niobuf->offset = pg->off;
1327                         niobuf->len    = pg->count;
1328                         niobuf->flags  = pg->flag;
1329                 }
1330                 pg_prev = pg;
1331         }
1332
1333         LASSERTF((void *)(niobuf - niocount) ==
1334                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1335                                niocount * sizeof(*niobuf)),
1336                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1337                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1338                 (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (osc_should_shrink_grant(cli))
1342                 osc_shrink_grant_local(cli, &body->oa);
1343
1344         /* size[REQ_REC_OFF] still sizeof (*body) */
1345         if (opc == OST_WRITE) {
1346                 if (unlikely(cli->cl_checksum) &&
1347                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1348                         /* store cl_cksum_type in a local variable since
1349                          * it can be changed via lprocfs */
1350                         cksum_type_t cksum_type = cli->cl_cksum_type;
1351
1352                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1353                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1354                                 body->oa.o_flags = 0;
1355                         }
1356                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1357                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1358                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1359                                                              page_count, pga,
1360                                                              OST_WRITE,
1361                                                              cksum_type);
1362                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1363                                body->oa.o_cksum);
1364                         /* save this in 'oa', too, for later checking */
1365                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         oa->o_flags |= cksum_type_pack(cksum_type);
1367                 } else {
1368                         /* clear out the checksum flag, in case this is a
1369                          * resend but cl_checksum is no longer set. b=11238 */
1370                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1371                 }
1372                 oa->o_cksum = body->oa.o_cksum;
1373                 /* 1 RC per niobuf */
1374                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1375                                      sizeof(__u32) * niocount);
1376         } else {
1377                 if (unlikely(cli->cl_checksum) &&
1378                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1379                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1380                                 body->oa.o_flags = 0;
1381                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1382                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1383                 }
1384                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1385                 /* 1 RC for the whole I/O */
1386         }
1387         ptlrpc_request_set_replen(req);
1388
1389         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1390         aa = ptlrpc_req_async_args(req);
1391         aa->aa_oa = oa;
1392         aa->aa_requested_nob = requested_nob;
1393         aa->aa_nio_count = niocount;
1394         aa->aa_page_count = page_count;
1395         aa->aa_resends = 0;
1396         aa->aa_ppga = pga;
1397         aa->aa_cli = cli;
1398         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1399         if (ocapa && reserve)
1400                 aa->aa_ocapa = capa_get(ocapa);
1401
1402         *reqp = req;
1403         RETURN(0);
1404
1405  out:
1406         ptlrpc_req_finished(req);
1407         RETURN(rc);
1408 }
1409
1410 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1411                                 __u32 client_cksum, __u32 server_cksum, int nob,
1412                                 obd_count page_count, struct brw_page **pga,
1413                                 cksum_type_t client_cksum_type)
1414 {
1415         __u32 new_cksum;
1416         char *msg;
1417         cksum_type_t cksum_type;
1418
1419         if (server_cksum == client_cksum) {
1420                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1421                 return 0;
1422         }
1423
1424         if (oa->o_valid & OBD_MD_FLFLAGS)
1425                 cksum_type = cksum_type_unpack(oa->o_flags);
1426         else
1427                 cksum_type = OBD_CKSUM_CRC32;
1428
1429         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1430                                       cksum_type);
1431
1432         if (cksum_type != client_cksum_type)
1433                 msg = "the server did not use the checksum type specified in "
1434                       "the original request - likely a protocol problem";
1435         else if (new_cksum == server_cksum)
1436                 msg = "changed on the client after we checksummed it - "
1437                       "likely false positive due to mmap IO (bug 11742)";
1438         else if (new_cksum == client_cksum)
1439                 msg = "changed in transit before arrival at OST";
1440         else
1441                 msg = "changed in transit AND doesn't match the original - "
1442                       "likely false positive due to mmap IO (bug 11742)";
1443
1444         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1445                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1446                            "["LPU64"-"LPU64"]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1450                                                         (__u64)0,
1451                            oa->o_id,
1452                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1453                            pga[0]->off,
1454                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1455         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1456                "client csum now %x\n", client_cksum, client_cksum_type,
1457                server_cksum, cksum_type, new_cksum);
1458         return 1;
1459 }
1460
1461 /* Note rc enters this function as number of bytes transferred */
1462 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1463 {
1464         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1465         const lnet_process_id_t *peer =
1466                         &req->rq_import->imp_connection->c_peer;
1467         struct client_obd *cli = aa->aa_cli;
1468         struct ost_body *body;
1469         __u32 client_cksum = 0;
1470         ENTRY;
1471
1472         if (rc < 0 && rc != -EDQUOT)
1473                 RETURN(rc);
1474
1475         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1476         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1477                                   lustre_swab_ost_body);
1478         if (body == NULL) {
1479                 CDEBUG(D_INFO, "Can't unpack body\n");
1480                 RETURN(-EPROTO);
1481         }
1482
1483         /* set/clear over quota flag for a uid/gid */
1484         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1485             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1486                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1487
1488                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1489                              body->oa.o_flags);
1490         }
1491
1492         if (rc < 0)
1493                 RETURN(rc);
1494
1495         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1496                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1497
1498         osc_update_grant(cli, body);
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         RETURN(-EPROTO);
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         RETURN(-EAGAIN);
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         RETURN(-EAGAIN);
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1518                                      aa->aa_page_count, aa->aa_ppga);
1519                 GOTO(out, rc);
1520         }
1521
1522         /* The rest of this function executes only for OST_READs */
1523
1524         /* if unwrap_bulk failed, return -EAGAIN to retry */
1525         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1526         if (rc < 0)
1527                 GOTO(out, rc = -EAGAIN);
1528
1529         if (rc > aa->aa_requested_nob) {
1530                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1531                        aa->aa_requested_nob);
1532                 RETURN(-EPROTO);
1533         }
1534
1535         if (rc != req->rq_bulk->bd_nob_transferred) {
1536                 CERROR ("Unexpected rc %d (%d transferred)\n",
1537                         rc, req->rq_bulk->bd_nob_transferred);
1538                 return (-EPROTO);
1539         }
1540
1541         if (rc < aa->aa_requested_nob)
1542                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1543
1544         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1545                 static int cksum_counter;
1546                 __u32      server_cksum = body->oa.o_cksum;
1547                 char      *via;
1548                 char      *router;
1549                 cksum_type_t cksum_type;
1550
1551                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1552                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1553                 else
1554                         cksum_type = OBD_CKSUM_CRC32;
1555                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1556                                                  aa->aa_ppga, OST_READ,
1557                                                  cksum_type);
1558
1559                 if (peer->nid == req->rq_bulk->bd_sender) {
1560                         via = router = "";
1561                 } else {
1562                         via = " via ";
1563                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1564                 }
1565
1566                 if (server_cksum == ~0 && rc > 0) {
1567                         CERROR("Protocol error: server %s set the 'checksum' "
1568                                "bit, but didn't send a checksum.  Not fatal, "
1569                                "but please notify on http://bugzilla.lustre.org/\n",
1570                                libcfs_nid2str(peer->nid));
1571                 } else if (server_cksum != client_cksum) {
1572                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1573                                            "%s%s%s inum "LPU64"/"LPU64" object "
1574                                            LPU64"/"LPU64" extent "
1575                                            "["LPU64"-"LPU64"]\n",
1576                                            req->rq_import->imp_obd->obd_name,
1577                                            libcfs_nid2str(peer->nid),
1578                                            via, router,
1579                                            body->oa.o_valid & OBD_MD_FLFID ?
1580                                                 body->oa.o_fid : (__u64)0,
1581                                            body->oa.o_valid & OBD_MD_FLFID ?
1582                                                 body->oa.o_generation :(__u64)0,
1583                                            body->oa.o_id,
1584                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1585                                                 body->oa.o_gr : (__u64)0,
1586                                            aa->aa_ppga[0]->off,
1587                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1588                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1589                                                                         1);
1590                         CERROR("client %x, server %x, cksum_type %x\n",
1591                                client_cksum, server_cksum, cksum_type);
1592                         cksum_counter = 0;
1593                         aa->aa_oa->o_cksum = client_cksum;
1594                         rc = -EAGAIN;
1595                 } else {
1596                         cksum_counter++;
1597                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1598                         rc = 0;
1599                 }
1600         } else if (unlikely(client_cksum)) {
1601                 static int cksum_missed;
1602
1603                 cksum_missed++;
1604                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1605                         CERROR("Checksum %u requested from %s but not sent\n",
1606                                cksum_missed, libcfs_nid2str(peer->nid));
1607         } else {
1608                 rc = 0;
1609         }
1610 out:
1611         if (rc >= 0)
1612                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1613
1614         RETURN(rc);
1615 }
1616
1617 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1618                             struct lov_stripe_md *lsm,
1619                             obd_count page_count, struct brw_page **pga,
1620                             struct obd_capa *ocapa)
1621 {
1622         struct ptlrpc_request *req;
1623         int                    rc;
1624         cfs_waitq_t            waitq;
1625         int                    resends = 0;
1626         struct l_wait_info     lwi;
1627
1628         ENTRY;
1629
1630         cfs_waitq_init(&waitq);
1631
1632 restart_bulk:
1633         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1634                                   page_count, pga, &req, ocapa, 0);
1635         if (rc != 0)
1636                 return (rc);
1637
1638         rc = ptlrpc_queue_wait(req);
1639
1640         if (rc == -ETIMEDOUT && req->rq_resend) {
1641                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1642                 ptlrpc_req_finished(req);
1643                 goto restart_bulk;
1644         }
1645
1646         rc = osc_brw_fini_request(req, rc);
1647
1648         ptlrpc_req_finished(req);
1649         if (osc_recoverable_error(rc)) {
1650                 resends++;
1651                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1652                         CERROR("too many resend retries, returning error\n");
1653                         RETURN(-EIO);
1654                 }
1655
1656                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1657                 l_wait_event(waitq, 0, &lwi);
1658
1659                 goto restart_bulk;
1660         }
1661
1662         RETURN (rc);
1663 }
1664
1665 int osc_brw_redo_request(struct ptlrpc_request *request,
1666                          struct osc_brw_async_args *aa)
1667 {
1668         struct ptlrpc_request *new_req;
1669         struct ptlrpc_request_set *set = request->rq_set;
1670         struct osc_brw_async_args *new_aa;
1671         struct osc_async_page *oap;
1672         int rc = 0;
1673         ENTRY;
1674
1675         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1676                 CERROR("too many resend retries, returning error\n");
1677                 RETURN(-EIO);
1678         }
1679
1680         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1681
1682         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1683                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1684                                   aa->aa_cli, aa->aa_oa,
1685                                   NULL /* lsm unused by osc currently */,
1686                                   aa->aa_page_count, aa->aa_ppga,
1687                                   &new_req, aa->aa_ocapa, 0);
1688         if (rc)
1689                 RETURN(rc);
1690
1691         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1692
1693         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1694                 if (oap->oap_request != NULL) {
1695                         LASSERTF(request == oap->oap_request,
1696                                  "request %p != oap_request %p\n",
1697                                  request, oap->oap_request);
1698                         if (oap->oap_interrupted) {
1699                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1700                                 ptlrpc_req_finished(new_req);
1701                                 RETURN(-EINTR);
1702                         }
1703                 }
1704         }
1705         /* New request takes over pga and oaps from old request.
1706          * Note that copying a list_head doesn't work, need to move it... */
1707         aa->aa_resends++;
1708         new_req->rq_interpret_reply = request->rq_interpret_reply;
1709         new_req->rq_async_args = request->rq_async_args;
1710         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1711
1712         new_aa = ptlrpc_req_async_args(new_req);
1713
1714         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1715         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1716         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1717
1718         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1719                 if (oap->oap_request) {
1720                         ptlrpc_req_finished(oap->oap_request);
1721                         oap->oap_request = ptlrpc_request_addref(new_req);
1722                 }
1723         }
1724
1725         new_aa->aa_ocapa = aa->aa_ocapa;
1726         aa->aa_ocapa = NULL;
1727
1728         /* use ptlrpc_set_add_req is safe because interpret functions work
1729          * in check_set context. only one way exist with access to request
1730          * from different thread got -EINTR - this way protected with
1731          * cl_loi_list_lock */
1732         ptlrpc_set_add_req(set, new_req);
1733
1734         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1735
1736         DEBUG_REQ(D_INFO, new_req, "new request");
1737         RETURN(0);
1738 }
1739
1740 /*
1741  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1742  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1743  * fine for our small page arrays and doesn't require allocation.  its an
1744  * insertion sort that swaps elements that are strides apart, shrinking the
1745  * stride down until its '1' and the array is sorted.
1746  */
1747 static void sort_brw_pages(struct brw_page **array, int num)
1748 {
1749         int stride, i, j;
1750         struct brw_page *tmp;
1751
1752         if (num == 1)
1753                 return;
1754         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1755                 ;
1756
1757         do {
1758                 stride /= 3;
1759                 for (i = stride ; i < num ; i++) {
1760                         tmp = array[i];
1761                         j = i;
1762                         while (j >= stride && array[j - stride]->off > tmp->off) {
1763                                 array[j] = array[j - stride];
1764                                 j -= stride;
1765                         }
1766                         array[j] = tmp;
1767                 }
1768         } while (stride > 1);
1769 }
1770
1771 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1772 {
1773         int count = 1;
1774         int offset;
1775         int i = 0;
1776
1777         LASSERT (pages > 0);
1778         offset = pg[i]->off & ~CFS_PAGE_MASK;
1779
1780         for (;;) {
1781                 pages--;
1782                 if (pages == 0)         /* that's all */
1783                         return count;
1784
1785                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1786                         return count;   /* doesn't end on page boundary */
1787
1788                 i++;
1789                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1790                 if (offset != 0)        /* doesn't start on page boundary */
1791                         return count;
1792
1793                 count++;
1794         }
1795 }
1796
1797 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1798 {
1799         struct brw_page **ppga;
1800         int i;
1801
1802         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1803         if (ppga == NULL)
1804                 return NULL;
1805
1806         for (i = 0; i < count; i++)
1807                 ppga[i] = pga + i;
1808         return ppga;
1809 }
1810
1811 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1812 {
1813         LASSERT(ppga != NULL);
1814         OBD_FREE(ppga, sizeof(*ppga) * count);
1815 }
1816
1817 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1818                    obd_count page_count, struct brw_page *pga,
1819                    struct obd_trans_info *oti)
1820 {
1821         struct obdo *saved_oa = NULL;
1822         struct brw_page **ppga, **orig;
1823         struct obd_import *imp = class_exp2cliimp(exp);
1824         struct client_obd *cli;
1825         int rc, page_count_orig;
1826         ENTRY;
1827
1828         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1829         cli = &imp->imp_obd->u.cli;
1830
1831         if (cmd & OBD_BRW_CHECK) {
1832                 /* The caller just wants to know if there's a chance that this
1833                  * I/O can succeed */
1834
1835                 if (imp->imp_invalid)
1836                         RETURN(-EIO);
1837                 RETURN(0);
1838         }
1839
1840         /* test_brw with a failed create can trip this, maybe others. */
1841         LASSERT(cli->cl_max_pages_per_rpc);
1842
1843         rc = 0;
1844
1845         orig = ppga = osc_build_ppga(pga, page_count);
1846         if (ppga == NULL)
1847                 RETURN(-ENOMEM);
1848         page_count_orig = page_count;
1849
1850         sort_brw_pages(ppga, page_count);
1851         while (page_count) {
1852                 obd_count pages_per_brw;
1853
1854                 if (page_count > cli->cl_max_pages_per_rpc)
1855                         pages_per_brw = cli->cl_max_pages_per_rpc;
1856                 else
1857                         pages_per_brw = page_count;
1858
1859                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1860
1861                 if (saved_oa != NULL) {
1862                         /* restore previously saved oa */
1863                         *oinfo->oi_oa = *saved_oa;
1864                 } else if (page_count > pages_per_brw) {
1865                         /* save a copy of oa (brw will clobber it) */
1866                         OBDO_ALLOC(saved_oa);
1867                         if (saved_oa == NULL)
1868                                 GOTO(out, rc = -ENOMEM);
1869                         *saved_oa = *oinfo->oi_oa;
1870                 }
1871
1872                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1873                                       pages_per_brw, ppga, oinfo->oi_capa);
1874
1875                 if (rc != 0)
1876                         break;
1877
1878                 page_count -= pages_per_brw;
1879                 ppga += pages_per_brw;
1880         }
1881
1882 out:
1883         osc_release_ppga(orig, page_count_orig);
1884
1885         if (saved_oa != NULL)
1886                 OBDO_FREE(saved_oa);
1887
1888         RETURN(rc);
1889 }
1890
1891 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1892  * the dirty accounting.  Writeback completes or truncate happens before
1893  * writing starts.  Must be called with the loi lock held. */
1894 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1895                            int sent)
1896 {
1897         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1898 }
1899
1900
1901 /* This maintains the lists of pending pages to read/write for a given object
1902  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1903  * to quickly find objects that are ready to send an RPC. */
1904 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1905                          int cmd)
1906 {
1907         int optimal;
1908         ENTRY;
1909
1910         if (lop->lop_num_pending == 0)
1911                 RETURN(0);
1912
1913         /* if we have an invalid import we want to drain the queued pages
1914          * by forcing them through rpcs that immediately fail and complete
1915          * the pages.  recovery relies on this to empty the queued pages
1916          * before canceling the locks and evicting down the llite pages */
1917         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1918                 RETURN(1);
1919
1920         /* stream rpcs in queue order as long as as there is an urgent page
1921          * queued.  this is our cheap solution for good batching in the case
1922          * where writepage marks some random page in the middle of the file
1923          * as urgent because of, say, memory pressure */
1924         if (!list_empty(&lop->lop_urgent)) {
1925                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1926                 RETURN(1);
1927         }
1928         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1929         optimal = cli->cl_max_pages_per_rpc;
1930         if (cmd & OBD_BRW_WRITE) {
1931                 /* trigger a write rpc stream as long as there are dirtiers
1932                  * waiting for space.  as they're waiting, they're not going to
1933                  * create more pages to coallesce with what's waiting.. */
1934                 if (!list_empty(&cli->cl_cache_waiters)) {
1935                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1936                         RETURN(1);
1937                 }
1938                 /* +16 to avoid triggering rpcs that would want to include pages
1939                  * that are being queued but which can't be made ready until
1940                  * the queuer finishes with the page. this is a wart for
1941                  * llite::commit_write() */
1942                 optimal += 16;
1943         }
1944         if (lop->lop_num_pending >= optimal)
1945                 RETURN(1);
1946
1947         RETURN(0);
1948 }
1949
1950 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1951 {
1952         struct osc_async_page *oap;
1953         ENTRY;
1954
1955         if (list_empty(&lop->lop_urgent))
1956                 RETURN(0);
1957
1958         oap = list_entry(lop->lop_urgent.next,
1959                          struct osc_async_page, oap_urgent_item);
1960
1961         if (oap->oap_async_flags & ASYNC_HP) {
1962                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1963                 RETURN(1);
1964         }
1965
1966         RETURN(0);
1967 }
1968
1969 static void on_list(struct list_head *item, struct list_head *list,
1970                     int should_be_on)
1971 {
1972         if (list_empty(item) && should_be_on)
1973                 list_add_tail(item, list);
1974         else if (!list_empty(item) && !should_be_on)
1975                 list_del_init(item);
1976 }
1977
1978 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1979  * can find pages to build into rpcs quickly */
1980 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1981 {
1982         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1983             lop_makes_hprpc(&loi->loi_read_lop)) {
1984                 /* HP rpc */
1985                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1986                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1987         } else {
1988                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1989                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1990                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1991                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1992         }
1993
1994         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1995                 loi->loi_write_lop.lop_num_pending);
1996
1997         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1998                 loi->loi_read_lop.lop_num_pending);
1999 }
2000
2001 static void lop_update_pending(struct client_obd *cli,
2002                                struct loi_oap_pages *lop, int cmd, int delta)
2003 {
2004         lop->lop_num_pending += delta;
2005         if (cmd & OBD_BRW_WRITE)
2006                 cli->cl_pending_w_pages += delta;
2007         else
2008                 cli->cl_pending_r_pages += delta;
2009 }
2010
2011 /**
2012  * this is called when a sync waiter receives an interruption.  Its job is to
2013  * get the caller woken as soon as possible.  If its page hasn't been put in an
2014  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2015  * desiring interruption which will forcefully complete the rpc once the rpc
2016  * has timed out.
2017  */
2018 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2019 {
2020         struct loi_oap_pages *lop;
2021         struct lov_oinfo *loi;
2022         int rc = -EBUSY;
2023         ENTRY;
2024
2025         LASSERT(!oap->oap_interrupted);
2026         oap->oap_interrupted = 1;
2027
2028         /* ok, it's been put in an rpc. only one oap gets a request reference */
2029         if (oap->oap_request != NULL) {
2030                 ptlrpc_mark_interrupted(oap->oap_request);
2031                 ptlrpcd_wake(oap->oap_request);
2032                 ptlrpc_req_finished(oap->oap_request);
2033                 oap->oap_request = NULL;
2034         }
2035
2036         /*
2037          * page completion may be called only if ->cpo_prep() method was
2038          * executed by osc_io_submit(), that also adds page the to pending list
2039          */
2040         if (!list_empty(&oap->oap_pending_item)) {
2041                 list_del_init(&oap->oap_pending_item);
2042                 list_del_init(&oap->oap_urgent_item);
2043
2044                 loi = oap->oap_loi;
2045                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2046                         &loi->loi_write_lop : &loi->loi_read_lop;
2047                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2048                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2049                 rc = oap->oap_caller_ops->ap_completion(env,
2050                                           oap->oap_caller_data,
2051                                           oap->oap_cmd, NULL, -EINTR);
2052         }
2053
2054         RETURN(rc);
2055 }
2056
2057 /* this is trying to propogate async writeback errors back up to the
2058  * application.  As an async write fails we record the error code for later if
2059  * the app does an fsync.  As long as errors persist we force future rpcs to be
2060  * sync so that the app can get a sync error and break the cycle of queueing
2061  * pages for which writeback will fail. */
2062 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2063                            int rc)
2064 {
2065         if (rc) {
2066                 if (!ar->ar_rc)
2067                         ar->ar_rc = rc;
2068
2069                 ar->ar_force_sync = 1;
2070                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2071                 return;
2072
2073         }
2074
2075         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2076                 ar->ar_force_sync = 0;
2077 }
2078
2079 void osc_oap_to_pending(struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082
2083         if (oap->oap_cmd & OBD_BRW_WRITE)
2084                 lop = &oap->oap_loi->loi_write_lop;
2085         else
2086                 lop = &oap->oap_loi->loi_read_lop;
2087
2088         if (oap->oap_async_flags & ASYNC_HP)
2089                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2090         else if (oap->oap_async_flags & ASYNC_URGENT)
2091                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2092         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2093         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2094 }
2095
2096 /* this must be called holding the loi list lock to give coverage to exit_cache,
2097  * async_flag maintenance, and oap_request */
2098 static void osc_ap_completion(const struct lu_env *env,
2099                               struct client_obd *cli, struct obdo *oa,
2100                               struct osc_async_page *oap, int sent, int rc)
2101 {
2102         __u64 xid = 0;
2103
2104         ENTRY;
2105         if (oap->oap_request != NULL) {
2106                 xid = ptlrpc_req_xid(oap->oap_request);
2107                 ptlrpc_req_finished(oap->oap_request);
2108                 oap->oap_request = NULL;
2109         }
2110
2111         spin_lock(&oap->oap_lock);
2112         oap->oap_async_flags = 0;
2113         spin_unlock(&oap->oap_lock);
2114         oap->oap_interrupted = 0;
2115
2116         if (oap->oap_cmd & OBD_BRW_WRITE) {
2117                 osc_process_ar(&cli->cl_ar, xid, rc);
2118                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2119         }
2120
2121         if (rc == 0 && oa != NULL) {
2122                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2123                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2124                 if (oa->o_valid & OBD_MD_FLMTIME)
2125                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2126                 if (oa->o_valid & OBD_MD_FLATIME)
2127                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2128                 if (oa->o_valid & OBD_MD_FLCTIME)
2129                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2130         }
2131
2132         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2133                                                 oap->oap_cmd, oa, rc);
2134
2135         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2136          * I/O on the page could start, but OSC calls it under lock
2137          * and thus we can add oap back to pending safely */
2138         if (rc)
2139                 /* upper layer wants to leave the page on pending queue */
2140                 osc_oap_to_pending(oap);
2141         else
2142                 osc_exit_cache(cli, oap, sent);
2143         EXIT;
2144 }
2145
2146 static int brw_interpret(const struct lu_env *env,
2147                          struct ptlrpc_request *req, void *data, int rc)
2148 {
2149         struct osc_brw_async_args *aa = data;
2150         struct client_obd *cli;
2151         int async;
2152         ENTRY;
2153
2154         rc = osc_brw_fini_request(req, rc);
2155         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2156         if (osc_recoverable_error(rc)) {
2157                 rc = osc_brw_redo_request(req, aa);
2158                 if (rc == 0)
2159                         RETURN(0);
2160         }
2161
2162         if (aa->aa_ocapa) {
2163                 capa_put(aa->aa_ocapa);
2164                 aa->aa_ocapa = NULL;
2165         }
2166
2167         cli = aa->aa_cli;
2168
2169         client_obd_list_lock(&cli->cl_loi_list_lock);
2170
2171         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2172          * is called so we know whether to go to sync BRWs or wait for more
2173          * RPCs to complete */
2174         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2175                 cli->cl_w_in_flight--;
2176         else
2177                 cli->cl_r_in_flight--;
2178
2179         async = list_empty(&aa->aa_oaps);
2180         if (!async) { /* from osc_send_oap_rpc() */
2181                 struct osc_async_page *oap, *tmp;
2182                 /* the caller may re-use the oap after the completion call so
2183                  * we need to clean it up a little */
2184                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2185                         list_del_init(&oap->oap_rpc_item);
2186                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2187                 }
2188                 OBDO_FREE(aa->aa_oa);
2189         } else { /* from async_internal() */
2190                 int i;
2191                 for (i = 0; i < aa->aa_page_count; i++)
2192                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2193                
2194                 if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY)
2195                         OBDO_FREE(aa->aa_oa);
2196         }
2197         osc_wake_cache_waiters(cli);
2198         osc_check_rpcs(env, cli);
2199         client_obd_list_unlock(&cli->cl_loi_list_lock);
2200         if (!async)
2201                 cl_req_completion(env, aa->aa_clerq, rc);
2202         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2203         RETURN(rc);
2204 }
2205
2206 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2207                                             struct client_obd *cli,
2208                                             struct list_head *rpc_list,
2209                                             int page_count, int cmd)
2210 {
2211         struct ptlrpc_request *req;
2212         struct brw_page **pga = NULL;
2213         struct osc_brw_async_args *aa;
2214         struct obdo *oa = NULL;
2215         const struct obd_async_page_ops *ops = NULL;
2216         void *caller_data = NULL;
2217         struct osc_async_page *oap;
2218         struct osc_async_page *tmp;
2219         struct ost_body *body;
2220         struct cl_req *clerq = NULL;
2221         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2222         struct ldlm_lock *lock = NULL;
2223         struct cl_req_attr crattr;
2224         int i, rc;
2225
2226         ENTRY;
2227         LASSERT(!list_empty(rpc_list));
2228
2229         memset(&crattr, 0, sizeof crattr);
2230         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2231         if (pga == NULL)
2232                 GOTO(out, req = ERR_PTR(-ENOMEM));
2233
2234         OBDO_ALLOC(oa);
2235         if (oa == NULL)
2236                 GOTO(out, req = ERR_PTR(-ENOMEM));
2237
2238         i = 0;
2239         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2240                 struct cl_page *page = osc_oap2cl_page(oap);
2241                 if (ops == NULL) {
2242                         ops = oap->oap_caller_ops;
2243                         caller_data = oap->oap_caller_data;
2244
2245                         clerq = cl_req_alloc(env, page, crt,
2246                                              1 /* only 1-object rpcs for
2247                                                 * now */);
2248                         if (IS_ERR(clerq))
2249                                 GOTO(out, req = (void *)clerq);
2250                         lock = oap->oap_ldlm_lock;
2251                 }
2252                 pga[i] = &oap->oap_brw_page;
2253                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2254                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2255                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2256                 i++;
2257                 cl_req_page_add(env, clerq, page);
2258         }
2259
2260         /* always get the data for the obdo for the rpc */
2261         LASSERT(ops != NULL);
2262         crattr.cra_oa = oa;
2263         crattr.cra_capa = NULL;
2264         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2265         if (lock) {
2266                 oa->o_handle = lock->l_remote_handle;
2267                 oa->o_valid |= OBD_MD_FLHANDLE;
2268         }
2269
2270         rc = cl_req_prep(env, clerq);
2271         if (rc != 0) {
2272                 CERROR("cl_req_prep failed: %d\n", rc);
2273                 GOTO(out, req = ERR_PTR(rc));
2274         }
2275
2276         sort_brw_pages(pga, page_count);
2277         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2278                                   pga, &req, crattr.cra_capa, 1);
2279         if (rc != 0) {
2280                 CERROR("prep_req failed: %d\n", rc);
2281                 GOTO(out, req = ERR_PTR(rc));
2282         }
2283
2284         /* Need to update the timestamps after the request is built in case
2285          * we race with setattr (locally or in queue at OST).  If OST gets
2286          * later setattr before earlier BRW (as determined by the request xid),
2287          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2288          * way to do this in a single call.  bug 10150 */
2289         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2290         cl_req_attr_set(env, clerq, &crattr,
2291                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2292
2293         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2294         aa = ptlrpc_req_async_args(req);
2295         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2296         list_splice(rpc_list, &aa->aa_oaps);
2297         CFS_INIT_LIST_HEAD(rpc_list);
2298         aa->aa_clerq = clerq;
2299 out:
2300         capa_put(crattr.cra_capa);
2301         if (IS_ERR(req)) {
2302                 if (oa)
2303                         OBDO_FREE(oa);
2304                 if (pga)
2305                         OBD_FREE(pga, sizeof(*pga) * page_count);
2306                 /* this should happen rarely and is pretty bad, it makes the
2307                  * pending list not follow the dirty order */
2308                 client_obd_list_lock(&cli->cl_loi_list_lock);
2309                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2310                         list_del_init(&oap->oap_rpc_item);
2311
2312                         /* queued sync pages can be torn down while the pages
2313                          * were between the pending list and the rpc */
2314                         if (oap->oap_interrupted) {
2315                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2316                                 osc_ap_completion(env, cli, NULL, oap, 0,
2317                                                   oap->oap_count);
2318                                 continue;
2319                         }
2320                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2321                 }
2322                 if (clerq && !IS_ERR(clerq))
2323                         cl_req_completion(env, clerq, PTR_ERR(req));
2324         }
2325         RETURN(req);
2326 }
2327
2328 /**
2329  * prepare pages for ASYNC io and put pages in send queue.
2330  *
2331  * \param cli -
2332  * \param loi -
2333  * \param cmd - OBD_BRW_* macroses
2334  * \param lop - pending pages
2335  *
2336  * \return zero if pages successfully add to send queue.
2337  * \return not zere if error occurring.
2338  */
2339 static int
2340 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2341                  struct lov_oinfo *loi,
2342                  int cmd, struct loi_oap_pages *lop)
2343 {
2344         struct ptlrpc_request *req;
2345         obd_count page_count = 0;
2346         struct osc_async_page *oap = NULL, *tmp;
2347         struct osc_brw_async_args *aa;
2348         const struct obd_async_page_ops *ops;
2349         CFS_LIST_HEAD(rpc_list);
2350         unsigned int ending_offset;
2351         unsigned  starting_offset = 0;
2352         int srvlock = 0;
2353         struct cl_object *clob = NULL;
2354         ENTRY;
2355
2356         /* If there are HP OAPs we need to handle at least 1 of them,
2357          * move it the beginning of the pending list for that. */
2358         if (!list_empty(&lop->lop_urgent)) {
2359                 oap = list_entry(lop->lop_urgent.next,
2360                                  struct osc_async_page, oap_urgent_item);
2361                 if (oap->oap_async_flags & ASYNC_HP)
2362                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2363         }
2364
2365         /* first we find the pages we're allowed to work with */
2366         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2367                                  oap_pending_item) {
2368                 ops = oap->oap_caller_ops;
2369
2370                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2371                          "magic 0x%x\n", oap, oap->oap_magic);
2372
2373                 if (clob == NULL) {
2374                         /* pin object in memory, so that completion call-backs
2375                          * can be safely called under client_obd_list lock. */
2376                         clob = osc_oap2cl_page(oap)->cp_obj;
2377                         cl_object_get(clob);
2378                 }
2379
2380                 if (page_count != 0 &&
2381                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2382                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2383                                " oap %p, page %p, srvlock %u\n",
2384                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2385                         break;
2386                 }
2387                 /* in llite being 'ready' equates to the page being locked
2388                  * until completion unlocks it.  commit_write submits a page
2389                  * as not ready because its unlock will happen unconditionally
2390                  * as the call returns.  if we race with commit_write giving
2391                  * us that page we dont' want to create a hole in the page
2392                  * stream, so we stop and leave the rpc to be fired by
2393                  * another dirtier or kupdated interval (the not ready page
2394                  * will still be on the dirty list).  we could call in
2395                  * at the end of ll_file_write to process the queue again. */
2396                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2397                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2398                                                     cmd);
2399                         if (rc < 0)
2400                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2401                                                 "instead of ready\n", oap,
2402                                                 oap->oap_page, rc);
2403                         switch (rc) {
2404                         case -EAGAIN:
2405                                 /* llite is telling us that the page is still
2406                                  * in commit_write and that we should try
2407                                  * and put it in an rpc again later.  we
2408                                  * break out of the loop so we don't create
2409                                  * a hole in the sequence of pages in the rpc
2410                                  * stream.*/
2411                                 oap = NULL;
2412                                 break;
2413                         case -EINTR:
2414                                 /* the io isn't needed.. tell the checks
2415                                  * below to complete the rpc with EINTR */
2416                                 spin_lock(&oap->oap_lock);
2417                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2418                                 spin_unlock(&oap->oap_lock);
2419                                 oap->oap_count = -EINTR;
2420                                 break;
2421                         case 0:
2422                                 spin_lock(&oap->oap_lock);
2423                                 oap->oap_async_flags |= ASYNC_READY;
2424                                 spin_unlock(&oap->oap_lock);
2425                                 break;
2426                         default:
2427                                 LASSERTF(0, "oap %p page %p returned %d "
2428                                             "from make_ready\n", oap,
2429                                             oap->oap_page, rc);
2430                                 break;
2431                         }
2432                 }
2433                 if (oap == NULL)
2434                         break;
2435                 /*
2436                  * Page submitted for IO has to be locked. Either by
2437                  * ->ap_make_ready() or by higher layers.
2438                  */
2439 #if defined(__KERNEL__) && defined(__linux__)
2440                 {
2441                         struct cl_page *page;
2442
2443                         page = osc_oap2cl_page(oap);
2444
2445                         if (page->cp_type == CPT_CACHEABLE &&
2446                             !(PageLocked(oap->oap_page) &&
2447                               (CheckWriteback(oap->oap_page, cmd)))) {
2448                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2449                                        oap->oap_page,
2450                                        (long)oap->oap_page->flags,
2451                                        oap->oap_async_flags);
2452                                 LBUG();
2453                         }
2454                 }
2455 #endif
2456                 /* If there is a gap at the start of this page, it can't merge
2457                  * with any previous page, so we'll hand the network a
2458                  * "fragmented" page array that it can't transfer in 1 RDMA */
2459                 if (page_count != 0 && oap->oap_page_off != 0)
2460                         break;
2461
2462                 /* take the page out of our book-keeping */
2463                 list_del_init(&oap->oap_pending_item);
2464                 lop_update_pending(cli, lop, cmd, -1);
2465                 list_del_init(&oap->oap_urgent_item);
2466
2467                 if (page_count == 0)
2468                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2469                                           (PTLRPC_MAX_BRW_SIZE - 1);
2470
2471                 /* ask the caller for the size of the io as the rpc leaves. */
2472                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2473                         oap->oap_count =
2474                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2475                                                       cmd);
2476                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2477                 }
2478                 if (oap->oap_count <= 0) {
2479                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2480                                oap->oap_count);
2481                         osc_ap_completion(env, cli, NULL,
2482                                           oap, 0, oap->oap_count);
2483                         continue;
2484                 }
2485
2486                 /* now put the page back in our accounting */
2487                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2488                 if (page_count == 0)
2489                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2490                 if (++page_count >= cli->cl_max_pages_per_rpc)
2491                         break;
2492
2493                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2494                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2495                  * have the same alignment as the initial writes that allocated
2496                  * extents on the server. */
2497                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2498                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2499                 if (ending_offset == 0)
2500                         break;
2501
2502                 /* If there is a gap at the end of this page, it can't merge
2503                  * with any subsequent pages, so we'll hand the network a
2504                  * "fragmented" page array that it can't transfer in 1 RDMA */
2505                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2506                         break;
2507         }
2508
2509         osc_wake_cache_waiters(cli);
2510
2511         loi_list_maint(cli, loi);
2512
2513         client_obd_list_unlock(&cli->cl_loi_list_lock);
2514
2515         if (clob != NULL)
2516                 cl_object_put(env, clob);
2517
2518         if (page_count == 0) {
2519                 client_obd_list_lock(&cli->cl_loi_list_lock);
2520                 RETURN(0);
2521         }
2522
2523         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2524         if (IS_ERR(req)) {
2525                 LASSERT(list_empty(&rpc_list));
2526                 /* loi_list_maint(cli, loi); */
2527                 RETURN(PTR_ERR(req));
2528         }
2529
2530         aa = ptlrpc_req_async_args(req);
2531
2532         if (cmd == OBD_BRW_READ) {
2533                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2534                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2535                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2536                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2537         } else {
2538                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2539                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2540                                  cli->cl_w_in_flight);
2541                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2542                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2543         }
2544         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2545
2546         client_obd_list_lock(&cli->cl_loi_list_lock);
2547
2548         if (cmd == OBD_BRW_READ)
2549                 cli->cl_r_in_flight++;
2550         else
2551                 cli->cl_w_in_flight++;
2552
2553         /* queued sync pages can be torn down while the pages
2554          * were between the pending list and the rpc */
2555         tmp = NULL;
2556         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2557                 /* only one oap gets a request reference */
2558                 if (tmp == NULL)
2559                         tmp = oap;
2560                 if (oap->oap_interrupted && !req->rq_intr) {
2561                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2562                                oap, req);
2563                         ptlrpc_mark_interrupted(req);
2564                 }
2565         }
2566         if (tmp != NULL)
2567                 tmp->oap_request = ptlrpc_request_addref(req);
2568
2569         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2570                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2571
2572         req->rq_interpret_reply = brw_interpret;
2573         ptlrpcd_add_req(req, PSCOPE_BRW);
2574         RETURN(1);
2575 }
2576
2577 #define LOI_DEBUG(LOI, STR, args...)                                     \
2578         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2579                !list_empty(&(LOI)->loi_ready_item) ||                    \
2580                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2581                (LOI)->loi_write_lop.lop_num_pending,                     \
2582                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2583                (LOI)->loi_read_lop.lop_num_pending,                      \
2584                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2585                args)                                                     \
2586
2587 /* This is called by osc_check_rpcs() to find which objects have pages that
2588  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2589 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2590 {
2591         ENTRY;
2592
2593         /* First return objects that have blocked locks so that they
2594          * will be flushed quickly and other clients can get the lock,
2595          * then objects which have pages ready to be stuffed into RPCs */
2596         if (!list_empty(&cli->cl_loi_hp_ready_list))
2597                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2598                                   struct lov_oinfo, loi_hp_ready_item));
2599         if (!list_empty(&cli->cl_loi_ready_list))
2600                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2601                                   struct lov_oinfo, loi_ready_item));
2602
2603         /* then if we have cache waiters, return all objects with queued
2604          * writes.  This is especially important when many small files
2605          * have filled up the cache and not been fired into rpcs because
2606          * they don't pass the nr_pending/object threshhold */
2607         if (!list_empty(&cli->cl_cache_waiters) &&
2608             !list_empty(&cli->cl_loi_write_list))
2609                 RETURN(list_entry(cli->cl_loi_write_list.next,
2610                                   struct lov_oinfo, loi_write_item));
2611
2612         /* then return all queued objects when we have an invalid import
2613          * so that they get flushed */
2614         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2615                 if (!list_empty(&cli->cl_loi_write_list))
2616                         RETURN(list_entry(cli->cl_loi_write_list.next,
2617                                           struct lov_oinfo, loi_write_item));
2618                 if (!list_empty(&cli->cl_loi_read_list))
2619                         RETURN(list_entry(cli->cl_loi_read_list.next,
2620                                           struct lov_oinfo, loi_read_item));
2621         }
2622         RETURN(NULL);
2623 }
2624
2625 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2626 {
2627         struct osc_async_page *oap;
2628         int hprpc = 0;
2629
2630         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2631                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2632                                  struct osc_async_page, oap_urgent_item);
2633                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2634         }
2635
2636         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2637                 oap = list_entry(loi->loi_read_lop.lop_urgent.next,
2638                                  struct osc_async_page, oap_urgent_item);
2639                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2640         }
2641
2642         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2643 }
2644
2645 /* called with the loi list lock held */
2646 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2647 {
2648         struct lov_oinfo *loi;
2649         int rc = 0, race_counter = 0;
2650         ENTRY;
2651
2652         while ((loi = osc_next_loi(cli)) != NULL) {
2653                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2654
2655                 if (osc_max_rpc_in_flight(cli, loi))
2656                         break;
2657
2658                 /* attempt some read/write balancing by alternating between
2659                  * reads and writes in an object.  The makes_rpc checks here
2660                  * would be redundant if we were getting read/write work items
2661                  * instead of objects.  we don't want send_oap_rpc to drain a
2662                  * partial read pending queue when we're given this object to
2663                  * do io on writes while there are cache waiters */
2664                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2665                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2666                                               &loi->loi_write_lop);
2667                         if (rc < 0)
2668                                 break;
2669                         if (rc > 0)
2670                                 race_counter = 0;
2671                         else
2672                                 race_counter++;
2673                 }
2674                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2675                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2676                                               &loi->loi_read_lop);
2677                         if (rc < 0)
2678                                 break;
2679                         if (rc > 0)
2680                                 race_counter = 0;
2681                         else
2682                                 race_counter++;
2683                 }
2684
2685                 /* attempt some inter-object balancing by issueing rpcs
2686                  * for each object in turn */
2687                 if (!list_empty(&loi->loi_hp_ready_item))
2688                         list_del_init(&loi->loi_hp_ready_item);
2689                 if (!list_empty(&loi->loi_ready_item))
2690                         list_del_init(&loi->loi_ready_item);
2691                 if (!list_empty(&loi->loi_write_item))
2692                         list_del_init(&loi->loi_write_item);
2693                 if (!list_empty(&loi->loi_read_item))
2694                         list_del_init(&loi->loi_read_item);
2695
2696                 loi_list_maint(cli, loi);
2697
2698                 /* send_oap_rpc fails with 0 when make_ready tells it to
2699                  * back off.  llite's make_ready does this when it tries
2700                  * to lock a page queued for write that is already locked.
2701                  * we want to try sending rpcs from many objects, but we
2702                  * don't want to spin failing with 0.  */
2703                 if (race_counter == 10)
2704                         break;
2705         }
2706         EXIT;
2707 }
2708
2709 /* we're trying to queue a page in the osc so we're subject to the
2710  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2711  * If the osc's queued pages are already at that limit, then we want to sleep
2712  * until there is space in the osc's queue for us.  We also may be waiting for
2713  * write credits from the OST if there are RPCs in flight that may return some
2714  * before we fall back to sync writes.
2715  *
2716  * We need this know our allocation was granted in the presence of signals */
2717 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2718 {
2719         int rc;
2720         ENTRY;
2721         client_obd_list_lock(&cli->cl_loi_list_lock);
2722         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2723         client_obd_list_unlock(&cli->cl_loi_list_lock);
2724         RETURN(rc);
2725 };
2726
2727 /**
2728  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2729  * is available.
2730  */
2731 int osc_enter_cache_try(const struct lu_env *env,
2732                         struct client_obd *cli, struct lov_oinfo *loi,
2733                         struct osc_async_page *oap, int transient)
2734 {
2735         int has_grant;
2736
2737         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2738         if (has_grant) {
2739                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2740                 if (transient) {
2741                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2742                         atomic_inc(&obd_dirty_transit_pages);
2743                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2744                 }
2745         }
2746         return has_grant;
2747 }
2748
2749 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2750  * grant or cache space. */
2751 static int osc_enter_cache(const struct lu_env *env,
2752                            struct client_obd *cli, struct lov_oinfo *loi,
2753                            struct osc_async_page *oap)
2754 {
2755         struct osc_cache_waiter ocw;
2756         struct l_wait_info lwi = { 0 };
2757
2758         ENTRY;
2759
2760         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2761                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2762                cli->cl_dirty_max, obd_max_dirty_pages,
2763                cli->cl_lost_grant, cli->cl_avail_grant);
2764
2765         /* force the caller to try sync io.  this can jump the list
2766          * of queued writes and create a discontiguous rpc stream */
2767         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2768             loi->loi_ar.ar_force_sync)
2769                 RETURN(-EDQUOT);
2770
2771         /* Hopefully normal case - cache space and write credits available */
2772         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2773             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2774             osc_enter_cache_try(env, cli, loi, oap, 0))
2775                 RETURN(0);
2776
2777         /* Make sure that there are write rpcs in flight to wait for.  This
2778          * is a little silly as this object may not have any pending but
2779          * other objects sure might. */
2780         if (cli->cl_w_in_flight) {
2781                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2782                 cfs_waitq_init(&ocw.ocw_waitq);
2783                 ocw.ocw_oap = oap;
2784                 ocw.ocw_rc = 0;
2785
2786                 loi_list_maint(cli, loi);
2787                 osc_check_rpcs(env, cli);
2788                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2789
2790                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2791                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2792
2793                 client_obd_list_lock(&cli->cl_loi_list_lock);
2794                 if (!list_empty(&ocw.ocw_entry)) {
2795                         list_del(&ocw.ocw_entry);
2796                         RETURN(-EINTR);
2797                 }
2798                 RETURN(ocw.ocw_rc);
2799         }
2800
2801         RETURN(-EDQUOT);
2802 }
2803
2804
2805 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2806                         struct lov_oinfo *loi, cfs_page_t *page,
2807                         obd_off offset, const struct obd_async_page_ops *ops,
2808                         void *data, void **res, int nocache,
2809                         struct lustre_handle *lockh)
2810 {
2811         struct osc_async_page *oap;
2812
2813         ENTRY;
2814
2815         if (!page)
2816                 return size_round(sizeof(*oap));
2817
2818         oap = *res;
2819         oap->oap_magic = OAP_MAGIC;
2820         oap->oap_cli = &exp->exp_obd->u.cli;
2821         oap->oap_loi = loi;
2822
2823         oap->oap_caller_ops = ops;
2824         oap->oap_caller_data = data;
2825
2826         oap->oap_page = page;
2827         oap->oap_obj_off = offset;
2828         if (!client_is_remote(exp) &&
2829             cfs_capable(CFS_CAP_SYS_RESOURCE))
2830                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2831
2832         LASSERT(!(offset & ~CFS_PAGE_MASK));
2833
2834         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2835         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2836         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2837         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2838
2839         spin_lock_init(&oap->oap_lock);
2840         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2841         RETURN(0);
2842 }
2843
2844 struct osc_async_page *oap_from_cookie(void *cookie)
2845 {
2846         struct osc_async_page *oap = cookie;
2847         if (oap->oap_magic != OAP_MAGIC)
2848                 return ERR_PTR(-EINVAL);
2849         return oap;
2850 };
2851
2852 int osc_queue_async_io(const struct lu_env *env,
2853                        struct obd_export *exp, struct lov_stripe_md *lsm,
2854                        struct lov_oinfo *loi, void *cookie,
2855                        int cmd, obd_off off, int count,
2856                        obd_flag brw_flags, enum async_flags async_flags)
2857 {
2858         struct client_obd *cli = &exp->exp_obd->u.cli;
2859         struct osc_async_page *oap;
2860         int rc = 0;
2861         ENTRY;
2862
2863         oap = oap_from_cookie(cookie);
2864         if (IS_ERR(oap))
2865                 RETURN(PTR_ERR(oap));
2866
2867         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2868                 RETURN(-EIO);
2869
2870         if (!list_empty(&oap->oap_pending_item) ||
2871             !list_empty(&oap->oap_urgent_item) ||
2872             !list_empty(&oap->oap_rpc_item))
2873                 RETURN(-EBUSY);
2874
2875         /* check if the file's owner/group is over quota */
2876         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2877                 struct cl_object *obj;
2878                 struct cl_attr    attr; /* XXX put attr into thread info */
2879                 unsigned int qid[MAXQUOTAS];
2880
2881                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2882
2883                 cl_object_attr_lock(obj);
2884                 rc = cl_object_attr_get(env, obj, &attr);
2885                 cl_object_attr_unlock(obj);
2886
2887                 qid[USRQUOTA] = attr.cat_uid;
2888                 qid[GRPQUOTA] = attr.cat_gid;
2889                 if (rc == 0 &&
2890                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
2891                         rc = -EDQUOT;
2892                 if (rc)
2893                         RETURN(rc);
2894         }
2895
2896         if (loi == NULL)
2897                 loi = lsm->lsm_oinfo[0];
2898
2899         client_obd_list_lock(&cli->cl_loi_list_lock);
2900
2901         LASSERT(off + count <= CFS_PAGE_SIZE);
2902         oap->oap_cmd = cmd;
2903         oap->oap_page_off = off;
2904         oap->oap_count = count;
2905         oap->oap_brw_flags = brw_flags;
2906         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
2907         if (libcfs_memory_pressure_get())
2908                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2909         spin_lock(&oap->oap_lock);
2910         oap->oap_async_flags = async_flags;
2911         spin_unlock(&oap->oap_lock);
2912
2913         if (cmd & OBD_BRW_WRITE) {
2914                 rc = osc_enter_cache(env, cli, loi, oap);
2915                 if (rc) {
2916                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2917                         RETURN(rc);
2918                 }
2919         }
2920
2921         osc_oap_to_pending(oap);
2922         loi_list_maint(cli, loi);
2923
2924         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2925                   cmd);
2926
2927         osc_check_rpcs(env, cli);
2928         client_obd_list_unlock(&cli->cl_loi_list_lock);
2929
2930         RETURN(0);
2931 }
2932
2933 /* aka (~was & now & flag), but this is more clear :) */
2934 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2935
2936 int osc_set_async_flags_base(struct client_obd *cli,
2937                              struct lov_oinfo *loi, struct osc_async_page *oap,
2938                              obd_flag async_flags)
2939 {
2940         struct loi_oap_pages *lop;
2941         int flags = 0;
2942         ENTRY;
2943
2944         LASSERT(!list_empty(&oap->oap_pending_item));
2945
2946         if (oap->oap_cmd & OBD_BRW_WRITE) {
2947                 lop = &loi->loi_write_lop;
2948         } else {
2949                 lop = &loi->loi_read_lop;
2950         }
2951
2952         if ((oap->oap_async_flags & async_flags) == async_flags)
2953                 RETURN(0);
2954
2955         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2956                 flags |= ASYNC_READY;
2957
2958         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2959             list_empty(&oap->oap_rpc_item)) {
2960                 if (oap->oap_async_flags & ASYNC_HP)
2961                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2962                 else
2963                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2964                 flags |= ASYNC_URGENT;
2965                 loi_list_maint(cli, loi);
2966         }
2967         spin_lock(&oap->oap_lock);
2968         oap->oap_async_flags |= flags;
2969         spin_unlock(&oap->oap_lock);
2970
2971         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2972                         oap->oap_async_flags);
2973         RETURN(0);
2974 }
2975
2976 int osc_teardown_async_page(struct obd_export *exp,
2977                             struct lov_stripe_md *lsm,
2978                             struct lov_oinfo *loi, void *cookie)
2979 {
2980         struct client_obd *cli = &exp->exp_obd->u.cli;
2981         struct loi_oap_pages *lop;
2982         struct osc_async_page *oap;
2983         int rc = 0;
2984         ENTRY;
2985
2986         oap = oap_from_cookie(cookie);
2987         if (IS_ERR(oap))
2988                 RETURN(PTR_ERR(oap));
2989
2990         if (loi == NULL)
2991                 loi = lsm->lsm_oinfo[0];
2992
2993         if (oap->oap_cmd & OBD_BRW_WRITE) {
2994                 lop = &loi->loi_write_lop;
2995         } else {
2996                 lop = &loi->loi_read_lop;
2997         }
2998
2999         client_obd_list_lock(&cli->cl_loi_list_lock);
3000
3001         if (!list_empty(&oap->oap_rpc_item))
3002                 GOTO(out, rc = -EBUSY);
3003
3004         osc_exit_cache(cli, oap, 0);
3005         osc_wake_cache_waiters(cli);
3006
3007         if (!list_empty(&oap->oap_urgent_item)) {
3008                 list_del_init(&oap->oap_urgent_item);
3009                 spin_lock(&oap->oap_lock);
3010                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3011                 spin_unlock(&oap->oap_lock);
3012         }
3013         if (!list_empty(&oap->oap_pending_item)) {
3014                 list_del_init(&oap->oap_pending_item);
3015                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3016         }
3017         loi_list_maint(cli, loi);
3018         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3019 out:
3020         client_obd_list_unlock(&cli->cl_loi_list_lock);
3021         RETURN(rc);
3022 }
3023
3024 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3025                                          struct ldlm_enqueue_info *einfo,
3026                                          int flags)
3027 {
3028         void *data = einfo->ei_cbdata;
3029
3030         LASSERT(lock != NULL);
3031         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3032         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3033         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3034         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3035
3036         lock_res_and_lock(lock);
3037         spin_lock(&osc_ast_guard);
3038         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3039         lock->l_ast_data = data;
3040         spin_unlock(&osc_ast_guard);
3041         unlock_res_and_lock(lock);
3042 }
3043
3044 static void osc_set_data_with_check(struct lustre_handle *lockh,
3045                                     struct ldlm_enqueue_info *einfo,
3046                                     int flags)
3047 {
3048         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3049
3050         if (lock != NULL) {
3051                 osc_set_lock_data_with_check(lock, einfo, flags);
3052                 LDLM_LOCK_PUT(lock);
3053         } else
3054                 CERROR("lockh %p, data %p - client evicted?\n",
3055                        lockh, einfo->ei_cbdata);
3056 }
3057
3058 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3059                              ldlm_iterator_t replace, void *data)
3060 {
3061         struct ldlm_res_id res_id;
3062         struct obd_device *obd = class_exp2obd(exp);
3063
3064         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3065         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3066         return 0;
3067 }
3068
3069 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3070                             obd_enqueue_update_f upcall, void *cookie,
3071                             int *flags, int rc)
3072 {
3073         int intent = *flags & LDLM_FL_HAS_INTENT;
3074         ENTRY;
3075
3076         if (intent) {
3077                 /* The request was created before ldlm_cli_enqueue call. */
3078                 if (rc == ELDLM_LOCK_ABORTED) {
3079                         struct ldlm_reply *rep;
3080                         rep = req_capsule_server_get(&req->rq_pill,
3081                                                      &RMF_DLM_REP);
3082
3083                         LASSERT(rep != NULL);
3084                         if (rep->lock_policy_res1)
3085                                 rc = rep->lock_policy_res1;
3086                 }
3087         }
3088
3089         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3090                 *flags |= LDLM_FL_LVB_READY;
3091                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3092                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3093         }
3094
3095         /* Call the update callback. */
3096         rc = (*upcall)(cookie, rc);
3097         RETURN(rc);
3098 }
3099
3100 static int osc_enqueue_interpret(const struct lu_env *env,
3101                                  struct ptlrpc_request *req,
3102                                  struct osc_enqueue_args *aa, int rc)
3103 {
3104         struct ldlm_lock *lock;
3105         struct lustre_handle handle;
3106         __u32 mode;
3107
3108         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3109          * might be freed anytime after lock upcall has been called. */
3110         lustre_handle_copy(&handle, aa->oa_lockh);
3111         mode = aa->oa_ei->ei_mode;
3112
3113         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3114          * be valid. */
3115         lock = ldlm_handle2lock(&handle);
3116
3117         /* Take an additional reference so that a blocking AST that
3118          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3119          * to arrive after an upcall has been executed by
3120          * osc_enqueue_fini(). */
3121         ldlm_lock_addref(&handle, mode);
3122
3123         /* Complete obtaining the lock procedure. */
3124         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3125                                    mode, aa->oa_flags, aa->oa_lvb,
3126                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3127                                    &handle, rc);
3128         /* Complete osc stuff. */
3129         rc = osc_enqueue_fini(req, aa->oa_lvb,
3130                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3131
3132         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3133
3134         /* Release the lock for async request. */
3135         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3136                 /*
3137                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3138                  * not already released by
3139                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3140                  */
3141                 ldlm_lock_decref(&handle, mode);
3142
3143         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3144                  aa->oa_lockh, req, aa);
3145         ldlm_lock_decref(&handle, mode);
3146         LDLM_LOCK_PUT(lock);
3147         return rc;
3148 }
3149
3150 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3151                         struct lov_oinfo *loi, int flags,
3152                         struct ost_lvb *lvb, __u32 mode, int rc)
3153 {
3154         if (rc == ELDLM_OK) {
3155                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3156                 __u64 tmp;
3157
3158                 LASSERT(lock != NULL);
3159                 loi->loi_lvb = *lvb;
3160                 tmp = loi->loi_lvb.lvb_size;
3161                 /* Extend KMS up to the end of this lock and no further
3162                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3163                 if (tmp > lock->l_policy_data.l_extent.end)
3164                         tmp = lock->l_policy_data.l_extent.end + 1;
3165                 if (tmp >= loi->loi_kms) {
3166                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3167                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3168                         loi_kms_set(loi, tmp);
3169                 } else {
3170                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3171                                    LPU64"; leaving kms="LPU64", end="LPU64,
3172                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3173                                    lock->l_policy_data.l_extent.end);
3174                 }
3175                 ldlm_lock_allow_match(lock);
3176                 LDLM_LOCK_PUT(lock);
3177         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3178                 loi->loi_lvb = *lvb;
3179                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3180                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3181                 rc = ELDLM_OK;
3182         }
3183 }
3184 EXPORT_SYMBOL(osc_update_enqueue);
3185
3186 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3187
3188 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3189  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3190  * other synchronous requests, however keeping some locks and trying to obtain
3191  * others may take a considerable amount of time in a case of ost failure; and
3192  * when other sync requests do not get released lock from a client, the client
3193  * is excluded from the cluster -- such scenarious make the life difficult, so
3194  * release locks just after they are obtained. */
3195 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3196                      int *flags, ldlm_policy_data_t *policy,
3197                      struct ost_lvb *lvb, int kms_valid,
3198                      obd_enqueue_update_f upcall, void *cookie,
3199                      struct ldlm_enqueue_info *einfo,
3200                      struct lustre_handle *lockh,
3201                      struct ptlrpc_request_set *rqset, int async)
3202 {
3203         struct obd_device *obd = exp->exp_obd;
3204         struct ptlrpc_request *req = NULL;
3205         int intent = *flags & LDLM_FL_HAS_INTENT;
3206         ldlm_mode_t mode;
3207         int rc;
3208         ENTRY;
3209
3210         /* Filesystem lock extents are extended to page boundaries so that
3211          * dealing with the page cache is a little smoother.  */
3212         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3213         policy->l_extent.end |= ~CFS_PAGE_MASK;
3214
3215         /*
3216          * kms is not valid when either object is completely fresh (so that no
3217          * locks are cached), or object was evicted. In the latter case cached
3218          * lock cannot be used, because it would prime inode state with
3219          * potentially stale LVB.
3220          */
3221         if (!kms_valid)
3222                 goto no_match;
3223
3224         /* Next, search for already existing extent locks that will cover us */
3225         /* If we're trying to read, we also search for an existing PW lock.  The
3226          * VFS and page cache already protect us locally, so lots of readers/
3227          * writers can share a single PW lock.
3228          *
3229          * There are problems with conversion deadlocks, so instead of
3230          * converting a read lock to a write lock, we'll just enqueue a new
3231          * one.
3232          *
3233          * At some point we should cancel the read lock instead of making them
3234          * send us a blocking callback, but there are problems with canceling
3235          * locks out from other users right now, too. */
3236         mode = einfo->ei_mode;
3237         if (einfo->ei_mode == LCK_PR)
3238                 mode |= LCK_PW;
3239         mode = ldlm_lock_match(obd->obd_namespace,
3240                                *flags | LDLM_FL_LVB_READY, res_id,
3241                                einfo->ei_type, policy, mode, lockh, 0);
3242         if (mode) {
3243                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3244
3245                 if (matched->l_ast_data == NULL ||
3246                     matched->l_ast_data == einfo->ei_cbdata) {
3247                         /* addref the lock only if not async requests and PW
3248                          * lock is matched whereas we asked for PR. */
3249                         if (!rqset && einfo->ei_mode != mode)
3250                                 ldlm_lock_addref(lockh, LCK_PR);
3251                         osc_set_lock_data_with_check(matched, einfo, *flags);
3252                         if (intent) {
3253                                 /* I would like to be able to ASSERT here that
3254                                  * rss <= kms, but I can't, for reasons which
3255                                  * are explained in lov_enqueue() */
3256                         }
3257
3258                         /* We already have a lock, and it's referenced */
3259                         (*upcall)(cookie, ELDLM_OK);
3260
3261                         /* For async requests, decref the lock. */
3262                         if (einfo->ei_mode != mode)
3263                                 ldlm_lock_decref(lockh, LCK_PW);
3264                         else if (rqset)
3265                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3266                         LDLM_LOCK_PUT(matched);
3267                         RETURN(ELDLM_OK);
3268                 } else
3269                         ldlm_lock_decref(lockh, mode);
3270                 LDLM_LOCK_PUT(matched);
3271         }
3272
3273  no_match:
3274         if (intent) {
3275                 CFS_LIST_HEAD(cancels);
3276                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3277                                            &RQF_LDLM_ENQUEUE_LVB);
3278                 if (req == NULL)
3279                         RETURN(-ENOMEM);
3280
3281                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3282                 if (rc)
3283                         RETURN(rc);
3284
3285                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3286                                      sizeof *lvb);
3287                 ptlrpc_request_set_replen(req);
3288         }
3289
3290         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3291         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3292
3293         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3294                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3295         if (rqset) {
3296                 if (!rc) {
3297                         struct osc_enqueue_args *aa;
3298                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3299                         aa = ptlrpc_req_async_args(req);
3300                         aa->oa_ei = einfo;
3301                         aa->oa_exp = exp;
3302                         aa->oa_flags  = flags;
3303                         aa->oa_upcall = upcall;
3304                         aa->oa_cookie = cookie;
3305                         aa->oa_lvb    = lvb;
3306                         aa->oa_lockh  = lockh;
3307
3308                         req->rq_interpret_reply =
3309                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3310                         if (rqset == PTLRPCD_SET)
3311                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3312                         else
3313                                 ptlrpc_set_add_req(rqset, req);
3314                 } else if (intent) {
3315                         ptlrpc_req_finished(req);
3316                 }
3317                 RETURN(rc);
3318         }
3319
3320         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3321         if (intent)
3322                 ptlrpc_req_finished(req);
3323
3324         RETURN(rc);
3325 }
3326
3327 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3328                        struct ldlm_enqueue_info *einfo,
3329                        struct ptlrpc_request_set *rqset)
3330 {
3331         struct ldlm_res_id res_id;
3332         int rc;
3333         ENTRY;
3334
3335         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3336                            oinfo->oi_md->lsm_object_gr, &res_id);
3337
3338         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3339                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3340                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3341                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3342                               rqset, rqset != NULL);
3343         RETURN(rc);
3344 }
3345
3346 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3347                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3348                    int *flags, void *data, struct lustre_handle *lockh,
3349                    int unref)
3350 {
3351         struct obd_device *obd = exp->exp_obd;
3352         int lflags = *flags;
3353         ldlm_mode_t rc;
3354         ENTRY;
3355
3356         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3357                 RETURN(-EIO);
3358
3359         /* Filesystem lock extents are extended to page boundaries so that
3360          * dealing with the page cache is a little smoother */
3361         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3362         policy->l_extent.end |= ~CFS_PAGE_MASK;
3363
3364         /* Next, search for already existing extent locks that will cover us */
3365         /* If we're trying to read, we also search for an existing PW lock.  The
3366          * VFS and page cache already protect us locally, so lots of readers/
3367          * writers can share a single PW lock. */
3368         rc = mode;
3369         if (mode == LCK_PR)
3370                 rc |= LCK_PW;
3371         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3372                              res_id, type, policy, rc, lockh, unref);
3373         if (rc) {
3374                 if (data != NULL)
3375                         osc_set_data_with_check(lockh, data, lflags);
3376                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3377                         ldlm_lock_addref(lockh, LCK_PR);
3378                         ldlm_lock_decref(lockh, LCK_PW);
3379                 }
3380                 RETURN(rc);
3381         }
3382         RETURN(rc);
3383 }
3384
3385 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3386 {
3387         ENTRY;
3388
3389         if (unlikely(mode == LCK_GROUP))
3390                 ldlm_lock_decref_and_cancel(lockh, mode);
3391         else
3392                 ldlm_lock_decref(lockh, mode);
3393
3394         RETURN(0);
3395 }
3396
3397 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3398                       __u32 mode, struct lustre_handle *lockh)
3399 {
3400         ENTRY;
3401         RETURN(osc_cancel_base(lockh, mode));
3402 }
3403
3404 static int osc_cancel_unused(struct obd_export *exp,
3405                              struct lov_stripe_md *lsm, int flags,
3406                              void *opaque)
3407 {
3408         struct obd_device *obd = class_exp2obd(exp);
3409         struct ldlm_res_id res_id, *resp = NULL;
3410
3411         if (lsm != NULL) {
3412                 resp = osc_build_res_name(lsm->lsm_object_id,
3413                                           lsm->lsm_object_gr, &res_id);
3414         }
3415
3416         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3417 }
3418
3419 static int osc_statfs_interpret(const struct lu_env *env,
3420                                 struct ptlrpc_request *req,
3421                                 struct osc_async_args *aa, int rc)
3422 {
3423         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3424         struct obd_statfs *msfs;
3425         ENTRY;
3426
3427         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3428             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3429                 GOTO(out, rc = 0);
3430
3431         if (rc != 0)
3432                 GOTO(out, rc);
3433
3434         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3435         if (msfs == NULL) {
3436                 GOTO(out, rc = -EPROTO);
3437         }
3438
3439         /* Reinitialize the RDONLY and DEGRADED flags at the client
3440          * on each statfs, so they don't stay set permanently. */
3441         spin_lock(&cli->cl_oscc.oscc_lock);
3442         cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_RDONLY | OSCC_FLAG_DEGRADED);
3443         if (msfs->os_state & OS_STATE_DEGRADED)
3444                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3445  
3446         if (msfs->os_state & OS_STATE_READONLY)
3447                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3448         spin_unlock(&cli->cl_oscc.oscc_lock);
3449  
3450         *aa->aa_oi->oi_osfs = *msfs;
3451 out:
3452         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3453         RETURN(rc);
3454 }
3455
3456 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3457                             __u64 max_age, struct ptlrpc_request_set *rqset)
3458 {
3459         struct ptlrpc_request *req;
3460         struct osc_async_args *aa;
3461         int                    rc;
3462         ENTRY;
3463
3464         /* We could possibly pass max_age in the request (as an absolute
3465          * timestamp or a "seconds.usec ago") so the target can avoid doing
3466          * extra calls into the filesystem if that isn't necessary (e.g.
3467          * during mount that would help a bit).  Having relative timestamps
3468          * is not so great if request processing is slow, while absolute
3469          * timestamps are not ideal because they need time synchronization. */
3470         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3471         if (req == NULL)
3472                 RETURN(-ENOMEM);
3473
3474         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3475         if (rc) {
3476                 ptlrpc_request_free(req);
3477                 RETURN(rc);
3478         }
3479         ptlrpc_request_set_replen(req);
3480         req->rq_request_portal = OST_CREATE_PORTAL;
3481         ptlrpc_at_set_req_timeout(req);
3482
3483         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3484                 /* procfs requests not want stat in wait for avoid deadlock */
3485                 req->rq_no_resend = 1;
3486                 req->rq_no_delay = 1;
3487         }
3488
3489         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3490         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3491         aa = ptlrpc_req_async_args(req);
3492         aa->aa_oi = oinfo;
3493
3494         ptlrpc_set_add_req(rqset, req);
3495         RETURN(0);
3496 }
3497
3498 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3499                       __u64 max_age, __u32 flags)
3500 {
3501         struct obd_statfs     *msfs;
3502         struct ptlrpc_request *req;
3503         struct obd_import     *imp = NULL;
3504         int rc;
3505         ENTRY;
3506
3507         /*Since the request might also come from lprocfs, so we need
3508          *sync this with client_disconnect_export Bug15684*/
3509         down_read(&obd->u.cli.cl_sem);
3510         if (obd->u.cli.cl_import)
3511                 imp = class_import_get(obd->u.cli.cl_import);
3512         up_read(&obd->u.cli.cl_sem);
3513         if (!imp)
3514                 RETURN(-ENODEV);
3515
3516         /* We could possibly pass max_age in the request (as an absolute
3517          * timestamp or a "seconds.usec ago") so the target can avoid doing
3518          * extra calls into the filesystem if that isn't necessary (e.g.
3519          * during mount that would help a bit).  Having relative timestamps
3520          * is not so great if request processing is slow, while absolute
3521          * timestamps are not ideal because they need time synchronization. */
3522         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3523
3524         class_import_put(imp);
3525
3526         if (req == NULL)
3527                 RETURN(-ENOMEM);
3528
3529         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3530         if (rc) {
3531                 ptlrpc_request_free(req);
3532                 RETURN(rc);
3533         }
3534         ptlrpc_request_set_replen(req);
3535         req->rq_request_portal = OST_CREATE_PORTAL;
3536         ptlrpc_at_set_req_timeout(req);
3537
3538         if (flags & OBD_STATFS_NODELAY) {
3539                 /* procfs requests not want stat in wait for avoid deadlock */
3540                 req->rq_no_resend = 1;
3541                 req->rq_no_delay = 1;
3542         }
3543
3544         rc = ptlrpc_queue_wait(req);
3545         if (rc)
3546                 GOTO(out, rc);
3547
3548         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3549         if (msfs == NULL) {
3550                 GOTO(out, rc = -EPROTO);
3551         }
3552
3553         *osfs = *msfs;
3554
3555         EXIT;
3556  out:
3557         ptlrpc_req_finished(req);
3558         return rc;
3559 }
3560
3561 /* Retrieve object striping information.
3562  *
3563  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3564  * the maximum number of OST indices which will fit in the user buffer.
3565  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3566  */
3567 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3568 {
3569         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3570         struct lov_user_md_v3 lum, *lumk;
3571         struct lov_user_ost_data_v1 *lmm_objects;
3572         int rc = 0, lum_size;
3573         ENTRY;
3574
3575         if (!lsm)
3576                 RETURN(-ENODATA);
3577
3578         /* we only need the header part from user space to get lmm_magic and
3579          * lmm_stripe_count, (the header part is common to v1 and v3) */
3580         lum_size = sizeof(struct lov_user_md_v1);
3581         if (copy_from_user(&lum, lump, lum_size))
3582                 RETURN(-EFAULT);
3583
3584         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3585             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3586                 RETURN(-EINVAL);
3587
3588         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3589         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3590         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3591         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3592
3593         /* we can use lov_mds_md_size() to compute lum_size
3594          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3595         if (lum.lmm_stripe_count > 0) {
3596                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3597                 OBD_ALLOC(lumk, lum_size);
3598                 if (!lumk)
3599                         RETURN(-ENOMEM);
3600
3601                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3602                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3603                 else
3604                         lmm_objects = &(lumk->lmm_objects[0]);
3605                 lmm_objects->l_object_id = lsm->lsm_object_id;
3606         } else {
3607                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3608                 lumk = &lum;
3609         }
3610
3611         lumk->lmm_object_id = lsm->lsm_object_id;
3612         lumk->lmm_object_gr = lsm->lsm_object_gr;
3613         lumk->lmm_stripe_count = 1;
3614
3615         if (copy_to_user(lump, lumk, lum_size))
3616                 rc = -EFAULT;
3617
3618         if (lumk != &lum)
3619                 OBD_FREE(lumk, lum_size);
3620
3621         RETURN(rc);
3622 }
3623
3624
3625 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3626                          void *karg, void *uarg)
3627 {
3628         struct obd_device *obd = exp->exp_obd;
3629         struct obd_ioctl_data *data = karg;
3630         int err = 0;
3631         ENTRY;
3632
3633         if (!try_module_get(THIS_MODULE)) {
3634                 CERROR("Can't get module. Is it alive?");
3635                 return -EINVAL;
3636         }
3637         switch (cmd) {
3638         case OBD_IOC_LOV_GET_CONFIG: {
3639                 char *buf;
3640                 struct lov_desc *desc;
3641                 struct obd_uuid uuid;
3642
3643                 buf = NULL;
3644                 len = 0;
3645                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3646                         GOTO(out, err = -EINVAL);
3647
3648                 data = (struct obd_ioctl_data *)buf;
3649
3650                 if (sizeof(*desc) > data->ioc_inllen1) {
3651                         obd_ioctl_freedata(buf, len);
3652                         GOTO(out, err = -EINVAL);
3653                 }
3654
3655                 if (data->ioc_inllen2 < sizeof(uuid)) {
3656                         obd_ioctl_freedata(buf, len);
3657                         GOTO(out, err = -EINVAL);
3658                 }
3659
3660                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3661                 desc->ld_tgt_count = 1;
3662                 desc->ld_active_tgt_count = 1;
3663                 desc->ld_default_stripe_count = 1;
3664                 desc->ld_default_stripe_size = 0;
3665                 desc->ld_default_stripe_offset = 0;
3666                 desc->ld_pattern = 0;
3667                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3668
3669                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3670
3671                 err = copy_to_user((void *)uarg, buf, len);
3672                 if (err)
3673                         err = -EFAULT;
3674                 obd_ioctl_freedata(buf, len);
3675                 GOTO(out, err);
3676         }
3677         case LL_IOC_LOV_SETSTRIPE:
3678                 err = obd_alloc_memmd(exp, karg);
3679                 if (err > 0)
3680                         err = 0;
3681                 GOTO(out, err);
3682         case LL_IOC_LOV_GETSTRIPE:
3683                 err = osc_getstripe(karg, uarg);
3684                 GOTO(out, err);
3685         case OBD_IOC_CLIENT_RECOVER:
3686                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3687                                             data->ioc_inlbuf1);
3688                 if (err > 0)
3689                         err = 0;
3690                 GOTO(out, err);
3691         case IOC_OSC_SET_ACTIVE:
3692                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3693                                                data->ioc_offset);
3694                 GOTO(out, err);
3695         case OBD_IOC_POLL_QUOTACHECK:
3696                 err = lquota_poll_check(quota_interface, exp,
3697                                         (struct if_quotacheck *)karg);
3698                 GOTO(out, err);
3699         case OBD_IOC_PING_TARGET:
3700                 err = ptlrpc_obd_ping(obd);
3701                 GOTO(out, err);
3702         default:
3703                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3704                        cmd, cfs_curproc_comm());
3705                 GOTO(out, err = -ENOTTY);
3706         }
3707 out:
3708         module_put(THIS_MODULE);
3709         return err;
3710 }
3711
3712 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3713                         void *key, __u32 *vallen, void *val,
3714                         struct lov_stripe_md *lsm)
3715 {
3716         ENTRY;
3717         if (!vallen || !val)
3718                 RETURN(-EFAULT);
3719
3720         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3721                 __u32 *stripe = val;
3722                 *vallen = sizeof(*stripe);
3723                 *stripe = 0;
3724                 RETURN(0);
3725         } else if (KEY_IS(KEY_LAST_ID)) {
3726                 struct ptlrpc_request *req;
3727                 obd_id                *reply;
3728                 char                  *tmp;
3729                 int                    rc;
3730
3731                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3732                                            &RQF_OST_GET_INFO_LAST_ID);
3733                 if (req == NULL)
3734                         RETURN(-ENOMEM);
3735
3736                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3737                                      RCL_CLIENT, keylen);
3738                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3739                 if (rc) {
3740                         ptlrpc_request_free(req);
3741                         RETURN(rc);
3742                 }
3743
3744                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3745                 memcpy(tmp, key, keylen);
3746
3747                 req->rq_no_delay = req->rq_no_resend = 1;
3748                 ptlrpc_request_set_replen(req);
3749                 rc = ptlrpc_queue_wait(req);
3750                 if (rc)
3751                         GOTO(out, rc);
3752
3753                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3754                 if (reply == NULL)
3755                         GOTO(out, rc = -EPROTO);
3756
3757                 *((obd_id *)val) = *reply;
3758         out:
3759                 ptlrpc_req_finished(req);
3760                 RETURN(rc);
3761         } else if (KEY_IS(KEY_FIEMAP)) {
3762                 struct ptlrpc_request *req;
3763                 struct ll_user_fiemap *reply;
3764                 char *tmp;
3765                 int rc;
3766
3767                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3768                                            &RQF_OST_GET_INFO_FIEMAP);
3769                 if (req == NULL)
3770                         RETURN(-ENOMEM);
3771
3772                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3773                                      RCL_CLIENT, keylen);
3774                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3775                                      RCL_CLIENT, *vallen);
3776                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3777                                      RCL_SERVER, *vallen);
3778
3779                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3780                 if (rc) {
3781                         ptlrpc_request_free(req);
3782                         RETURN(rc);
3783                 }
3784
3785                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3786                 memcpy(tmp, key, keylen);
3787                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3788                 memcpy(tmp, val, *vallen);
3789
3790                 ptlrpc_request_set_replen(req);
3791                 rc = ptlrpc_queue_wait(req);
3792                 if (rc)
3793                         GOTO(out1, rc);
3794
3795                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3796                 if (reply == NULL)
3797                         GOTO(out1, rc = -EPROTO);
3798
3799                 memcpy(val, reply, *vallen);
3800         out1:
3801                 ptlrpc_req_finished(req);
3802
3803                 RETURN(rc);
3804         }
3805
3806         RETURN(-EINVAL);
3807 }
3808
3809 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3810 {
3811         struct llog_ctxt *ctxt;
3812         int rc = 0;
3813         ENTRY;
3814
3815         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3816         if (ctxt) {
3817                 rc = llog_initiator_connect(ctxt);
3818                 llog_ctxt_put(ctxt);
3819         } else {
3820                 /* XXX return an error? skip setting below flags? */
3821         }
3822
3823         spin_lock(&imp->imp_lock);
3824         imp->imp_server_timeout = 1;
3825         imp->imp_pingable = 1;
3826         spin_unlock(&imp->imp_lock);
3827         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3828
3829         RETURN(rc);
3830 }
3831
3832 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3833                                           struct ptlrpc_request *req,
3834                                           void *aa, int rc)
3835 {
3836         ENTRY;
3837         if (rc != 0)
3838                 RETURN(rc);
3839
3840         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
3841 }
3842
3843 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3844                               void *key, obd_count vallen, void *val,
3845                               struct ptlrpc_request_set *set)
3846 {
3847         struct ptlrpc_request *req;
3848         struct obd_device     *obd = exp->exp_obd;
3849         struct obd_import     *imp = class_exp2cliimp(exp);
3850         char                  *tmp;
3851         int                    rc;
3852         ENTRY;
3853
3854         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3855
3856         if (KEY_IS(KEY_NEXT_ID)) {
3857                 if (vallen != sizeof(obd_id))
3858                         RETURN(-ERANGE);
3859                 if (val == NULL)
3860                         RETURN(-EINVAL);
3861                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3862                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3863                        exp->exp_obd->obd_name,
3864                        obd->u.cli.cl_oscc.oscc_next_id);
3865
3866                 RETURN(0);
3867         }
3868
3869         if (KEY_IS(KEY_UNLINKED)) {
3870                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3871                 spin_lock(&oscc->oscc_lock);
3872                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3873                 spin_unlock(&oscc->oscc_lock);
3874                 RETURN(0);
3875         }
3876
3877         if (KEY_IS(KEY_INIT_RECOV)) {
3878                 if (vallen != sizeof(int))
3879                         RETURN(-EINVAL);
3880                 spin_lock(&imp->imp_lock);
3881                 imp->imp_initial_recov = *(int *)val;
3882                 spin_unlock(&imp->imp_lock);
3883                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3884                        exp->exp_obd->obd_name,
3885                        imp->imp_initial_recov);
3886                 RETURN(0);
3887         }
3888
3889         if (KEY_IS(KEY_CHECKSUM)) {
3890                 if (vallen != sizeof(int))
3891                         RETURN(-EINVAL);
3892                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3893                 RETURN(0);
3894         }
3895
3896         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3897                 sptlrpc_conf_client_adapt(obd);
3898                 RETURN(0);
3899         }
3900
3901         if (KEY_IS(KEY_FLUSH_CTX)) {
3902                 sptlrpc_import_flush_my_ctx(imp);
3903                 RETURN(0);
3904         }
3905
3906         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3907                 RETURN(-EINVAL);
3908
3909         /* We pass all other commands directly to OST. Since nobody calls osc
3910            methods directly and everybody is supposed to go through LOV, we
3911            assume lov checked invalid values for us.
3912            The only recognised values so far are evict_by_nid and mds_conn.
3913            Even if something bad goes through, we'd get a -EINVAL from OST
3914            anyway. */
3915
3916         if (KEY_IS(KEY_GRANT_SHRINK))
3917                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
3918         else
3919                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3920
3921         if (req == NULL)
3922                 RETURN(-ENOMEM);
3923
3924         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3925                              RCL_CLIENT, keylen);
3926         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3927                              RCL_CLIENT, vallen);
3928         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3929         if (rc) {
3930                 ptlrpc_request_free(req);
3931                 RETURN(rc);
3932         }
3933
3934         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3935         memcpy(tmp, key, keylen);
3936         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3937         memcpy(tmp, val, vallen);
3938
3939         if (KEY_IS(KEY_MDS_CONN)) {
3940                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3941
3942                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3943                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3944                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3945                 req->rq_no_delay = req->rq_no_resend = 1;
3946                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3947         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3948                 struct osc_grant_args *aa;
3949                 struct obdo *oa;
3950
3951                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3952                 aa = ptlrpc_req_async_args(req);
3953                 OBD_ALLOC_PTR(oa);
3954                 if (!oa) {
3955                         ptlrpc_req_finished(req);
3956                         RETURN(-ENOMEM);
3957                 }
3958                 *oa = ((struct ost_body *)val)->oa;
3959                 aa->aa_oa = oa;
3960                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3961         }
3962
3963         ptlrpc_request_set_replen(req);
3964         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3965                 LASSERT(set != NULL);
3966                 ptlrpc_set_add_req(set, req);
3967                 ptlrpc_check_set(NULL, set);
3968         } else
3969                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3970
3971         RETURN(0);
3972 }
3973
3974
3975 static struct llog_operations osc_size_repl_logops = {
3976         lop_cancel: llog_obd_repl_cancel
3977 };
3978
3979 static struct llog_operations osc_mds_ost_orig_logops;
3980
3981 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3982                            struct obd_device *tgt, struct llog_catid *catid)
3983 {
3984         int rc;
3985         ENTRY;
3986
3987         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
3988                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3989         if (rc) {
3990                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3991                 GOTO(out, rc);
3992         }
3993
3994         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
3995                         NULL, &osc_size_repl_logops);
3996         if (rc) {
3997                 struct llog_ctxt *ctxt =
3998                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3999                 if (ctxt)
4000                         llog_cleanup(ctxt);
4001                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4002         }
4003         GOTO(out, rc);
4004 out:
4005         if (rc) {
4006                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4007                        obd->obd_name, tgt->obd_name, catid, rc);
4008                 CERROR("logid "LPX64":0x%x\n",
4009                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4010         }
4011         return rc;
4012 }
4013
4014 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4015                          struct obd_device *disk_obd, int *index)
4016 {
4017         struct llog_catid catid;
4018         static char name[32] = CATLIST;
4019         int rc;
4020         ENTRY;
4021
4022         LASSERT(olg == &obd->obd_olg);
4023
4024         mutex_down(&olg->olg_cat_processing);
4025         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4026         if (rc) {
4027                 CERROR("rc: %d\n", rc);
4028                 GOTO(out, rc);
4029         }
4030
4031         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4032                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4033                catid.lci_logid.lgl_ogr, catid.lci_logid.lgl_ogen);
4034
4035         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4036         if (rc) {
4037                 CERROR("rc: %d\n", rc);
4038                 GOTO(out, rc);
4039         }
4040
4041         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4042         if (rc) {
4043                 CERROR("rc: %d\n", rc);
4044                 GOTO(out, rc);
4045         }
4046
4047  out:
4048         mutex_up(&olg->olg_cat_processing);
4049
4050         return rc;
4051 }
4052
4053 static int osc_llog_finish(struct obd_device *obd, int count)
4054 {
4055         struct llog_ctxt *ctxt;
4056         int rc = 0, rc2 = 0;
4057         ENTRY;
4058
4059         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4060         if (ctxt)
4061                 rc = llog_cleanup(ctxt);
4062
4063         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4064         if (ctxt)
4065                 rc2 = llog_cleanup(ctxt);
4066         if (!rc)
4067                 rc = rc2;
4068
4069         RETURN(rc);
4070 }
4071
4072 static int osc_reconnect(const struct lu_env *env,
4073                          struct obd_export *exp, struct obd_device *obd,
4074                          struct obd_uuid *cluuid,
4075                          struct obd_connect_data *data,
4076                          void *localdata)
4077 {
4078         struct client_obd *cli = &obd->u.cli;
4079
4080         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4081                 long lost_grant;
4082
4083                 client_obd_list_lock(&cli->cl_loi_list_lock);
4084                 data->ocd_grant = cli->cl_avail_grant ?:
4085                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4086                 lost_grant = cli->cl_lost_grant;
4087                 cli->cl_lost_grant = 0;
4088                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4089
4090                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4091                        "cl_lost_grant: %ld\n", data->ocd_grant,
4092                        cli->cl_avail_grant, lost_grant);
4093                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4094                        " ocd_grant: %d\n", data->ocd_connect_flags,
4095                        data->ocd_version, data->ocd_grant);
4096         }
4097
4098         RETURN(0);
4099 }
4100
4101 static int osc_disconnect(struct obd_export *exp)
4102 {
4103         struct obd_device *obd = class_exp2obd(exp);
4104         struct llog_ctxt  *ctxt;
4105         int rc;
4106
4107         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4108         if (ctxt) {
4109                 if (obd->u.cli.cl_conn_count == 1) {
4110                         /* Flush any remaining cancel messages out to the
4111                          * target */
4112                         llog_sync(ctxt, exp);
4113                 }
4114                 llog_ctxt_put(ctxt);
4115         } else {
4116                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4117                        obd);
4118         }
4119
4120         rc = client_disconnect_export(exp);
4121         /**
4122          * Initially we put del_shrink_grant before disconnect_export, but it
4123          * causes the following problem if setup (connect) and cleanup
4124          * (disconnect) are tangled together.
4125          *      connect p1                     disconnect p2
4126          *   ptlrpc_connect_import
4127          *     ...............               class_manual_cleanup
4128          *                                     osc_disconnect
4129          *                                     del_shrink_grant
4130          *   ptlrpc_connect_interrupt
4131          *     init_grant_shrink
4132          *   add this client to shrink list
4133          *                                      cleanup_osc
4134          * Bang! pinger trigger the shrink.
4135          * So the osc should be disconnected from the shrink list, after we
4136          * are sure the import has been destroyed. BUG18662
4137          */
4138         if (obd->u.cli.cl_import == NULL)
4139                 osc_del_shrink_grant(&obd->u.cli);
4140         return rc;
4141 }
4142
4143 static int osc_import_event(struct obd_device *obd,
4144                             struct obd_import *imp,
4145                             enum obd_import_event event)
4146 {
4147         struct client_obd *cli;
4148         int rc = 0;
4149
4150         ENTRY;
4151         LASSERT(imp->imp_obd == obd);
4152
4153         switch (event) {
4154         case IMP_EVENT_DISCON: {
4155                 /* Only do this on the MDS OSC's */
4156                 if (imp->imp_server_timeout) {
4157                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4158
4159                         spin_lock(&oscc->oscc_lock);
4160                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4161                         spin_unlock(&oscc->oscc_lock);
4162                 }
4163                 cli = &obd->u.cli;
4164                 client_obd_list_lock(&cli->cl_loi_list_lock);
4165                 cli->cl_avail_grant = 0;
4166                 cli->cl_lost_grant = 0;
4167                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4168                 break;
4169         }
4170         case IMP_EVENT_INACTIVE: {
4171                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4172                 break;
4173         }
4174         case IMP_EVENT_INVALIDATE: {
4175                 struct ldlm_namespace *ns = obd->obd_namespace;
4176                 struct lu_env         *env;
4177                 int                    refcheck;
4178
4179                 env = cl_env_get(&refcheck);
4180                 if (!IS_ERR(env)) {
4181                         /* Reset grants */
4182                         cli = &obd->u.cli;
4183                         client_obd_list_lock(&cli->cl_loi_list_lock);
4184                         /* all pages go to failing rpcs due to the invalid
4185                          * import */
4186                         osc_check_rpcs(env, cli);
4187                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4188
4189                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4190                         cl_env_put(env, &refcheck);
4191                 } else
4192                         rc = PTR_ERR(env);
4193                 break;
4194         }
4195         case IMP_EVENT_ACTIVE: {
4196                 /* Only do this on the MDS OSC's */
4197                 if (imp->imp_server_timeout) {
4198                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4199
4200                         spin_lock(&oscc->oscc_lock);
4201                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4202                         spin_unlock(&oscc->oscc_lock);
4203                 }
4204                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4205                 break;
4206         }
4207         case IMP_EVENT_OCD: {
4208                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4209
4210                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4211                         osc_init_grant(&obd->u.cli, ocd);
4212
4213                 /* See bug 7198 */
4214                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4215                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4216
4217                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4218                 break;
4219         }
4220         default:
4221                 CERROR("Unknown import event %d\n", event);
4222                 LBUG();
4223         }
4224         RETURN(rc);
4225 }
4226
4227 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4228 {
4229         int rc;
4230         ENTRY;
4231
4232         ENTRY;
4233         rc = ptlrpcd_addref();
4234         if (rc)
4235                 RETURN(rc);
4236
4237         rc = client_obd_setup(obd, lcfg);
4238         if (rc) {
4239                 ptlrpcd_decref();
4240         } else {
4241                 struct lprocfs_static_vars lvars = { 0 };
4242                 struct client_obd *cli = &obd->u.cli;
4243
4244                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4245                 lprocfs_osc_init_vars(&lvars);
4246                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4247                         lproc_osc_attach_seqstat(obd);
4248                         sptlrpc_lprocfs_cliobd_attach(obd);
4249                         ptlrpc_lprocfs_register_obd(obd);
4250                 }
4251
4252                 oscc_init(obd);
4253                 /* We need to allocate a few requests more, because
4254                    brw_interpret tries to create new requests before freeing
4255                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4256                    reserved, but I afraid that might be too much wasted RAM
4257                    in fact, so 2 is just my guess and still should work. */
4258                 cli->cl_import->imp_rq_pool =
4259                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4260                                             OST_MAXREQSIZE,
4261                                             ptlrpc_add_rqs_to_pool);
4262
4263                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4264                 sema_init(&cli->cl_grant_sem, 1);
4265         }
4266
4267         RETURN(rc);
4268 }
4269
4270 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4271 {
4272         int rc = 0;
4273         ENTRY;
4274
4275         switch (stage) {
4276         case OBD_CLEANUP_EARLY: {
4277                 struct obd_import *imp;
4278                 imp = obd->u.cli.cl_import;
4279                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4280                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4281                 ptlrpc_deactivate_import(imp);
4282                 spin_lock(&imp->imp_lock);
4283                 imp->imp_pingable = 0;
4284                 spin_unlock(&imp->imp_lock);
4285                 break;
4286         }
4287         case OBD_CLEANUP_EXPORTS: {
4288                 /* If we set up but never connected, the
4289                    client import will not have been cleaned. */
4290                 if (obd->u.cli.cl_import) {
4291                         struct obd_import *imp;
4292                         down_write(&obd->u.cli.cl_sem);
4293                         imp = obd->u.cli.cl_import;
4294                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4295                                obd->obd_name);
4296                         ptlrpc_invalidate_import(imp);
4297                         if (imp->imp_rq_pool) {
4298                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4299                                 imp->imp_rq_pool = NULL;
4300                         }
4301                         class_destroy_import(imp);
4302                         up_write(&obd->u.cli.cl_sem);
4303                         obd->u.cli.cl_import = NULL;
4304                 }
4305                 rc = obd_llog_finish(obd, 0);
4306                 if (rc != 0)
4307                         CERROR("failed to cleanup llogging subsystems\n");
4308                 break;
4309                 }
4310         }
4311         RETURN(rc);
4312 }
4313
4314 int osc_cleanup(struct obd_device *obd)
4315 {
4316         int rc;
4317
4318         ENTRY;
4319         ptlrpc_lprocfs_unregister_obd(obd);
4320         lprocfs_obd_cleanup(obd);
4321
4322         /* free memory of osc quota cache */
4323         lquota_cleanup(quota_interface, obd);
4324
4325         rc = client_obd_cleanup(obd);
4326
4327         ptlrpcd_decref();
4328         RETURN(rc);
4329 }
4330
4331 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4332 {
4333         struct lprocfs_static_vars lvars = { 0 };
4334         int rc = 0;
4335
4336         lprocfs_osc_init_vars(&lvars);
4337
4338         switch (lcfg->lcfg_command) {
4339         default:
4340                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4341                                               lcfg, obd);
4342                 if (rc > 0)
4343                         rc = 0;
4344                 break;
4345         }
4346
4347         return(rc);
4348 }
4349
4350 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4351 {
4352         return osc_process_config_base(obd, buf);
4353 }
4354
4355 struct obd_ops osc_obd_ops = {
4356         .o_owner                = THIS_MODULE,
4357         .o_setup                = osc_setup,
4358         .o_precleanup           = osc_precleanup,
4359         .o_cleanup              = osc_cleanup,
4360         .o_add_conn             = client_import_add_conn,
4361         .o_del_conn             = client_import_del_conn,
4362         .o_connect              = client_connect_import,
4363         .o_reconnect            = osc_reconnect,
4364         .o_disconnect           = osc_disconnect,
4365         .o_statfs               = osc_statfs,
4366         .o_statfs_async         = osc_statfs_async,
4367         .o_packmd               = osc_packmd,
4368         .o_unpackmd             = osc_unpackmd,
4369         .o_precreate            = osc_precreate,
4370         .o_create               = osc_create,
4371         .o_create_async         = osc_create_async,
4372         .o_destroy              = osc_destroy,
4373         .o_getattr              = osc_getattr,
4374         .o_getattr_async        = osc_getattr_async,
4375         .o_setattr              = osc_setattr,
4376         .o_setattr_async        = osc_setattr_async,
4377         .o_brw                  = osc_brw,
4378         .o_punch                = osc_punch,
4379         .o_sync                 = osc_sync,
4380         .o_enqueue              = osc_enqueue,
4381         .o_change_cbdata        = osc_change_cbdata,
4382         .o_cancel               = osc_cancel,
4383         .o_cancel_unused        = osc_cancel_unused,
4384         .o_iocontrol            = osc_iocontrol,
4385         .o_get_info             = osc_get_info,
4386         .o_set_info_async       = osc_set_info_async,
4387         .o_import_event         = osc_import_event,
4388         .o_llog_init            = osc_llog_init,
4389         .o_llog_finish          = osc_llog_finish,
4390         .o_process_config       = osc_process_config,
4391 };
4392
4393 extern struct lu_kmem_descr  osc_caches[];
4394 extern spinlock_t            osc_ast_guard;
4395 extern struct lock_class_key osc_ast_guard_class;
4396
4397 int __init osc_init(void)
4398 {
4399         struct lprocfs_static_vars lvars = { 0 };
4400         int rc;
4401         ENTRY;
4402
4403         /* print an address of _any_ initialized kernel symbol from this
4404          * module, to allow debugging with gdb that doesn't support data
4405          * symbols from modules.*/
4406         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4407
4408         rc = lu_kmem_init(osc_caches);
4409
4410         lprocfs_osc_init_vars(&lvars);
4411
4412         request_module("lquota");
4413         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4414         lquota_init(quota_interface);
4415         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4416
4417         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4418                                  LUSTRE_OSC_NAME, &osc_device_type);
4419         if (rc) {
4420                 if (quota_interface)
4421                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4422                 lu_kmem_fini(osc_caches);
4423                 RETURN(rc);
4424         }
4425
4426         spin_lock_init(&osc_ast_guard);
4427         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4428
4429         osc_mds_ost_orig_logops = llog_lvfs_ops;
4430         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4431         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4432         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4433         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4434
4435         RETURN(rc);
4436 }
4437
4438 #ifdef __KERNEL__
4439 static void /*__exit*/ osc_exit(void)
4440 {
4441         lu_device_type_fini(&osc_device_type);
4442
4443         lquota_exit(quota_interface);
4444         if (quota_interface)
4445                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4446
4447         class_unregister_type(LUSTRE_OSC_NAME);
4448         lu_kmem_fini(osc_caches);
4449 }
4450
4451 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4452 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4453 MODULE_LICENSE("GPL");
4454
4455 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4456 #endif