Whamcloud - gitweb
9e34a2c4a6ad49c2de29317abc1efd38ee0a9d68
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         int time = GRANT_SHRINK_INTERVAL;
804         cli->cl_next_shrink_grant = cfs_time_shift(time);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
833                 EXIT;
834                 return;
835         }
836
837         pga->flag &= ~OBD_BRW_FROM_GRANT;
838         atomic_dec(&obd_dirty_pages);
839         cli->cl_dirty -= CFS_PAGE_SIZE;
840         if (pga->flag & OBD_BRW_NOCACHE) {
841                 pga->flag &= ~OBD_BRW_NOCACHE;
842                 atomic_dec(&obd_dirty_transit_pages);
843                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
844         }
845         if (!sent) {
846                 cli->cl_lost_grant += CFS_PAGE_SIZE;
847                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850                 /* For short writes we shouldn't count parts of pages that
851                  * span a whole block on the OST side, or our accounting goes
852                  * wrong.  Should match the code in filter_grant_check. */
853                 int offset = pga->off & ~CFS_PAGE_MASK;
854                 int count = pga->count + (offset & (blocksize - 1));
855                 int end = (offset + pga->count) & (blocksize - 1);
856                 if (end)
857                         count += blocksize - end;
858
859                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862                        cli->cl_avail_grant, cli->cl_dirty);
863         }
864
865         EXIT;
866 }
867
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
869 {
870         return cli->cl_r_in_flight + cli->cl_w_in_flight;
871 }
872
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
875 {
876         struct list_head *l, *tmp;
877         struct osc_cache_waiter *ocw;
878
879         ENTRY;
880         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881                 /* if we can't dirty more, we must wait until some is written */
882                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885                                "osc max %ld, sys max %d\n", cli->cl_dirty,
886                                cli->cl_dirty_max, obd_max_dirty_pages);
887                         return;
888                 }
889
890                 /* if still dirty cache but no grant wait for pending RPCs that
891                  * may yet return us some grant before doing sync writes */
892                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894                                cli->cl_w_in_flight);
895                         return;
896                 }
897
898                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899                 list_del_init(&ocw->ocw_entry);
900                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         /* no more RPCs in flight to return grant, do sync IO */
902                         ocw->ocw_rc = -EDQUOT;
903                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
904                 } else {
905                         osc_consume_write_grant(cli,
906                                                 &ocw->ocw_oap->oap_brw_page);
907                 }
908
909                 cfs_waitq_signal(&ocw->ocw_waitq);
910         }
911
912         EXIT;
913 }
914
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919         if (body->oa.o_valid & OBD_MD_FLGRANT)
920                 cli->cl_avail_grant += body->oa.o_grant;
921         /* waiters are woken in brw_interpret */
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936         
937         if (rc != 0) {
938                 client_obd_list_lock(&cli->cl_loi_list_lock);
939                 cli->cl_avail_grant += oa->o_grant;
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 GOTO(out, rc);
942         }
943
944         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
945         LASSERT(body);
946         osc_update_grant(cli, body);
947 out:
948         OBD_FREE_PTR(oa);
949         return rc;        
950 }
951
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
953 {
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         oa->o_grant = cli->cl_avail_grant / 4;
956         cli->cl_avail_grant -= oa->o_grant; 
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         oa->o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960 }
961
962 static int osc_shrink_grant(struct client_obd *cli)
963 {
964         int    rc = 0;
965         struct ost_body     *body;
966         ENTRY;
967
968         OBD_ALLOC_PTR(body);
969         if (!body)
970                 RETURN(-ENOMEM);
971
972         osc_announce_cached(cli, &body->oa, 0);
973         osc_shrink_grant_local(cli, &body->oa);
974         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976                                 sizeof(*body), body, NULL);
977         if (rc) {
978                 client_obd_list_lock(&cli->cl_loi_list_lock);
979                 cli->cl_avail_grant += body->oa.o_grant;
980                 client_obd_list_unlock(&cli->cl_loi_list_lock);
981         }
982         if (body)
983                OBD_FREE_PTR(body);
984         RETURN(rc);
985 }
986
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
989 {
990         cfs_time_t time = cfs_time_current();
991         cfs_time_t next_shrink = client->cl_next_shrink_grant;
992         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
995                         return 1;
996                 else
997                         osc_update_next_shrink(client);
998         }
999         return 0;
1000 }
1001
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1003 {
1004         struct client_obd *client;
1005
1006         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007                 if (osc_should_shrink_grant(client))
1008                         osc_shrink_grant(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_add_shrink_grant(struct client_obd *client)
1014 {
1015         int rc;
1016
1017         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
1018                                          TIMEOUT_GRANT,
1019                                          osc_grant_shrink_grant_cb, NULL,
1020                                          &client->cl_grant_shrink_list);
1021         if (rc) {
1022                 CERROR("add grant client %s error %d\n", 
1023                         client->cl_import->imp_obd->obd_name, rc);
1024                 return rc;
1025         }
1026         CDEBUG(D_CACHE, "add grant client %s \n", 
1027                client->cl_import->imp_obd->obd_name);
1028         osc_update_next_shrink(client);
1029         return 0; 
1030 }
1031
1032 static int osc_del_shrink_grant(struct client_obd *client)
1033 {
1034         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1035 }
1036
1037 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1038 {
1039         client_obd_list_lock(&cli->cl_loi_list_lock);
1040         cli->cl_avail_grant = ocd->ocd_grant;
1041         client_obd_list_unlock(&cli->cl_loi_list_lock);
1042
1043         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1044             list_empty(&cli->cl_grant_shrink_list))
1045                 osc_add_shrink_grant(cli);
1046
1047         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1048                cli->cl_avail_grant, cli->cl_lost_grant);
1049         LASSERT(cli->cl_avail_grant >= 0);
1050 }
1051
1052 /* We assume that the reason this OSC got a short read is because it read
1053  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1054  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1055  * this stripe never got written at or beyond this stripe offset yet. */
1056 static void handle_short_read(int nob_read, obd_count page_count,
1057                               struct brw_page **pga)
1058 {
1059         char *ptr;
1060         int i = 0;
1061
1062         /* skip bytes read OK */
1063         while (nob_read > 0) {
1064                 LASSERT (page_count > 0);
1065
1066                 if (pga[i]->count > nob_read) {
1067                         /* EOF inside this page */
1068                         ptr = cfs_kmap(pga[i]->pg) +
1069                                 (pga[i]->off & ~CFS_PAGE_MASK);
1070                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1071                         cfs_kunmap(pga[i]->pg);
1072                         page_count--;
1073                         i++;
1074                         break;
1075                 }
1076
1077                 nob_read -= pga[i]->count;
1078                 page_count--;
1079                 i++;
1080         }
1081
1082         /* zero remaining pages */
1083         while (page_count-- > 0) {
1084                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1085                 memset(ptr, 0, pga[i]->count);
1086                 cfs_kunmap(pga[i]->pg);
1087                 i++;
1088         }
1089 }
1090
1091 static int check_write_rcs(struct ptlrpc_request *req,
1092                            int requested_nob, int niocount,
1093                            obd_count page_count, struct brw_page **pga)
1094 {
1095         int    *remote_rcs, i;
1096
1097         /* return error if any niobuf was in error */
1098         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1099                                         sizeof(*remote_rcs) * niocount, NULL);
1100         if (remote_rcs == NULL) {
1101                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1102                 return(-EPROTO);
1103         }
1104         if (lustre_msg_swabbed(req->rq_repmsg))
1105                 for (i = 0; i < niocount; i++)
1106                         __swab32s(&remote_rcs[i]);
1107
1108         for (i = 0; i < niocount; i++) {
1109                 if (remote_rcs[i] < 0)
1110                         return(remote_rcs[i]);
1111
1112                 if (remote_rcs[i] != 0) {
1113                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1114                                 i, remote_rcs[i], req);
1115                         return(-EPROTO);
1116                 }
1117         }
1118
1119         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1120                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1121                        req->rq_bulk->bd_nob_transferred, requested_nob);
1122                 return(-EPROTO);
1123         }
1124
1125         return (0);
1126 }
1127
1128 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1129 {
1130         if (p1->flag != p2->flag) {
1131                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1132                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1133
1134                 /* warn if we try to combine flags that we don't know to be
1135                  * safe to combine */
1136                 if ((p1->flag & mask) != (p2->flag & mask))
1137                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1138                                "same brw?\n", p1->flag, p2->flag);
1139                 return 0;
1140         }
1141
1142         return (p1->off + p1->count == p2->off);
1143 }
1144
1145 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1146                                    struct brw_page **pga, int opc,
1147                                    cksum_type_t cksum_type)
1148 {
1149         __u32 cksum;
1150         int i = 0;
1151
1152         LASSERT (pg_count > 0);
1153         cksum = init_checksum(cksum_type);
1154         while (nob > 0 && pg_count > 0) {
1155                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1156                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1157                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1158
1159                 /* corrupt the data before we compute the checksum, to
1160                  * simulate an OST->client data error */
1161                 if (i == 0 && opc == OST_READ &&
1162                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1163                         memcpy(ptr + off, "bad1", min(4, nob));
1164                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1165                 cfs_kunmap(pga[i]->pg);
1166                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1167                                off, cksum);
1168
1169                 nob -= pga[i]->count;
1170                 pg_count--;
1171                 i++;
1172         }
1173         /* For sending we only compute the wrong checksum instead
1174          * of corrupting the data so it is still correct on a redo */
1175         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1176                 cksum++;
1177
1178         return cksum;
1179 }
1180
1181 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1182                                 struct lov_stripe_md *lsm, obd_count page_count,
1183                                 struct brw_page **pga,
1184                                 struct ptlrpc_request **reqp,
1185                                 struct obd_capa *ocapa, int reserve)
1186 {
1187         struct ptlrpc_request   *req;
1188         struct ptlrpc_bulk_desc *desc;
1189         struct ost_body         *body;
1190         struct obd_ioobj        *ioobj;
1191         struct niobuf_remote    *niobuf;
1192         int niocount, i, requested_nob, opc, rc;
1193         struct osc_brw_async_args *aa;
1194         struct req_capsule      *pill;
1195         struct brw_page *pg_prev;
1196
1197         ENTRY;
1198         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1199                 RETURN(-ENOMEM); /* Recoverable */
1200         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1201                 RETURN(-EINVAL); /* Fatal */
1202
1203         if ((cmd & OBD_BRW_WRITE) != 0) {
1204                 opc = OST_WRITE;
1205                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1206                                                 cli->cl_import->imp_rq_pool,
1207                                                 &RQF_OST_BRW);
1208         } else {
1209                 opc = OST_READ;
1210                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1211         }
1212         if (req == NULL)
1213                 RETURN(-ENOMEM);
1214
1215         for (niocount = i = 1; i < page_count; i++) {
1216                 if (!can_merge_pages(pga[i - 1], pga[i]))
1217                         niocount++;
1218         }
1219
1220         pill = &req->rq_pill;
1221         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1222                              niocount * sizeof(*niobuf));
1223         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1224
1225         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1226         if (rc) {
1227                 ptlrpc_request_free(req);
1228                 RETURN(rc);
1229         }
1230         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1231         ptlrpc_at_set_req_timeout(req);
1232
1233         if (opc == OST_WRITE)
1234                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1235                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1236         else
1237                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1238                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1239
1240         if (desc == NULL)
1241                 GOTO(out, rc = -ENOMEM);
1242         /* NB request now owns desc and will free it when it gets freed */
1243
1244         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1245         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1246         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1247         LASSERT(body && ioobj && niobuf);
1248
1249         body->oa = *oa;
1250
1251         obdo_to_ioobj(oa, ioobj);
1252         ioobj->ioo_bufcnt = niocount;
1253         osc_pack_capa(req, body, ocapa);
1254         LASSERT (page_count > 0);
1255         pg_prev = pga[0];
1256         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1257                 struct brw_page *pg = pga[i];
1258
1259                 LASSERT(pg->count > 0);
1260                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1261                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1262                          pg->off, pg->count);
1263 #ifdef __linux__
1264                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1265                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1266                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1267                          i, page_count,
1268                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1269                          pg_prev->pg, page_private(pg_prev->pg),
1270                          pg_prev->pg->index, pg_prev->off);
1271 #else
1272                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1273                          "i %d p_c %u\n", i, page_count);
1274 #endif
1275                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1276                         (pg->flag & OBD_BRW_SRVLOCK));
1277
1278                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1279                                       pg->count);
1280                 requested_nob += pg->count;
1281
1282                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1283                         niobuf--;
1284                         niobuf->len += pg->count;
1285                 } else {
1286                         niobuf->offset = pg->off;
1287                         niobuf->len    = pg->count;
1288                         niobuf->flags  = pg->flag;
1289                 }
1290                 pg_prev = pg;
1291         }
1292
1293         LASSERTF((void *)(niobuf - niocount) ==
1294                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1295                                niocount * sizeof(*niobuf)),
1296                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1297                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1298                 (void *)(niobuf - niocount));
1299
1300         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1301         if (osc_should_shrink_grant(cli))
1302                 osc_shrink_grant_local(cli, &body->oa); 
1303
1304         /* size[REQ_REC_OFF] still sizeof (*body) */
1305         if (opc == OST_WRITE) {
1306                 if (unlikely(cli->cl_checksum) &&
1307                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1308                         /* store cl_cksum_type in a local variable since
1309                          * it can be changed via lprocfs */
1310                         cksum_type_t cksum_type = cli->cl_cksum_type;
1311
1312                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1313                                 oa->o_flags = body->oa.o_flags = 0;
1314                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1315                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1316                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1317                                                              page_count, pga,
1318                                                              OST_WRITE,
1319                                                              cksum_type);
1320                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1321                                body->oa.o_cksum);
1322                         /* save this in 'oa', too, for later checking */
1323                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1324                         oa->o_flags |= cksum_type_pack(cksum_type);
1325                 } else {
1326                         /* clear out the checksum flag, in case this is a
1327                          * resend but cl_checksum is no longer set. b=11238 */
1328                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1329                 }
1330                 oa->o_cksum = body->oa.o_cksum;
1331                 /* 1 RC per niobuf */
1332                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1333                                      sizeof(__u32) * niocount);
1334         } else {
1335                 if (unlikely(cli->cl_checksum) &&
1336                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1337                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1338                                 body->oa.o_flags = 0;
1339                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1340                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1341                 }
1342                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1343                 /* 1 RC for the whole I/O */
1344         }
1345         ptlrpc_request_set_replen(req);
1346
1347         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1348         aa = ptlrpc_req_async_args(req);
1349         aa->aa_oa = oa;
1350         aa->aa_requested_nob = requested_nob;
1351         aa->aa_nio_count = niocount;
1352         aa->aa_page_count = page_count;
1353         aa->aa_resends = 0;
1354         aa->aa_ppga = pga;
1355         aa->aa_cli = cli;
1356         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1357         if (ocapa && reserve)
1358                 aa->aa_ocapa = capa_get(ocapa);
1359
1360         *reqp = req;
1361         RETURN(0);
1362
1363  out:
1364         ptlrpc_req_finished(req);
1365         RETURN(rc);
1366 }
1367
1368 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1369                                 __u32 client_cksum, __u32 server_cksum, int nob,
1370                                 obd_count page_count, struct brw_page **pga,
1371                                 cksum_type_t client_cksum_type)
1372 {
1373         __u32 new_cksum;
1374         char *msg;
1375         cksum_type_t cksum_type;
1376
1377         if (server_cksum == client_cksum) {
1378                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1379                 return 0;
1380         }
1381
1382         if (oa->o_valid & OBD_MD_FLFLAGS)
1383                 cksum_type = cksum_type_unpack(oa->o_flags);
1384         else
1385                 cksum_type = OBD_CKSUM_CRC32;
1386
1387         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1388                                       cksum_type);
1389
1390         if (cksum_type != client_cksum_type)
1391                 msg = "the server did not use the checksum type specified in "
1392                       "the original request - likely a protocol problem";
1393         else if (new_cksum == server_cksum)
1394                 msg = "changed on the client after we checksummed it - "
1395                       "likely false positive due to mmap IO (bug 11742)";
1396         else if (new_cksum == client_cksum)
1397                 msg = "changed in transit before arrival at OST";
1398         else
1399                 msg = "changed in transit AND doesn't match the original - "
1400                       "likely false positive due to mmap IO (bug 11742)";
1401
1402         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1403                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1404                            "["LPU64"-"LPU64"]\n",
1405                            msg, libcfs_nid2str(peer->nid),
1406                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1407                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1408                                                         (__u64)0,
1409                            oa->o_id,
1410                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1411                            pga[0]->off,
1412                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1413         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1414                "client csum now %x\n", client_cksum, client_cksum_type,
1415                server_cksum, cksum_type, new_cksum);
1416         return 1;
1417 }
1418
1419 /* Note rc enters this function as number of bytes transferred */
1420 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1421 {
1422         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1423         const lnet_process_id_t *peer =
1424                         &req->rq_import->imp_connection->c_peer;
1425         struct client_obd *cli = aa->aa_cli;
1426         struct ost_body *body;
1427         __u32 client_cksum = 0;
1428         ENTRY;
1429
1430         if (rc < 0 && rc != -EDQUOT)
1431                 RETURN(rc);
1432
1433         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1434         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1435                                   lustre_swab_ost_body);
1436         if (body == NULL) {
1437                 CDEBUG(D_INFO, "Can't unpack body\n");
1438                 RETURN(-EPROTO);
1439         }
1440
1441         /* set/clear over quota flag for a uid/gid */
1442         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1443             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1444                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1445                              body->oa.o_gid, body->oa.o_valid,
1446                              body->oa.o_flags);
1447
1448         if (rc < 0)
1449                 RETURN(rc);
1450
1451         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1452                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1453
1454         osc_update_grant(cli, body);
1455
1456         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1457                 if (rc > 0) {
1458                         CERROR("Unexpected +ve rc %d\n", rc);
1459                         RETURN(-EPROTO);
1460                 }
1461                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1462
1463                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1464                         RETURN(-EAGAIN);
1465
1466                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1467                     check_write_checksum(&body->oa, peer, client_cksum,
1468                                          body->oa.o_cksum, aa->aa_requested_nob,
1469                                          aa->aa_page_count, aa->aa_ppga,
1470                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1471                         RETURN(-EAGAIN);
1472
1473                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1474                                      aa->aa_page_count, aa->aa_ppga);
1475                 GOTO(out, rc);
1476         }
1477
1478         /* The rest of this function executes only for OST_READs */
1479
1480         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1481         if (rc < 0)
1482                 GOTO(out, rc);
1483
1484         if (rc > aa->aa_requested_nob) {
1485                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1486                        aa->aa_requested_nob);
1487                 RETURN(-EPROTO);
1488         }
1489
1490         if (rc != req->rq_bulk->bd_nob_transferred) {
1491                 CERROR ("Unexpected rc %d (%d transferred)\n",
1492                         rc, req->rq_bulk->bd_nob_transferred);
1493                 return (-EPROTO);
1494         }
1495
1496         if (rc < aa->aa_requested_nob)
1497                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1498
1499         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1500                 static int cksum_counter;
1501                 __u32      server_cksum = body->oa.o_cksum;
1502                 char      *via;
1503                 char      *router;
1504                 cksum_type_t cksum_type;
1505
1506                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1507                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1508                 else
1509                         cksum_type = OBD_CKSUM_CRC32;
1510                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1511                                                  aa->aa_ppga, OST_READ,
1512                                                  cksum_type);
1513
1514                 if (peer->nid == req->rq_bulk->bd_sender) {
1515                         via = router = "";
1516                 } else {
1517                         via = " via ";
1518                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1519                 }
1520
1521                 if (server_cksum == ~0 && rc > 0) {
1522                         CERROR("Protocol error: server %s set the 'checksum' "
1523                                "bit, but didn't send a checksum.  Not fatal, "
1524                                "but please notify on http://bugzilla.lustre.org/\n",
1525                                libcfs_nid2str(peer->nid));
1526                 } else if (server_cksum != client_cksum) {
1527                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1528                                            "%s%s%s inum "LPU64"/"LPU64" object "
1529                                            LPU64"/"LPU64" extent "
1530                                            "["LPU64"-"LPU64"]\n",
1531                                            req->rq_import->imp_obd->obd_name,
1532                                            libcfs_nid2str(peer->nid),
1533                                            via, router,
1534                                            body->oa.o_valid & OBD_MD_FLFID ?
1535                                                 body->oa.o_fid : (__u64)0,
1536                                            body->oa.o_valid & OBD_MD_FLFID ?
1537                                                 body->oa.o_generation :(__u64)0,
1538                                            body->oa.o_id,
1539                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1540                                                 body->oa.o_gr : (__u64)0,
1541                                            aa->aa_ppga[0]->off,
1542                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1543                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1544                                                                         1);
1545                         CERROR("client %x, server %x, cksum_type %x\n",
1546                                client_cksum, server_cksum, cksum_type);
1547                         cksum_counter = 0;
1548                         aa->aa_oa->o_cksum = client_cksum;
1549                         rc = -EAGAIN;
1550                 } else {
1551                         cksum_counter++;
1552                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1553                         rc = 0;
1554                 }
1555         } else if (unlikely(client_cksum)) {
1556                 static int cksum_missed;
1557
1558                 cksum_missed++;
1559                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1560                         CERROR("Checksum %u requested from %s but not sent\n",
1561                                cksum_missed, libcfs_nid2str(peer->nid));
1562         } else {
1563                 rc = 0;
1564         }
1565 out:
1566         if (rc >= 0)
1567                 *aa->aa_oa = body->oa;
1568
1569         RETURN(rc);
1570 }
1571
1572 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1573                             struct lov_stripe_md *lsm,
1574                             obd_count page_count, struct brw_page **pga,
1575                             struct obd_capa *ocapa)
1576 {
1577         struct ptlrpc_request *req;
1578         int                    rc;
1579         cfs_waitq_t            waitq;
1580         int                    resends = 0;
1581         struct l_wait_info     lwi;
1582
1583         ENTRY;
1584
1585         cfs_waitq_init(&waitq);
1586
1587 restart_bulk:
1588         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1589                                   page_count, pga, &req, ocapa, 0);
1590         if (rc != 0)
1591                 return (rc);
1592
1593         rc = ptlrpc_queue_wait(req);
1594
1595         if (rc == -ETIMEDOUT && req->rq_resend) {
1596                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1597                 ptlrpc_req_finished(req);
1598                 goto restart_bulk;
1599         }
1600
1601         rc = osc_brw_fini_request(req, rc);
1602
1603         ptlrpc_req_finished(req);
1604         if (osc_recoverable_error(rc)) {
1605                 resends++;
1606                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1607                         CERROR("too many resend retries, returning error\n");
1608                         RETURN(-EIO);
1609                 }
1610
1611                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1612                 l_wait_event(waitq, 0, &lwi);
1613
1614                 goto restart_bulk;
1615         }
1616
1617         RETURN (rc);
1618 }
1619
1620 int osc_brw_redo_request(struct ptlrpc_request *request,
1621                          struct osc_brw_async_args *aa)
1622 {
1623         struct ptlrpc_request *new_req;
1624         struct ptlrpc_request_set *set = request->rq_set;
1625         struct osc_brw_async_args *new_aa;
1626         struct osc_async_page *oap;
1627         int rc = 0;
1628         ENTRY;
1629
1630         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1631                 CERROR("too many resend retries, returning error\n");
1632                 RETURN(-EIO);
1633         }
1634
1635         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1636
1637         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1638                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1639                                   aa->aa_cli, aa->aa_oa,
1640                                   NULL /* lsm unused by osc currently */,
1641                                   aa->aa_page_count, aa->aa_ppga,
1642                                   &new_req, aa->aa_ocapa, 0);
1643         if (rc)
1644                 RETURN(rc);
1645
1646         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1647
1648         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1649                 if (oap->oap_request != NULL) {
1650                         LASSERTF(request == oap->oap_request,
1651                                  "request %p != oap_request %p\n",
1652                                  request, oap->oap_request);
1653                         if (oap->oap_interrupted) {
1654                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1655                                 ptlrpc_req_finished(new_req);
1656                                 RETURN(-EINTR);
1657                         }
1658                 }
1659         }
1660         /* New request takes over pga and oaps from old request.
1661          * Note that copying a list_head doesn't work, need to move it... */
1662         aa->aa_resends++;
1663         new_req->rq_interpret_reply = request->rq_interpret_reply;
1664         new_req->rq_async_args = request->rq_async_args;
1665         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1666
1667         new_aa = ptlrpc_req_async_args(new_req);
1668
1669         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1670         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1671         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1672
1673         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1674                 if (oap->oap_request) {
1675                         ptlrpc_req_finished(oap->oap_request);
1676                         oap->oap_request = ptlrpc_request_addref(new_req);
1677                 }
1678         }
1679
1680         new_aa->aa_ocapa = aa->aa_ocapa;
1681         aa->aa_ocapa = NULL;
1682
1683         /* use ptlrpc_set_add_req is safe because interpret functions work
1684          * in check_set context. only one way exist with access to request
1685          * from different thread got -EINTR - this way protected with
1686          * cl_loi_list_lock */
1687         ptlrpc_set_add_req(set, new_req);
1688
1689         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1690
1691         DEBUG_REQ(D_INFO, new_req, "new request");
1692         RETURN(0);
1693 }
1694
1695 /*
1696  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1697  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1698  * fine for our small page arrays and doesn't require allocation.  its an
1699  * insertion sort that swaps elements that are strides apart, shrinking the
1700  * stride down until its '1' and the array is sorted.
1701  */
1702 static void sort_brw_pages(struct brw_page **array, int num)
1703 {
1704         int stride, i, j;
1705         struct brw_page *tmp;
1706
1707         if (num == 1)
1708                 return;
1709         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1710                 ;
1711
1712         do {
1713                 stride /= 3;
1714                 for (i = stride ; i < num ; i++) {
1715                         tmp = array[i];
1716                         j = i;
1717                         while (j >= stride && array[j - stride]->off > tmp->off) {
1718                                 array[j] = array[j - stride];
1719                                 j -= stride;
1720                         }
1721                         array[j] = tmp;
1722                 }
1723         } while (stride > 1);
1724 }
1725
1726 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1727 {
1728         int count = 1;
1729         int offset;
1730         int i = 0;
1731
1732         LASSERT (pages > 0);
1733         offset = pg[i]->off & ~CFS_PAGE_MASK;
1734
1735         for (;;) {
1736                 pages--;
1737                 if (pages == 0)         /* that's all */
1738                         return count;
1739
1740                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1741                         return count;   /* doesn't end on page boundary */
1742
1743                 i++;
1744                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1745                 if (offset != 0)        /* doesn't start on page boundary */
1746                         return count;
1747
1748                 count++;
1749         }
1750 }
1751
1752 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1753 {
1754         struct brw_page **ppga;
1755         int i;
1756
1757         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1758         if (ppga == NULL)
1759                 return NULL;
1760
1761         for (i = 0; i < count; i++)
1762                 ppga[i] = pga + i;
1763         return ppga;
1764 }
1765
1766 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1767 {
1768         LASSERT(ppga != NULL);
1769         OBD_FREE(ppga, sizeof(*ppga) * count);
1770 }
1771
1772 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1773                    obd_count page_count, struct brw_page *pga,
1774                    struct obd_trans_info *oti)
1775 {
1776         struct obdo *saved_oa = NULL;
1777         struct brw_page **ppga, **orig;
1778         struct obd_import *imp = class_exp2cliimp(exp);
1779         struct client_obd *cli;
1780         int rc, page_count_orig;
1781         ENTRY;
1782
1783         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1784         cli = &imp->imp_obd->u.cli;
1785
1786         if (cmd & OBD_BRW_CHECK) {
1787                 /* The caller just wants to know if there's a chance that this
1788                  * I/O can succeed */
1789
1790                 if (imp->imp_invalid)
1791                         RETURN(-EIO);
1792                 RETURN(0);
1793         }
1794
1795         /* test_brw with a failed create can trip this, maybe others. */
1796         LASSERT(cli->cl_max_pages_per_rpc);
1797
1798         rc = 0;
1799
1800         orig = ppga = osc_build_ppga(pga, page_count);
1801         if (ppga == NULL)
1802                 RETURN(-ENOMEM);
1803         page_count_orig = page_count;
1804
1805         sort_brw_pages(ppga, page_count);
1806         while (page_count) {
1807                 obd_count pages_per_brw;
1808
1809                 if (page_count > cli->cl_max_pages_per_rpc)
1810                         pages_per_brw = cli->cl_max_pages_per_rpc;
1811                 else
1812                         pages_per_brw = page_count;
1813
1814                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1815
1816                 if (saved_oa != NULL) {
1817                         /* restore previously saved oa */
1818                         *oinfo->oi_oa = *saved_oa;
1819                 } else if (page_count > pages_per_brw) {
1820                         /* save a copy of oa (brw will clobber it) */
1821                         OBDO_ALLOC(saved_oa);
1822                         if (saved_oa == NULL)
1823                                 GOTO(out, rc = -ENOMEM);
1824                         *saved_oa = *oinfo->oi_oa;
1825                 }
1826
1827                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1828                                       pages_per_brw, ppga, oinfo->oi_capa);
1829
1830                 if (rc != 0)
1831                         break;
1832
1833                 page_count -= pages_per_brw;
1834                 ppga += pages_per_brw;
1835         }
1836
1837 out:
1838         osc_release_ppga(orig, page_count_orig);
1839
1840         if (saved_oa != NULL)
1841                 OBDO_FREE(saved_oa);
1842
1843         RETURN(rc);
1844 }
1845
1846 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1847  * the dirty accounting.  Writeback completes or truncate happens before
1848  * writing starts.  Must be called with the loi lock held. */
1849 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1850                            int sent)
1851 {
1852         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1853 }
1854
1855
1856 /* This maintains the lists of pending pages to read/write for a given object
1857  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1858  * to quickly find objects that are ready to send an RPC. */
1859 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1860                          int cmd)
1861 {
1862         int optimal;
1863         ENTRY;
1864
1865         if (lop->lop_num_pending == 0)
1866                 RETURN(0);
1867
1868         /* if we have an invalid import we want to drain the queued pages
1869          * by forcing them through rpcs that immediately fail and complete
1870          * the pages.  recovery relies on this to empty the queued pages
1871          * before canceling the locks and evicting down the llite pages */
1872         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1873                 RETURN(1);
1874
1875         /* stream rpcs in queue order as long as as there is an urgent page
1876          * queued.  this is our cheap solution for good batching in the case
1877          * where writepage marks some random page in the middle of the file
1878          * as urgent because of, say, memory pressure */
1879         if (!list_empty(&lop->lop_urgent)) {
1880                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1881                 RETURN(1);
1882         }
1883         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1884         optimal = cli->cl_max_pages_per_rpc;
1885         if (cmd & OBD_BRW_WRITE) {
1886                 /* trigger a write rpc stream as long as there are dirtiers
1887                  * waiting for space.  as they're waiting, they're not going to
1888                  * create more pages to coallesce with what's waiting.. */
1889                 if (!list_empty(&cli->cl_cache_waiters)) {
1890                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1891                         RETURN(1);
1892                 }
1893                 /* +16 to avoid triggering rpcs that would want to include pages
1894                  * that are being queued but which can't be made ready until
1895                  * the queuer finishes with the page. this is a wart for
1896                  * llite::commit_write() */
1897                 optimal += 16;
1898         }
1899         if (lop->lop_num_pending >= optimal)
1900                 RETURN(1);
1901
1902         RETURN(0);
1903 }
1904
1905 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1906 {
1907         struct osc_async_page *oap;
1908         ENTRY;
1909
1910         if (list_empty(&lop->lop_urgent))
1911                 RETURN(0);
1912
1913         oap = list_entry(lop->lop_urgent.next,
1914                          struct osc_async_page, oap_urgent_item);
1915
1916         if (oap->oap_async_flags & ASYNC_HP) {
1917                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1918                 RETURN(1);
1919         }
1920
1921         RETURN(0);
1922 }
1923
1924 static void on_list(struct list_head *item, struct list_head *list,
1925                     int should_be_on)
1926 {
1927         if (list_empty(item) && should_be_on)
1928                 list_add_tail(item, list);
1929         else if (!list_empty(item) && !should_be_on)
1930                 list_del_init(item);
1931 }
1932
1933 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1934  * can find pages to build into rpcs quickly */
1935 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1936 {
1937         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1938             lop_makes_hprpc(&loi->loi_read_lop)) {
1939                 /* HP rpc */
1940                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1941                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1942         } else {
1943                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1944                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1945                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1946                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1947         }
1948
1949         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1950                 loi->loi_write_lop.lop_num_pending);
1951
1952         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1953                 loi->loi_read_lop.lop_num_pending);
1954 }
1955
1956 static void lop_update_pending(struct client_obd *cli,
1957                                struct loi_oap_pages *lop, int cmd, int delta)
1958 {
1959         lop->lop_num_pending += delta;
1960         if (cmd & OBD_BRW_WRITE)
1961                 cli->cl_pending_w_pages += delta;
1962         else
1963                 cli->cl_pending_r_pages += delta;
1964 }
1965
1966 /**
1967  * this is called when a sync waiter receives an interruption.  Its job is to
1968  * get the caller woken as soon as possible.  If its page hasn't been put in an
1969  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1970  * desiring interruption which will forcefully complete the rpc once the rpc
1971  * has timed out.
1972  */
1973 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1974 {
1975         struct loi_oap_pages *lop;
1976         struct lov_oinfo *loi;
1977         int rc = -EBUSY;
1978         ENTRY;
1979
1980         LASSERT(!oap->oap_interrupted);
1981         oap->oap_interrupted = 1;
1982
1983         /* ok, it's been put in an rpc. only one oap gets a request reference */
1984         if (oap->oap_request != NULL) {
1985                 ptlrpc_mark_interrupted(oap->oap_request);
1986                 ptlrpcd_wake(oap->oap_request);
1987                 ptlrpc_req_finished(oap->oap_request);
1988                 oap->oap_request = NULL;
1989         }
1990
1991         /*
1992          * page completion may be called only if ->cpo_prep() method was
1993          * executed by osc_io_submit(), that also adds page the to pending list
1994          */
1995         if (!list_empty(&oap->oap_pending_item)) {
1996                 list_del_init(&oap->oap_pending_item);
1997                 list_del_init(&oap->oap_urgent_item);
1998
1999                 loi = oap->oap_loi;
2000                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2001                         &loi->loi_write_lop : &loi->loi_read_lop;
2002                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2003                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2004                 rc = oap->oap_caller_ops->ap_completion(env,
2005                                           oap->oap_caller_data,
2006                                           oap->oap_cmd, NULL, -EINTR);
2007         }
2008
2009         RETURN(rc);
2010 }
2011
2012 /* this is trying to propogate async writeback errors back up to the
2013  * application.  As an async write fails we record the error code for later if
2014  * the app does an fsync.  As long as errors persist we force future rpcs to be
2015  * sync so that the app can get a sync error and break the cycle of queueing
2016  * pages for which writeback will fail. */
2017 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2018                            int rc)
2019 {
2020         if (rc) {
2021                 if (!ar->ar_rc)
2022                         ar->ar_rc = rc;
2023
2024                 ar->ar_force_sync = 1;
2025                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2026                 return;
2027
2028         }
2029
2030         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2031                 ar->ar_force_sync = 0;
2032 }
2033
2034 void osc_oap_to_pending(struct osc_async_page *oap)
2035 {
2036         struct loi_oap_pages *lop;
2037
2038         if (oap->oap_cmd & OBD_BRW_WRITE)
2039                 lop = &oap->oap_loi->loi_write_lop;
2040         else
2041                 lop = &oap->oap_loi->loi_read_lop;
2042
2043         if (oap->oap_async_flags & ASYNC_HP)
2044                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2045         else if (oap->oap_async_flags & ASYNC_URGENT)
2046                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2047         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2048         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2049 }
2050
2051 /* this must be called holding the loi list lock to give coverage to exit_cache,
2052  * async_flag maintenance, and oap_request */
2053 static void osc_ap_completion(const struct lu_env *env,
2054                               struct client_obd *cli, struct obdo *oa,
2055                               struct osc_async_page *oap, int sent, int rc)
2056 {
2057         __u64 xid = 0;
2058
2059         ENTRY;
2060         if (oap->oap_request != NULL) {
2061                 xid = ptlrpc_req_xid(oap->oap_request);
2062                 ptlrpc_req_finished(oap->oap_request);
2063                 oap->oap_request = NULL;
2064         }
2065
2066         oap->oap_async_flags = 0;
2067         oap->oap_interrupted = 0;
2068
2069         if (oap->oap_cmd & OBD_BRW_WRITE) {
2070                 osc_process_ar(&cli->cl_ar, xid, rc);
2071                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2072         }
2073
2074         if (rc == 0 && oa != NULL) {
2075                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2076                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2077                 if (oa->o_valid & OBD_MD_FLMTIME)
2078                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2079                 if (oa->o_valid & OBD_MD_FLATIME)
2080                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2081                 if (oa->o_valid & OBD_MD_FLCTIME)
2082                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2083         }
2084
2085         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2086                                                 oap->oap_cmd, oa, rc);
2087
2088         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2089          * I/O on the page could start, but OSC calls it under lock
2090          * and thus we can add oap back to pending safely */
2091         if (rc)
2092                 /* upper layer wants to leave the page on pending queue */
2093                 osc_oap_to_pending(oap);
2094         else
2095                 osc_exit_cache(cli, oap, sent);
2096         EXIT;
2097 }
2098
2099 static int brw_interpret(const struct lu_env *env,
2100                          struct ptlrpc_request *req, void *data, int rc)
2101 {
2102         struct osc_brw_async_args *aa = data;
2103         struct client_obd *cli;
2104         int async;
2105         ENTRY;
2106
2107         rc = osc_brw_fini_request(req, rc);
2108         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2109         if (osc_recoverable_error(rc)) {
2110                 rc = osc_brw_redo_request(req, aa);
2111                 if (rc == 0)
2112                         RETURN(0);
2113         }
2114
2115         if (aa->aa_ocapa) {
2116                 capa_put(aa->aa_ocapa);
2117                 aa->aa_ocapa = NULL;
2118         }
2119
2120         cli = aa->aa_cli;
2121
2122         client_obd_list_lock(&cli->cl_loi_list_lock);
2123
2124         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2125          * is called so we know whether to go to sync BRWs or wait for more
2126          * RPCs to complete */
2127         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2128                 cli->cl_w_in_flight--;
2129         else
2130                 cli->cl_r_in_flight--;
2131
2132         async = list_empty(&aa->aa_oaps);
2133         if (!async) { /* from osc_send_oap_rpc() */
2134                 struct osc_async_page *oap, *tmp;
2135                 /* the caller may re-use the oap after the completion call so
2136                  * we need to clean it up a little */
2137                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2138                         list_del_init(&oap->oap_rpc_item);
2139                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2140                 }
2141                 OBDO_FREE(aa->aa_oa);
2142         } else { /* from async_internal() */
2143                 int i;
2144                 for (i = 0; i < aa->aa_page_count; i++)
2145                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2146         }
2147         osc_wake_cache_waiters(cli);
2148         osc_check_rpcs(env, cli);
2149         client_obd_list_unlock(&cli->cl_loi_list_lock);
2150         if (!async)
2151                 cl_req_completion(env, aa->aa_clerq, rc);
2152         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2153         RETURN(rc);
2154 }
2155
2156 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2157                                             struct client_obd *cli,
2158                                             struct list_head *rpc_list,
2159                                             int page_count, int cmd)
2160 {
2161         struct ptlrpc_request *req;
2162         struct brw_page **pga = NULL;
2163         struct osc_brw_async_args *aa;
2164         struct obdo *oa = NULL;
2165         const struct obd_async_page_ops *ops = NULL;
2166         void *caller_data = NULL;
2167         struct osc_async_page *oap;
2168         struct osc_async_page *tmp;
2169         struct ost_body *body;
2170         struct cl_req *clerq = NULL;
2171         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2172         struct ldlm_lock *lock = NULL;
2173         struct cl_req_attr crattr;
2174         int i, rc;
2175
2176         ENTRY;
2177         LASSERT(!list_empty(rpc_list));
2178
2179         memset(&crattr, 0, sizeof crattr);
2180         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2181         if (pga == NULL)
2182                 GOTO(out, req = ERR_PTR(-ENOMEM));
2183
2184         OBDO_ALLOC(oa);
2185         if (oa == NULL)
2186                 GOTO(out, req = ERR_PTR(-ENOMEM));
2187
2188         i = 0;
2189         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2190                 struct cl_page *page = osc_oap2cl_page(oap);
2191                 if (ops == NULL) {
2192                         ops = oap->oap_caller_ops;
2193                         caller_data = oap->oap_caller_data;
2194
2195                         clerq = cl_req_alloc(env, page, crt,
2196                                              1 /* only 1-object rpcs for
2197                                                 * now */);
2198                         if (IS_ERR(clerq))
2199                                 GOTO(out, req = (void *)clerq);
2200                         lock = oap->oap_ldlm_lock;
2201                 }
2202                 pga[i] = &oap->oap_brw_page;
2203                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2204                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2205                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2206                 i++;
2207                 cl_req_page_add(env, clerq, page);
2208         }
2209
2210         /* always get the data for the obdo for the rpc */
2211         LASSERT(ops != NULL);
2212         crattr.cra_oa = oa;
2213         crattr.cra_capa = NULL;
2214         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2215         if (lock) {
2216                 oa->o_handle = lock->l_remote_handle;
2217                 oa->o_valid |= OBD_MD_FLHANDLE;
2218         }
2219
2220         rc = cl_req_prep(env, clerq);
2221         if (rc != 0) {
2222                 CERROR("cl_req_prep failed: %d\n", rc);
2223                 GOTO(out, req = ERR_PTR(rc));
2224         }
2225
2226         sort_brw_pages(pga, page_count);
2227         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2228                                   pga, &req, crattr.cra_capa, 1);
2229         if (rc != 0) {
2230                 CERROR("prep_req failed: %d\n", rc);
2231                 GOTO(out, req = ERR_PTR(rc));
2232         }
2233
2234         /* Need to update the timestamps after the request is built in case
2235          * we race with setattr (locally or in queue at OST).  If OST gets
2236          * later setattr before earlier BRW (as determined by the request xid),
2237          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2238          * way to do this in a single call.  bug 10150 */
2239         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2240         cl_req_attr_set(env, clerq, &crattr,
2241                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2242
2243         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2244         aa = ptlrpc_req_async_args(req);
2245         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2246         list_splice(rpc_list, &aa->aa_oaps);
2247         CFS_INIT_LIST_HEAD(rpc_list);
2248         aa->aa_clerq = clerq;
2249 out:
2250         capa_put(crattr.cra_capa);
2251         if (IS_ERR(req)) {
2252                 if (oa)
2253                         OBDO_FREE(oa);
2254                 if (pga)
2255                         OBD_FREE(pga, sizeof(*pga) * page_count);
2256                 /* this should happen rarely and is pretty bad, it makes the
2257                  * pending list not follow the dirty order */
2258                 client_obd_list_lock(&cli->cl_loi_list_lock);
2259                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2260                         list_del_init(&oap->oap_rpc_item);
2261
2262                         /* queued sync pages can be torn down while the pages
2263                          * were between the pending list and the rpc */
2264                         if (oap->oap_interrupted) {
2265                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2266                                 osc_ap_completion(env, cli, NULL, oap, 0,
2267                                                   oap->oap_count);
2268                                 continue;
2269                         }
2270                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2271                 }
2272                 if (clerq && !IS_ERR(clerq))
2273                         cl_req_completion(env, clerq, PTR_ERR(req));
2274         }
2275         RETURN(req);
2276 }
2277
2278 /**
2279  * prepare pages for ASYNC io and put pages in send queue.
2280  *
2281  * \param cli -
2282  * \param loi -
2283  * \param cmd - OBD_BRW_* macroses
2284  * \param lop - pending pages
2285  *
2286  * \return zero if pages successfully add to send queue.
2287  * \return not zere if error occurring.
2288  */
2289 static int
2290 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2291                  struct lov_oinfo *loi,
2292                  int cmd, struct loi_oap_pages *lop)
2293 {
2294         struct ptlrpc_request *req;
2295         obd_count page_count = 0;
2296         struct osc_async_page *oap = NULL, *tmp;
2297         struct osc_brw_async_args *aa;
2298         const struct obd_async_page_ops *ops;
2299         CFS_LIST_HEAD(rpc_list);
2300         unsigned int ending_offset;
2301         unsigned  starting_offset = 0;
2302         int srvlock = 0;
2303         struct cl_object *clob = NULL;
2304         ENTRY;
2305
2306         /* If there are HP OAPs we need to handle at least 1 of them,
2307          * move it the beginning of the pending list for that. */
2308         if (!list_empty(&lop->lop_urgent)) {
2309                 oap = list_entry(lop->lop_urgent.next,
2310                                  struct osc_async_page, oap_urgent_item);
2311                 if (oap->oap_async_flags & ASYNC_HP)
2312                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2313         }
2314
2315         /* first we find the pages we're allowed to work with */
2316         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2317                                  oap_pending_item) {
2318                 ops = oap->oap_caller_ops;
2319
2320                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2321                          "magic 0x%x\n", oap, oap->oap_magic);
2322
2323                 if (clob == NULL) {
2324                         /* pin object in memory, so that completion call-backs
2325                          * can be safely called under client_obd_list lock. */
2326                         clob = osc_oap2cl_page(oap)->cp_obj;
2327                         cl_object_get(clob);
2328                 }
2329
2330                 if (page_count != 0 &&
2331                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2332                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2333                                " oap %p, page %p, srvlock %u\n",
2334                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2335                         break;
2336                 }
2337                 /* in llite being 'ready' equates to the page being locked
2338                  * until completion unlocks it.  commit_write submits a page
2339                  * as not ready because its unlock will happen unconditionally
2340                  * as the call returns.  if we race with commit_write giving
2341                  * us that page we dont' want to create a hole in the page
2342                  * stream, so we stop and leave the rpc to be fired by
2343                  * another dirtier or kupdated interval (the not ready page
2344                  * will still be on the dirty list).  we could call in
2345                  * at the end of ll_file_write to process the queue again. */
2346                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2347                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2348                                                     cmd);
2349                         if (rc < 0)
2350                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2351                                                 "instead of ready\n", oap,
2352                                                 oap->oap_page, rc);
2353                         switch (rc) {
2354                         case -EAGAIN:
2355                                 /* llite is telling us that the page is still
2356                                  * in commit_write and that we should try
2357                                  * and put it in an rpc again later.  we
2358                                  * break out of the loop so we don't create
2359                                  * a hole in the sequence of pages in the rpc
2360                                  * stream.*/
2361                                 oap = NULL;
2362                                 break;
2363                         case -EINTR:
2364                                 /* the io isn't needed.. tell the checks
2365                                  * below to complete the rpc with EINTR */
2366                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2367                                 oap->oap_count = -EINTR;
2368                                 break;
2369                         case 0:
2370                                 oap->oap_async_flags |= ASYNC_READY;
2371                                 break;
2372                         default:
2373                                 LASSERTF(0, "oap %p page %p returned %d "
2374                                             "from make_ready\n", oap,
2375                                             oap->oap_page, rc);
2376                                 break;
2377                         }
2378                 }
2379                 if (oap == NULL)
2380                         break;
2381                 /*
2382                  * Page submitted for IO has to be locked. Either by
2383                  * ->ap_make_ready() or by higher layers.
2384                  */
2385 #if defined(__KERNEL__) && defined(__linux__)
2386                 {
2387                         struct cl_page *page;
2388
2389                         page = osc_oap2cl_page(oap);
2390
2391                         if (page->cp_type == CPT_CACHEABLE &&
2392                             !(PageLocked(oap->oap_page) &&
2393                               (CheckWriteback(oap->oap_page, cmd)))) {
2394                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2395                                        oap->oap_page,
2396                                        (long)oap->oap_page->flags,
2397                                        oap->oap_async_flags);
2398                                 LBUG();
2399                         }
2400                 }
2401 #endif
2402                 /* If there is a gap at the start of this page, it can't merge
2403                  * with any previous page, so we'll hand the network a
2404                  * "fragmented" page array that it can't transfer in 1 RDMA */
2405                 if (page_count != 0 && oap->oap_page_off != 0)
2406                         break;
2407
2408                 /* take the page out of our book-keeping */
2409                 list_del_init(&oap->oap_pending_item);
2410                 lop_update_pending(cli, lop, cmd, -1);
2411                 list_del_init(&oap->oap_urgent_item);
2412
2413                 if (page_count == 0)
2414                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2415                                           (PTLRPC_MAX_BRW_SIZE - 1);
2416
2417                 /* ask the caller for the size of the io as the rpc leaves. */
2418                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2419                         oap->oap_count =
2420                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2421                                                       cmd);
2422                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2423                 }
2424                 if (oap->oap_count <= 0) {
2425                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2426                                oap->oap_count);
2427                         osc_ap_completion(env, cli, NULL,
2428                                           oap, 0, oap->oap_count);
2429                         continue;
2430                 }
2431
2432                 /* now put the page back in our accounting */
2433                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2434                 if (page_count == 0)
2435                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2436                 if (++page_count >= cli->cl_max_pages_per_rpc)
2437                         break;
2438
2439                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2440                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2441                  * have the same alignment as the initial writes that allocated
2442                  * extents on the server. */
2443                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2444                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2445                 if (ending_offset == 0)
2446                         break;
2447
2448                 /* If there is a gap at the end of this page, it can't merge
2449                  * with any subsequent pages, so we'll hand the network a
2450                  * "fragmented" page array that it can't transfer in 1 RDMA */
2451                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2452                         break;
2453         }
2454
2455         osc_wake_cache_waiters(cli);
2456
2457         loi_list_maint(cli, loi);
2458
2459         client_obd_list_unlock(&cli->cl_loi_list_lock);
2460
2461         if (clob != NULL)
2462                 cl_object_put(env, clob);
2463
2464         if (page_count == 0) {
2465                 client_obd_list_lock(&cli->cl_loi_list_lock);
2466                 RETURN(0);
2467         }
2468
2469         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2470         if (IS_ERR(req)) {
2471                 LASSERT(list_empty(&rpc_list));
2472                 loi_list_maint(cli, loi);
2473                 RETURN(PTR_ERR(req));
2474         }
2475
2476         aa = ptlrpc_req_async_args(req);
2477
2478         if (cmd == OBD_BRW_READ) {
2479                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2480                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2481                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2482                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2483         } else {
2484                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2485                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2486                                  cli->cl_w_in_flight);
2487                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2488                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2489         }
2490         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2491
2492         client_obd_list_lock(&cli->cl_loi_list_lock);
2493
2494         if (cmd == OBD_BRW_READ)
2495                 cli->cl_r_in_flight++;
2496         else
2497                 cli->cl_w_in_flight++;
2498
2499         /* queued sync pages can be torn down while the pages
2500          * were between the pending list and the rpc */
2501         tmp = NULL;
2502         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2503                 /* only one oap gets a request reference */
2504                 if (tmp == NULL)
2505                         tmp = oap;
2506                 if (oap->oap_interrupted && !req->rq_intr) {
2507                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2508                                oap, req);
2509                         ptlrpc_mark_interrupted(req);
2510                 }
2511         }
2512         if (tmp != NULL)
2513                 tmp->oap_request = ptlrpc_request_addref(req);
2514
2515         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2516                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2517
2518         req->rq_interpret_reply = brw_interpret;
2519         ptlrpcd_add_req(req, PSCOPE_BRW);
2520         RETURN(1);
2521 }
2522
2523 #define LOI_DEBUG(LOI, STR, args...)                                     \
2524         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2525                !list_empty(&(LOI)->loi_ready_item) ||                    \
2526                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2527                (LOI)->loi_write_lop.lop_num_pending,                     \
2528                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2529                (LOI)->loi_read_lop.lop_num_pending,                      \
2530                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2531                args)                                                     \
2532
2533 /* This is called by osc_check_rpcs() to find which objects have pages that
2534  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2535 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2536 {
2537         ENTRY;
2538
2539         /* First return objects that have blocked locks so that they
2540          * will be flushed quickly and other clients can get the lock,
2541          * then objects which have pages ready to be stuffed into RPCs */
2542         if (!list_empty(&cli->cl_loi_hp_ready_list))
2543                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2544                                   struct lov_oinfo, loi_hp_ready_item));
2545         if (!list_empty(&cli->cl_loi_ready_list))
2546                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2547                                   struct lov_oinfo, loi_ready_item));
2548
2549         /* then if we have cache waiters, return all objects with queued
2550          * writes.  This is especially important when many small files
2551          * have filled up the cache and not been fired into rpcs because
2552          * they don't pass the nr_pending/object threshhold */
2553         if (!list_empty(&cli->cl_cache_waiters) &&
2554             !list_empty(&cli->cl_loi_write_list))
2555                 RETURN(list_entry(cli->cl_loi_write_list.next,
2556                                   struct lov_oinfo, loi_write_item));
2557
2558         /* then return all queued objects when we have an invalid import
2559          * so that they get flushed */
2560         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2561                 if (!list_empty(&cli->cl_loi_write_list))
2562                         RETURN(list_entry(cli->cl_loi_write_list.next,
2563                                           struct lov_oinfo, loi_write_item));
2564                 if (!list_empty(&cli->cl_loi_read_list))
2565                         RETURN(list_entry(cli->cl_loi_read_list.next,
2566                                           struct lov_oinfo, loi_read_item));
2567         }
2568         RETURN(NULL);
2569 }
2570
2571 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2572 {
2573         struct osc_async_page *oap;
2574         int hprpc = 0;
2575
2576         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2577                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2578                                  struct osc_async_page, oap_urgent_item);
2579                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2580         }
2581
2582         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2583                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2584                                  struct osc_async_page, oap_urgent_item);
2585                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2586         }
2587
2588         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2589 }
2590
2591 /* called with the loi list lock held */
2592 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2593 {
2594         struct lov_oinfo *loi;
2595         int rc = 0, race_counter = 0;
2596         ENTRY;
2597
2598         while ((loi = osc_next_loi(cli)) != NULL) {
2599                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2600
2601                 if (osc_max_rpc_in_flight(cli, loi))
2602                         break;
2603
2604                 /* attempt some read/write balancing by alternating between
2605                  * reads and writes in an object.  The makes_rpc checks here
2606                  * would be redundant if we were getting read/write work items
2607                  * instead of objects.  we don't want send_oap_rpc to drain a
2608                  * partial read pending queue when we're given this object to
2609                  * do io on writes while there are cache waiters */
2610                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2611                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2612                                               &loi->loi_write_lop);
2613                         if (rc < 0)
2614                                 break;
2615                         if (rc > 0)
2616                                 race_counter = 0;
2617                         else
2618                                 race_counter++;
2619                 }
2620                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2621                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2622                                               &loi->loi_read_lop);
2623                         if (rc < 0)
2624                                 break;
2625                         if (rc > 0)
2626                                 race_counter = 0;
2627                         else
2628                                 race_counter++;
2629                 }
2630
2631                 /* attempt some inter-object balancing by issueing rpcs
2632                  * for each object in turn */
2633                 if (!list_empty(&loi->loi_hp_ready_item))
2634                         list_del_init(&loi->loi_hp_ready_item);
2635                 if (!list_empty(&loi->loi_ready_item))
2636                         list_del_init(&loi->loi_ready_item);
2637                 if (!list_empty(&loi->loi_write_item))
2638                         list_del_init(&loi->loi_write_item);
2639                 if (!list_empty(&loi->loi_read_item))
2640                         list_del_init(&loi->loi_read_item);
2641
2642                 loi_list_maint(cli, loi);
2643
2644                 /* send_oap_rpc fails with 0 when make_ready tells it to
2645                  * back off.  llite's make_ready does this when it tries
2646                  * to lock a page queued for write that is already locked.
2647                  * we want to try sending rpcs from many objects, but we
2648                  * don't want to spin failing with 0.  */
2649                 if (race_counter == 10)
2650                         break;
2651         }
2652         EXIT;
2653 }
2654
2655 /* we're trying to queue a page in the osc so we're subject to the
2656  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2657  * If the osc's queued pages are already at that limit, then we want to sleep
2658  * until there is space in the osc's queue for us.  We also may be waiting for
2659  * write credits from the OST if there are RPCs in flight that may return some
2660  * before we fall back to sync writes.
2661  *
2662  * We need this know our allocation was granted in the presence of signals */
2663 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2664 {
2665         int rc;
2666         ENTRY;
2667         client_obd_list_lock(&cli->cl_loi_list_lock);
2668         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2669         client_obd_list_unlock(&cli->cl_loi_list_lock);
2670         RETURN(rc);
2671 };
2672
2673 /**
2674  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2675  * is available.
2676  */
2677 int osc_enter_cache_try(const struct lu_env *env,
2678                         struct client_obd *cli, struct lov_oinfo *loi,
2679                         struct osc_async_page *oap, int transient)
2680 {
2681         int has_grant;
2682
2683         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2684         if (has_grant) {
2685                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2686                 if (transient) {
2687                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2688                         atomic_inc(&obd_dirty_transit_pages);
2689                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2690                 }
2691         }
2692         return has_grant;
2693 }
2694
2695 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2696  * grant or cache space. */
2697 static int osc_enter_cache(const struct lu_env *env,
2698                            struct client_obd *cli, struct lov_oinfo *loi,
2699                            struct osc_async_page *oap)
2700 {
2701         struct osc_cache_waiter ocw;
2702         struct l_wait_info lwi = { 0 };
2703
2704         ENTRY;
2705
2706         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2707                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2708                cli->cl_dirty_max, obd_max_dirty_pages,
2709                cli->cl_lost_grant, cli->cl_avail_grant);
2710
2711         /* force the caller to try sync io.  this can jump the list
2712          * of queued writes and create a discontiguous rpc stream */
2713         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2714             loi->loi_ar.ar_force_sync)
2715                 RETURN(-EDQUOT);
2716
2717         /* Hopefully normal case - cache space and write credits available */
2718         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2719             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2720             osc_enter_cache_try(env, cli, loi, oap, 0))
2721                 RETURN(0);
2722
2723         /* Make sure that there are write rpcs in flight to wait for.  This
2724          * is a little silly as this object may not have any pending but
2725          * other objects sure might. */
2726         if (cli->cl_w_in_flight) {
2727                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2728                 cfs_waitq_init(&ocw.ocw_waitq);
2729                 ocw.ocw_oap = oap;
2730                 ocw.ocw_rc = 0;
2731
2732                 loi_list_maint(cli, loi);
2733                 osc_check_rpcs(env, cli);
2734                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2735
2736                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2737                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2738
2739                 client_obd_list_lock(&cli->cl_loi_list_lock);
2740                 if (!list_empty(&ocw.ocw_entry)) {
2741                         list_del(&ocw.ocw_entry);
2742                         RETURN(-EINTR);
2743                 }
2744                 RETURN(ocw.ocw_rc);
2745         }
2746
2747         RETURN(-EDQUOT);
2748 }
2749
2750
2751 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2752                         struct lov_oinfo *loi, cfs_page_t *page,
2753                         obd_off offset, const struct obd_async_page_ops *ops,
2754                         void *data, void **res, int nocache,
2755                         struct lustre_handle *lockh)
2756 {
2757         struct osc_async_page *oap;
2758
2759         ENTRY;
2760
2761         if (!page)
2762                 return size_round(sizeof(*oap));
2763
2764         oap = *res;
2765         oap->oap_magic = OAP_MAGIC;
2766         oap->oap_cli = &exp->exp_obd->u.cli;
2767         oap->oap_loi = loi;
2768
2769         oap->oap_caller_ops = ops;
2770         oap->oap_caller_data = data;
2771
2772         oap->oap_page = page;
2773         oap->oap_obj_off = offset;
2774         if (!client_is_remote(exp) &&
2775             cfs_capable(CFS_CAP_SYS_RESOURCE))
2776                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2777
2778         LASSERT(!(offset & ~CFS_PAGE_MASK));
2779
2780         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2781         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2782         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2783         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2784
2785         spin_lock_init(&oap->oap_lock);
2786         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2787         RETURN(0);
2788 }
2789
2790 struct osc_async_page *oap_from_cookie(void *cookie)
2791 {
2792         struct osc_async_page *oap = cookie;
2793         if (oap->oap_magic != OAP_MAGIC)
2794                 return ERR_PTR(-EINVAL);
2795         return oap;
2796 };
2797
2798 int osc_queue_async_io(const struct lu_env *env,
2799                        struct obd_export *exp, struct lov_stripe_md *lsm,
2800                        struct lov_oinfo *loi, void *cookie,
2801                        int cmd, obd_off off, int count,
2802                        obd_flag brw_flags, enum async_flags async_flags)
2803 {
2804         struct client_obd *cli = &exp->exp_obd->u.cli;
2805         struct osc_async_page *oap;
2806         int rc = 0;
2807         ENTRY;
2808
2809         oap = oap_from_cookie(cookie);
2810         if (IS_ERR(oap))
2811                 RETURN(PTR_ERR(oap));
2812
2813         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2814                 RETURN(-EIO);
2815
2816         if (!list_empty(&oap->oap_pending_item) ||
2817             !list_empty(&oap->oap_urgent_item) ||
2818             !list_empty(&oap->oap_rpc_item))
2819                 RETURN(-EBUSY);
2820
2821         /* check if the file's owner/group is over quota */
2822         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2823                 struct cl_object *obj;
2824                 struct cl_attr    attr; /* XXX put attr into thread info */
2825
2826                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2827
2828                 cl_object_attr_lock(obj);
2829                 rc = cl_object_attr_get(env, obj, &attr);
2830                 cl_object_attr_unlock(obj);
2831
2832                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2833                                             attr.cat_gid) == NO_QUOTA)
2834                         rc = -EDQUOT;
2835                 if (rc)
2836                         RETURN(rc);
2837         }
2838
2839         if (loi == NULL)
2840                 loi = lsm->lsm_oinfo[0];
2841
2842         client_obd_list_lock(&cli->cl_loi_list_lock);
2843
2844         LASSERT(off + count <= CFS_PAGE_SIZE);
2845         oap->oap_cmd = cmd;
2846         oap->oap_page_off = off;
2847         oap->oap_count = count;
2848         oap->oap_brw_flags = brw_flags;
2849         oap->oap_async_flags = async_flags;
2850
2851         if (cmd & OBD_BRW_WRITE) {
2852                 rc = osc_enter_cache(env, cli, loi, oap);
2853                 if (rc) {
2854                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2855                         RETURN(rc);
2856                 }
2857         }
2858
2859         osc_oap_to_pending(oap);
2860         loi_list_maint(cli, loi);
2861
2862         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2863                   cmd);
2864
2865         osc_check_rpcs(env, cli);
2866         client_obd_list_unlock(&cli->cl_loi_list_lock);
2867
2868         RETURN(0);
2869 }
2870
2871 /* aka (~was & now & flag), but this is more clear :) */
2872 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2873
2874 int osc_set_async_flags_base(struct client_obd *cli,
2875                              struct lov_oinfo *loi, struct osc_async_page *oap,
2876                              obd_flag async_flags)
2877 {
2878         struct loi_oap_pages *lop;
2879         ENTRY;
2880
2881         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2882                 RETURN(-EIO);
2883
2884         if (oap->oap_cmd & OBD_BRW_WRITE) {
2885                 lop = &loi->loi_write_lop;
2886         } else {
2887                 lop = &loi->loi_read_lop;
2888         }
2889
2890         if (list_empty(&oap->oap_pending_item))
2891                 RETURN(-EINVAL);
2892
2893         if ((oap->oap_async_flags & async_flags) == async_flags)
2894                 RETURN(0);
2895
2896         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2897                 oap->oap_async_flags |= ASYNC_READY;
2898
2899         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2900             list_empty(&oap->oap_rpc_item)) {
2901                 if (oap->oap_async_flags & ASYNC_HP)
2902                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2903                 else
2904                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2905                 oap->oap_async_flags |= ASYNC_URGENT;
2906                 loi_list_maint(cli, loi);
2907         }
2908
2909         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2910                         oap->oap_async_flags);
2911         RETURN(0);
2912 }
2913
2914 int osc_teardown_async_page(struct obd_export *exp,
2915                             struct lov_stripe_md *lsm,
2916                             struct lov_oinfo *loi, void *cookie)
2917 {
2918         struct client_obd *cli = &exp->exp_obd->u.cli;
2919         struct loi_oap_pages *lop;
2920         struct osc_async_page *oap;
2921         int rc = 0;
2922         ENTRY;
2923
2924         oap = oap_from_cookie(cookie);
2925         if (IS_ERR(oap))
2926                 RETURN(PTR_ERR(oap));
2927
2928         if (loi == NULL)
2929                 loi = lsm->lsm_oinfo[0];
2930
2931         if (oap->oap_cmd & OBD_BRW_WRITE) {
2932                 lop = &loi->loi_write_lop;
2933         } else {
2934                 lop = &loi->loi_read_lop;
2935         }
2936
2937         client_obd_list_lock(&cli->cl_loi_list_lock);
2938
2939         if (!list_empty(&oap->oap_rpc_item))
2940                 GOTO(out, rc = -EBUSY);
2941
2942         osc_exit_cache(cli, oap, 0);
2943         osc_wake_cache_waiters(cli);
2944
2945         if (!list_empty(&oap->oap_urgent_item)) {
2946                 list_del_init(&oap->oap_urgent_item);
2947                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2948         }
2949         if (!list_empty(&oap->oap_pending_item)) {
2950                 list_del_init(&oap->oap_pending_item);
2951                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2952         }
2953         loi_list_maint(cli, loi);
2954         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2955 out:
2956         client_obd_list_unlock(&cli->cl_loi_list_lock);
2957         RETURN(rc);
2958 }
2959
2960 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2961                                          struct ldlm_enqueue_info *einfo,
2962                                          int flags)
2963 {
2964         void *data = einfo->ei_cbdata;
2965
2966         LASSERT(lock != NULL);
2967         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2968         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2969         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2970         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2971
2972         lock_res_and_lock(lock);
2973         spin_lock(&osc_ast_guard);
2974         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2975         lock->l_ast_data = data;
2976         spin_unlock(&osc_ast_guard);
2977         unlock_res_and_lock(lock);
2978 }
2979
2980 static void osc_set_data_with_check(struct lustre_handle *lockh,
2981                                     struct ldlm_enqueue_info *einfo,
2982                                     int flags)
2983 {
2984         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2985
2986         if (lock != NULL) {
2987                 osc_set_lock_data_with_check(lock, einfo, flags);
2988                 LDLM_LOCK_PUT(lock);
2989         } else
2990                 CERROR("lockh %p, data %p - client evicted?\n",
2991                        lockh, einfo->ei_cbdata);
2992 }
2993
2994 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2995                              ldlm_iterator_t replace, void *data)
2996 {
2997         struct ldlm_res_id res_id;
2998         struct obd_device *obd = class_exp2obd(exp);
2999
3000         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3001         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3002         return 0;
3003 }
3004
3005 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3006                             obd_enqueue_update_f upcall, void *cookie,
3007                             int *flags, int rc)
3008 {
3009         int intent = *flags & LDLM_FL_HAS_INTENT;
3010         ENTRY;
3011
3012         if (intent) {
3013                 /* The request was created before ldlm_cli_enqueue call. */
3014                 if (rc == ELDLM_LOCK_ABORTED) {
3015                         struct ldlm_reply *rep;
3016                         rep = req_capsule_server_get(&req->rq_pill,
3017                                                      &RMF_DLM_REP);
3018
3019                         LASSERT(rep != NULL);
3020                         if (rep->lock_policy_res1)
3021                                 rc = rep->lock_policy_res1;
3022                 }
3023         }
3024
3025         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3026                 *flags |= LDLM_FL_LVB_READY;
3027                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3028                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3029         }
3030
3031         /* Call the update callback. */
3032         rc = (*upcall)(cookie, rc);
3033         RETURN(rc);
3034 }
3035
3036 static int osc_enqueue_interpret(const struct lu_env *env,
3037                                  struct ptlrpc_request *req,
3038                                  struct osc_enqueue_args *aa, int rc)
3039 {
3040         struct ldlm_lock *lock;
3041         struct lustre_handle handle;
3042         __u32 mode;
3043
3044         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3045          * might be freed anytime after lock upcall has been called. */
3046         lustre_handle_copy(&handle, aa->oa_lockh);
3047         mode = aa->oa_ei->ei_mode;
3048
3049         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3050          * be valid. */
3051         lock = ldlm_handle2lock(&handle);
3052
3053         /* Take an additional reference so that a blocking AST that
3054          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3055          * to arrive after an upcall has been executed by
3056          * osc_enqueue_fini(). */
3057         ldlm_lock_addref(&handle, mode);
3058
3059         /* Complete obtaining the lock procedure. */
3060         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3061                                    mode, aa->oa_flags, aa->oa_lvb,
3062                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3063                                    &handle, rc);
3064         /* Complete osc stuff. */
3065         rc = osc_enqueue_fini(req, aa->oa_lvb,
3066                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3067
3068         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3069
3070         /* Release the lock for async request. */
3071         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3072                 /*
3073                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3074                  * not already released by
3075                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3076                  */
3077                 ldlm_lock_decref(&handle, mode);
3078
3079         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3080                  aa->oa_lockh, req, aa);
3081         ldlm_lock_decref(&handle, mode);
3082         LDLM_LOCK_PUT(lock);
3083         return rc;
3084 }
3085
3086 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3087                         struct lov_oinfo *loi, int flags,
3088                         struct ost_lvb *lvb, __u32 mode, int rc)
3089 {
3090         if (rc == ELDLM_OK) {
3091                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3092                 __u64 tmp;
3093
3094                 LASSERT(lock != NULL);
3095                 loi->loi_lvb = *lvb;
3096                 tmp = loi->loi_lvb.lvb_size;
3097                 /* Extend KMS up to the end of this lock and no further
3098                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3099                 if (tmp > lock->l_policy_data.l_extent.end)
3100                         tmp = lock->l_policy_data.l_extent.end + 1;
3101                 if (tmp >= loi->loi_kms) {
3102                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3103                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3104                         loi_kms_set(loi, tmp);
3105                 } else {
3106                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3107                                    LPU64"; leaving kms="LPU64", end="LPU64,
3108                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3109                                    lock->l_policy_data.l_extent.end);
3110                 }
3111                 ldlm_lock_allow_match(lock);
3112                 LDLM_LOCK_PUT(lock);
3113         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3114                 loi->loi_lvb = *lvb;
3115                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3116                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3117                 rc = ELDLM_OK;
3118         }
3119 }
3120 EXPORT_SYMBOL(osc_update_enqueue);
3121
3122 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3123
3124 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3125  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3126  * other synchronous requests, however keeping some locks and trying to obtain
3127  * others may take a considerable amount of time in a case of ost failure; and
3128  * when other sync requests do not get released lock from a client, the client
3129  * is excluded from the cluster -- such scenarious make the life difficult, so
3130  * release locks just after they are obtained. */
3131 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3132                      int *flags, ldlm_policy_data_t *policy,
3133                      struct ost_lvb *lvb, int kms_valid,
3134                      obd_enqueue_update_f upcall, void *cookie,
3135                      struct ldlm_enqueue_info *einfo,
3136                      struct lustre_handle *lockh,
3137                      struct ptlrpc_request_set *rqset, int async)
3138 {
3139         struct obd_device *obd = exp->exp_obd;
3140         struct ptlrpc_request *req = NULL;
3141         int intent = *flags & LDLM_FL_HAS_INTENT;
3142         ldlm_mode_t mode;
3143         int rc;
3144         ENTRY;
3145
3146         /* Filesystem lock extents are extended to page boundaries so that
3147          * dealing with the page cache is a little smoother.  */
3148         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3149         policy->l_extent.end |= ~CFS_PAGE_MASK;
3150
3151         /*
3152          * kms is not valid when either object is completely fresh (so that no
3153          * locks are cached), or object was evicted. In the latter case cached
3154          * lock cannot be used, because it would prime inode state with
3155          * potentially stale LVB.
3156          */
3157         if (!kms_valid)
3158                 goto no_match;
3159
3160         /* Next, search for already existing extent locks that will cover us */
3161         /* If we're trying to read, we also search for an existing PW lock.  The
3162          * VFS and page cache already protect us locally, so lots of readers/
3163          * writers can share a single PW lock.
3164          *
3165          * There are problems with conversion deadlocks, so instead of
3166          * converting a read lock to a write lock, we'll just enqueue a new
3167          * one.
3168          *
3169          * At some point we should cancel the read lock instead of making them
3170          * send us a blocking callback, but there are problems with canceling
3171          * locks out from other users right now, too. */
3172         mode = einfo->ei_mode;
3173         if (einfo->ei_mode == LCK_PR)
3174                 mode |= LCK_PW;
3175         mode = ldlm_lock_match(obd->obd_namespace,
3176                                *flags | LDLM_FL_LVB_READY, res_id,
3177                                einfo->ei_type, policy, mode, lockh, 0);
3178         if (mode) {
3179                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3180
3181                 if (matched->l_ast_data == NULL ||
3182                     matched->l_ast_data == einfo->ei_cbdata) {
3183                         /* addref the lock only if not async requests and PW
3184                          * lock is matched whereas we asked for PR. */
3185                         if (!rqset && einfo->ei_mode != mode)
3186                                 ldlm_lock_addref(lockh, LCK_PR);
3187                         osc_set_lock_data_with_check(matched, einfo, *flags);
3188                         if (intent) {
3189                                 /* I would like to be able to ASSERT here that
3190                                  * rss <= kms, but I can't, for reasons which
3191                                  * are explained in lov_enqueue() */
3192                         }
3193
3194                         /* We already have a lock, and it's referenced */
3195                         (*upcall)(cookie, ELDLM_OK);
3196
3197                         /* For async requests, decref the lock. */
3198                         if (einfo->ei_mode != mode)
3199                                 ldlm_lock_decref(lockh, LCK_PW);
3200                         else if (rqset)
3201                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3202                         LDLM_LOCK_PUT(matched);
3203                         RETURN(ELDLM_OK);
3204                 } else
3205                         ldlm_lock_decref(lockh, mode);
3206                 LDLM_LOCK_PUT(matched);
3207         }
3208
3209  no_match:
3210         if (intent) {
3211                 CFS_LIST_HEAD(cancels);
3212                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3213                                            &RQF_LDLM_ENQUEUE_LVB);
3214                 if (req == NULL)
3215                         RETURN(-ENOMEM);
3216
3217                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3218                 if (rc)
3219                         RETURN(rc);
3220
3221                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3222                                      sizeof *lvb);
3223                 ptlrpc_request_set_replen(req);
3224         }
3225
3226         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3227         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3228
3229         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3230                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3231         if (rqset) {
3232                 if (!rc) {
3233                         struct osc_enqueue_args *aa;
3234                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3235                         aa = ptlrpc_req_async_args(req);
3236                         aa->oa_ei = einfo;
3237                         aa->oa_exp = exp;
3238                         aa->oa_flags  = flags;
3239                         aa->oa_upcall = upcall;
3240                         aa->oa_cookie = cookie;
3241                         aa->oa_lvb    = lvb;
3242                         aa->oa_lockh  = lockh;
3243
3244                         req->rq_interpret_reply =
3245                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3246                         if (rqset == PTLRPCD_SET)
3247                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3248                         else
3249                                 ptlrpc_set_add_req(rqset, req);
3250                 } else if (intent) {
3251                         ptlrpc_req_finished(req);
3252                 }
3253                 RETURN(rc);
3254         }
3255
3256         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3257         if (intent)
3258                 ptlrpc_req_finished(req);
3259
3260         RETURN(rc);
3261 }
3262
3263 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3264                        struct ldlm_enqueue_info *einfo,
3265                        struct ptlrpc_request_set *rqset)
3266 {
3267         struct ldlm_res_id res_id;
3268         int rc;
3269         ENTRY;
3270
3271         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3272                            oinfo->oi_md->lsm_object_gr, &res_id);
3273
3274         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3275                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3276                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3277                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3278                               rqset, rqset != NULL);
3279         RETURN(rc);
3280 }
3281
3282 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3283                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3284                    int *flags, void *data, struct lustre_handle *lockh,
3285                    int unref)
3286 {
3287         struct obd_device *obd = exp->exp_obd;
3288         int lflags = *flags;
3289         ldlm_mode_t rc;
3290         ENTRY;
3291
3292         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3293                 RETURN(-EIO);
3294
3295         /* Filesystem lock extents are extended to page boundaries so that
3296          * dealing with the page cache is a little smoother */
3297         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3298         policy->l_extent.end |= ~CFS_PAGE_MASK;
3299
3300         /* Next, search for already existing extent locks that will cover us */
3301         /* If we're trying to read, we also search for an existing PW lock.  The
3302          * VFS and page cache already protect us locally, so lots of readers/
3303          * writers can share a single PW lock. */
3304         rc = mode;
3305         if (mode == LCK_PR)
3306                 rc |= LCK_PW;
3307         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3308                              res_id, type, policy, rc, lockh, unref);
3309         if (rc) {
3310                 if (data != NULL)
3311                         osc_set_data_with_check(lockh, data, lflags);
3312                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3313                         ldlm_lock_addref(lockh, LCK_PR);
3314                         ldlm_lock_decref(lockh, LCK_PW);
3315                 }
3316                 RETURN(rc);
3317         }
3318         RETURN(rc);
3319 }
3320
3321 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3322 {
3323         ENTRY;
3324
3325         if (unlikely(mode == LCK_GROUP))
3326                 ldlm_lock_decref_and_cancel(lockh, mode);
3327         else
3328                 ldlm_lock_decref(lockh, mode);
3329
3330         RETURN(0);
3331 }
3332
3333 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3334                       __u32 mode, struct lustre_handle *lockh)
3335 {
3336         ENTRY;
3337         RETURN(osc_cancel_base(lockh, mode));
3338 }
3339
3340 static int osc_cancel_unused(struct obd_export *exp,
3341                              struct lov_stripe_md *lsm, int flags,
3342                              void *opaque)
3343 {
3344         struct obd_device *obd = class_exp2obd(exp);
3345         struct ldlm_res_id res_id, *resp = NULL;
3346
3347         if (lsm != NULL) {
3348                 resp = osc_build_res_name(lsm->lsm_object_id,
3349                                           lsm->lsm_object_gr, &res_id);
3350         }
3351
3352         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3353 }
3354
3355 static int osc_statfs_interpret(const struct lu_env *env,
3356                                 struct ptlrpc_request *req,
3357                                 struct osc_async_args *aa, int rc)
3358 {
3359         struct obd_statfs *msfs;
3360         ENTRY;
3361
3362         if (rc != 0)
3363                 GOTO(out, rc);
3364
3365         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3366         if (msfs == NULL) {
3367                 GOTO(out, rc = -EPROTO);
3368         }
3369
3370         *aa->aa_oi->oi_osfs = *msfs;
3371 out:
3372         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3373         RETURN(rc);
3374 }
3375
3376 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3377                             __u64 max_age, struct ptlrpc_request_set *rqset)
3378 {
3379         struct ptlrpc_request *req;
3380         struct osc_async_args *aa;
3381         int                    rc;
3382         ENTRY;
3383
3384         /* We could possibly pass max_age in the request (as an absolute
3385          * timestamp or a "seconds.usec ago") so the target can avoid doing
3386          * extra calls into the filesystem if that isn't necessary (e.g.
3387          * during mount that would help a bit).  Having relative timestamps
3388          * is not so great if request processing is slow, while absolute
3389          * timestamps are not ideal because they need time synchronization. */
3390         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3391         if (req == NULL)
3392                 RETURN(-ENOMEM);
3393
3394         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3395         if (rc) {
3396                 ptlrpc_request_free(req);
3397                 RETURN(rc);
3398         }
3399         ptlrpc_request_set_replen(req);
3400         req->rq_request_portal = OST_CREATE_PORTAL;
3401         ptlrpc_at_set_req_timeout(req);
3402
3403         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3404                 /* procfs requests not want stat in wait for avoid deadlock */
3405                 req->rq_no_resend = 1;
3406                 req->rq_no_delay = 1;
3407         }
3408
3409         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3410         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3411         aa = ptlrpc_req_async_args(req);
3412         aa->aa_oi = oinfo;
3413
3414         ptlrpc_set_add_req(rqset, req);
3415         RETURN(0);
3416 }
3417
3418 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3419                       __u64 max_age, __u32 flags)
3420 {
3421         struct obd_statfs     *msfs;
3422         struct ptlrpc_request *req;
3423         struct obd_import     *imp = NULL;
3424         int rc;
3425         ENTRY;
3426
3427         /*Since the request might also come from lprocfs, so we need
3428          *sync this with client_disconnect_export Bug15684*/
3429         down_read(&obd->u.cli.cl_sem);
3430         if (obd->u.cli.cl_import)
3431                 imp = class_import_get(obd->u.cli.cl_import);
3432         up_read(&obd->u.cli.cl_sem);
3433         if (!imp)
3434                 RETURN(-ENODEV);
3435
3436         /* We could possibly pass max_age in the request (as an absolute
3437          * timestamp or a "seconds.usec ago") so the target can avoid doing
3438          * extra calls into the filesystem if that isn't necessary (e.g.
3439          * during mount that would help a bit).  Having relative timestamps
3440          * is not so great if request processing is slow, while absolute
3441          * timestamps are not ideal because they need time synchronization. */
3442         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3443
3444         class_import_put(imp);
3445
3446         if (req == NULL)
3447                 RETURN(-ENOMEM);
3448
3449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3450         if (rc) {
3451                 ptlrpc_request_free(req);
3452                 RETURN(rc);
3453         }
3454         ptlrpc_request_set_replen(req);
3455         req->rq_request_portal = OST_CREATE_PORTAL;
3456         ptlrpc_at_set_req_timeout(req);
3457
3458         if (flags & OBD_STATFS_NODELAY) {
3459                 /* procfs requests not want stat in wait for avoid deadlock */
3460                 req->rq_no_resend = 1;
3461                 req->rq_no_delay = 1;
3462         }
3463
3464         rc = ptlrpc_queue_wait(req);
3465         if (rc)
3466                 GOTO(out, rc);
3467
3468         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3469         if (msfs == NULL) {
3470                 GOTO(out, rc = -EPROTO);
3471         }
3472
3473         *osfs = *msfs;
3474
3475         EXIT;
3476  out:
3477         ptlrpc_req_finished(req);
3478         return rc;
3479 }
3480
3481 /* Retrieve object striping information.
3482  *
3483  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3484  * the maximum number of OST indices which will fit in the user buffer.
3485  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3486  */
3487 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3488 {
3489         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3490         struct lov_user_md_v3 lum, *lumk;
3491         struct lov_user_ost_data_v1 *lmm_objects;
3492         int rc = 0, lum_size;
3493         ENTRY;
3494
3495         if (!lsm)
3496                 RETURN(-ENODATA);
3497
3498         /* we only need the header part from user space to get lmm_magic and
3499          * lmm_stripe_count, (the header part is common to v1 and v3) */
3500         lum_size = sizeof(struct lov_user_md_v1);
3501         if (copy_from_user(&lum, lump, lum_size))
3502                 RETURN(-EFAULT);
3503
3504         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3505             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3506                 RETURN(-EINVAL);
3507
3508         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3509         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3510         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3511         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3512
3513         /* we can use lov_mds_md_size() to compute lum_size
3514          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3515         if (lum.lmm_stripe_count > 0) {
3516                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3517                 OBD_ALLOC(lumk, lum_size);
3518                 if (!lumk)
3519                         RETURN(-ENOMEM);
3520
3521                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3522                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3523                 else
3524                         lmm_objects = &(lumk->lmm_objects[0]);
3525                 lmm_objects->l_object_id = lsm->lsm_object_id;
3526         } else {
3527                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3528                 lumk = &lum;
3529         }
3530
3531         lumk->lmm_object_id = lsm->lsm_object_id;
3532         lumk->lmm_object_gr = lsm->lsm_object_gr;
3533         lumk->lmm_stripe_count = 1;
3534
3535         if (copy_to_user(lump, lumk, lum_size))
3536                 rc = -EFAULT;
3537
3538         if (lumk != &lum)
3539                 OBD_FREE(lumk, lum_size);
3540
3541         RETURN(rc);
3542 }
3543
3544
3545 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3546                          void *karg, void *uarg)
3547 {
3548         struct obd_device *obd = exp->exp_obd;
3549         struct obd_ioctl_data *data = karg;
3550         int err = 0;
3551         ENTRY;
3552
3553         if (!try_module_get(THIS_MODULE)) {
3554                 CERROR("Can't get module. Is it alive?");
3555                 return -EINVAL;
3556         }
3557         switch (cmd) {
3558         case OBD_IOC_LOV_GET_CONFIG: {
3559                 char *buf;
3560                 struct lov_desc *desc;
3561                 struct obd_uuid uuid;
3562
3563                 buf = NULL;
3564                 len = 0;
3565                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3566                         GOTO(out, err = -EINVAL);
3567
3568                 data = (struct obd_ioctl_data *)buf;
3569
3570                 if (sizeof(*desc) > data->ioc_inllen1) {
3571                         obd_ioctl_freedata(buf, len);
3572                         GOTO(out, err = -EINVAL);
3573                 }
3574
3575                 if (data->ioc_inllen2 < sizeof(uuid)) {
3576                         obd_ioctl_freedata(buf, len);
3577                         GOTO(out, err = -EINVAL);
3578                 }
3579
3580                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3581                 desc->ld_tgt_count = 1;
3582                 desc->ld_active_tgt_count = 1;
3583                 desc->ld_default_stripe_count = 1;
3584                 desc->ld_default_stripe_size = 0;
3585                 desc->ld_default_stripe_offset = 0;
3586                 desc->ld_pattern = 0;
3587                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3588
3589                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3590
3591                 err = copy_to_user((void *)uarg, buf, len);
3592                 if (err)
3593                         err = -EFAULT;
3594                 obd_ioctl_freedata(buf, len);
3595                 GOTO(out, err);
3596         }
3597         case LL_IOC_LOV_SETSTRIPE:
3598                 err = obd_alloc_memmd(exp, karg);
3599                 if (err > 0)
3600                         err = 0;
3601                 GOTO(out, err);
3602         case LL_IOC_LOV_GETSTRIPE:
3603                 err = osc_getstripe(karg, uarg);
3604                 GOTO(out, err);
3605         case OBD_IOC_CLIENT_RECOVER:
3606                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3607                                             data->ioc_inlbuf1);
3608                 if (err > 0)
3609                         err = 0;
3610                 GOTO(out, err);
3611         case IOC_OSC_SET_ACTIVE:
3612                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3613                                                data->ioc_offset);
3614                 GOTO(out, err);
3615         case OBD_IOC_POLL_QUOTACHECK:
3616                 err = lquota_poll_check(quota_interface, exp,
3617                                         (struct if_quotacheck *)karg);
3618                 GOTO(out, err);
3619         case OBD_IOC_PING_TARGET:
3620                 err = ptlrpc_obd_ping(obd);
3621                 GOTO(out, err);
3622         default:
3623                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3624                        cmd, cfs_curproc_comm());
3625                 GOTO(out, err = -ENOTTY);
3626         }
3627 out:
3628         module_put(THIS_MODULE);
3629         return err;
3630 }
3631
3632 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3633                         void *key, __u32 *vallen, void *val,
3634                         struct lov_stripe_md *lsm)
3635 {
3636         ENTRY;
3637         if (!vallen || !val)
3638                 RETURN(-EFAULT);
3639
3640         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3641                 __u32 *stripe = val;
3642                 *vallen = sizeof(*stripe);
3643                 *stripe = 0;
3644                 RETURN(0);
3645         } else if (KEY_IS(KEY_LAST_ID)) {
3646                 struct ptlrpc_request *req;
3647                 obd_id                *reply;
3648                 char                  *tmp;
3649                 int                    rc;
3650
3651                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3652                                            &RQF_OST_GET_INFO_LAST_ID);
3653                 if (req == NULL)
3654                         RETURN(-ENOMEM);
3655
3656                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3657                                      RCL_CLIENT, keylen);
3658                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3659                 if (rc) {
3660                         ptlrpc_request_free(req);
3661                         RETURN(rc);
3662                 }
3663
3664                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3665                 memcpy(tmp, key, keylen);
3666
3667                 ptlrpc_request_set_replen(req);
3668                 rc = ptlrpc_queue_wait(req);
3669                 if (rc)
3670                         GOTO(out, rc);
3671
3672                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3673                 if (reply == NULL)
3674                         GOTO(out, rc = -EPROTO);
3675
3676                 *((obd_id *)val) = *reply;
3677         out:
3678                 ptlrpc_req_finished(req);
3679                 RETURN(rc);
3680         } else if (KEY_IS(KEY_FIEMAP)) {
3681                 struct ptlrpc_request *req;
3682                 struct ll_user_fiemap *reply;
3683                 char *tmp;
3684                 int rc;
3685
3686                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3687                                            &RQF_OST_GET_INFO_FIEMAP);
3688                 if (req == NULL)
3689                         RETURN(-ENOMEM);
3690
3691                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3692                                      RCL_CLIENT, keylen);
3693                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3694                                      RCL_CLIENT, *vallen);
3695                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3696                                      RCL_SERVER, *vallen);
3697
3698                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3699                 if (rc) {
3700                         ptlrpc_request_free(req);
3701                         RETURN(rc);
3702                 }
3703
3704                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3705                 memcpy(tmp, key, keylen);
3706                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3707                 memcpy(tmp, val, *vallen);
3708
3709                 ptlrpc_request_set_replen(req);
3710                 rc = ptlrpc_queue_wait(req);
3711                 if (rc)
3712                         GOTO(out1, rc);
3713
3714                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3715                 if (reply == NULL)
3716                         GOTO(out1, rc = -EPROTO);
3717
3718                 memcpy(val, reply, *vallen);
3719         out1:
3720                 ptlrpc_req_finished(req);
3721
3722                 RETURN(rc);
3723         }
3724
3725         RETURN(-EINVAL);
3726 }
3727
3728 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3729                                           struct ptlrpc_request *req,
3730                                           void *aa, int rc)
3731 {
3732         struct llog_ctxt *ctxt;
3733         struct obd_import *imp = req->rq_import;
3734         ENTRY;
3735
3736         if (rc != 0)
3737                 RETURN(rc);
3738
3739         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3740         if (ctxt) {
3741                 if (rc == 0)
3742                         rc = llog_initiator_connect(ctxt);
3743                 else
3744                         CERROR("cannot establish connection for "
3745                                "ctxt %p: %d\n", ctxt, rc);
3746         }
3747
3748         llog_ctxt_put(ctxt);
3749         spin_lock(&imp->imp_lock);
3750         imp->imp_server_timeout = 1;
3751         imp->imp_pingable = 1;
3752         spin_unlock(&imp->imp_lock);
3753         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3754
3755         RETURN(rc);
3756 }
3757
3758 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3759                               void *key, obd_count vallen, void *val,
3760                               struct ptlrpc_request_set *set)
3761 {
3762         struct ptlrpc_request *req;
3763         struct obd_device     *obd = exp->exp_obd;
3764         struct obd_import     *imp = class_exp2cliimp(exp);
3765         char                  *tmp;
3766         int                    rc;
3767         ENTRY;
3768
3769         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3770
3771         if (KEY_IS(KEY_NEXT_ID)) {
3772                 if (vallen != sizeof(obd_id))
3773                         RETURN(-ERANGE);
3774                 if (val == NULL)
3775                         RETURN(-EINVAL);
3776                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3777                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3778                        exp->exp_obd->obd_name,
3779                        obd->u.cli.cl_oscc.oscc_next_id);
3780
3781                 RETURN(0);
3782         }
3783
3784         if (KEY_IS(KEY_UNLINKED)) {
3785                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3786                 spin_lock(&oscc->oscc_lock);
3787                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3788                 spin_unlock(&oscc->oscc_lock);
3789                 RETURN(0);
3790         }
3791
3792         if (KEY_IS(KEY_INIT_RECOV)) {
3793                 if (vallen != sizeof(int))
3794                         RETURN(-EINVAL);
3795                 spin_lock(&imp->imp_lock);
3796                 imp->imp_initial_recov = *(int *)val;
3797                 spin_unlock(&imp->imp_lock);
3798                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3799                        exp->exp_obd->obd_name,
3800                        imp->imp_initial_recov);
3801                 RETURN(0);
3802         }
3803
3804         if (KEY_IS(KEY_CHECKSUM)) {
3805                 if (vallen != sizeof(int))
3806                         RETURN(-EINVAL);
3807                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3808                 RETURN(0);
3809         }
3810
3811         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3812                 sptlrpc_conf_client_adapt(obd);
3813                 RETURN(0);
3814         }
3815
3816         if (KEY_IS(KEY_FLUSH_CTX)) {
3817                 sptlrpc_import_flush_my_ctx(imp);
3818                 RETURN(0);
3819         }
3820
3821         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3822                 RETURN(-EINVAL);
3823
3824         /* We pass all other commands directly to OST. Since nobody calls osc
3825            methods directly and everybody is supposed to go through LOV, we
3826            assume lov checked invalid values for us.
3827            The only recognised values so far are evict_by_nid and mds_conn.
3828            Even if something bad goes through, we'd get a -EINVAL from OST
3829            anyway. */
3830
3831         if (KEY_IS(KEY_GRANT_SHRINK))  
3832                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
3833         else 
3834                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3835         
3836         if (req == NULL)
3837                 RETURN(-ENOMEM);
3838
3839         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3840                              RCL_CLIENT, keylen);
3841         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3842                              RCL_CLIENT, vallen);
3843         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3844         if (rc) {
3845                 ptlrpc_request_free(req);
3846                 RETURN(rc);
3847         }
3848
3849         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3850         memcpy(tmp, key, keylen);
3851         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3852         memcpy(tmp, val, vallen);
3853
3854         if (KEY_IS(KEY_MDS_CONN)) {
3855                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3856
3857                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3858                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3859                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3860                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3861         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3862                 struct osc_grant_args *aa;
3863                 struct obdo *oa;
3864
3865                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3866                 aa = ptlrpc_req_async_args(req);
3867                 OBD_ALLOC_PTR(oa);
3868                 if (!oa) {
3869                         ptlrpc_req_finished(req);
3870                         RETURN(-ENOMEM);
3871                 }
3872                 *oa = ((struct ost_body *)val)->oa;
3873                 aa->aa_oa = oa;
3874                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3875         }
3876         
3877         ptlrpc_request_set_replen(req);
3878         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3879                 LASSERT(set != NULL);
3880                 ptlrpc_set_add_req(set, req);
3881                 ptlrpc_check_set(NULL, set);
3882         } else 
3883                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3884         
3885         RETURN(0);
3886 }
3887
3888
3889 static struct llog_operations osc_size_repl_logops = {
3890         lop_cancel: llog_obd_repl_cancel
3891 };
3892
3893 static struct llog_operations osc_mds_ost_orig_logops;
3894 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3895                          struct obd_device *tgt, int count,
3896                          struct llog_catid *catid, struct obd_uuid *uuid)
3897 {
3898         int rc;
3899         ENTRY;
3900
3901         LASSERT(olg == &obd->obd_olg);
3902         spin_lock(&obd->obd_dev_lock);
3903         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3904                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3905                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3906                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3907                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3908                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3909         }
3910         spin_unlock(&obd->obd_dev_lock);
3911
3912         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3913                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3914         if (rc) {
3915                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3916                 GOTO(out, rc);
3917         }
3918
3919         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3920                         NULL, &osc_size_repl_logops);
3921         if (rc) {
3922                 struct llog_ctxt *ctxt =
3923                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3924                 if (ctxt)
3925                         llog_cleanup(ctxt);
3926                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3927         }
3928         GOTO(out, rc);
3929 out:
3930         if (rc) {
3931                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3932                        obd->obd_name, tgt->obd_name, count, catid, rc);
3933                 CERROR("logid "LPX64":0x%x\n",
3934                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3935         }
3936         return rc;
3937 }
3938
3939 static int osc_llog_finish(struct obd_device *obd, int count)
3940 {
3941         struct llog_ctxt *ctxt;
3942         int rc = 0, rc2 = 0;
3943         ENTRY;
3944
3945         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3946         if (ctxt)
3947                 rc = llog_cleanup(ctxt);
3948
3949         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3950         if (ctxt)
3951                 rc2 = llog_cleanup(ctxt);
3952         if (!rc)
3953                 rc = rc2;
3954
3955         RETURN(rc);
3956 }
3957
3958 static int osc_reconnect(const struct lu_env *env,
3959                          struct obd_export *exp, struct obd_device *obd,
3960                          struct obd_uuid *cluuid,
3961                          struct obd_connect_data *data,
3962                          void *localdata)
3963 {
3964         struct client_obd *cli = &obd->u.cli;
3965
3966         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3967                 long lost_grant;
3968
3969                 client_obd_list_lock(&cli->cl_loi_list_lock);
3970                 data->ocd_grant = cli->cl_avail_grant ?:
3971                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3972                 lost_grant = cli->cl_lost_grant;
3973                 cli->cl_lost_grant = 0;
3974                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3975
3976                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3977                        "cl_lost_grant: %ld\n", data->ocd_grant,
3978                        cli->cl_avail_grant, lost_grant);
3979                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3980                        " ocd_grant: %d\n", data->ocd_connect_flags,
3981                        data->ocd_version, data->ocd_grant);
3982         }
3983
3984         RETURN(0);
3985 }
3986
3987 static int osc_disconnect(struct obd_export *exp)
3988 {
3989         struct obd_device *obd = class_exp2obd(exp);
3990         struct llog_ctxt  *ctxt;
3991         int rc;
3992
3993         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3994         if (ctxt) {
3995                 if (obd->u.cli.cl_conn_count == 1) {
3996                         /* Flush any remaining cancel messages out to the
3997                          * target */
3998                         llog_sync(ctxt, exp);
3999                 }
4000                 llog_ctxt_put(ctxt);
4001         } else {
4002                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4003                        obd);
4004         }
4005
4006         rc = client_disconnect_export(exp);
4007         /**
4008          * Initially we put del_shrink_grant before disconnect_export, but it
4009          * causes the following problem if setup (connect) and cleanup
4010          * (disconnect) are tangled together.
4011          *      connect p1                     disconnect p2
4012          *   ptlrpc_connect_import 
4013          *     ...............               class_manual_cleanup
4014          *                                     osc_disconnect
4015          *                                     del_shrink_grant
4016          *   ptlrpc_connect_interrupt
4017          *     init_grant_shrink
4018          *   add this client to shrink list                 
4019          *                                      cleanup_osc
4020          * Bang! pinger trigger the shrink.
4021          * So the osc should be disconnected from the shrink list, after we
4022          * are sure the import has been destroyed. BUG18662 
4023          */
4024         if (obd->u.cli.cl_import == NULL)
4025                 osc_del_shrink_grant(&obd->u.cli);
4026         return rc;
4027 }
4028
4029 static int osc_import_event(struct obd_device *obd,
4030                             struct obd_import *imp,
4031                             enum obd_import_event event)
4032 {
4033         struct client_obd *cli;
4034         int rc = 0;
4035
4036         ENTRY;
4037         LASSERT(imp->imp_obd == obd);
4038
4039         switch (event) {
4040         case IMP_EVENT_DISCON: {
4041                 /* Only do this on the MDS OSC's */
4042                 if (imp->imp_server_timeout) {
4043                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4044
4045                         spin_lock(&oscc->oscc_lock);
4046                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4047                         spin_unlock(&oscc->oscc_lock);
4048                 }
4049                 cli = &obd->u.cli;
4050                 client_obd_list_lock(&cli->cl_loi_list_lock);
4051                 cli->cl_avail_grant = 0;
4052                 cli->cl_lost_grant = 0;
4053                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4054                 break;
4055         }
4056         case IMP_EVENT_INACTIVE: {
4057                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4058                 break;
4059         }
4060         case IMP_EVENT_INVALIDATE: {
4061                 struct ldlm_namespace *ns = obd->obd_namespace;
4062                 struct lu_env         *env;
4063                 int                    refcheck;
4064
4065                 env = cl_env_get(&refcheck);
4066                 if (!IS_ERR(env)) {
4067                         /* Reset grants */
4068                         cli = &obd->u.cli;
4069                         client_obd_list_lock(&cli->cl_loi_list_lock);
4070                         /* all pages go to failing rpcs due to the invalid
4071                          * import */
4072                         osc_check_rpcs(env, cli);
4073                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4074
4075                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4076                         cl_env_put(env, &refcheck);
4077                 } else
4078                         rc = PTR_ERR(env);
4079                 break;
4080         }
4081         case IMP_EVENT_ACTIVE: {
4082                 /* Only do this on the MDS OSC's */
4083                 if (imp->imp_server_timeout) {
4084                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4085
4086                         spin_lock(&oscc->oscc_lock);
4087                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4088                         spin_unlock(&oscc->oscc_lock);
4089                 }
4090                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4091                 break;
4092         }
4093         case IMP_EVENT_OCD: {
4094                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4095
4096                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4097                         osc_init_grant(&obd->u.cli, ocd);
4098
4099                 /* See bug 7198 */
4100                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4101                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4102
4103                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4104                 break;
4105         }
4106         default:
4107                 CERROR("Unknown import event %d\n", event);
4108                 LBUG();
4109         }
4110         RETURN(rc);
4111 }
4112
4113 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4114 {
4115         int rc;
4116         ENTRY;
4117
4118         ENTRY;
4119         rc = ptlrpcd_addref();
4120         if (rc)
4121                 RETURN(rc);
4122
4123         rc = client_obd_setup(obd, lcfg);
4124         if (rc) {
4125                 ptlrpcd_decref();
4126         } else {
4127                 struct lprocfs_static_vars lvars = { 0 };
4128                 struct client_obd *cli = &obd->u.cli;
4129
4130                 lprocfs_osc_init_vars(&lvars);
4131                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4132                         lproc_osc_attach_seqstat(obd);
4133                         sptlrpc_lprocfs_cliobd_attach(obd);
4134                         ptlrpc_lprocfs_register_obd(obd);
4135                 }
4136
4137                 oscc_init(obd);
4138                 /* We need to allocate a few requests more, because
4139                    brw_interpret tries to create new requests before freeing
4140                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4141                    reserved, but I afraid that might be too much wasted RAM
4142                    in fact, so 2 is just my guess and still should work. */
4143                 cli->cl_import->imp_rq_pool =
4144                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4145                                             OST_MAXREQSIZE,
4146                                             ptlrpc_add_rqs_to_pool);
4147                 
4148                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4149                 sema_init(&cli->cl_grant_sem, 1);
4150         }
4151
4152         RETURN(rc);
4153 }
4154
4155 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4156 {
4157         int rc = 0;
4158         ENTRY;
4159
4160         switch (stage) {
4161         case OBD_CLEANUP_EARLY: {
4162                 struct obd_import *imp;
4163                 imp = obd->u.cli.cl_import;
4164                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4165                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4166                 ptlrpc_deactivate_import(imp);
4167                 spin_lock(&imp->imp_lock);
4168                 imp->imp_pingable = 0;
4169                 spin_unlock(&imp->imp_lock);
4170                 break;
4171         }
4172         case OBD_CLEANUP_EXPORTS: {
4173                 /* If we set up but never connected, the
4174                    client import will not have been cleaned. */
4175                 if (obd->u.cli.cl_import) {
4176                         struct obd_import *imp;
4177                         down_write(&obd->u.cli.cl_sem);
4178                         imp = obd->u.cli.cl_import;
4179                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4180                                obd->obd_name);
4181                         ptlrpc_invalidate_import(imp);
4182                         if (imp->imp_rq_pool) {
4183                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4184                                 imp->imp_rq_pool = NULL;
4185                         }
4186                         class_destroy_import(imp);
4187                         up_write(&obd->u.cli.cl_sem);
4188                         obd->u.cli.cl_import = NULL;
4189                 }
4190                 rc = obd_llog_finish(obd, 0);
4191                 if (rc != 0)
4192                         CERROR("failed to cleanup llogging subsystems\n");
4193                 break;
4194                 }
4195         }
4196         RETURN(rc);
4197 }
4198
4199 int osc_cleanup(struct obd_device *obd)
4200 {
4201         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4202         int rc;
4203
4204         ENTRY;
4205         ptlrpc_lprocfs_unregister_obd(obd);
4206         lprocfs_obd_cleanup(obd);
4207
4208         spin_lock(&oscc->oscc_lock);
4209         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4210         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4211         spin_unlock(&oscc->oscc_lock);
4212
4213         /* free memory of osc quota cache */
4214         lquota_cleanup(quota_interface, obd);
4215
4216         rc = client_obd_cleanup(obd);
4217
4218         ptlrpcd_decref();
4219         RETURN(rc);
4220 }
4221
4222 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4223 {
4224         struct lprocfs_static_vars lvars = { 0 };
4225         int rc = 0;
4226
4227         lprocfs_osc_init_vars(&lvars);
4228
4229         switch (lcfg->lcfg_command) {
4230         default:
4231                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4232                                               lcfg, obd);
4233                 if (rc > 0)
4234                         rc = 0;
4235                 break;
4236         }
4237
4238         return(rc);
4239 }
4240
4241 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4242 {
4243         return osc_process_config_base(obd, buf);
4244 }
4245
4246 struct obd_ops osc_obd_ops = {
4247         .o_owner                = THIS_MODULE,
4248         .o_setup                = osc_setup,
4249         .o_precleanup           = osc_precleanup,
4250         .o_cleanup              = osc_cleanup,
4251         .o_add_conn             = client_import_add_conn,
4252         .o_del_conn             = client_import_del_conn,
4253         .o_connect              = client_connect_import,
4254         .o_reconnect            = osc_reconnect,
4255         .o_disconnect           = osc_disconnect,
4256         .o_statfs               = osc_statfs,
4257         .o_statfs_async         = osc_statfs_async,
4258         .o_packmd               = osc_packmd,
4259         .o_unpackmd             = osc_unpackmd,
4260         .o_precreate            = osc_precreate,
4261         .o_create               = osc_create,
4262         .o_destroy              = osc_destroy,
4263         .o_getattr              = osc_getattr,
4264         .o_getattr_async        = osc_getattr_async,
4265         .o_setattr              = osc_setattr,
4266         .o_setattr_async        = osc_setattr_async,
4267         .o_brw                  = osc_brw,
4268         .o_punch                = osc_punch,
4269         .o_sync                 = osc_sync,
4270         .o_enqueue              = osc_enqueue,
4271         .o_change_cbdata        = osc_change_cbdata,
4272         .o_cancel               = osc_cancel,
4273         .o_cancel_unused        = osc_cancel_unused,
4274         .o_iocontrol            = osc_iocontrol,
4275         .o_get_info             = osc_get_info,
4276         .o_set_info_async       = osc_set_info_async,
4277         .o_import_event         = osc_import_event,
4278         .o_llog_init            = osc_llog_init,
4279         .o_llog_finish          = osc_llog_finish,
4280         .o_process_config       = osc_process_config,
4281 };
4282
4283 extern struct lu_kmem_descr  osc_caches[];
4284 extern spinlock_t            osc_ast_guard;
4285 extern struct lock_class_key osc_ast_guard_class;
4286
4287 int __init osc_init(void)
4288 {
4289         struct lprocfs_static_vars lvars = { 0 };
4290         int rc;
4291         ENTRY;
4292
4293         /* print an address of _any_ initialized kernel symbol from this
4294          * module, to allow debugging with gdb that doesn't support data
4295          * symbols from modules.*/
4296         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4297
4298         rc = lu_kmem_init(osc_caches);
4299
4300         lprocfs_osc_init_vars(&lvars);
4301
4302         request_module("lquota");
4303         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4304         lquota_init(quota_interface);
4305         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4306
4307         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4308                                  LUSTRE_OSC_NAME, &osc_device_type);
4309         if (rc) {
4310                 if (quota_interface)
4311                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4312                 lu_kmem_fini(osc_caches);
4313                 RETURN(rc);
4314         }
4315
4316         spin_lock_init(&osc_ast_guard);
4317         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4318
4319         RETURN(rc);
4320 }
4321
4322 #ifdef __KERNEL__
4323 static void /*__exit*/ osc_exit(void)
4324 {
4325         lu_device_type_fini(&osc_device_type);
4326
4327         lquota_exit(quota_interface);
4328         if (quota_interface)
4329                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4330
4331         class_unregister_type(LUSTRE_OSC_NAME);
4332         lu_kmem_fini(osc_caches);
4333 }
4334
4335 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4336 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4337 MODULE_LICENSE("GPL");
4338
4339 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4340 #endif