Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         int time = GRANT_SHRINK_INTERVAL;
804         cli->cl_next_shrink_grant = cfs_time_shift(time);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
833                 EXIT;
834                 return;
835         }
836
837         pga->flag &= ~OBD_BRW_FROM_GRANT;
838         atomic_dec(&obd_dirty_pages);
839         cli->cl_dirty -= CFS_PAGE_SIZE;
840         if (pga->flag & OBD_BRW_NOCACHE) {
841                 pga->flag &= ~OBD_BRW_NOCACHE;
842                 atomic_dec(&obd_dirty_transit_pages);
843                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
844         }
845         if (!sent) {
846                 cli->cl_lost_grant += CFS_PAGE_SIZE;
847                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850                 /* For short writes we shouldn't count parts of pages that
851                  * span a whole block on the OST side, or our accounting goes
852                  * wrong.  Should match the code in filter_grant_check. */
853                 int offset = pga->off & ~CFS_PAGE_MASK;
854                 int count = pga->count + (offset & (blocksize - 1));
855                 int end = (offset + pga->count) & (blocksize - 1);
856                 if (end)
857                         count += blocksize - end;
858
859                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862                        cli->cl_avail_grant, cli->cl_dirty);
863         }
864
865         EXIT;
866 }
867
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
869 {
870         return cli->cl_r_in_flight + cli->cl_w_in_flight;
871 }
872
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
875 {
876         struct list_head *l, *tmp;
877         struct osc_cache_waiter *ocw;
878
879         ENTRY;
880         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881                 /* if we can't dirty more, we must wait until some is written */
882                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885                                "osc max %ld, sys max %d\n", cli->cl_dirty,
886                                cli->cl_dirty_max, obd_max_dirty_pages);
887                         return;
888                 }
889
890                 /* if still dirty cache but no grant wait for pending RPCs that
891                  * may yet return us some grant before doing sync writes */
892                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894                                cli->cl_w_in_flight);
895                         return;
896                 }
897
898                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899                 list_del_init(&ocw->ocw_entry);
900                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         /* no more RPCs in flight to return grant, do sync IO */
902                         ocw->ocw_rc = -EDQUOT;
903                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
904                 } else {
905                         osc_consume_write_grant(cli,
906                                                 &ocw->ocw_oap->oap_brw_page);
907                 }
908
909                 cfs_waitq_signal(&ocw->ocw_waitq);
910         }
911
912         EXIT;
913 }
914
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919         if (body->oa.o_valid & OBD_MD_FLGRANT)
920                 cli->cl_avail_grant += body->oa.o_grant;
921         /* waiters are woken in brw_interpret */
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936         
937         if (rc != 0) {
938                 client_obd_list_lock(&cli->cl_loi_list_lock);
939                 cli->cl_avail_grant += oa->o_grant;
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 GOTO(out, rc);
942         }
943
944         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
945         LASSERT(body);
946         osc_update_grant(cli, body);
947 out:
948         OBD_FREE_PTR(oa);
949         return rc;        
950 }
951
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
953 {
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         oa->o_grant = cli->cl_avail_grant / 4;
956         cli->cl_avail_grant -= oa->o_grant; 
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         oa->o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960 }
961
962 static int osc_shrink_grant(struct client_obd *cli)
963 {
964         int    rc = 0;
965         struct ost_body     *body;
966         ENTRY;
967
968         OBD_ALLOC_PTR(body);
969         if (!body)
970                 RETURN(-ENOMEM);
971
972         osc_announce_cached(cli, &body->oa, 0);
973         osc_shrink_grant_local(cli, &body->oa);
974         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976                                 sizeof(*body), body, NULL);
977         if (rc) {
978                 client_obd_list_lock(&cli->cl_loi_list_lock);
979                 cli->cl_avail_grant += body->oa.o_grant;
980                 client_obd_list_unlock(&cli->cl_loi_list_lock);
981         }
982         if (body)
983                OBD_FREE_PTR(body);
984         RETURN(rc);
985 }
986
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
989 {
990         cfs_time_t time = cfs_time_current();
991         cfs_time_t next_shrink = client->cl_next_shrink_grant;
992         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
995                         return 1;
996                 else
997                         osc_update_next_shrink(client);
998         }
999         return 0;
1000 }
1001
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1003 {
1004         struct client_obd *client;
1005
1006         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007                 if (osc_should_shrink_grant(client))
1008                         osc_shrink_grant(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_add_shrink_grant(struct client_obd *client)
1014 {
1015         int rc;
1016
1017         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
1018                                          TIMEOUT_GRANT,
1019                                          osc_grant_shrink_grant_cb, NULL,
1020                                          &client->cl_grant_shrink_list);
1021         if (rc) {
1022                 CERROR("add grant client %s error %d\n", 
1023                         client->cl_import->imp_obd->obd_name, rc);
1024                 return rc;
1025         }
1026         CDEBUG(D_CACHE, "add grant client %s \n", 
1027                client->cl_import->imp_obd->obd_name);
1028         osc_update_next_shrink(client);
1029         return 0; 
1030 }
1031
1032 static int osc_del_shrink_grant(struct client_obd *client)
1033 {
1034         CDEBUG(D_CACHE, "del grant client %s \n", 
1035                client->cl_import->imp_obd->obd_name);
1036         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1037 }
1038
1039 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1040 {
1041         client_obd_list_lock(&cli->cl_loi_list_lock);
1042         cli->cl_avail_grant = ocd->ocd_grant;
1043         client_obd_list_unlock(&cli->cl_loi_list_lock);
1044
1045         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1046             list_empty(&cli->cl_grant_shrink_list))
1047                 osc_add_shrink_grant(cli);
1048
1049         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1050                cli->cl_avail_grant, cli->cl_lost_grant);
1051         LASSERT(cli->cl_avail_grant >= 0);
1052 }
1053
1054 /* We assume that the reason this OSC got a short read is because it read
1055  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1056  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1057  * this stripe never got written at or beyond this stripe offset yet. */
1058 static void handle_short_read(int nob_read, obd_count page_count,
1059                               struct brw_page **pga)
1060 {
1061         char *ptr;
1062         int i = 0;
1063
1064         /* skip bytes read OK */
1065         while (nob_read > 0) {
1066                 LASSERT (page_count > 0);
1067
1068                 if (pga[i]->count > nob_read) {
1069                         /* EOF inside this page */
1070                         ptr = cfs_kmap(pga[i]->pg) +
1071                                 (pga[i]->off & ~CFS_PAGE_MASK);
1072                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1073                         cfs_kunmap(pga[i]->pg);
1074                         page_count--;
1075                         i++;
1076                         break;
1077                 }
1078
1079                 nob_read -= pga[i]->count;
1080                 page_count--;
1081                 i++;
1082         }
1083
1084         /* zero remaining pages */
1085         while (page_count-- > 0) {
1086                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1087                 memset(ptr, 0, pga[i]->count);
1088                 cfs_kunmap(pga[i]->pg);
1089                 i++;
1090         }
1091 }
1092
1093 static int check_write_rcs(struct ptlrpc_request *req,
1094                            int requested_nob, int niocount,
1095                            obd_count page_count, struct brw_page **pga)
1096 {
1097         int    *remote_rcs, i;
1098
1099         /* return error if any niobuf was in error */
1100         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1101                                         sizeof(*remote_rcs) * niocount, NULL);
1102         if (remote_rcs == NULL) {
1103                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1104                 return(-EPROTO);
1105         }
1106         if (lustre_msg_swabbed(req->rq_repmsg))
1107                 for (i = 0; i < niocount; i++)
1108                         __swab32s(&remote_rcs[i]);
1109
1110         for (i = 0; i < niocount; i++) {
1111                 if (remote_rcs[i] < 0)
1112                         return(remote_rcs[i]);
1113
1114                 if (remote_rcs[i] != 0) {
1115                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1116                                 i, remote_rcs[i], req);
1117                         return(-EPROTO);
1118                 }
1119         }
1120
1121         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1122                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1123                        req->rq_bulk->bd_nob_transferred, requested_nob);
1124                 return(-EPROTO);
1125         }
1126
1127         return (0);
1128 }
1129
1130 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1131 {
1132         if (p1->flag != p2->flag) {
1133                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1134                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1135
1136                 /* warn if we try to combine flags that we don't know to be
1137                  * safe to combine */
1138                 if ((p1->flag & mask) != (p2->flag & mask))
1139                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1140                                "same brw?\n", p1->flag, p2->flag);
1141                 return 0;
1142         }
1143
1144         return (p1->off + p1->count == p2->off);
1145 }
1146
1147 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1148                                    struct brw_page **pga, int opc,
1149                                    cksum_type_t cksum_type)
1150 {
1151         __u32 cksum;
1152         int i = 0;
1153
1154         LASSERT (pg_count > 0);
1155         cksum = init_checksum(cksum_type);
1156         while (nob > 0 && pg_count > 0) {
1157                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1158                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1159                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1160
1161                 /* corrupt the data before we compute the checksum, to
1162                  * simulate an OST->client data error */
1163                 if (i == 0 && opc == OST_READ &&
1164                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1165                         memcpy(ptr + off, "bad1", min(4, nob));
1166                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1167                 cfs_kunmap(pga[i]->pg);
1168                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1169                                off, cksum);
1170
1171                 nob -= pga[i]->count;
1172                 pg_count--;
1173                 i++;
1174         }
1175         /* For sending we only compute the wrong checksum instead
1176          * of corrupting the data so it is still correct on a redo */
1177         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1178                 cksum++;
1179
1180         return cksum;
1181 }
1182
1183 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1184                                 struct lov_stripe_md *lsm, obd_count page_count,
1185                                 struct brw_page **pga,
1186                                 struct ptlrpc_request **reqp,
1187                                 struct obd_capa *ocapa, int reserve)
1188 {
1189         struct ptlrpc_request   *req;
1190         struct ptlrpc_bulk_desc *desc;
1191         struct ost_body         *body;
1192         struct obd_ioobj        *ioobj;
1193         struct niobuf_remote    *niobuf;
1194         int niocount, i, requested_nob, opc, rc;
1195         struct osc_brw_async_args *aa;
1196         struct req_capsule      *pill;
1197         struct brw_page *pg_prev;
1198
1199         ENTRY;
1200         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1201                 RETURN(-ENOMEM); /* Recoverable */
1202         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1203                 RETURN(-EINVAL); /* Fatal */
1204
1205         if ((cmd & OBD_BRW_WRITE) != 0) {
1206                 opc = OST_WRITE;
1207                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1208                                                 cli->cl_import->imp_rq_pool,
1209                                                 &RQF_OST_BRW);
1210         } else {
1211                 opc = OST_READ;
1212                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1213         }
1214         if (req == NULL)
1215                 RETURN(-ENOMEM);
1216
1217         for (niocount = i = 1; i < page_count; i++) {
1218                 if (!can_merge_pages(pga[i - 1], pga[i]))
1219                         niocount++;
1220         }
1221
1222         pill = &req->rq_pill;
1223         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1224                              niocount * sizeof(*niobuf));
1225         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1226
1227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1228         if (rc) {
1229                 ptlrpc_request_free(req);
1230                 RETURN(rc);
1231         }
1232         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1233         ptlrpc_at_set_req_timeout(req);
1234
1235         if (opc == OST_WRITE)
1236                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1237                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1238         else
1239                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1240                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1241
1242         if (desc == NULL)
1243                 GOTO(out, rc = -ENOMEM);
1244         /* NB request now owns desc and will free it when it gets freed */
1245
1246         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1247         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1248         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1249         LASSERT(body && ioobj && niobuf);
1250
1251         body->oa = *oa;
1252
1253         obdo_to_ioobj(oa, ioobj);
1254         ioobj->ioo_bufcnt = niocount;
1255         osc_pack_capa(req, body, ocapa);
1256         LASSERT (page_count > 0);
1257         pg_prev = pga[0];
1258         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1259                 struct brw_page *pg = pga[i];
1260
1261                 LASSERT(pg->count > 0);
1262                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1263                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1264                          pg->off, pg->count);
1265 #ifdef __linux__
1266                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1267                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1268                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1269                          i, page_count,
1270                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1271                          pg_prev->pg, page_private(pg_prev->pg),
1272                          pg_prev->pg->index, pg_prev->off);
1273 #else
1274                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1275                          "i %d p_c %u\n", i, page_count);
1276 #endif
1277                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1278                         (pg->flag & OBD_BRW_SRVLOCK));
1279
1280                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1281                                       pg->count);
1282                 requested_nob += pg->count;
1283
1284                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1285                         niobuf--;
1286                         niobuf->len += pg->count;
1287                 } else {
1288                         niobuf->offset = pg->off;
1289                         niobuf->len    = pg->count;
1290                         niobuf->flags  = pg->flag;
1291                 }
1292                 pg_prev = pg;
1293         }
1294
1295         LASSERTF((void *)(niobuf - niocount) ==
1296                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1297                                niocount * sizeof(*niobuf)),
1298                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1299                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1300                 (void *)(niobuf - niocount));
1301
1302         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1303         if (osc_should_shrink_grant(cli))
1304                 osc_shrink_grant_local(cli, &body->oa); 
1305
1306         /* size[REQ_REC_OFF] still sizeof (*body) */
1307         if (opc == OST_WRITE) {
1308                 if (unlikely(cli->cl_checksum) &&
1309                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1310                         /* store cl_cksum_type in a local variable since
1311                          * it can be changed via lprocfs */
1312                         cksum_type_t cksum_type = cli->cl_cksum_type;
1313
1314                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1315                                 oa->o_flags = body->oa.o_flags = 0;
1316                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1317                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1318                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1319                                                              page_count, pga,
1320                                                              OST_WRITE,
1321                                                              cksum_type);
1322                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1323                                body->oa.o_cksum);
1324                         /* save this in 'oa', too, for later checking */
1325                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1326                         oa->o_flags |= cksum_type_pack(cksum_type);
1327                 } else {
1328                         /* clear out the checksum flag, in case this is a
1329                          * resend but cl_checksum is no longer set. b=11238 */
1330                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1331                 }
1332                 oa->o_cksum = body->oa.o_cksum;
1333                 /* 1 RC per niobuf */
1334                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1335                                      sizeof(__u32) * niocount);
1336         } else {
1337                 if (unlikely(cli->cl_checksum) &&
1338                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1339                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1340                                 body->oa.o_flags = 0;
1341                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1342                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1343                 }
1344                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1345                 /* 1 RC for the whole I/O */
1346         }
1347         ptlrpc_request_set_replen(req);
1348
1349         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1350         aa = ptlrpc_req_async_args(req);
1351         aa->aa_oa = oa;
1352         aa->aa_requested_nob = requested_nob;
1353         aa->aa_nio_count = niocount;
1354         aa->aa_page_count = page_count;
1355         aa->aa_resends = 0;
1356         aa->aa_ppga = pga;
1357         aa->aa_cli = cli;
1358         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1359         if (ocapa && reserve)
1360                 aa->aa_ocapa = capa_get(ocapa);
1361
1362         *reqp = req;
1363         RETURN(0);
1364
1365  out:
1366         ptlrpc_req_finished(req);
1367         RETURN(rc);
1368 }
1369
1370 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1371                                 __u32 client_cksum, __u32 server_cksum, int nob,
1372                                 obd_count page_count, struct brw_page **pga,
1373                                 cksum_type_t client_cksum_type)
1374 {
1375         __u32 new_cksum;
1376         char *msg;
1377         cksum_type_t cksum_type;
1378
1379         if (server_cksum == client_cksum) {
1380                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1381                 return 0;
1382         }
1383
1384         if (oa->o_valid & OBD_MD_FLFLAGS)
1385                 cksum_type = cksum_type_unpack(oa->o_flags);
1386         else
1387                 cksum_type = OBD_CKSUM_CRC32;
1388
1389         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1390                                       cksum_type);
1391
1392         if (cksum_type != client_cksum_type)
1393                 msg = "the server did not use the checksum type specified in "
1394                       "the original request - likely a protocol problem";
1395         else if (new_cksum == server_cksum)
1396                 msg = "changed on the client after we checksummed it - "
1397                       "likely false positive due to mmap IO (bug 11742)";
1398         else if (new_cksum == client_cksum)
1399                 msg = "changed in transit before arrival at OST";
1400         else
1401                 msg = "changed in transit AND doesn't match the original - "
1402                       "likely false positive due to mmap IO (bug 11742)";
1403
1404         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1405                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1406                            "["LPU64"-"LPU64"]\n",
1407                            msg, libcfs_nid2str(peer->nid),
1408                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1409                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1410                                                         (__u64)0,
1411                            oa->o_id,
1412                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1413                            pga[0]->off,
1414                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1415         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1416                "client csum now %x\n", client_cksum, client_cksum_type,
1417                server_cksum, cksum_type, new_cksum);
1418         return 1;
1419 }
1420
1421 /* Note rc enters this function as number of bytes transferred */
1422 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1423 {
1424         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1425         const lnet_process_id_t *peer =
1426                         &req->rq_import->imp_connection->c_peer;
1427         struct client_obd *cli = aa->aa_cli;
1428         struct ost_body *body;
1429         __u32 client_cksum = 0;
1430         ENTRY;
1431
1432         if (rc < 0 && rc != -EDQUOT)
1433                 RETURN(rc);
1434
1435         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1436         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1437                                   lustre_swab_ost_body);
1438         if (body == NULL) {
1439                 CDEBUG(D_INFO, "Can't unpack body\n");
1440                 RETURN(-EPROTO);
1441         }
1442
1443         /* set/clear over quota flag for a uid/gid */
1444         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1445             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1446                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1447                              body->oa.o_gid, body->oa.o_valid,
1448                              body->oa.o_flags);
1449
1450         if (rc < 0)
1451                 RETURN(rc);
1452
1453         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1454                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1455
1456         osc_update_grant(cli, body);
1457
1458         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1459                 if (rc > 0) {
1460                         CERROR("Unexpected +ve rc %d\n", rc);
1461                         RETURN(-EPROTO);
1462                 }
1463                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1464
1465                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1466                         RETURN(-EAGAIN);
1467
1468                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1469                     check_write_checksum(&body->oa, peer, client_cksum,
1470                                          body->oa.o_cksum, aa->aa_requested_nob,
1471                                          aa->aa_page_count, aa->aa_ppga,
1472                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1473                         RETURN(-EAGAIN);
1474
1475                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1476                                      aa->aa_page_count, aa->aa_ppga);
1477                 GOTO(out, rc);
1478         }
1479
1480         /* The rest of this function executes only for OST_READs */
1481
1482         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1483         if (rc < 0)
1484                 GOTO(out, rc);
1485
1486         if (rc > aa->aa_requested_nob) {
1487                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1488                        aa->aa_requested_nob);
1489                 RETURN(-EPROTO);
1490         }
1491
1492         if (rc != req->rq_bulk->bd_nob_transferred) {
1493                 CERROR ("Unexpected rc %d (%d transferred)\n",
1494                         rc, req->rq_bulk->bd_nob_transferred);
1495                 return (-EPROTO);
1496         }
1497
1498         if (rc < aa->aa_requested_nob)
1499                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1500
1501         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1502                 static int cksum_counter;
1503                 __u32      server_cksum = body->oa.o_cksum;
1504                 char      *via;
1505                 char      *router;
1506                 cksum_type_t cksum_type;
1507
1508                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1509                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1510                 else
1511                         cksum_type = OBD_CKSUM_CRC32;
1512                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1513                                                  aa->aa_ppga, OST_READ,
1514                                                  cksum_type);
1515
1516                 if (peer->nid == req->rq_bulk->bd_sender) {
1517                         via = router = "";
1518                 } else {
1519                         via = " via ";
1520                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1521                 }
1522
1523                 if (server_cksum == ~0 && rc > 0) {
1524                         CERROR("Protocol error: server %s set the 'checksum' "
1525                                "bit, but didn't send a checksum.  Not fatal, "
1526                                "but please notify on http://bugzilla.lustre.org/\n",
1527                                libcfs_nid2str(peer->nid));
1528                 } else if (server_cksum != client_cksum) {
1529                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1530                                            "%s%s%s inum "LPU64"/"LPU64" object "
1531                                            LPU64"/"LPU64" extent "
1532                                            "["LPU64"-"LPU64"]\n",
1533                                            req->rq_import->imp_obd->obd_name,
1534                                            libcfs_nid2str(peer->nid),
1535                                            via, router,
1536                                            body->oa.o_valid & OBD_MD_FLFID ?
1537                                                 body->oa.o_fid : (__u64)0,
1538                                            body->oa.o_valid & OBD_MD_FLFID ?
1539                                                 body->oa.o_generation :(__u64)0,
1540                                            body->oa.o_id,
1541                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1542                                                 body->oa.o_gr : (__u64)0,
1543                                            aa->aa_ppga[0]->off,
1544                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1545                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1546                                                                         1);
1547                         CERROR("client %x, server %x, cksum_type %x\n",
1548                                client_cksum, server_cksum, cksum_type);
1549                         cksum_counter = 0;
1550                         aa->aa_oa->o_cksum = client_cksum;
1551                         rc = -EAGAIN;
1552                 } else {
1553                         cksum_counter++;
1554                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1555                         rc = 0;
1556                 }
1557         } else if (unlikely(client_cksum)) {
1558                 static int cksum_missed;
1559
1560                 cksum_missed++;
1561                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1562                         CERROR("Checksum %u requested from %s but not sent\n",
1563                                cksum_missed, libcfs_nid2str(peer->nid));
1564         } else {
1565                 rc = 0;
1566         }
1567 out:
1568         if (rc >= 0)
1569                 *aa->aa_oa = body->oa;
1570
1571         RETURN(rc);
1572 }
1573
1574 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1575                             struct lov_stripe_md *lsm,
1576                             obd_count page_count, struct brw_page **pga,
1577                             struct obd_capa *ocapa)
1578 {
1579         struct ptlrpc_request *req;
1580         int                    rc;
1581         cfs_waitq_t            waitq;
1582         int                    resends = 0;
1583         struct l_wait_info     lwi;
1584
1585         ENTRY;
1586
1587         cfs_waitq_init(&waitq);
1588
1589 restart_bulk:
1590         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1591                                   page_count, pga, &req, ocapa, 0);
1592         if (rc != 0)
1593                 return (rc);
1594
1595         rc = ptlrpc_queue_wait(req);
1596
1597         if (rc == -ETIMEDOUT && req->rq_resend) {
1598                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1599                 ptlrpc_req_finished(req);
1600                 goto restart_bulk;
1601         }
1602
1603         rc = osc_brw_fini_request(req, rc);
1604
1605         ptlrpc_req_finished(req);
1606         if (osc_recoverable_error(rc)) {
1607                 resends++;
1608                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1609                         CERROR("too many resend retries, returning error\n");
1610                         RETURN(-EIO);
1611                 }
1612
1613                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1614                 l_wait_event(waitq, 0, &lwi);
1615
1616                 goto restart_bulk;
1617         }
1618
1619         RETURN (rc);
1620 }
1621
1622 int osc_brw_redo_request(struct ptlrpc_request *request,
1623                          struct osc_brw_async_args *aa)
1624 {
1625         struct ptlrpc_request *new_req;
1626         struct ptlrpc_request_set *set = request->rq_set;
1627         struct osc_brw_async_args *new_aa;
1628         struct osc_async_page *oap;
1629         int rc = 0;
1630         ENTRY;
1631
1632         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1633                 CERROR("too many resend retries, returning error\n");
1634                 RETURN(-EIO);
1635         }
1636
1637         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1638
1639         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1640                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1641                                   aa->aa_cli, aa->aa_oa,
1642                                   NULL /* lsm unused by osc currently */,
1643                                   aa->aa_page_count, aa->aa_ppga,
1644                                   &new_req, aa->aa_ocapa, 0);
1645         if (rc)
1646                 RETURN(rc);
1647
1648         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1649
1650         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1651                 if (oap->oap_request != NULL) {
1652                         LASSERTF(request == oap->oap_request,
1653                                  "request %p != oap_request %p\n",
1654                                  request, oap->oap_request);
1655                         if (oap->oap_interrupted) {
1656                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1657                                 ptlrpc_req_finished(new_req);
1658                                 RETURN(-EINTR);
1659                         }
1660                 }
1661         }
1662         /* New request takes over pga and oaps from old request.
1663          * Note that copying a list_head doesn't work, need to move it... */
1664         aa->aa_resends++;
1665         new_req->rq_interpret_reply = request->rq_interpret_reply;
1666         new_req->rq_async_args = request->rq_async_args;
1667         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1668
1669         new_aa = ptlrpc_req_async_args(new_req);
1670
1671         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1672         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1673         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1674
1675         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1676                 if (oap->oap_request) {
1677                         ptlrpc_req_finished(oap->oap_request);
1678                         oap->oap_request = ptlrpc_request_addref(new_req);
1679                 }
1680         }
1681
1682         new_aa->aa_ocapa = aa->aa_ocapa;
1683         aa->aa_ocapa = NULL;
1684
1685         /* use ptlrpc_set_add_req is safe because interpret functions work
1686          * in check_set context. only one way exist with access to request
1687          * from different thread got -EINTR - this way protected with
1688          * cl_loi_list_lock */
1689         ptlrpc_set_add_req(set, new_req);
1690
1691         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1692
1693         DEBUG_REQ(D_INFO, new_req, "new request");
1694         RETURN(0);
1695 }
1696
1697 /*
1698  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1699  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1700  * fine for our small page arrays and doesn't require allocation.  its an
1701  * insertion sort that swaps elements that are strides apart, shrinking the
1702  * stride down until its '1' and the array is sorted.
1703  */
1704 static void sort_brw_pages(struct brw_page **array, int num)
1705 {
1706         int stride, i, j;
1707         struct brw_page *tmp;
1708
1709         if (num == 1)
1710                 return;
1711         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1712                 ;
1713
1714         do {
1715                 stride /= 3;
1716                 for (i = stride ; i < num ; i++) {
1717                         tmp = array[i];
1718                         j = i;
1719                         while (j >= stride && array[j - stride]->off > tmp->off) {
1720                                 array[j] = array[j - stride];
1721                                 j -= stride;
1722                         }
1723                         array[j] = tmp;
1724                 }
1725         } while (stride > 1);
1726 }
1727
1728 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1729 {
1730         int count = 1;
1731         int offset;
1732         int i = 0;
1733
1734         LASSERT (pages > 0);
1735         offset = pg[i]->off & ~CFS_PAGE_MASK;
1736
1737         for (;;) {
1738                 pages--;
1739                 if (pages == 0)         /* that's all */
1740                         return count;
1741
1742                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1743                         return count;   /* doesn't end on page boundary */
1744
1745                 i++;
1746                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1747                 if (offset != 0)        /* doesn't start on page boundary */
1748                         return count;
1749
1750                 count++;
1751         }
1752 }
1753
1754 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1755 {
1756         struct brw_page **ppga;
1757         int i;
1758
1759         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1760         if (ppga == NULL)
1761                 return NULL;
1762
1763         for (i = 0; i < count; i++)
1764                 ppga[i] = pga + i;
1765         return ppga;
1766 }
1767
1768 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1769 {
1770         LASSERT(ppga != NULL);
1771         OBD_FREE(ppga, sizeof(*ppga) * count);
1772 }
1773
1774 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1775                    obd_count page_count, struct brw_page *pga,
1776                    struct obd_trans_info *oti)
1777 {
1778         struct obdo *saved_oa = NULL;
1779         struct brw_page **ppga, **orig;
1780         struct obd_import *imp = class_exp2cliimp(exp);
1781         struct client_obd *cli;
1782         int rc, page_count_orig;
1783         ENTRY;
1784
1785         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1786         cli = &imp->imp_obd->u.cli;
1787
1788         if (cmd & OBD_BRW_CHECK) {
1789                 /* The caller just wants to know if there's a chance that this
1790                  * I/O can succeed */
1791
1792                 if (imp->imp_invalid)
1793                         RETURN(-EIO);
1794                 RETURN(0);
1795         }
1796
1797         /* test_brw with a failed create can trip this, maybe others. */
1798         LASSERT(cli->cl_max_pages_per_rpc);
1799
1800         rc = 0;
1801
1802         orig = ppga = osc_build_ppga(pga, page_count);
1803         if (ppga == NULL)
1804                 RETURN(-ENOMEM);
1805         page_count_orig = page_count;
1806
1807         sort_brw_pages(ppga, page_count);
1808         while (page_count) {
1809                 obd_count pages_per_brw;
1810
1811                 if (page_count > cli->cl_max_pages_per_rpc)
1812                         pages_per_brw = cli->cl_max_pages_per_rpc;
1813                 else
1814                         pages_per_brw = page_count;
1815
1816                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1817
1818                 if (saved_oa != NULL) {
1819                         /* restore previously saved oa */
1820                         *oinfo->oi_oa = *saved_oa;
1821                 } else if (page_count > pages_per_brw) {
1822                         /* save a copy of oa (brw will clobber it) */
1823                         OBDO_ALLOC(saved_oa);
1824                         if (saved_oa == NULL)
1825                                 GOTO(out, rc = -ENOMEM);
1826                         *saved_oa = *oinfo->oi_oa;
1827                 }
1828
1829                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1830                                       pages_per_brw, ppga, oinfo->oi_capa);
1831
1832                 if (rc != 0)
1833                         break;
1834
1835                 page_count -= pages_per_brw;
1836                 ppga += pages_per_brw;
1837         }
1838
1839 out:
1840         osc_release_ppga(orig, page_count_orig);
1841
1842         if (saved_oa != NULL)
1843                 OBDO_FREE(saved_oa);
1844
1845         RETURN(rc);
1846 }
1847
1848 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1849  * the dirty accounting.  Writeback completes or truncate happens before
1850  * writing starts.  Must be called with the loi lock held. */
1851 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1852                            int sent)
1853 {
1854         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1855 }
1856
1857
1858 /* This maintains the lists of pending pages to read/write for a given object
1859  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1860  * to quickly find objects that are ready to send an RPC. */
1861 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1862                          int cmd)
1863 {
1864         int optimal;
1865         ENTRY;
1866
1867         if (lop->lop_num_pending == 0)
1868                 RETURN(0);
1869
1870         /* if we have an invalid import we want to drain the queued pages
1871          * by forcing them through rpcs that immediately fail and complete
1872          * the pages.  recovery relies on this to empty the queued pages
1873          * before canceling the locks and evicting down the llite pages */
1874         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1875                 RETURN(1);
1876
1877         /* stream rpcs in queue order as long as as there is an urgent page
1878          * queued.  this is our cheap solution for good batching in the case
1879          * where writepage marks some random page in the middle of the file
1880          * as urgent because of, say, memory pressure */
1881         if (!list_empty(&lop->lop_urgent)) {
1882                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1883                 RETURN(1);
1884         }
1885         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1886         optimal = cli->cl_max_pages_per_rpc;
1887         if (cmd & OBD_BRW_WRITE) {
1888                 /* trigger a write rpc stream as long as there are dirtiers
1889                  * waiting for space.  as they're waiting, they're not going to
1890                  * create more pages to coallesce with what's waiting.. */
1891                 if (!list_empty(&cli->cl_cache_waiters)) {
1892                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1893                         RETURN(1);
1894                 }
1895                 /* +16 to avoid triggering rpcs that would want to include pages
1896                  * that are being queued but which can't be made ready until
1897                  * the queuer finishes with the page. this is a wart for
1898                  * llite::commit_write() */
1899                 optimal += 16;
1900         }
1901         if (lop->lop_num_pending >= optimal)
1902                 RETURN(1);
1903
1904         RETURN(0);
1905 }
1906
1907 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1908 {
1909         struct osc_async_page *oap;
1910         ENTRY;
1911
1912         if (list_empty(&lop->lop_urgent))
1913                 RETURN(0);
1914
1915         oap = list_entry(lop->lop_urgent.next,
1916                          struct osc_async_page, oap_urgent_item);
1917
1918         if (oap->oap_async_flags & ASYNC_HP) {
1919                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1920                 RETURN(1);
1921         }
1922
1923         RETURN(0);
1924 }
1925
1926 static void on_list(struct list_head *item, struct list_head *list,
1927                     int should_be_on)
1928 {
1929         if (list_empty(item) && should_be_on)
1930                 list_add_tail(item, list);
1931         else if (!list_empty(item) && !should_be_on)
1932                 list_del_init(item);
1933 }
1934
1935 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1936  * can find pages to build into rpcs quickly */
1937 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1938 {
1939         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1940             lop_makes_hprpc(&loi->loi_read_lop)) {
1941                 /* HP rpc */
1942                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1943                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1944         } else {
1945                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1946                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1947                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1948                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1949         }
1950
1951         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1952                 loi->loi_write_lop.lop_num_pending);
1953
1954         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1955                 loi->loi_read_lop.lop_num_pending);
1956 }
1957
1958 static void lop_update_pending(struct client_obd *cli,
1959                                struct loi_oap_pages *lop, int cmd, int delta)
1960 {
1961         lop->lop_num_pending += delta;
1962         if (cmd & OBD_BRW_WRITE)
1963                 cli->cl_pending_w_pages += delta;
1964         else
1965                 cli->cl_pending_r_pages += delta;
1966 }
1967
1968 /**
1969  * this is called when a sync waiter receives an interruption.  Its job is to
1970  * get the caller woken as soon as possible.  If its page hasn't been put in an
1971  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1972  * desiring interruption which will forcefully complete the rpc once the rpc
1973  * has timed out.
1974  */
1975 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1976 {
1977         struct loi_oap_pages *lop;
1978         struct lov_oinfo *loi;
1979         int rc = -EBUSY;
1980         ENTRY;
1981
1982         LASSERT(!oap->oap_interrupted);
1983         oap->oap_interrupted = 1;
1984
1985         /* ok, it's been put in an rpc. only one oap gets a request reference */
1986         if (oap->oap_request != NULL) {
1987                 ptlrpc_mark_interrupted(oap->oap_request);
1988                 ptlrpcd_wake(oap->oap_request);
1989                 ptlrpc_req_finished(oap->oap_request);
1990                 oap->oap_request = NULL;
1991         }
1992
1993         /*
1994          * page completion may be called only if ->cpo_prep() method was
1995          * executed by osc_io_submit(), that also adds page the to pending list
1996          */
1997         if (!list_empty(&oap->oap_pending_item)) {
1998                 list_del_init(&oap->oap_pending_item);
1999                 list_del_init(&oap->oap_urgent_item);
2000
2001                 loi = oap->oap_loi;
2002                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2003                         &loi->loi_write_lop : &loi->loi_read_lop;
2004                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2005                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2006                 rc = oap->oap_caller_ops->ap_completion(env,
2007                                           oap->oap_caller_data,
2008                                           oap->oap_cmd, NULL, -EINTR);
2009         }
2010
2011         RETURN(rc);
2012 }
2013
2014 /* this is trying to propogate async writeback errors back up to the
2015  * application.  As an async write fails we record the error code for later if
2016  * the app does an fsync.  As long as errors persist we force future rpcs to be
2017  * sync so that the app can get a sync error and break the cycle of queueing
2018  * pages for which writeback will fail. */
2019 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2020                            int rc)
2021 {
2022         if (rc) {
2023                 if (!ar->ar_rc)
2024                         ar->ar_rc = rc;
2025
2026                 ar->ar_force_sync = 1;
2027                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2028                 return;
2029
2030         }
2031
2032         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2033                 ar->ar_force_sync = 0;
2034 }
2035
2036 void osc_oap_to_pending(struct osc_async_page *oap)
2037 {
2038         struct loi_oap_pages *lop;
2039
2040         if (oap->oap_cmd & OBD_BRW_WRITE)
2041                 lop = &oap->oap_loi->loi_write_lop;
2042         else
2043                 lop = &oap->oap_loi->loi_read_lop;
2044
2045         if (oap->oap_async_flags & ASYNC_HP)
2046                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2047         else if (oap->oap_async_flags & ASYNC_URGENT)
2048                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2049         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2050         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2051 }
2052
2053 /* this must be called holding the loi list lock to give coverage to exit_cache,
2054  * async_flag maintenance, and oap_request */
2055 static void osc_ap_completion(const struct lu_env *env,
2056                               struct client_obd *cli, struct obdo *oa,
2057                               struct osc_async_page *oap, int sent, int rc)
2058 {
2059         __u64 xid = 0;
2060
2061         ENTRY;
2062         if (oap->oap_request != NULL) {
2063                 xid = ptlrpc_req_xid(oap->oap_request);
2064                 ptlrpc_req_finished(oap->oap_request);
2065                 oap->oap_request = NULL;
2066         }
2067
2068         oap->oap_async_flags = 0;
2069         oap->oap_interrupted = 0;
2070
2071         if (oap->oap_cmd & OBD_BRW_WRITE) {
2072                 osc_process_ar(&cli->cl_ar, xid, rc);
2073                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2074         }
2075
2076         if (rc == 0 && oa != NULL) {
2077                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2078                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2079                 if (oa->o_valid & OBD_MD_FLMTIME)
2080                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2081                 if (oa->o_valid & OBD_MD_FLATIME)
2082                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2083                 if (oa->o_valid & OBD_MD_FLCTIME)
2084                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2085         }
2086
2087         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2088                                                 oap->oap_cmd, oa, rc);
2089
2090         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2091          * I/O on the page could start, but OSC calls it under lock
2092          * and thus we can add oap back to pending safely */
2093         if (rc)
2094                 /* upper layer wants to leave the page on pending queue */
2095                 osc_oap_to_pending(oap);
2096         else
2097                 osc_exit_cache(cli, oap, sent);
2098         EXIT;
2099 }
2100
2101 static int brw_interpret(const struct lu_env *env,
2102                          struct ptlrpc_request *req, void *data, int rc)
2103 {
2104         struct osc_brw_async_args *aa = data;
2105         struct client_obd *cli;
2106         int async;
2107         ENTRY;
2108
2109         rc = osc_brw_fini_request(req, rc);
2110         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2111         if (osc_recoverable_error(rc)) {
2112                 rc = osc_brw_redo_request(req, aa);
2113                 if (rc == 0)
2114                         RETURN(0);
2115         }
2116
2117         if (aa->aa_ocapa) {
2118                 capa_put(aa->aa_ocapa);
2119                 aa->aa_ocapa = NULL;
2120         }
2121
2122         cli = aa->aa_cli;
2123
2124         client_obd_list_lock(&cli->cl_loi_list_lock);
2125
2126         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2127          * is called so we know whether to go to sync BRWs or wait for more
2128          * RPCs to complete */
2129         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2130                 cli->cl_w_in_flight--;
2131         else
2132                 cli->cl_r_in_flight--;
2133
2134         async = list_empty(&aa->aa_oaps);
2135         if (!async) { /* from osc_send_oap_rpc() */
2136                 struct osc_async_page *oap, *tmp;
2137                 /* the caller may re-use the oap after the completion call so
2138                  * we need to clean it up a little */
2139                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2140                         list_del_init(&oap->oap_rpc_item);
2141                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2142                 }
2143                 OBDO_FREE(aa->aa_oa);
2144         } else { /* from async_internal() */
2145                 int i;
2146                 for (i = 0; i < aa->aa_page_count; i++)
2147                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2148         }
2149         osc_wake_cache_waiters(cli);
2150         osc_check_rpcs(env, cli);
2151         client_obd_list_unlock(&cli->cl_loi_list_lock);
2152         if (!async)
2153                 cl_req_completion(env, aa->aa_clerq, rc);
2154         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2155         RETURN(rc);
2156 }
2157
2158 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2159                                             struct client_obd *cli,
2160                                             struct list_head *rpc_list,
2161                                             int page_count, int cmd)
2162 {
2163         struct ptlrpc_request *req;
2164         struct brw_page **pga = NULL;
2165         struct osc_brw_async_args *aa;
2166         struct obdo *oa = NULL;
2167         const struct obd_async_page_ops *ops = NULL;
2168         void *caller_data = NULL;
2169         struct osc_async_page *oap;
2170         struct osc_async_page *tmp;
2171         struct ost_body *body;
2172         struct cl_req *clerq = NULL;
2173         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2174         struct ldlm_lock *lock = NULL;
2175         struct cl_req_attr crattr;
2176         int i, rc;
2177
2178         ENTRY;
2179         LASSERT(!list_empty(rpc_list));
2180
2181         memset(&crattr, 0, sizeof crattr);
2182         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2183         if (pga == NULL)
2184                 GOTO(out, req = ERR_PTR(-ENOMEM));
2185
2186         OBDO_ALLOC(oa);
2187         if (oa == NULL)
2188                 GOTO(out, req = ERR_PTR(-ENOMEM));
2189
2190         i = 0;
2191         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2192                 struct cl_page *page = osc_oap2cl_page(oap);
2193                 if (ops == NULL) {
2194                         ops = oap->oap_caller_ops;
2195                         caller_data = oap->oap_caller_data;
2196
2197                         clerq = cl_req_alloc(env, page, crt,
2198                                              1 /* only 1-object rpcs for
2199                                                 * now */);
2200                         if (IS_ERR(clerq))
2201                                 GOTO(out, req = (void *)clerq);
2202                         lock = oap->oap_ldlm_lock;
2203                 }
2204                 pga[i] = &oap->oap_brw_page;
2205                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2206                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2207                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2208                 i++;
2209                 cl_req_page_add(env, clerq, page);
2210         }
2211
2212         /* always get the data for the obdo for the rpc */
2213         LASSERT(ops != NULL);
2214         crattr.cra_oa = oa;
2215         crattr.cra_capa = NULL;
2216         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2217         if (lock) {
2218                 oa->o_handle = lock->l_remote_handle;
2219                 oa->o_valid |= OBD_MD_FLHANDLE;
2220         }
2221
2222         rc = cl_req_prep(env, clerq);
2223         if (rc != 0) {
2224                 CERROR("cl_req_prep failed: %d\n", rc);
2225                 GOTO(out, req = ERR_PTR(rc));
2226         }
2227
2228         sort_brw_pages(pga, page_count);
2229         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2230                                   pga, &req, crattr.cra_capa, 1);
2231         if (rc != 0) {
2232                 CERROR("prep_req failed: %d\n", rc);
2233                 GOTO(out, req = ERR_PTR(rc));
2234         }
2235
2236         /* Need to update the timestamps after the request is built in case
2237          * we race with setattr (locally or in queue at OST).  If OST gets
2238          * later setattr before earlier BRW (as determined by the request xid),
2239          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2240          * way to do this in a single call.  bug 10150 */
2241         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2242         cl_req_attr_set(env, clerq, &crattr,
2243                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2244
2245         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2246         aa = ptlrpc_req_async_args(req);
2247         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2248         list_splice(rpc_list, &aa->aa_oaps);
2249         CFS_INIT_LIST_HEAD(rpc_list);
2250         aa->aa_clerq = clerq;
2251 out:
2252         capa_put(crattr.cra_capa);
2253         if (IS_ERR(req)) {
2254                 if (oa)
2255                         OBDO_FREE(oa);
2256                 if (pga)
2257                         OBD_FREE(pga, sizeof(*pga) * page_count);
2258                 /* this should happen rarely and is pretty bad, it makes the
2259                  * pending list not follow the dirty order */
2260                 client_obd_list_lock(&cli->cl_loi_list_lock);
2261                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2262                         list_del_init(&oap->oap_rpc_item);
2263
2264                         /* queued sync pages can be torn down while the pages
2265                          * were between the pending list and the rpc */
2266                         if (oap->oap_interrupted) {
2267                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2268                                 osc_ap_completion(env, cli, NULL, oap, 0,
2269                                                   oap->oap_count);
2270                                 continue;
2271                         }
2272                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2273                 }
2274                 if (clerq && !IS_ERR(clerq))
2275                         cl_req_completion(env, clerq, PTR_ERR(req));
2276         }
2277         RETURN(req);
2278 }
2279
2280 /**
2281  * prepare pages for ASYNC io and put pages in send queue.
2282  *
2283  * \param cli -
2284  * \param loi -
2285  * \param cmd - OBD_BRW_* macroses
2286  * \param lop - pending pages
2287  *
2288  * \return zero if pages successfully add to send queue.
2289  * \return not zere if error occurring.
2290  */
2291 static int
2292 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2293                  struct lov_oinfo *loi,
2294                  int cmd, struct loi_oap_pages *lop)
2295 {
2296         struct ptlrpc_request *req;
2297         obd_count page_count = 0;
2298         struct osc_async_page *oap = NULL, *tmp;
2299         struct osc_brw_async_args *aa;
2300         const struct obd_async_page_ops *ops;
2301         CFS_LIST_HEAD(rpc_list);
2302         unsigned int ending_offset;
2303         unsigned  starting_offset = 0;
2304         int srvlock = 0;
2305         struct cl_object *clob = NULL;
2306         ENTRY;
2307
2308         /* If there are HP OAPs we need to handle at least 1 of them,
2309          * move it the beginning of the pending list for that. */
2310         if (!list_empty(&lop->lop_urgent)) {
2311                 oap = list_entry(lop->lop_urgent.next,
2312                                  struct osc_async_page, oap_urgent_item);
2313                 if (oap->oap_async_flags & ASYNC_HP)
2314                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2315         }
2316
2317         /* first we find the pages we're allowed to work with */
2318         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2319                                  oap_pending_item) {
2320                 ops = oap->oap_caller_ops;
2321
2322                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2323                          "magic 0x%x\n", oap, oap->oap_magic);
2324
2325                 if (clob == NULL) {
2326                         /* pin object in memory, so that completion call-backs
2327                          * can be safely called under client_obd_list lock. */
2328                         clob = osc_oap2cl_page(oap)->cp_obj;
2329                         cl_object_get(clob);
2330                 }
2331
2332                 if (page_count != 0 &&
2333                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2334                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2335                                " oap %p, page %p, srvlock %u\n",
2336                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2337                         break;
2338                 }
2339                 /* in llite being 'ready' equates to the page being locked
2340                  * until completion unlocks it.  commit_write submits a page
2341                  * as not ready because its unlock will happen unconditionally
2342                  * as the call returns.  if we race with commit_write giving
2343                  * us that page we dont' want to create a hole in the page
2344                  * stream, so we stop and leave the rpc to be fired by
2345                  * another dirtier or kupdated interval (the not ready page
2346                  * will still be on the dirty list).  we could call in
2347                  * at the end of ll_file_write to process the queue again. */
2348                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2349                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2350                                                     cmd);
2351                         if (rc < 0)
2352                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2353                                                 "instead of ready\n", oap,
2354                                                 oap->oap_page, rc);
2355                         switch (rc) {
2356                         case -EAGAIN:
2357                                 /* llite is telling us that the page is still
2358                                  * in commit_write and that we should try
2359                                  * and put it in an rpc again later.  we
2360                                  * break out of the loop so we don't create
2361                                  * a hole in the sequence of pages in the rpc
2362                                  * stream.*/
2363                                 oap = NULL;
2364                                 break;
2365                         case -EINTR:
2366                                 /* the io isn't needed.. tell the checks
2367                                  * below to complete the rpc with EINTR */
2368                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2369                                 oap->oap_count = -EINTR;
2370                                 break;
2371                         case 0:
2372                                 oap->oap_async_flags |= ASYNC_READY;
2373                                 break;
2374                         default:
2375                                 LASSERTF(0, "oap %p page %p returned %d "
2376                                             "from make_ready\n", oap,
2377                                             oap->oap_page, rc);
2378                                 break;
2379                         }
2380                 }
2381                 if (oap == NULL)
2382                         break;
2383                 /*
2384                  * Page submitted for IO has to be locked. Either by
2385                  * ->ap_make_ready() or by higher layers.
2386                  */
2387 #if defined(__KERNEL__) && defined(__linux__)
2388                 {
2389                         struct cl_page *page;
2390
2391                         page = osc_oap2cl_page(oap);
2392
2393                         if (page->cp_type == CPT_CACHEABLE &&
2394                             !(PageLocked(oap->oap_page) &&
2395                               (CheckWriteback(oap->oap_page, cmd)))) {
2396                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2397                                        oap->oap_page,
2398                                        (long)oap->oap_page->flags,
2399                                        oap->oap_async_flags);
2400                                 LBUG();
2401                         }
2402                 }
2403 #endif
2404                 /* If there is a gap at the start of this page, it can't merge
2405                  * with any previous page, so we'll hand the network a
2406                  * "fragmented" page array that it can't transfer in 1 RDMA */
2407                 if (page_count != 0 && oap->oap_page_off != 0)
2408                         break;
2409
2410                 /* take the page out of our book-keeping */
2411                 list_del_init(&oap->oap_pending_item);
2412                 lop_update_pending(cli, lop, cmd, -1);
2413                 list_del_init(&oap->oap_urgent_item);
2414
2415                 if (page_count == 0)
2416                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2417                                           (PTLRPC_MAX_BRW_SIZE - 1);
2418
2419                 /* ask the caller for the size of the io as the rpc leaves. */
2420                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2421                         oap->oap_count =
2422                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2423                                                       cmd);
2424                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2425                 }
2426                 if (oap->oap_count <= 0) {
2427                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2428                                oap->oap_count);
2429                         osc_ap_completion(env, cli, NULL,
2430                                           oap, 0, oap->oap_count);
2431                         continue;
2432                 }
2433
2434                 /* now put the page back in our accounting */
2435                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2436                 if (page_count == 0)
2437                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2438                 if (++page_count >= cli->cl_max_pages_per_rpc)
2439                         break;
2440
2441                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2442                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2443                  * have the same alignment as the initial writes that allocated
2444                  * extents on the server. */
2445                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2446                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2447                 if (ending_offset == 0)
2448                         break;
2449
2450                 /* If there is a gap at the end of this page, it can't merge
2451                  * with any subsequent pages, so we'll hand the network a
2452                  * "fragmented" page array that it can't transfer in 1 RDMA */
2453                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2454                         break;
2455         }
2456
2457         osc_wake_cache_waiters(cli);
2458
2459         loi_list_maint(cli, loi);
2460
2461         client_obd_list_unlock(&cli->cl_loi_list_lock);
2462
2463         if (clob != NULL)
2464                 cl_object_put(env, clob);
2465
2466         if (page_count == 0) {
2467                 client_obd_list_lock(&cli->cl_loi_list_lock);
2468                 RETURN(0);
2469         }
2470
2471         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2472         if (IS_ERR(req)) {
2473                 LASSERT(list_empty(&rpc_list));
2474                 loi_list_maint(cli, loi);
2475                 RETURN(PTR_ERR(req));
2476         }
2477
2478         aa = ptlrpc_req_async_args(req);
2479
2480         if (cmd == OBD_BRW_READ) {
2481                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2482                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2483                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2484                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2485         } else {
2486                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2487                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2488                                  cli->cl_w_in_flight);
2489                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2490                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2491         }
2492         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2493
2494         client_obd_list_lock(&cli->cl_loi_list_lock);
2495
2496         if (cmd == OBD_BRW_READ)
2497                 cli->cl_r_in_flight++;
2498         else
2499                 cli->cl_w_in_flight++;
2500
2501         /* queued sync pages can be torn down while the pages
2502          * were between the pending list and the rpc */
2503         tmp = NULL;
2504         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2505                 /* only one oap gets a request reference */
2506                 if (tmp == NULL)
2507                         tmp = oap;
2508                 if (oap->oap_interrupted && !req->rq_intr) {
2509                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2510                                oap, req);
2511                         ptlrpc_mark_interrupted(req);
2512                 }
2513         }
2514         if (tmp != NULL)
2515                 tmp->oap_request = ptlrpc_request_addref(req);
2516
2517         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2518                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2519
2520         req->rq_interpret_reply = brw_interpret;
2521         ptlrpcd_add_req(req, PSCOPE_BRW);
2522         RETURN(1);
2523 }
2524
2525 #define LOI_DEBUG(LOI, STR, args...)                                     \
2526         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2527                !list_empty(&(LOI)->loi_ready_item) ||                    \
2528                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2529                (LOI)->loi_write_lop.lop_num_pending,                     \
2530                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2531                (LOI)->loi_read_lop.lop_num_pending,                      \
2532                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2533                args)                                                     \
2534
2535 /* This is called by osc_check_rpcs() to find which objects have pages that
2536  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2537 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2538 {
2539         ENTRY;
2540
2541         /* First return objects that have blocked locks so that they
2542          * will be flushed quickly and other clients can get the lock,
2543          * then objects which have pages ready to be stuffed into RPCs */
2544         if (!list_empty(&cli->cl_loi_hp_ready_list))
2545                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2546                                   struct lov_oinfo, loi_hp_ready_item));
2547         if (!list_empty(&cli->cl_loi_ready_list))
2548                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2549                                   struct lov_oinfo, loi_ready_item));
2550
2551         /* then if we have cache waiters, return all objects with queued
2552          * writes.  This is especially important when many small files
2553          * have filled up the cache and not been fired into rpcs because
2554          * they don't pass the nr_pending/object threshhold */
2555         if (!list_empty(&cli->cl_cache_waiters) &&
2556             !list_empty(&cli->cl_loi_write_list))
2557                 RETURN(list_entry(cli->cl_loi_write_list.next,
2558                                   struct lov_oinfo, loi_write_item));
2559
2560         /* then return all queued objects when we have an invalid import
2561          * so that they get flushed */
2562         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2563                 if (!list_empty(&cli->cl_loi_write_list))
2564                         RETURN(list_entry(cli->cl_loi_write_list.next,
2565                                           struct lov_oinfo, loi_write_item));
2566                 if (!list_empty(&cli->cl_loi_read_list))
2567                         RETURN(list_entry(cli->cl_loi_read_list.next,
2568                                           struct lov_oinfo, loi_read_item));
2569         }
2570         RETURN(NULL);
2571 }
2572
2573 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2574 {
2575         struct osc_async_page *oap;
2576         int hprpc = 0;
2577
2578         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2579                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2580                                  struct osc_async_page, oap_urgent_item);
2581                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2582         }
2583
2584         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2585                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2586                                  struct osc_async_page, oap_urgent_item);
2587                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2588         }
2589
2590         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2591 }
2592
2593 /* called with the loi list lock held */
2594 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2595 {
2596         struct lov_oinfo *loi;
2597         int rc = 0, race_counter = 0;
2598         ENTRY;
2599
2600         while ((loi = osc_next_loi(cli)) != NULL) {
2601                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2602
2603                 if (osc_max_rpc_in_flight(cli, loi))
2604                         break;
2605
2606                 /* attempt some read/write balancing by alternating between
2607                  * reads and writes in an object.  The makes_rpc checks here
2608                  * would be redundant if we were getting read/write work items
2609                  * instead of objects.  we don't want send_oap_rpc to drain a
2610                  * partial read pending queue when we're given this object to
2611                  * do io on writes while there are cache waiters */
2612                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2613                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2614                                               &loi->loi_write_lop);
2615                         if (rc < 0)
2616                                 break;
2617                         if (rc > 0)
2618                                 race_counter = 0;
2619                         else
2620                                 race_counter++;
2621                 }
2622                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2623                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2624                                               &loi->loi_read_lop);
2625                         if (rc < 0)
2626                                 break;
2627                         if (rc > 0)
2628                                 race_counter = 0;
2629                         else
2630                                 race_counter++;
2631                 }
2632
2633                 /* attempt some inter-object balancing by issueing rpcs
2634                  * for each object in turn */
2635                 if (!list_empty(&loi->loi_hp_ready_item))
2636                         list_del_init(&loi->loi_hp_ready_item);
2637                 if (!list_empty(&loi->loi_ready_item))
2638                         list_del_init(&loi->loi_ready_item);
2639                 if (!list_empty(&loi->loi_write_item))
2640                         list_del_init(&loi->loi_write_item);
2641                 if (!list_empty(&loi->loi_read_item))
2642                         list_del_init(&loi->loi_read_item);
2643
2644                 loi_list_maint(cli, loi);
2645
2646                 /* send_oap_rpc fails with 0 when make_ready tells it to
2647                  * back off.  llite's make_ready does this when it tries
2648                  * to lock a page queued for write that is already locked.
2649                  * we want to try sending rpcs from many objects, but we
2650                  * don't want to spin failing with 0.  */
2651                 if (race_counter == 10)
2652                         break;
2653         }
2654         EXIT;
2655 }
2656
2657 /* we're trying to queue a page in the osc so we're subject to the
2658  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2659  * If the osc's queued pages are already at that limit, then we want to sleep
2660  * until there is space in the osc's queue for us.  We also may be waiting for
2661  * write credits from the OST if there are RPCs in flight that may return some
2662  * before we fall back to sync writes.
2663  *
2664  * We need this know our allocation was granted in the presence of signals */
2665 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2666 {
2667         int rc;
2668         ENTRY;
2669         client_obd_list_lock(&cli->cl_loi_list_lock);
2670         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2671         client_obd_list_unlock(&cli->cl_loi_list_lock);
2672         RETURN(rc);
2673 };
2674
2675 /**
2676  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2677  * is available.
2678  */
2679 int osc_enter_cache_try(const struct lu_env *env,
2680                         struct client_obd *cli, struct lov_oinfo *loi,
2681                         struct osc_async_page *oap, int transient)
2682 {
2683         int has_grant;
2684
2685         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2686         if (has_grant) {
2687                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2688                 if (transient) {
2689                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2690                         atomic_inc(&obd_dirty_transit_pages);
2691                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2692                 }
2693         }
2694         return has_grant;
2695 }
2696
2697 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2698  * grant or cache space. */
2699 static int osc_enter_cache(const struct lu_env *env,
2700                            struct client_obd *cli, struct lov_oinfo *loi,
2701                            struct osc_async_page *oap)
2702 {
2703         struct osc_cache_waiter ocw;
2704         struct l_wait_info lwi = { 0 };
2705
2706         ENTRY;
2707
2708         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2709                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2710                cli->cl_dirty_max, obd_max_dirty_pages,
2711                cli->cl_lost_grant, cli->cl_avail_grant);
2712
2713         /* force the caller to try sync io.  this can jump the list
2714          * of queued writes and create a discontiguous rpc stream */
2715         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2716             loi->loi_ar.ar_force_sync)
2717                 RETURN(-EDQUOT);
2718
2719         /* Hopefully normal case - cache space and write credits available */
2720         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2721             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2722             osc_enter_cache_try(env, cli, loi, oap, 0))
2723                 RETURN(0);
2724
2725         /* Make sure that there are write rpcs in flight to wait for.  This
2726          * is a little silly as this object may not have any pending but
2727          * other objects sure might. */
2728         if (cli->cl_w_in_flight) {
2729                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2730                 cfs_waitq_init(&ocw.ocw_waitq);
2731                 ocw.ocw_oap = oap;
2732                 ocw.ocw_rc = 0;
2733
2734                 loi_list_maint(cli, loi);
2735                 osc_check_rpcs(env, cli);
2736                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2737
2738                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2739                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2740
2741                 client_obd_list_lock(&cli->cl_loi_list_lock);
2742                 if (!list_empty(&ocw.ocw_entry)) {
2743                         list_del(&ocw.ocw_entry);
2744                         RETURN(-EINTR);
2745                 }
2746                 RETURN(ocw.ocw_rc);
2747         }
2748
2749         RETURN(-EDQUOT);
2750 }
2751
2752
2753 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2754                         struct lov_oinfo *loi, cfs_page_t *page,
2755                         obd_off offset, const struct obd_async_page_ops *ops,
2756                         void *data, void **res, int nocache,
2757                         struct lustre_handle *lockh)
2758 {
2759         struct osc_async_page *oap;
2760
2761         ENTRY;
2762
2763         if (!page)
2764                 return size_round(sizeof(*oap));
2765
2766         oap = *res;
2767         oap->oap_magic = OAP_MAGIC;
2768         oap->oap_cli = &exp->exp_obd->u.cli;
2769         oap->oap_loi = loi;
2770
2771         oap->oap_caller_ops = ops;
2772         oap->oap_caller_data = data;
2773
2774         oap->oap_page = page;
2775         oap->oap_obj_off = offset;
2776         if (!client_is_remote(exp) &&
2777             cfs_capable(CFS_CAP_SYS_RESOURCE))
2778                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2779
2780         LASSERT(!(offset & ~CFS_PAGE_MASK));
2781
2782         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2783         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2784         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2785         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2786
2787         spin_lock_init(&oap->oap_lock);
2788         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2789         RETURN(0);
2790 }
2791
2792 struct osc_async_page *oap_from_cookie(void *cookie)
2793 {
2794         struct osc_async_page *oap = cookie;
2795         if (oap->oap_magic != OAP_MAGIC)
2796                 return ERR_PTR(-EINVAL);
2797         return oap;
2798 };
2799
2800 int osc_queue_async_io(const struct lu_env *env,
2801                        struct obd_export *exp, struct lov_stripe_md *lsm,
2802                        struct lov_oinfo *loi, void *cookie,
2803                        int cmd, obd_off off, int count,
2804                        obd_flag brw_flags, enum async_flags async_flags)
2805 {
2806         struct client_obd *cli = &exp->exp_obd->u.cli;
2807         struct osc_async_page *oap;
2808         int rc = 0;
2809         ENTRY;
2810
2811         oap = oap_from_cookie(cookie);
2812         if (IS_ERR(oap))
2813                 RETURN(PTR_ERR(oap));
2814
2815         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2816                 RETURN(-EIO);
2817
2818         if (!list_empty(&oap->oap_pending_item) ||
2819             !list_empty(&oap->oap_urgent_item) ||
2820             !list_empty(&oap->oap_rpc_item))
2821                 RETURN(-EBUSY);
2822
2823         /* check if the file's owner/group is over quota */
2824         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2825                 struct cl_object *obj;
2826                 struct cl_attr    attr; /* XXX put attr into thread info */
2827
2828                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2829
2830                 cl_object_attr_lock(obj);
2831                 rc = cl_object_attr_get(env, obj, &attr);
2832                 cl_object_attr_unlock(obj);
2833
2834                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2835                                             attr.cat_gid) == NO_QUOTA)
2836                         rc = -EDQUOT;
2837                 if (rc)
2838                         RETURN(rc);
2839         }
2840
2841         if (loi == NULL)
2842                 loi = lsm->lsm_oinfo[0];
2843
2844         client_obd_list_lock(&cli->cl_loi_list_lock);
2845
2846         LASSERT(off + count <= CFS_PAGE_SIZE);
2847         oap->oap_cmd = cmd;
2848         oap->oap_page_off = off;
2849         oap->oap_count = count;
2850         oap->oap_brw_flags = brw_flags;
2851         oap->oap_async_flags = async_flags;
2852
2853         if (cmd & OBD_BRW_WRITE) {
2854                 rc = osc_enter_cache(env, cli, loi, oap);
2855                 if (rc) {
2856                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2857                         RETURN(rc);
2858                 }
2859         }
2860
2861         osc_oap_to_pending(oap);
2862         loi_list_maint(cli, loi);
2863
2864         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2865                   cmd);
2866
2867         osc_check_rpcs(env, cli);
2868         client_obd_list_unlock(&cli->cl_loi_list_lock);
2869
2870         RETURN(0);
2871 }
2872
2873 /* aka (~was & now & flag), but this is more clear :) */
2874 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2875
2876 int osc_set_async_flags_base(struct client_obd *cli,
2877                              struct lov_oinfo *loi, struct osc_async_page *oap,
2878                              obd_flag async_flags)
2879 {
2880         struct loi_oap_pages *lop;
2881         ENTRY;
2882
2883         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2884                 RETURN(-EIO);
2885
2886         if (oap->oap_cmd & OBD_BRW_WRITE) {
2887                 lop = &loi->loi_write_lop;
2888         } else {
2889                 lop = &loi->loi_read_lop;
2890         }
2891
2892         if (list_empty(&oap->oap_pending_item))
2893                 RETURN(-EINVAL);
2894
2895         if ((oap->oap_async_flags & async_flags) == async_flags)
2896                 RETURN(0);
2897
2898         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2899                 oap->oap_async_flags |= ASYNC_READY;
2900
2901         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2902             list_empty(&oap->oap_rpc_item)) {
2903                 if (oap->oap_async_flags & ASYNC_HP)
2904                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2905                 else
2906                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2907                 oap->oap_async_flags |= ASYNC_URGENT;
2908                 loi_list_maint(cli, loi);
2909         }
2910
2911         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2912                         oap->oap_async_flags);
2913         RETURN(0);
2914 }
2915
2916 int osc_teardown_async_page(struct obd_export *exp,
2917                             struct lov_stripe_md *lsm,
2918                             struct lov_oinfo *loi, void *cookie)
2919 {
2920         struct client_obd *cli = &exp->exp_obd->u.cli;
2921         struct loi_oap_pages *lop;
2922         struct osc_async_page *oap;
2923         int rc = 0;
2924         ENTRY;
2925
2926         oap = oap_from_cookie(cookie);
2927         if (IS_ERR(oap))
2928                 RETURN(PTR_ERR(oap));
2929
2930         if (loi == NULL)
2931                 loi = lsm->lsm_oinfo[0];
2932
2933         if (oap->oap_cmd & OBD_BRW_WRITE) {
2934                 lop = &loi->loi_write_lop;
2935         } else {
2936                 lop = &loi->loi_read_lop;
2937         }
2938
2939         client_obd_list_lock(&cli->cl_loi_list_lock);
2940
2941         if (!list_empty(&oap->oap_rpc_item))
2942                 GOTO(out, rc = -EBUSY);
2943
2944         osc_exit_cache(cli, oap, 0);
2945         osc_wake_cache_waiters(cli);
2946
2947         if (!list_empty(&oap->oap_urgent_item)) {
2948                 list_del_init(&oap->oap_urgent_item);
2949                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2950         }
2951         if (!list_empty(&oap->oap_pending_item)) {
2952                 list_del_init(&oap->oap_pending_item);
2953                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2954         }
2955         loi_list_maint(cli, loi);
2956         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2957 out:
2958         client_obd_list_unlock(&cli->cl_loi_list_lock);
2959         RETURN(rc);
2960 }
2961
2962 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2963                                          struct ldlm_enqueue_info *einfo,
2964                                          int flags)
2965 {
2966         void *data = einfo->ei_cbdata;
2967
2968         LASSERT(lock != NULL);
2969         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2970         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2971         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2972         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2973
2974         lock_res_and_lock(lock);
2975         spin_lock(&osc_ast_guard);
2976         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2977         lock->l_ast_data = data;
2978         spin_unlock(&osc_ast_guard);
2979         unlock_res_and_lock(lock);
2980 }
2981
2982 static void osc_set_data_with_check(struct lustre_handle *lockh,
2983                                     struct ldlm_enqueue_info *einfo,
2984                                     int flags)
2985 {
2986         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2987
2988         if (lock != NULL) {
2989                 osc_set_lock_data_with_check(lock, einfo, flags);
2990                 LDLM_LOCK_PUT(lock);
2991         } else
2992                 CERROR("lockh %p, data %p - client evicted?\n",
2993                        lockh, einfo->ei_cbdata);
2994 }
2995
2996 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2997                              ldlm_iterator_t replace, void *data)
2998 {
2999         struct ldlm_res_id res_id;
3000         struct obd_device *obd = class_exp2obd(exp);
3001
3002         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3003         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3004         return 0;
3005 }
3006
3007 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3008                             obd_enqueue_update_f upcall, void *cookie,
3009                             int *flags, int rc)
3010 {
3011         int intent = *flags & LDLM_FL_HAS_INTENT;
3012         ENTRY;
3013
3014         if (intent) {
3015                 /* The request was created before ldlm_cli_enqueue call. */
3016                 if (rc == ELDLM_LOCK_ABORTED) {
3017                         struct ldlm_reply *rep;
3018                         rep = req_capsule_server_get(&req->rq_pill,
3019                                                      &RMF_DLM_REP);
3020
3021                         LASSERT(rep != NULL);
3022                         if (rep->lock_policy_res1)
3023                                 rc = rep->lock_policy_res1;
3024                 }
3025         }
3026
3027         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3028                 *flags |= LDLM_FL_LVB_READY;
3029                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3030                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3031         }
3032
3033         /* Call the update callback. */
3034         rc = (*upcall)(cookie, rc);
3035         RETURN(rc);
3036 }
3037
3038 static int osc_enqueue_interpret(const struct lu_env *env,
3039                                  struct ptlrpc_request *req,
3040                                  struct osc_enqueue_args *aa, int rc)
3041 {
3042         struct ldlm_lock *lock;
3043         struct lustre_handle handle;
3044         __u32 mode;
3045
3046         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3047          * might be freed anytime after lock upcall has been called. */
3048         lustre_handle_copy(&handle, aa->oa_lockh);
3049         mode = aa->oa_ei->ei_mode;
3050
3051         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3052          * be valid. */
3053         lock = ldlm_handle2lock(&handle);
3054
3055         /* Take an additional reference so that a blocking AST that
3056          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3057          * to arrive after an upcall has been executed by
3058          * osc_enqueue_fini(). */
3059         ldlm_lock_addref(&handle, mode);
3060
3061         /* Complete obtaining the lock procedure. */
3062         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3063                                    mode, aa->oa_flags, aa->oa_lvb,
3064                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3065                                    &handle, rc);
3066         /* Complete osc stuff. */
3067         rc = osc_enqueue_fini(req, aa->oa_lvb,
3068                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3069         /* Release the lock for async request. */
3070         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3071                 /*
3072                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3073                  * not already released by
3074                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3075                  */
3076                 ldlm_lock_decref(&handle, mode);
3077
3078         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3079                  aa->oa_lockh, req, aa);
3080         ldlm_lock_decref(&handle, mode);
3081         LDLM_LOCK_PUT(lock);
3082         return rc;
3083 }
3084
3085 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3086                         struct lov_oinfo *loi, int flags,
3087                         struct ost_lvb *lvb, __u32 mode, int rc)
3088 {
3089         if (rc == ELDLM_OK) {
3090                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3091                 __u64 tmp;
3092
3093                 LASSERT(lock != NULL);
3094                 loi->loi_lvb = *lvb;
3095                 tmp = loi->loi_lvb.lvb_size;
3096                 /* Extend KMS up to the end of this lock and no further
3097                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3098                 if (tmp > lock->l_policy_data.l_extent.end)
3099                         tmp = lock->l_policy_data.l_extent.end + 1;
3100                 if (tmp >= loi->loi_kms) {
3101                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3102                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3103                         loi_kms_set(loi, tmp);
3104                 } else {
3105                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3106                                    LPU64"; leaving kms="LPU64", end="LPU64,
3107                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3108                                    lock->l_policy_data.l_extent.end);
3109                 }
3110                 ldlm_lock_allow_match(lock);
3111                 LDLM_LOCK_PUT(lock);
3112         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3113                 loi->loi_lvb = *lvb;
3114                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3115                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3116                 rc = ELDLM_OK;
3117         }
3118 }
3119 EXPORT_SYMBOL(osc_update_enqueue);
3120
3121 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3122
3123 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3124  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3125  * other synchronous requests, however keeping some locks and trying to obtain
3126  * others may take a considerable amount of time in a case of ost failure; and
3127  * when other sync requests do not get released lock from a client, the client
3128  * is excluded from the cluster -- such scenarious make the life difficult, so
3129  * release locks just after they are obtained. */
3130 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3131                      int *flags, ldlm_policy_data_t *policy,
3132                      struct ost_lvb *lvb, int kms_valid,
3133                      obd_enqueue_update_f upcall, void *cookie,
3134                      struct ldlm_enqueue_info *einfo,
3135                      struct lustre_handle *lockh,
3136                      struct ptlrpc_request_set *rqset, int async)
3137 {
3138         struct obd_device *obd = exp->exp_obd;
3139         struct ptlrpc_request *req = NULL;
3140         int intent = *flags & LDLM_FL_HAS_INTENT;
3141         ldlm_mode_t mode;
3142         int rc;
3143         ENTRY;
3144
3145         /* Filesystem lock extents are extended to page boundaries so that
3146          * dealing with the page cache is a little smoother.  */
3147         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3148         policy->l_extent.end |= ~CFS_PAGE_MASK;
3149
3150         /*
3151          * kms is not valid when either object is completely fresh (so that no
3152          * locks are cached), or object was evicted. In the latter case cached
3153          * lock cannot be used, because it would prime inode state with
3154          * potentially stale LVB.
3155          */
3156         if (!kms_valid)
3157                 goto no_match;
3158
3159         /* Next, search for already existing extent locks that will cover us */
3160         /* If we're trying to read, we also search for an existing PW lock.  The
3161          * VFS and page cache already protect us locally, so lots of readers/
3162          * writers can share a single PW lock.
3163          *
3164          * There are problems with conversion deadlocks, so instead of
3165          * converting a read lock to a write lock, we'll just enqueue a new
3166          * one.
3167          *
3168          * At some point we should cancel the read lock instead of making them
3169          * send us a blocking callback, but there are problems with canceling
3170          * locks out from other users right now, too. */
3171         mode = einfo->ei_mode;
3172         if (einfo->ei_mode == LCK_PR)
3173                 mode |= LCK_PW;
3174         mode = ldlm_lock_match(obd->obd_namespace,
3175                                *flags | LDLM_FL_LVB_READY, res_id,
3176                                einfo->ei_type, policy, mode, lockh, 0);
3177         if (mode) {
3178                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3179
3180                 if (matched->l_ast_data == NULL ||
3181                     matched->l_ast_data == einfo->ei_cbdata) {
3182                         /* addref the lock only if not async requests and PW
3183                          * lock is matched whereas we asked for PR. */
3184                         if (!rqset && einfo->ei_mode != mode)
3185                                 ldlm_lock_addref(lockh, LCK_PR);
3186                         osc_set_lock_data_with_check(matched, einfo, *flags);
3187                         if (intent) {
3188                                 /* I would like to be able to ASSERT here that
3189                                  * rss <= kms, but I can't, for reasons which
3190                                  * are explained in lov_enqueue() */
3191                         }
3192
3193                         /* We already have a lock, and it's referenced */
3194                         (*upcall)(cookie, ELDLM_OK);
3195
3196                         /* For async requests, decref the lock. */
3197                         if (einfo->ei_mode != mode)
3198                                 ldlm_lock_decref(lockh, LCK_PW);
3199                         else if (rqset)
3200                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3201                         LDLM_LOCK_PUT(matched);
3202                         RETURN(ELDLM_OK);
3203                 } else
3204                         ldlm_lock_decref(lockh, mode);
3205                 LDLM_LOCK_PUT(matched);
3206         }
3207
3208  no_match:
3209         if (intent) {
3210                 CFS_LIST_HEAD(cancels);
3211                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3212                                            &RQF_LDLM_ENQUEUE_LVB);
3213                 if (req == NULL)
3214                         RETURN(-ENOMEM);
3215
3216                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3217                 if (rc)
3218                         RETURN(rc);
3219
3220                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3221                                      sizeof *lvb);
3222                 ptlrpc_request_set_replen(req);
3223         }
3224
3225         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3226         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3227
3228         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3229                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3230         if (rqset) {
3231                 if (!rc) {
3232                         struct osc_enqueue_args *aa;
3233                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3234                         aa = ptlrpc_req_async_args(req);
3235                         aa->oa_ei = einfo;
3236                         aa->oa_exp = exp;
3237                         aa->oa_flags  = flags;
3238                         aa->oa_upcall = upcall;
3239                         aa->oa_cookie = cookie;
3240                         aa->oa_lvb    = lvb;
3241                         aa->oa_lockh  = lockh;
3242
3243                         req->rq_interpret_reply =
3244                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3245                         if (rqset == PTLRPCD_SET)
3246                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3247                         else
3248                                 ptlrpc_set_add_req(rqset, req);
3249                 } else if (intent) {
3250                         ptlrpc_req_finished(req);
3251                 }
3252                 RETURN(rc);
3253         }
3254
3255         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3256         if (intent)
3257                 ptlrpc_req_finished(req);
3258
3259         RETURN(rc);
3260 }
3261
3262 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3263                        struct ldlm_enqueue_info *einfo,
3264                        struct ptlrpc_request_set *rqset)
3265 {
3266         struct ldlm_res_id res_id;
3267         int rc;
3268         ENTRY;
3269
3270         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3271                            oinfo->oi_md->lsm_object_gr, &res_id);
3272
3273         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3274                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3275                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3276                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3277                               rqset, rqset != NULL);
3278         RETURN(rc);
3279 }
3280
3281 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3282                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3283                    int *flags, void *data, struct lustre_handle *lockh,
3284                    int unref)
3285 {
3286         struct obd_device *obd = exp->exp_obd;
3287         int lflags = *flags;
3288         ldlm_mode_t rc;
3289         ENTRY;
3290
3291         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3292                 RETURN(-EIO);
3293
3294         /* Filesystem lock extents are extended to page boundaries so that
3295          * dealing with the page cache is a little smoother */
3296         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3297         policy->l_extent.end |= ~CFS_PAGE_MASK;
3298
3299         /* Next, search for already existing extent locks that will cover us */
3300         /* If we're trying to read, we also search for an existing PW lock.  The
3301          * VFS and page cache already protect us locally, so lots of readers/
3302          * writers can share a single PW lock. */
3303         rc = mode;
3304         if (mode == LCK_PR)
3305                 rc |= LCK_PW;
3306         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3307                              res_id, type, policy, rc, lockh, unref);
3308         if (rc) {
3309                 if (data != NULL)
3310                         osc_set_data_with_check(lockh, data, lflags);
3311                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3312                         ldlm_lock_addref(lockh, LCK_PR);
3313                         ldlm_lock_decref(lockh, LCK_PW);
3314                 }
3315                 RETURN(rc);
3316         }
3317         RETURN(rc);
3318 }
3319
3320 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3321 {
3322         ENTRY;
3323
3324         if (unlikely(mode == LCK_GROUP))
3325                 ldlm_lock_decref_and_cancel(lockh, mode);
3326         else
3327                 ldlm_lock_decref(lockh, mode);
3328
3329         RETURN(0);
3330 }
3331
3332 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3333                       __u32 mode, struct lustre_handle *lockh)
3334 {
3335         ENTRY;
3336         RETURN(osc_cancel_base(lockh, mode));
3337 }
3338
3339 static int osc_cancel_unused(struct obd_export *exp,
3340                              struct lov_stripe_md *lsm, int flags,
3341                              void *opaque)
3342 {
3343         struct obd_device *obd = class_exp2obd(exp);
3344         struct ldlm_res_id res_id, *resp = NULL;
3345
3346         if (lsm != NULL) {
3347                 resp = osc_build_res_name(lsm->lsm_object_id,
3348                                           lsm->lsm_object_gr, &res_id);
3349         }
3350
3351         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3352 }
3353
3354 static int osc_statfs_interpret(const struct lu_env *env,
3355                                 struct ptlrpc_request *req,
3356                                 struct osc_async_args *aa, int rc)
3357 {
3358         struct obd_statfs *msfs;
3359         ENTRY;
3360
3361         if (rc != 0)
3362                 GOTO(out, rc);
3363
3364         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3365         if (msfs == NULL) {
3366                 GOTO(out, rc = -EPROTO);
3367         }
3368
3369         *aa->aa_oi->oi_osfs = *msfs;
3370 out:
3371         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3372         RETURN(rc);
3373 }
3374
3375 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3376                             __u64 max_age, struct ptlrpc_request_set *rqset)
3377 {
3378         struct ptlrpc_request *req;
3379         struct osc_async_args *aa;
3380         int                    rc;
3381         ENTRY;
3382
3383         /* We could possibly pass max_age in the request (as an absolute
3384          * timestamp or a "seconds.usec ago") so the target can avoid doing
3385          * extra calls into the filesystem if that isn't necessary (e.g.
3386          * during mount that would help a bit).  Having relative timestamps
3387          * is not so great if request processing is slow, while absolute
3388          * timestamps are not ideal because they need time synchronization. */
3389         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3390         if (req == NULL)
3391                 RETURN(-ENOMEM);
3392
3393         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3394         if (rc) {
3395                 ptlrpc_request_free(req);
3396                 RETURN(rc);
3397         }
3398         ptlrpc_request_set_replen(req);
3399         req->rq_request_portal = OST_CREATE_PORTAL;
3400         ptlrpc_at_set_req_timeout(req);
3401
3402         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3403                 /* procfs requests not want stat in wait for avoid deadlock */
3404                 req->rq_no_resend = 1;
3405                 req->rq_no_delay = 1;
3406         }
3407
3408         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3409         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3410         aa = ptlrpc_req_async_args(req);
3411         aa->aa_oi = oinfo;
3412
3413         ptlrpc_set_add_req(rqset, req);
3414         RETURN(0);
3415 }
3416
3417 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3418                       __u64 max_age, __u32 flags)
3419 {
3420         struct obd_statfs     *msfs;
3421         struct ptlrpc_request *req;
3422         struct obd_import     *imp = NULL;
3423         int rc;
3424         ENTRY;
3425
3426         /*Since the request might also come from lprocfs, so we need
3427          *sync this with client_disconnect_export Bug15684*/
3428         down_read(&obd->u.cli.cl_sem);
3429         if (obd->u.cli.cl_import)
3430                 imp = class_import_get(obd->u.cli.cl_import);
3431         up_read(&obd->u.cli.cl_sem);
3432         if (!imp)
3433                 RETURN(-ENODEV);
3434
3435         /* We could possibly pass max_age in the request (as an absolute
3436          * timestamp or a "seconds.usec ago") so the target can avoid doing
3437          * extra calls into the filesystem if that isn't necessary (e.g.
3438          * during mount that would help a bit).  Having relative timestamps
3439          * is not so great if request processing is slow, while absolute
3440          * timestamps are not ideal because they need time synchronization. */
3441         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3442
3443         class_import_put(imp);
3444
3445         if (req == NULL)
3446                 RETURN(-ENOMEM);
3447
3448         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3449         if (rc) {
3450                 ptlrpc_request_free(req);
3451                 RETURN(rc);
3452         }
3453         ptlrpc_request_set_replen(req);
3454         req->rq_request_portal = OST_CREATE_PORTAL;
3455         ptlrpc_at_set_req_timeout(req);
3456
3457         if (flags & OBD_STATFS_NODELAY) {
3458                 /* procfs requests not want stat in wait for avoid deadlock */
3459                 req->rq_no_resend = 1;
3460                 req->rq_no_delay = 1;
3461         }
3462
3463         rc = ptlrpc_queue_wait(req);
3464         if (rc)
3465                 GOTO(out, rc);
3466
3467         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3468         if (msfs == NULL) {
3469                 GOTO(out, rc = -EPROTO);
3470         }
3471
3472         *osfs = *msfs;
3473
3474         EXIT;
3475  out:
3476         ptlrpc_req_finished(req);
3477         return rc;
3478 }
3479
3480 /* Retrieve object striping information.
3481  *
3482  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3483  * the maximum number of OST indices which will fit in the user buffer.
3484  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3485  */
3486 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3487 {
3488         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3489         struct lov_user_md_v3 lum, *lumk;
3490         struct lov_user_ost_data_v1 *lmm_objects;
3491         int rc = 0, lum_size;
3492         ENTRY;
3493
3494         if (!lsm)
3495                 RETURN(-ENODATA);
3496
3497         /* we only need the header part from user space to get lmm_magic and
3498          * lmm_stripe_count, (the header part is common to v1 and v3) */
3499         lum_size = sizeof(struct lov_user_md_v1);
3500         if (copy_from_user(&lum, lump, lum_size))
3501                 RETURN(-EFAULT);
3502
3503         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3504             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3505                 RETURN(-EINVAL);
3506
3507         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3508         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3509         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3510         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3511
3512         /* we can use lov_mds_md_size() to compute lum_size
3513          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3514         if (lum.lmm_stripe_count > 0) {
3515                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3516                 OBD_ALLOC(lumk, lum_size);
3517                 if (!lumk)
3518                         RETURN(-ENOMEM);
3519
3520                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3521                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3522                 else
3523                         lmm_objects = &(lumk->lmm_objects[0]);
3524                 lmm_objects->l_object_id = lsm->lsm_object_id;
3525         } else {
3526                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3527                 lumk = &lum;
3528         }
3529
3530         lumk->lmm_object_id = lsm->lsm_object_id;
3531         lumk->lmm_object_gr = lsm->lsm_object_gr;
3532         lumk->lmm_stripe_count = 1;
3533
3534         if (copy_to_user(lump, lumk, lum_size))
3535                 rc = -EFAULT;
3536
3537         if (lumk != &lum)
3538                 OBD_FREE(lumk, lum_size);
3539
3540         RETURN(rc);
3541 }
3542
3543
3544 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3545                          void *karg, void *uarg)
3546 {
3547         struct obd_device *obd = exp->exp_obd;
3548         struct obd_ioctl_data *data = karg;
3549         int err = 0;
3550         ENTRY;
3551
3552         if (!try_module_get(THIS_MODULE)) {
3553                 CERROR("Can't get module. Is it alive?");
3554                 return -EINVAL;
3555         }
3556         switch (cmd) {
3557         case OBD_IOC_LOV_GET_CONFIG: {
3558                 char *buf;
3559                 struct lov_desc *desc;
3560                 struct obd_uuid uuid;
3561
3562                 buf = NULL;
3563                 len = 0;
3564                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3565                         GOTO(out, err = -EINVAL);
3566
3567                 data = (struct obd_ioctl_data *)buf;
3568
3569                 if (sizeof(*desc) > data->ioc_inllen1) {
3570                         obd_ioctl_freedata(buf, len);
3571                         GOTO(out, err = -EINVAL);
3572                 }
3573
3574                 if (data->ioc_inllen2 < sizeof(uuid)) {
3575                         obd_ioctl_freedata(buf, len);
3576                         GOTO(out, err = -EINVAL);
3577                 }
3578
3579                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3580                 desc->ld_tgt_count = 1;
3581                 desc->ld_active_tgt_count = 1;
3582                 desc->ld_default_stripe_count = 1;
3583                 desc->ld_default_stripe_size = 0;
3584                 desc->ld_default_stripe_offset = 0;
3585                 desc->ld_pattern = 0;
3586                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3587
3588                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3589
3590                 err = copy_to_user((void *)uarg, buf, len);
3591                 if (err)
3592                         err = -EFAULT;
3593                 obd_ioctl_freedata(buf, len);
3594                 GOTO(out, err);
3595         }
3596         case LL_IOC_LOV_SETSTRIPE:
3597                 err = obd_alloc_memmd(exp, karg);
3598                 if (err > 0)
3599                         err = 0;
3600                 GOTO(out, err);
3601         case LL_IOC_LOV_GETSTRIPE:
3602                 err = osc_getstripe(karg, uarg);
3603                 GOTO(out, err);
3604         case OBD_IOC_CLIENT_RECOVER:
3605                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3606                                             data->ioc_inlbuf1);
3607                 if (err > 0)
3608                         err = 0;
3609                 GOTO(out, err);
3610         case IOC_OSC_SET_ACTIVE:
3611                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3612                                                data->ioc_offset);
3613                 GOTO(out, err);
3614         case OBD_IOC_POLL_QUOTACHECK:
3615                 err = lquota_poll_check(quota_interface, exp,
3616                                         (struct if_quotacheck *)karg);
3617                 GOTO(out, err);
3618         case OBD_IOC_PING_TARGET:
3619                 err = ptlrpc_obd_ping(obd);
3620                 GOTO(out, err);
3621         default:
3622                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3623                        cmd, cfs_curproc_comm());
3624                 GOTO(out, err = -ENOTTY);
3625         }
3626 out:
3627         module_put(THIS_MODULE);
3628         return err;
3629 }
3630
3631 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3632                         void *key, __u32 *vallen, void *val,
3633                         struct lov_stripe_md *lsm)
3634 {
3635         ENTRY;
3636         if (!vallen || !val)
3637                 RETURN(-EFAULT);
3638
3639         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3640                 __u32 *stripe = val;
3641                 *vallen = sizeof(*stripe);
3642                 *stripe = 0;
3643                 RETURN(0);
3644         } else if (KEY_IS(KEY_LAST_ID)) {
3645                 struct ptlrpc_request *req;
3646                 obd_id                *reply;
3647                 char                  *tmp;
3648                 int                    rc;
3649
3650                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3651                                            &RQF_OST_GET_INFO_LAST_ID);
3652                 if (req == NULL)
3653                         RETURN(-ENOMEM);
3654
3655                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3656                                      RCL_CLIENT, keylen);
3657                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3658                 if (rc) {
3659                         ptlrpc_request_free(req);
3660                         RETURN(rc);
3661                 }
3662
3663                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3664                 memcpy(tmp, key, keylen);
3665
3666                 ptlrpc_request_set_replen(req);
3667                 rc = ptlrpc_queue_wait(req);
3668                 if (rc)
3669                         GOTO(out, rc);
3670
3671                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3672                 if (reply == NULL)
3673                         GOTO(out, rc = -EPROTO);
3674
3675                 *((obd_id *)val) = *reply;
3676         out:
3677                 ptlrpc_req_finished(req);
3678                 RETURN(rc);
3679         } else if (KEY_IS(KEY_FIEMAP)) {
3680                 struct ptlrpc_request *req;
3681                 struct ll_user_fiemap *reply;
3682                 char *tmp;
3683                 int rc;
3684
3685                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3686                                            &RQF_OST_GET_INFO_FIEMAP);
3687                 if (req == NULL)
3688                         RETURN(-ENOMEM);
3689
3690                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3691                                      RCL_CLIENT, keylen);
3692                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3693                                      RCL_CLIENT, *vallen);
3694                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3695                                      RCL_SERVER, *vallen);
3696
3697                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3698                 if (rc) {
3699                         ptlrpc_request_free(req);
3700                         RETURN(rc);
3701                 }
3702
3703                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3704                 memcpy(tmp, key, keylen);
3705                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3706                 memcpy(tmp, val, *vallen);
3707
3708                 ptlrpc_request_set_replen(req);
3709                 rc = ptlrpc_queue_wait(req);
3710                 if (rc)
3711                         GOTO(out1, rc);
3712
3713                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3714                 if (reply == NULL)
3715                         GOTO(out1, rc = -EPROTO);
3716
3717                 memcpy(val, reply, *vallen);
3718         out1:
3719                 ptlrpc_req_finished(req);
3720
3721                 RETURN(rc);
3722         }
3723
3724         RETURN(-EINVAL);
3725 }
3726
3727 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3728                                           struct ptlrpc_request *req,
3729                                           void *aa, int rc)
3730 {
3731         struct llog_ctxt *ctxt;
3732         struct obd_import *imp = req->rq_import;
3733         ENTRY;
3734
3735         if (rc != 0)
3736                 RETURN(rc);
3737
3738         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3739         if (ctxt) {
3740                 if (rc == 0)
3741                         rc = llog_initiator_connect(ctxt);
3742                 else
3743                         CERROR("cannot establish connection for "
3744                                "ctxt %p: %d\n", ctxt, rc);
3745         }
3746
3747         llog_ctxt_put(ctxt);
3748         spin_lock(&imp->imp_lock);
3749         imp->imp_server_timeout = 1;
3750         imp->imp_pingable = 1;
3751         spin_unlock(&imp->imp_lock);
3752         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3753
3754         RETURN(rc);
3755 }
3756
3757 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3758                               void *key, obd_count vallen, void *val,
3759                               struct ptlrpc_request_set *set)
3760 {
3761         struct ptlrpc_request *req;
3762         struct obd_device     *obd = exp->exp_obd;
3763         struct obd_import     *imp = class_exp2cliimp(exp);
3764         char                  *tmp;
3765         int                    rc;
3766         ENTRY;
3767
3768         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3769
3770         if (KEY_IS(KEY_NEXT_ID)) {
3771                 if (vallen != sizeof(obd_id))
3772                         RETURN(-ERANGE);
3773                 if (val == NULL)
3774                         RETURN(-EINVAL);
3775                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3776                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3777                        exp->exp_obd->obd_name,
3778                        obd->u.cli.cl_oscc.oscc_next_id);
3779
3780                 RETURN(0);
3781         }
3782
3783         if (KEY_IS(KEY_UNLINKED)) {
3784                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3785                 spin_lock(&oscc->oscc_lock);
3786                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3787                 spin_unlock(&oscc->oscc_lock);
3788                 RETURN(0);
3789         }
3790
3791         if (KEY_IS(KEY_INIT_RECOV)) {
3792                 if (vallen != sizeof(int))
3793                         RETURN(-EINVAL);
3794                 spin_lock(&imp->imp_lock);
3795                 imp->imp_initial_recov = *(int *)val;
3796                 spin_unlock(&imp->imp_lock);
3797                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3798                        exp->exp_obd->obd_name,
3799                        imp->imp_initial_recov);
3800                 RETURN(0);
3801         }
3802
3803         if (KEY_IS(KEY_CHECKSUM)) {
3804                 if (vallen != sizeof(int))
3805                         RETURN(-EINVAL);
3806                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3807                 RETURN(0);
3808         }
3809
3810         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3811                 sptlrpc_conf_client_adapt(obd);
3812                 RETURN(0);
3813         }
3814
3815         if (KEY_IS(KEY_FLUSH_CTX)) {
3816                 sptlrpc_import_flush_my_ctx(imp);
3817                 RETURN(0);
3818         }
3819
3820         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3821                 RETURN(-EINVAL);
3822
3823         /* We pass all other commands directly to OST. Since nobody calls osc
3824            methods directly and everybody is supposed to go through LOV, we
3825            assume lov checked invalid values for us.
3826            The only recognised values so far are evict_by_nid and mds_conn.
3827            Even if something bad goes through, we'd get a -EINVAL from OST
3828            anyway. */
3829
3830         if (KEY_IS(KEY_GRANT_SHRINK))  
3831                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
3832         else 
3833                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3834         
3835         if (req == NULL)
3836                 RETURN(-ENOMEM);
3837
3838         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3839                              RCL_CLIENT, keylen);
3840         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3841                              RCL_CLIENT, vallen);
3842         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3843         if (rc) {
3844                 ptlrpc_request_free(req);
3845                 RETURN(rc);
3846         }
3847
3848         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3849         memcpy(tmp, key, keylen);
3850         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3851         memcpy(tmp, val, vallen);
3852
3853         if (KEY_IS(KEY_MDS_CONN)) {
3854                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3855
3856                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3857                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3858                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3859                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3860         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3861                 struct osc_grant_args *aa;
3862                 struct obdo *oa;
3863
3864                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3865                 aa = ptlrpc_req_async_args(req);
3866                 OBD_ALLOC_PTR(oa);
3867                 if (!oa) {
3868                         ptlrpc_req_finished(req);
3869                         RETURN(-ENOMEM);
3870                 }
3871                 *oa = ((struct ost_body *)val)->oa;
3872                 aa->aa_oa = oa;
3873                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3874         }
3875         
3876         ptlrpc_request_set_replen(req);
3877         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3878                 LASSERT(set != NULL);
3879                 ptlrpc_set_add_req(set, req);
3880                 ptlrpc_check_set(NULL, set);
3881         } else 
3882                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3883         
3884         RETURN(0);
3885 }
3886
3887
3888 static struct llog_operations osc_size_repl_logops = {
3889         lop_cancel: llog_obd_repl_cancel
3890 };
3891
3892 static struct llog_operations osc_mds_ost_orig_logops;
3893 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3894                          struct obd_device *tgt, int count,
3895                          struct llog_catid *catid, struct obd_uuid *uuid)
3896 {
3897         int rc;
3898         ENTRY;
3899
3900         LASSERT(olg == &obd->obd_olg);
3901         spin_lock(&obd->obd_dev_lock);
3902         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3903                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3904                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3905                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3906                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3907                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3908         }
3909         spin_unlock(&obd->obd_dev_lock);
3910
3911         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3912                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3913         if (rc) {
3914                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3915                 GOTO(out, rc);
3916         }
3917
3918         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3919                         NULL, &osc_size_repl_logops);
3920         if (rc) {
3921                 struct llog_ctxt *ctxt =
3922                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3923                 if (ctxt)
3924                         llog_cleanup(ctxt);
3925                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3926         }
3927         GOTO(out, rc);
3928 out:
3929         if (rc) {
3930                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3931                        obd->obd_name, tgt->obd_name, count, catid, rc);
3932                 CERROR("logid "LPX64":0x%x\n",
3933                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3934         }
3935         return rc;
3936 }
3937
3938 static int osc_llog_finish(struct obd_device *obd, int count)
3939 {
3940         struct llog_ctxt *ctxt;
3941         int rc = 0, rc2 = 0;
3942         ENTRY;
3943
3944         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3945         if (ctxt)
3946                 rc = llog_cleanup(ctxt);
3947
3948         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3949         if (ctxt)
3950                 rc2 = llog_cleanup(ctxt);
3951         if (!rc)
3952                 rc = rc2;
3953
3954         RETURN(rc);
3955 }
3956
3957 static int osc_reconnect(const struct lu_env *env,
3958                          struct obd_export *exp, struct obd_device *obd,
3959                          struct obd_uuid *cluuid,
3960                          struct obd_connect_data *data,
3961                          void *localdata)
3962 {
3963         struct client_obd *cli = &obd->u.cli;
3964
3965         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3966                 long lost_grant;
3967
3968                 client_obd_list_lock(&cli->cl_loi_list_lock);
3969                 data->ocd_grant = cli->cl_avail_grant ?:
3970                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3971                 lost_grant = cli->cl_lost_grant;
3972                 cli->cl_lost_grant = 0;
3973                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3974
3975                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3976                        "cl_lost_grant: %ld\n", data->ocd_grant,
3977                        cli->cl_avail_grant, lost_grant);
3978                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3979                        " ocd_grant: %d\n", data->ocd_connect_flags,
3980                        data->ocd_version, data->ocd_grant);
3981         }
3982
3983         RETURN(0);
3984 }
3985
3986 static int osc_disconnect(struct obd_export *exp)
3987 {
3988         struct obd_device *obd = class_exp2obd(exp);
3989         struct llog_ctxt  *ctxt;
3990         int rc;
3991
3992         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3993         if (ctxt) {
3994                 if (obd->u.cli.cl_conn_count == 1) {
3995                         /* Flush any remaining cancel messages out to the
3996                          * target */
3997                         llog_sync(ctxt, exp);
3998                 }
3999                 llog_ctxt_put(ctxt);
4000         } else {
4001                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4002                        obd);
4003         }
4004
4005         osc_del_shrink_grant(&obd->u.cli);
4006         rc = client_disconnect_export(exp);
4007         return rc;
4008 }
4009
4010 static int osc_import_event(struct obd_device *obd,
4011                             struct obd_import *imp,
4012                             enum obd_import_event event)
4013 {
4014         struct client_obd *cli;
4015         int rc = 0;
4016
4017         ENTRY;
4018         LASSERT(imp->imp_obd == obd);
4019
4020         switch (event) {
4021         case IMP_EVENT_DISCON: {
4022                 /* Only do this on the MDS OSC's */
4023                 if (imp->imp_server_timeout) {
4024                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4025
4026                         spin_lock(&oscc->oscc_lock);
4027                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4028                         spin_unlock(&oscc->oscc_lock);
4029                 }
4030                 cli = &obd->u.cli;
4031                 client_obd_list_lock(&cli->cl_loi_list_lock);
4032                 cli->cl_avail_grant = 0;
4033                 cli->cl_lost_grant = 0;
4034                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4035                 break;
4036         }
4037         case IMP_EVENT_INACTIVE: {
4038                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4039                 break;
4040         }
4041         case IMP_EVENT_INVALIDATE: {
4042                 struct ldlm_namespace *ns = obd->obd_namespace;
4043                 struct lu_env         *env;
4044                 int                    refcheck;
4045
4046                 env = cl_env_get(&refcheck);
4047                 if (!IS_ERR(env)) {
4048                         /* Reset grants */
4049                         cli = &obd->u.cli;
4050                         client_obd_list_lock(&cli->cl_loi_list_lock);
4051                         /* all pages go to failing rpcs due to the invalid
4052                          * import */
4053                         osc_check_rpcs(env, cli);
4054                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4055
4056                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4057                         cl_env_put(env, &refcheck);
4058                 } else
4059                         rc = PTR_ERR(env);
4060                 break;
4061         }
4062         case IMP_EVENT_ACTIVE: {
4063                 /* Only do this on the MDS OSC's */
4064                 if (imp->imp_server_timeout) {
4065                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4066
4067                         spin_lock(&oscc->oscc_lock);
4068                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4069                         spin_unlock(&oscc->oscc_lock);
4070                 }
4071                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4072                 break;
4073         }
4074         case IMP_EVENT_OCD: {
4075                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4076
4077                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4078                         osc_init_grant(&obd->u.cli, ocd);
4079
4080                 /* See bug 7198 */
4081                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4082                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4083
4084                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4085                 break;
4086         }
4087         default:
4088                 CERROR("Unknown import event %d\n", event);
4089                 LBUG();
4090         }
4091         RETURN(rc);
4092 }
4093
4094 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4095 {
4096         int rc;
4097         ENTRY;
4098
4099         ENTRY;
4100         rc = ptlrpcd_addref();
4101         if (rc)
4102                 RETURN(rc);
4103
4104         rc = client_obd_setup(obd, lcfg);
4105         if (rc) {
4106                 ptlrpcd_decref();
4107         } else {
4108                 struct lprocfs_static_vars lvars = { 0 };
4109                 struct client_obd *cli = &obd->u.cli;
4110
4111                 lprocfs_osc_init_vars(&lvars);
4112                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4113                         lproc_osc_attach_seqstat(obd);
4114                         sptlrpc_lprocfs_cliobd_attach(obd);
4115                         ptlrpc_lprocfs_register_obd(obd);
4116                 }
4117
4118                 oscc_init(obd);
4119                 /* We need to allocate a few requests more, because
4120                    brw_interpret tries to create new requests before freeing
4121                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4122                    reserved, but I afraid that might be too much wasted RAM
4123                    in fact, so 2 is just my guess and still should work. */
4124                 cli->cl_import->imp_rq_pool =
4125                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4126                                             OST_MAXREQSIZE,
4127                                             ptlrpc_add_rqs_to_pool);
4128                 
4129                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4130                 sema_init(&cli->cl_grant_sem, 1);
4131         }
4132
4133         RETURN(rc);
4134 }
4135
4136 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4137 {
4138         int rc = 0;
4139         ENTRY;
4140
4141         switch (stage) {
4142         case OBD_CLEANUP_EARLY: {
4143                 struct obd_import *imp;
4144                 imp = obd->u.cli.cl_import;
4145                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4146                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4147                 ptlrpc_deactivate_import(imp);
4148                 spin_lock(&imp->imp_lock);
4149                 imp->imp_pingable = 0;
4150                 spin_unlock(&imp->imp_lock);
4151                 break;
4152         }
4153         case OBD_CLEANUP_EXPORTS: {
4154                 /* If we set up but never connected, the
4155                    client import will not have been cleaned. */
4156                 if (obd->u.cli.cl_import) {
4157                         struct obd_import *imp;
4158                         down_write(&obd->u.cli.cl_sem);
4159                         imp = obd->u.cli.cl_import;
4160                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4161                                obd->obd_name);
4162                         ptlrpc_invalidate_import(imp);
4163                         if (imp->imp_rq_pool) {
4164                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4165                                 imp->imp_rq_pool = NULL;
4166                         }
4167                         class_destroy_import(imp);
4168                         up_write(&obd->u.cli.cl_sem);
4169                         obd->u.cli.cl_import = NULL;
4170                 }
4171                 rc = obd_llog_finish(obd, 0);
4172                 if (rc != 0)
4173                         CERROR("failed to cleanup llogging subsystems\n");
4174                 break;
4175                 }
4176         }
4177         RETURN(rc);
4178 }
4179
4180 int osc_cleanup(struct obd_device *obd)
4181 {
4182         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4183         int rc;
4184
4185         ENTRY;
4186         ptlrpc_lprocfs_unregister_obd(obd);
4187         lprocfs_obd_cleanup(obd);
4188
4189         spin_lock(&oscc->oscc_lock);
4190         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4191         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4192         spin_unlock(&oscc->oscc_lock);
4193
4194         /* free memory of osc quota cache */
4195         lquota_cleanup(quota_interface, obd);
4196
4197         rc = client_obd_cleanup(obd);
4198
4199         ptlrpcd_decref();
4200         RETURN(rc);
4201 }
4202
4203 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4204 {
4205         struct lprocfs_static_vars lvars = { 0 };
4206         int rc = 0;
4207
4208         lprocfs_osc_init_vars(&lvars);
4209
4210         switch (lcfg->lcfg_command) {
4211         default:
4212                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4213                                               lcfg, obd);
4214                 if (rc > 0)
4215                         rc = 0;
4216                 break;
4217         }
4218
4219         return(rc);
4220 }
4221
4222 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4223 {
4224         return osc_process_config_base(obd, buf);
4225 }
4226
4227 struct obd_ops osc_obd_ops = {
4228         .o_owner                = THIS_MODULE,
4229         .o_setup                = osc_setup,
4230         .o_precleanup           = osc_precleanup,
4231         .o_cleanup              = osc_cleanup,
4232         .o_add_conn             = client_import_add_conn,
4233         .o_del_conn             = client_import_del_conn,
4234         .o_connect              = client_connect_import,
4235         .o_reconnect            = osc_reconnect,
4236         .o_disconnect           = osc_disconnect,
4237         .o_statfs               = osc_statfs,
4238         .o_statfs_async         = osc_statfs_async,
4239         .o_packmd               = osc_packmd,
4240         .o_unpackmd             = osc_unpackmd,
4241         .o_precreate            = osc_precreate,
4242         .o_create               = osc_create,
4243         .o_destroy              = osc_destroy,
4244         .o_getattr              = osc_getattr,
4245         .o_getattr_async        = osc_getattr_async,
4246         .o_setattr              = osc_setattr,
4247         .o_setattr_async        = osc_setattr_async,
4248         .o_brw                  = osc_brw,
4249         .o_punch                = osc_punch,
4250         .o_sync                 = osc_sync,
4251         .o_enqueue              = osc_enqueue,
4252         .o_change_cbdata        = osc_change_cbdata,
4253         .o_cancel               = osc_cancel,
4254         .o_cancel_unused        = osc_cancel_unused,
4255         .o_iocontrol            = osc_iocontrol,
4256         .o_get_info             = osc_get_info,
4257         .o_set_info_async       = osc_set_info_async,
4258         .o_import_event         = osc_import_event,
4259         .o_llog_init            = osc_llog_init,
4260         .o_llog_finish          = osc_llog_finish,
4261         .o_process_config       = osc_process_config,
4262 };
4263
4264 extern struct lu_kmem_descr  osc_caches[];
4265 extern spinlock_t            osc_ast_guard;
4266 extern struct lock_class_key osc_ast_guard_class;
4267
4268 int __init osc_init(void)
4269 {
4270         struct lprocfs_static_vars lvars = { 0 };
4271         int rc;
4272         ENTRY;
4273
4274         /* print an address of _any_ initialized kernel symbol from this
4275          * module, to allow debugging with gdb that doesn't support data
4276          * symbols from modules.*/
4277         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4278
4279         rc = lu_kmem_init(osc_caches);
4280
4281         lprocfs_osc_init_vars(&lvars);
4282
4283         request_module("lquota");
4284         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4285         lquota_init(quota_interface);
4286         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4287
4288         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4289                                  LUSTRE_OSC_NAME, &osc_device_type);
4290         if (rc) {
4291                 if (quota_interface)
4292                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4293                 lu_kmem_fini(osc_caches);
4294                 RETURN(rc);
4295         }
4296
4297         spin_lock_init(&osc_ast_guard);
4298         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4299
4300         RETURN(rc);
4301 }
4302
4303 #ifdef __KERNEL__
4304 static void /*__exit*/ osc_exit(void)
4305 {
4306         lu_device_type_fini(&osc_device_type);
4307
4308         lquota_exit(quota_interface);
4309         if (quota_interface)
4310                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4311
4312         class_unregister_type(LUSTRE_OSC_NAME);
4313         lu_kmem_fini(osc_caches);
4314 }
4315
4316 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4317 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4318 MODULE_LICENSE("GPL");
4319
4320 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4321 #endif