Whamcloud - gitweb
6c9fcab35af947e268401b2f8a29ee7148d75d56
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         int time = GRANT_SHRINK_INTERVAL;
804         cli->cl_next_shrink_grant = cfs_time_shift(time);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
833                 EXIT;
834                 return;
835         }
836
837         pga->flag &= ~OBD_BRW_FROM_GRANT;
838         atomic_dec(&obd_dirty_pages);
839         cli->cl_dirty -= CFS_PAGE_SIZE;
840         if (pga->flag & OBD_BRW_NOCACHE) {
841                 pga->flag &= ~OBD_BRW_NOCACHE;
842                 atomic_dec(&obd_dirty_transit_pages);
843                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
844         }
845         if (!sent) {
846                 cli->cl_lost_grant += CFS_PAGE_SIZE;
847                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850                 /* For short writes we shouldn't count parts of pages that
851                  * span a whole block on the OST side, or our accounting goes
852                  * wrong.  Should match the code in filter_grant_check. */
853                 int offset = pga->off & ~CFS_PAGE_MASK;
854                 int count = pga->count + (offset & (blocksize - 1));
855                 int end = (offset + pga->count) & (blocksize - 1);
856                 if (end)
857                         count += blocksize - end;
858
859                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862                        cli->cl_avail_grant, cli->cl_dirty);
863         }
864
865         EXIT;
866 }
867
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
869 {
870         return cli->cl_r_in_flight + cli->cl_w_in_flight;
871 }
872
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
875 {
876         struct list_head *l, *tmp;
877         struct osc_cache_waiter *ocw;
878
879         ENTRY;
880         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881                 /* if we can't dirty more, we must wait until some is written */
882                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885                                "osc max %ld, sys max %d\n", cli->cl_dirty,
886                                cli->cl_dirty_max, obd_max_dirty_pages);
887                         return;
888                 }
889
890                 /* if still dirty cache but no grant wait for pending RPCs that
891                  * may yet return us some grant before doing sync writes */
892                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894                                cli->cl_w_in_flight);
895                         return;
896                 }
897
898                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899                 list_del_init(&ocw->ocw_entry);
900                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         /* no more RPCs in flight to return grant, do sync IO */
902                         ocw->ocw_rc = -EDQUOT;
903                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
904                 } else {
905                         osc_consume_write_grant(cli,
906                                                 &ocw->ocw_oap->oap_brw_page);
907                 }
908
909                 cfs_waitq_signal(&ocw->ocw_waitq);
910         }
911
912         EXIT;
913 }
914
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919         if (body->oa.o_valid & OBD_MD_FLGRANT)
920                 cli->cl_avail_grant += body->oa.o_grant;
921         /* waiters are woken in brw_interpret */
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936         
937         if (rc != 0) {
938                 client_obd_list_lock(&cli->cl_loi_list_lock);
939                 cli->cl_avail_grant += oa->o_grant;
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 GOTO(out, rc);
942         }
943
944         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
945         LASSERT(body);
946         osc_update_grant(cli, body);
947 out:
948         OBD_FREE_PTR(oa);
949         return rc;        
950 }
951
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
953 {
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         oa->o_grant = cli->cl_avail_grant / 4;
956         cli->cl_avail_grant -= oa->o_grant; 
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         oa->o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960 }
961
962 static int osc_shrink_grant(struct client_obd *cli)
963 {
964         int    rc = 0;
965         struct ost_body     *body;
966         ENTRY;
967
968         OBD_ALLOC_PTR(body);
969         if (!body)
970                 RETURN(-ENOMEM);
971
972         osc_announce_cached(cli, &body->oa, 0);
973         osc_shrink_grant_local(cli, &body->oa);
974         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976                                 sizeof(*body), body, NULL);
977         if (rc) {
978                 client_obd_list_lock(&cli->cl_loi_list_lock);
979                 cli->cl_avail_grant += body->oa.o_grant;
980                 client_obd_list_unlock(&cli->cl_loi_list_lock);
981         }
982         if (body)
983                OBD_FREE_PTR(body);
984         RETURN(rc);
985 }
986
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
989 {
990         cfs_time_t time = cfs_time_current();
991         cfs_time_t next_shrink = client->cl_next_shrink_grant;
992         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
995                         return 1;
996                 else
997                         osc_update_next_shrink(client);
998         }
999         return 0;
1000 }
1001
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1003 {
1004         struct client_obd *client;
1005
1006         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007                 if (osc_should_shrink_grant(client))
1008                         osc_shrink_grant(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_add_shrink_grant(struct client_obd *client)
1014 {
1015         int rc;
1016
1017         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
1018                                          TIMEOUT_GRANT,
1019                                          osc_grant_shrink_grant_cb, NULL,
1020                                          &client->cl_grant_shrink_list);
1021         if (rc) {
1022                 CERROR("add grant client %s error %d\n", 
1023                         client->cl_import->imp_obd->obd_name, rc);
1024                 return rc;
1025         }
1026         CDEBUG(D_CACHE, "add grant client %s \n", 
1027                client->cl_import->imp_obd->obd_name);
1028         osc_update_next_shrink(client);
1029         return 0; 
1030 }
1031
1032 static int osc_del_shrink_grant(struct client_obd *client)
1033 {
1034         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1035 }
1036
1037 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1038 {
1039         client_obd_list_lock(&cli->cl_loi_list_lock);
1040         cli->cl_avail_grant = ocd->ocd_grant;
1041         client_obd_list_unlock(&cli->cl_loi_list_lock);
1042
1043         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1044             list_empty(&cli->cl_grant_shrink_list))
1045                 osc_add_shrink_grant(cli);
1046
1047         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1048                cli->cl_avail_grant, cli->cl_lost_grant);
1049         LASSERT(cli->cl_avail_grant >= 0);
1050 }
1051
1052 /* We assume that the reason this OSC got a short read is because it read
1053  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1054  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1055  * this stripe never got written at or beyond this stripe offset yet. */
1056 static void handle_short_read(int nob_read, obd_count page_count,
1057                               struct brw_page **pga)
1058 {
1059         char *ptr;
1060         int i = 0;
1061
1062         /* skip bytes read OK */
1063         while (nob_read > 0) {
1064                 LASSERT (page_count > 0);
1065
1066                 if (pga[i]->count > nob_read) {
1067                         /* EOF inside this page */
1068                         ptr = cfs_kmap(pga[i]->pg) +
1069                                 (pga[i]->off & ~CFS_PAGE_MASK);
1070                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1071                         cfs_kunmap(pga[i]->pg);
1072                         page_count--;
1073                         i++;
1074                         break;
1075                 }
1076
1077                 nob_read -= pga[i]->count;
1078                 page_count--;
1079                 i++;
1080         }
1081
1082         /* zero remaining pages */
1083         while (page_count-- > 0) {
1084                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1085                 memset(ptr, 0, pga[i]->count);
1086                 cfs_kunmap(pga[i]->pg);
1087                 i++;
1088         }
1089 }
1090
1091 static int check_write_rcs(struct ptlrpc_request *req,
1092                            int requested_nob, int niocount,
1093                            obd_count page_count, struct brw_page **pga)
1094 {
1095         int    *remote_rcs, i;
1096
1097         /* return error if any niobuf was in error */
1098         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1099                                         sizeof(*remote_rcs) * niocount, NULL);
1100         if (remote_rcs == NULL) {
1101                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1102                 return(-EPROTO);
1103         }
1104         if (lustre_msg_swabbed(req->rq_repmsg))
1105                 for (i = 0; i < niocount; i++)
1106                         __swab32s(&remote_rcs[i]);
1107
1108         for (i = 0; i < niocount; i++) {
1109                 if (remote_rcs[i] < 0)
1110                         return(remote_rcs[i]);
1111
1112                 if (remote_rcs[i] != 0) {
1113                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1114                                 i, remote_rcs[i], req);
1115                         return(-EPROTO);
1116                 }
1117         }
1118
1119         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1120                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1121                        req->rq_bulk->bd_nob_transferred, requested_nob);
1122                 return(-EPROTO);
1123         }
1124
1125         return (0);
1126 }
1127
1128 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1129 {
1130         if (p1->flag != p2->flag) {
1131                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1132                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1133
1134                 /* warn if we try to combine flags that we don't know to be
1135                  * safe to combine */
1136                 if ((p1->flag & mask) != (p2->flag & mask))
1137                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1138                                "same brw?\n", p1->flag, p2->flag);
1139                 return 0;
1140         }
1141
1142         return (p1->off + p1->count == p2->off);
1143 }
1144
1145 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1146                                    struct brw_page **pga, int opc,
1147                                    cksum_type_t cksum_type)
1148 {
1149         __u32 cksum;
1150         int i = 0;
1151
1152         LASSERT (pg_count > 0);
1153         cksum = init_checksum(cksum_type);
1154         while (nob > 0 && pg_count > 0) {
1155                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1156                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1157                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1158
1159                 /* corrupt the data before we compute the checksum, to
1160                  * simulate an OST->client data error */
1161                 if (i == 0 && opc == OST_READ &&
1162                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1163                         memcpy(ptr + off, "bad1", min(4, nob));
1164                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1165                 cfs_kunmap(pga[i]->pg);
1166                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1167                                off, cksum);
1168
1169                 nob -= pga[i]->count;
1170                 pg_count--;
1171                 i++;
1172         }
1173         /* For sending we only compute the wrong checksum instead
1174          * of corrupting the data so it is still correct on a redo */
1175         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1176                 cksum++;
1177
1178         return cksum;
1179 }
1180
1181 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1182                                 struct lov_stripe_md *lsm, obd_count page_count,
1183                                 struct brw_page **pga,
1184                                 struct ptlrpc_request **reqp,
1185                                 struct obd_capa *ocapa, int reserve)
1186 {
1187         struct ptlrpc_request   *req;
1188         struct ptlrpc_bulk_desc *desc;
1189         struct ost_body         *body;
1190         struct obd_ioobj        *ioobj;
1191         struct niobuf_remote    *niobuf;
1192         int niocount, i, requested_nob, opc, rc;
1193         struct osc_brw_async_args *aa;
1194         struct req_capsule      *pill;
1195         struct brw_page *pg_prev;
1196
1197         ENTRY;
1198         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1199                 RETURN(-ENOMEM); /* Recoverable */
1200         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1201                 RETURN(-EINVAL); /* Fatal */
1202
1203         if ((cmd & OBD_BRW_WRITE) != 0) {
1204                 opc = OST_WRITE;
1205                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1206                                                 cli->cl_import->imp_rq_pool,
1207                                                 &RQF_OST_BRW);
1208         } else {
1209                 opc = OST_READ;
1210                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1211         }
1212         if (req == NULL)
1213                 RETURN(-ENOMEM);
1214
1215         for (niocount = i = 1; i < page_count; i++) {
1216                 if (!can_merge_pages(pga[i - 1], pga[i]))
1217                         niocount++;
1218         }
1219
1220         pill = &req->rq_pill;
1221         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1222                              niocount * sizeof(*niobuf));
1223         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1224
1225         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1226         if (rc) {
1227                 ptlrpc_request_free(req);
1228                 RETURN(rc);
1229         }
1230         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1231         ptlrpc_at_set_req_timeout(req);
1232
1233         if (opc == OST_WRITE)
1234                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1235                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1236         else
1237                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1238                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1239
1240         if (desc == NULL)
1241                 GOTO(out, rc = -ENOMEM);
1242         /* NB request now owns desc and will free it when it gets freed */
1243
1244         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1245         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1246         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1247         LASSERT(body && ioobj && niobuf);
1248
1249         body->oa = *oa;
1250
1251         obdo_to_ioobj(oa, ioobj);
1252         ioobj->ioo_bufcnt = niocount;
1253         osc_pack_capa(req, body, ocapa);
1254         LASSERT (page_count > 0);
1255         pg_prev = pga[0];
1256         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1257                 struct brw_page *pg = pga[i];
1258
1259                 LASSERT(pg->count > 0);
1260                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1261                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1262                          pg->off, pg->count);
1263 #ifdef __linux__
1264                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1265                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1266                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1267                          i, page_count,
1268                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1269                          pg_prev->pg, page_private(pg_prev->pg),
1270                          pg_prev->pg->index, pg_prev->off);
1271 #else
1272                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1273                          "i %d p_c %u\n", i, page_count);
1274 #endif
1275                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1276                         (pg->flag & OBD_BRW_SRVLOCK));
1277
1278                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1279                                       pg->count);
1280                 requested_nob += pg->count;
1281
1282                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1283                         niobuf--;
1284                         niobuf->len += pg->count;
1285                 } else {
1286                         niobuf->offset = pg->off;
1287                         niobuf->len    = pg->count;
1288                         niobuf->flags  = pg->flag;
1289                 }
1290                 pg_prev = pg;
1291         }
1292
1293         LASSERTF((void *)(niobuf - niocount) ==
1294                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1295                                niocount * sizeof(*niobuf)),
1296                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1297                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1298                 (void *)(niobuf - niocount));
1299
1300         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1301         if (osc_should_shrink_grant(cli))
1302                 osc_shrink_grant_local(cli, &body->oa); 
1303
1304         /* size[REQ_REC_OFF] still sizeof (*body) */
1305         if (opc == OST_WRITE) {
1306                 if (unlikely(cli->cl_checksum) &&
1307                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1308                         /* store cl_cksum_type in a local variable since
1309                          * it can be changed via lprocfs */
1310                         cksum_type_t cksum_type = cli->cl_cksum_type;
1311
1312                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1313                                 oa->o_flags = body->oa.o_flags = 0;
1314                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1315                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1316                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1317                                                              page_count, pga,
1318                                                              OST_WRITE,
1319                                                              cksum_type);
1320                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1321                                body->oa.o_cksum);
1322                         /* save this in 'oa', too, for later checking */
1323                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1324                         oa->o_flags |= cksum_type_pack(cksum_type);
1325                 } else {
1326                         /* clear out the checksum flag, in case this is a
1327                          * resend but cl_checksum is no longer set. b=11238 */
1328                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1329                 }
1330                 oa->o_cksum = body->oa.o_cksum;
1331                 /* 1 RC per niobuf */
1332                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1333                                      sizeof(__u32) * niocount);
1334         } else {
1335                 if (unlikely(cli->cl_checksum) &&
1336                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1337                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1338                                 body->oa.o_flags = 0;
1339                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1340                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1341                 }
1342                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1343                 /* 1 RC for the whole I/O */
1344         }
1345         ptlrpc_request_set_replen(req);
1346
1347         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1348         aa = ptlrpc_req_async_args(req);
1349         aa->aa_oa = oa;
1350         aa->aa_requested_nob = requested_nob;
1351         aa->aa_nio_count = niocount;
1352         aa->aa_page_count = page_count;
1353         aa->aa_resends = 0;
1354         aa->aa_ppga = pga;
1355         aa->aa_cli = cli;
1356         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1357         if (ocapa && reserve)
1358                 aa->aa_ocapa = capa_get(ocapa);
1359
1360         *reqp = req;
1361         RETURN(0);
1362
1363  out:
1364         ptlrpc_req_finished(req);
1365         RETURN(rc);
1366 }
1367
1368 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1369                                 __u32 client_cksum, __u32 server_cksum, int nob,
1370                                 obd_count page_count, struct brw_page **pga,
1371                                 cksum_type_t client_cksum_type)
1372 {
1373         __u32 new_cksum;
1374         char *msg;
1375         cksum_type_t cksum_type;
1376
1377         if (server_cksum == client_cksum) {
1378                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1379                 return 0;
1380         }
1381
1382         if (oa->o_valid & OBD_MD_FLFLAGS)
1383                 cksum_type = cksum_type_unpack(oa->o_flags);
1384         else
1385                 cksum_type = OBD_CKSUM_CRC32;
1386
1387         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1388                                       cksum_type);
1389
1390         if (cksum_type != client_cksum_type)
1391                 msg = "the server did not use the checksum type specified in "
1392                       "the original request - likely a protocol problem";
1393         else if (new_cksum == server_cksum)
1394                 msg = "changed on the client after we checksummed it - "
1395                       "likely false positive due to mmap IO (bug 11742)";
1396         else if (new_cksum == client_cksum)
1397                 msg = "changed in transit before arrival at OST";
1398         else
1399                 msg = "changed in transit AND doesn't match the original - "
1400                       "likely false positive due to mmap IO (bug 11742)";
1401
1402         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1403                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1404                            "["LPU64"-"LPU64"]\n",
1405                            msg, libcfs_nid2str(peer->nid),
1406                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1407                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1408                                                         (__u64)0,
1409                            oa->o_id,
1410                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1411                            pga[0]->off,
1412                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1413         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1414                "client csum now %x\n", client_cksum, client_cksum_type,
1415                server_cksum, cksum_type, new_cksum);
1416         return 1;
1417 }
1418
1419 /* Note rc enters this function as number of bytes transferred */
1420 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1421 {
1422         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1423         const lnet_process_id_t *peer =
1424                         &req->rq_import->imp_connection->c_peer;
1425         struct client_obd *cli = aa->aa_cli;
1426         struct ost_body *body;
1427         __u32 client_cksum = 0;
1428         ENTRY;
1429
1430         if (rc < 0 && rc != -EDQUOT)
1431                 RETURN(rc);
1432
1433         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1434         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1435                                   lustre_swab_ost_body);
1436         if (body == NULL) {
1437                 CDEBUG(D_INFO, "Can't unpack body\n");
1438                 RETURN(-EPROTO);
1439         }
1440
1441         /* set/clear over quota flag for a uid/gid */
1442         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1443             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1444                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1445                              body->oa.o_gid, body->oa.o_valid,
1446                              body->oa.o_flags);
1447
1448         if (rc < 0)
1449                 RETURN(rc);
1450
1451         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1452                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1453
1454         osc_update_grant(cli, body);
1455
1456         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1457                 if (rc > 0) {
1458                         CERROR("Unexpected +ve rc %d\n", rc);
1459                         RETURN(-EPROTO);
1460                 }
1461                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1462
1463                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1464                         RETURN(-EAGAIN);
1465
1466                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1467                     check_write_checksum(&body->oa, peer, client_cksum,
1468                                          body->oa.o_cksum, aa->aa_requested_nob,
1469                                          aa->aa_page_count, aa->aa_ppga,
1470                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1471                         RETURN(-EAGAIN);
1472
1473                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1474                                      aa->aa_page_count, aa->aa_ppga);
1475                 GOTO(out, rc);
1476         }
1477
1478         /* The rest of this function executes only for OST_READs */
1479
1480         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1481         if (rc < 0)
1482                 GOTO(out, rc);
1483
1484         if (rc > aa->aa_requested_nob) {
1485                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1486                        aa->aa_requested_nob);
1487                 RETURN(-EPROTO);
1488         }
1489
1490         if (rc != req->rq_bulk->bd_nob_transferred) {
1491                 CERROR ("Unexpected rc %d (%d transferred)\n",
1492                         rc, req->rq_bulk->bd_nob_transferred);
1493                 return (-EPROTO);
1494         }
1495
1496         if (rc < aa->aa_requested_nob)
1497                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1498
1499         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1500                 static int cksum_counter;
1501                 __u32      server_cksum = body->oa.o_cksum;
1502                 char      *via;
1503                 char      *router;
1504                 cksum_type_t cksum_type;
1505
1506                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1507                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1508                 else
1509                         cksum_type = OBD_CKSUM_CRC32;
1510                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1511                                                  aa->aa_ppga, OST_READ,
1512                                                  cksum_type);
1513
1514                 if (peer->nid == req->rq_bulk->bd_sender) {
1515                         via = router = "";
1516                 } else {
1517                         via = " via ";
1518                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1519                 }
1520
1521                 if (server_cksum == ~0 && rc > 0) {
1522                         CERROR("Protocol error: server %s set the 'checksum' "
1523                                "bit, but didn't send a checksum.  Not fatal, "
1524                                "but please notify on http://bugzilla.lustre.org/\n",
1525                                libcfs_nid2str(peer->nid));
1526                 } else if (server_cksum != client_cksum) {
1527                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1528                                            "%s%s%s inum "LPU64"/"LPU64" object "
1529                                            LPU64"/"LPU64" extent "
1530                                            "["LPU64"-"LPU64"]\n",
1531                                            req->rq_import->imp_obd->obd_name,
1532                                            libcfs_nid2str(peer->nid),
1533                                            via, router,
1534                                            body->oa.o_valid & OBD_MD_FLFID ?
1535                                                 body->oa.o_fid : (__u64)0,
1536                                            body->oa.o_valid & OBD_MD_FLFID ?
1537                                                 body->oa.o_generation :(__u64)0,
1538                                            body->oa.o_id,
1539                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1540                                                 body->oa.o_gr : (__u64)0,
1541                                            aa->aa_ppga[0]->off,
1542                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1543                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1544                                                                         1);
1545                         CERROR("client %x, server %x, cksum_type %x\n",
1546                                client_cksum, server_cksum, cksum_type);
1547                         cksum_counter = 0;
1548                         aa->aa_oa->o_cksum = client_cksum;
1549                         rc = -EAGAIN;
1550                 } else {
1551                         cksum_counter++;
1552                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1553                         rc = 0;
1554                 }
1555         } else if (unlikely(client_cksum)) {
1556                 static int cksum_missed;
1557
1558                 cksum_missed++;
1559                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1560                         CERROR("Checksum %u requested from %s but not sent\n",
1561                                cksum_missed, libcfs_nid2str(peer->nid));
1562         } else {
1563                 rc = 0;
1564         }
1565 out:
1566         if (rc >= 0)
1567                 *aa->aa_oa = body->oa;
1568
1569         RETURN(rc);
1570 }
1571
1572 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1573                             struct lov_stripe_md *lsm,
1574                             obd_count page_count, struct brw_page **pga,
1575                             struct obd_capa *ocapa)
1576 {
1577         struct ptlrpc_request *req;
1578         int                    rc;
1579         cfs_waitq_t            waitq;
1580         int                    resends = 0;
1581         struct l_wait_info     lwi;
1582
1583         ENTRY;
1584
1585         cfs_waitq_init(&waitq);
1586
1587 restart_bulk:
1588         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1589                                   page_count, pga, &req, ocapa, 0);
1590         if (rc != 0)
1591                 return (rc);
1592
1593         rc = ptlrpc_queue_wait(req);
1594
1595         if (rc == -ETIMEDOUT && req->rq_resend) {
1596                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1597                 ptlrpc_req_finished(req);
1598                 goto restart_bulk;
1599         }
1600
1601         rc = osc_brw_fini_request(req, rc);
1602
1603         ptlrpc_req_finished(req);
1604         if (osc_recoverable_error(rc)) {
1605                 resends++;
1606                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1607                         CERROR("too many resend retries, returning error\n");
1608                         RETURN(-EIO);
1609                 }
1610
1611                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1612                 l_wait_event(waitq, 0, &lwi);
1613
1614                 goto restart_bulk;
1615         }
1616
1617         RETURN (rc);
1618 }
1619
1620 int osc_brw_redo_request(struct ptlrpc_request *request,
1621                          struct osc_brw_async_args *aa)
1622 {
1623         struct ptlrpc_request *new_req;
1624         struct ptlrpc_request_set *set = request->rq_set;
1625         struct osc_brw_async_args *new_aa;
1626         struct osc_async_page *oap;
1627         int rc = 0;
1628         ENTRY;
1629
1630         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1631                 CERROR("too many resend retries, returning error\n");
1632                 RETURN(-EIO);
1633         }
1634
1635         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1636
1637         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1638                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1639                                   aa->aa_cli, aa->aa_oa,
1640                                   NULL /* lsm unused by osc currently */,
1641                                   aa->aa_page_count, aa->aa_ppga,
1642                                   &new_req, aa->aa_ocapa, 0);
1643         if (rc)
1644                 RETURN(rc);
1645
1646         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1647
1648         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1649                 if (oap->oap_request != NULL) {
1650                         LASSERTF(request == oap->oap_request,
1651                                  "request %p != oap_request %p\n",
1652                                  request, oap->oap_request);
1653                         if (oap->oap_interrupted) {
1654                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1655                                 ptlrpc_req_finished(new_req);
1656                                 RETURN(-EINTR);
1657                         }
1658                 }
1659         }
1660         /* New request takes over pga and oaps from old request.
1661          * Note that copying a list_head doesn't work, need to move it... */
1662         aa->aa_resends++;
1663         new_req->rq_interpret_reply = request->rq_interpret_reply;
1664         new_req->rq_async_args = request->rq_async_args;
1665         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1666
1667         new_aa = ptlrpc_req_async_args(new_req);
1668
1669         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1670         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1671         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1672
1673         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1674                 if (oap->oap_request) {
1675                         ptlrpc_req_finished(oap->oap_request);
1676                         oap->oap_request = ptlrpc_request_addref(new_req);
1677                 }
1678         }
1679
1680         new_aa->aa_ocapa = aa->aa_ocapa;
1681         aa->aa_ocapa = NULL;
1682
1683         /* use ptlrpc_set_add_req is safe because interpret functions work
1684          * in check_set context. only one way exist with access to request
1685          * from different thread got -EINTR - this way protected with
1686          * cl_loi_list_lock */
1687         ptlrpc_set_add_req(set, new_req);
1688
1689         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1690
1691         DEBUG_REQ(D_INFO, new_req, "new request");
1692         RETURN(0);
1693 }
1694
1695 /*
1696  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1697  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1698  * fine for our small page arrays and doesn't require allocation.  its an
1699  * insertion sort that swaps elements that are strides apart, shrinking the
1700  * stride down until its '1' and the array is sorted.
1701  */
1702 static void sort_brw_pages(struct brw_page **array, int num)
1703 {
1704         int stride, i, j;
1705         struct brw_page *tmp;
1706
1707         if (num == 1)
1708                 return;
1709         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1710                 ;
1711
1712         do {
1713                 stride /= 3;
1714                 for (i = stride ; i < num ; i++) {
1715                         tmp = array[i];
1716                         j = i;
1717                         while (j >= stride && array[j - stride]->off > tmp->off) {
1718                                 array[j] = array[j - stride];
1719                                 j -= stride;
1720                         }
1721                         array[j] = tmp;
1722                 }
1723         } while (stride > 1);
1724 }
1725
1726 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1727 {
1728         int count = 1;
1729         int offset;
1730         int i = 0;
1731
1732         LASSERT (pages > 0);
1733         offset = pg[i]->off & ~CFS_PAGE_MASK;
1734
1735         for (;;) {
1736                 pages--;
1737                 if (pages == 0)         /* that's all */
1738                         return count;
1739
1740                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1741                         return count;   /* doesn't end on page boundary */
1742
1743                 i++;
1744                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1745                 if (offset != 0)        /* doesn't start on page boundary */
1746                         return count;
1747
1748                 count++;
1749         }
1750 }
1751
1752 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1753 {
1754         struct brw_page **ppga;
1755         int i;
1756
1757         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1758         if (ppga == NULL)
1759                 return NULL;
1760
1761         for (i = 0; i < count; i++)
1762                 ppga[i] = pga + i;
1763         return ppga;
1764 }
1765
1766 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1767 {
1768         LASSERT(ppga != NULL);
1769         OBD_FREE(ppga, sizeof(*ppga) * count);
1770 }
1771
1772 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1773                    obd_count page_count, struct brw_page *pga,
1774                    struct obd_trans_info *oti)
1775 {
1776         struct obdo *saved_oa = NULL;
1777         struct brw_page **ppga, **orig;
1778         struct obd_import *imp = class_exp2cliimp(exp);
1779         struct client_obd *cli;
1780         int rc, page_count_orig;
1781         ENTRY;
1782
1783         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1784         cli = &imp->imp_obd->u.cli;
1785
1786         if (cmd & OBD_BRW_CHECK) {
1787                 /* The caller just wants to know if there's a chance that this
1788                  * I/O can succeed */
1789
1790                 if (imp->imp_invalid)
1791                         RETURN(-EIO);
1792                 RETURN(0);
1793         }
1794
1795         /* test_brw with a failed create can trip this, maybe others. */
1796         LASSERT(cli->cl_max_pages_per_rpc);
1797
1798         rc = 0;
1799
1800         orig = ppga = osc_build_ppga(pga, page_count);
1801         if (ppga == NULL)
1802                 RETURN(-ENOMEM);
1803         page_count_orig = page_count;
1804
1805         sort_brw_pages(ppga, page_count);
1806         while (page_count) {
1807                 obd_count pages_per_brw;
1808
1809                 if (page_count > cli->cl_max_pages_per_rpc)
1810                         pages_per_brw = cli->cl_max_pages_per_rpc;
1811                 else
1812                         pages_per_brw = page_count;
1813
1814                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1815
1816                 if (saved_oa != NULL) {
1817                         /* restore previously saved oa */
1818                         *oinfo->oi_oa = *saved_oa;
1819                 } else if (page_count > pages_per_brw) {
1820                         /* save a copy of oa (brw will clobber it) */
1821                         OBDO_ALLOC(saved_oa);
1822                         if (saved_oa == NULL)
1823                                 GOTO(out, rc = -ENOMEM);
1824                         *saved_oa = *oinfo->oi_oa;
1825                 }
1826
1827                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1828                                       pages_per_brw, ppga, oinfo->oi_capa);
1829
1830                 if (rc != 0)
1831                         break;
1832
1833                 page_count -= pages_per_brw;
1834                 ppga += pages_per_brw;
1835         }
1836
1837 out:
1838         osc_release_ppga(orig, page_count_orig);
1839
1840         if (saved_oa != NULL)
1841                 OBDO_FREE(saved_oa);
1842
1843         RETURN(rc);
1844 }
1845
1846 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1847  * the dirty accounting.  Writeback completes or truncate happens before
1848  * writing starts.  Must be called with the loi lock held. */
1849 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1850                            int sent)
1851 {
1852         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1853 }
1854
1855
1856 /* This maintains the lists of pending pages to read/write for a given object
1857  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1858  * to quickly find objects that are ready to send an RPC. */
1859 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1860                          int cmd)
1861 {
1862         int optimal;
1863         ENTRY;
1864
1865         if (lop->lop_num_pending == 0)
1866                 RETURN(0);
1867
1868         /* if we have an invalid import we want to drain the queued pages
1869          * by forcing them through rpcs that immediately fail and complete
1870          * the pages.  recovery relies on this to empty the queued pages
1871          * before canceling the locks and evicting down the llite pages */
1872         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1873                 RETURN(1);
1874
1875         /* stream rpcs in queue order as long as as there is an urgent page
1876          * queued.  this is our cheap solution for good batching in the case
1877          * where writepage marks some random page in the middle of the file
1878          * as urgent because of, say, memory pressure */
1879         if (!list_empty(&lop->lop_urgent)) {
1880                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1881                 RETURN(1);
1882         }
1883         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1884         optimal = cli->cl_max_pages_per_rpc;
1885         if (cmd & OBD_BRW_WRITE) {
1886                 /* trigger a write rpc stream as long as there are dirtiers
1887                  * waiting for space.  as they're waiting, they're not going to
1888                  * create more pages to coallesce with what's waiting.. */
1889                 if (!list_empty(&cli->cl_cache_waiters)) {
1890                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1891                         RETURN(1);
1892                 }
1893                 /* +16 to avoid triggering rpcs that would want to include pages
1894                  * that are being queued but which can't be made ready until
1895                  * the queuer finishes with the page. this is a wart for
1896                  * llite::commit_write() */
1897                 optimal += 16;
1898         }
1899         if (lop->lop_num_pending >= optimal)
1900                 RETURN(1);
1901
1902         RETURN(0);
1903 }
1904
1905 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1906 {
1907         struct osc_async_page *oap;
1908         ENTRY;
1909
1910         if (list_empty(&lop->lop_urgent))
1911                 RETURN(0);
1912
1913         oap = list_entry(lop->lop_urgent.next,
1914                          struct osc_async_page, oap_urgent_item);
1915
1916         if (oap->oap_async_flags & ASYNC_HP) {
1917                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1918                 RETURN(1);
1919         }
1920
1921         RETURN(0);
1922 }
1923
1924 static void on_list(struct list_head *item, struct list_head *list,
1925                     int should_be_on)
1926 {
1927         if (list_empty(item) && should_be_on)
1928                 list_add_tail(item, list);
1929         else if (!list_empty(item) && !should_be_on)
1930                 list_del_init(item);
1931 }
1932
1933 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1934  * can find pages to build into rpcs quickly */
1935 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1936 {
1937         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1938             lop_makes_hprpc(&loi->loi_read_lop)) {
1939                 /* HP rpc */
1940                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1941                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1942         } else {
1943                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1944                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1945                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1946                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1947         }
1948
1949         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1950                 loi->loi_write_lop.lop_num_pending);
1951
1952         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1953                 loi->loi_read_lop.lop_num_pending);
1954 }
1955
1956 static void lop_update_pending(struct client_obd *cli,
1957                                struct loi_oap_pages *lop, int cmd, int delta)
1958 {
1959         lop->lop_num_pending += delta;
1960         if (cmd & OBD_BRW_WRITE)
1961                 cli->cl_pending_w_pages += delta;
1962         else
1963                 cli->cl_pending_r_pages += delta;
1964 }
1965
1966 /**
1967  * this is called when a sync waiter receives an interruption.  Its job is to
1968  * get the caller woken as soon as possible.  If its page hasn't been put in an
1969  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1970  * desiring interruption which will forcefully complete the rpc once the rpc
1971  * has timed out.
1972  */
1973 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1974 {
1975         struct loi_oap_pages *lop;
1976         struct lov_oinfo *loi;
1977         int rc = -EBUSY;
1978         ENTRY;
1979
1980         LASSERT(!oap->oap_interrupted);
1981         oap->oap_interrupted = 1;
1982
1983         /* ok, it's been put in an rpc. only one oap gets a request reference */
1984         if (oap->oap_request != NULL) {
1985                 ptlrpc_mark_interrupted(oap->oap_request);
1986                 ptlrpcd_wake(oap->oap_request);
1987                 ptlrpc_req_finished(oap->oap_request);
1988                 oap->oap_request = NULL;
1989         }
1990
1991         /*
1992          * page completion may be called only if ->cpo_prep() method was
1993          * executed by osc_io_submit(), that also adds page the to pending list
1994          */
1995         if (!list_empty(&oap->oap_pending_item)) {
1996                 list_del_init(&oap->oap_pending_item);
1997                 list_del_init(&oap->oap_urgent_item);
1998
1999                 loi = oap->oap_loi;
2000                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2001                         &loi->loi_write_lop : &loi->loi_read_lop;
2002                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2003                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2004                 rc = oap->oap_caller_ops->ap_completion(env,
2005                                           oap->oap_caller_data,
2006                                           oap->oap_cmd, NULL, -EINTR);
2007         }
2008
2009         RETURN(rc);
2010 }
2011
2012 /* this is trying to propogate async writeback errors back up to the
2013  * application.  As an async write fails we record the error code for later if
2014  * the app does an fsync.  As long as errors persist we force future rpcs to be
2015  * sync so that the app can get a sync error and break the cycle of queueing
2016  * pages for which writeback will fail. */
2017 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2018                            int rc)
2019 {
2020         if (rc) {
2021                 if (!ar->ar_rc)
2022                         ar->ar_rc = rc;
2023
2024                 ar->ar_force_sync = 1;
2025                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2026                 return;
2027
2028         }
2029
2030         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2031                 ar->ar_force_sync = 0;
2032 }
2033
2034 void osc_oap_to_pending(struct osc_async_page *oap)
2035 {
2036         struct loi_oap_pages *lop;
2037
2038         if (oap->oap_cmd & OBD_BRW_WRITE)
2039                 lop = &oap->oap_loi->loi_write_lop;
2040         else
2041                 lop = &oap->oap_loi->loi_read_lop;
2042
2043         if (oap->oap_async_flags & ASYNC_HP)
2044                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2045         else if (oap->oap_async_flags & ASYNC_URGENT)
2046                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2047         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2048         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2049 }
2050
2051 /* this must be called holding the loi list lock to give coverage to exit_cache,
2052  * async_flag maintenance, and oap_request */
2053 static void osc_ap_completion(const struct lu_env *env,
2054                               struct client_obd *cli, struct obdo *oa,
2055                               struct osc_async_page *oap, int sent, int rc)
2056 {
2057         __u64 xid = 0;
2058
2059         ENTRY;
2060         if (oap->oap_request != NULL) {
2061                 xid = ptlrpc_req_xid(oap->oap_request);
2062                 ptlrpc_req_finished(oap->oap_request);
2063                 oap->oap_request = NULL;
2064         }
2065
2066         oap->oap_async_flags = 0;
2067         oap->oap_interrupted = 0;
2068
2069         if (oap->oap_cmd & OBD_BRW_WRITE) {
2070                 osc_process_ar(&cli->cl_ar, xid, rc);
2071                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2072         }
2073
2074         if (rc == 0 && oa != NULL) {
2075                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2076                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2077                 if (oa->o_valid & OBD_MD_FLMTIME)
2078                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2079                 if (oa->o_valid & OBD_MD_FLATIME)
2080                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2081                 if (oa->o_valid & OBD_MD_FLCTIME)
2082                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2083         }
2084
2085         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2086                                                 oap->oap_cmd, oa, rc);
2087
2088         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2089          * I/O on the page could start, but OSC calls it under lock
2090          * and thus we can add oap back to pending safely */
2091         if (rc)
2092                 /* upper layer wants to leave the page on pending queue */
2093                 osc_oap_to_pending(oap);
2094         else
2095                 osc_exit_cache(cli, oap, sent);
2096         EXIT;
2097 }
2098
2099 static int brw_interpret(const struct lu_env *env,
2100                          struct ptlrpc_request *req, void *data, int rc)
2101 {
2102         struct osc_brw_async_args *aa = data;
2103         struct client_obd *cli;
2104         int async;
2105         ENTRY;
2106
2107         rc = osc_brw_fini_request(req, rc);
2108         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2109         if (osc_recoverable_error(rc)) {
2110                 rc = osc_brw_redo_request(req, aa);
2111                 if (rc == 0)
2112                         RETURN(0);
2113         }
2114
2115         if (aa->aa_ocapa) {
2116                 capa_put(aa->aa_ocapa);
2117                 aa->aa_ocapa = NULL;
2118         }
2119
2120         cli = aa->aa_cli;
2121
2122         client_obd_list_lock(&cli->cl_loi_list_lock);
2123
2124         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2125          * is called so we know whether to go to sync BRWs or wait for more
2126          * RPCs to complete */
2127         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2128                 cli->cl_w_in_flight--;
2129         else
2130                 cli->cl_r_in_flight--;
2131
2132         async = list_empty(&aa->aa_oaps);
2133         if (!async) { /* from osc_send_oap_rpc() */
2134                 struct osc_async_page *oap, *tmp;
2135                 /* the caller may re-use the oap after the completion call so
2136                  * we need to clean it up a little */
2137                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2138                         list_del_init(&oap->oap_rpc_item);
2139                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2140                 }
2141                 OBDO_FREE(aa->aa_oa);
2142         } else { /* from async_internal() */
2143                 int i;
2144                 for (i = 0; i < aa->aa_page_count; i++)
2145                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2146         }
2147         osc_wake_cache_waiters(cli);
2148         osc_check_rpcs(env, cli);
2149         client_obd_list_unlock(&cli->cl_loi_list_lock);
2150         if (!async)
2151                 cl_req_completion(env, aa->aa_clerq, rc);
2152         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2153         RETURN(rc);
2154 }
2155
2156 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2157                                             struct client_obd *cli,
2158                                             struct list_head *rpc_list,
2159                                             int page_count, int cmd)
2160 {
2161         struct ptlrpc_request *req;
2162         struct brw_page **pga = NULL;
2163         struct osc_brw_async_args *aa;
2164         struct obdo *oa = NULL;
2165         const struct obd_async_page_ops *ops = NULL;
2166         void *caller_data = NULL;
2167         struct osc_async_page *oap;
2168         struct osc_async_page *tmp;
2169         struct ost_body *body;
2170         struct cl_req *clerq = NULL;
2171         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2172         struct ldlm_lock *lock = NULL;
2173         struct cl_req_attr crattr;
2174         int i, rc;
2175
2176         ENTRY;
2177         LASSERT(!list_empty(rpc_list));
2178
2179         memset(&crattr, 0, sizeof crattr);
2180         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2181         if (pga == NULL)
2182                 GOTO(out, req = ERR_PTR(-ENOMEM));
2183
2184         OBDO_ALLOC(oa);
2185         if (oa == NULL)
2186                 GOTO(out, req = ERR_PTR(-ENOMEM));
2187
2188         i = 0;
2189         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2190                 struct cl_page *page = osc_oap2cl_page(oap);
2191                 if (ops == NULL) {
2192                         ops = oap->oap_caller_ops;
2193                         caller_data = oap->oap_caller_data;
2194
2195                         clerq = cl_req_alloc(env, page, crt,
2196                                              1 /* only 1-object rpcs for
2197                                                 * now */);
2198                         if (IS_ERR(clerq))
2199                                 GOTO(out, req = (void *)clerq);
2200                         lock = oap->oap_ldlm_lock;
2201                 }
2202                 pga[i] = &oap->oap_brw_page;
2203                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2204                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2205                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2206                 i++;
2207                 cl_req_page_add(env, clerq, page);
2208         }
2209
2210         /* always get the data for the obdo for the rpc */
2211         LASSERT(ops != NULL);
2212         crattr.cra_oa = oa;
2213         crattr.cra_capa = NULL;
2214         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2215         if (lock) {
2216                 oa->o_handle = lock->l_remote_handle;
2217                 oa->o_valid |= OBD_MD_FLHANDLE;
2218         }
2219
2220         rc = cl_req_prep(env, clerq);
2221         if (rc != 0) {
2222                 CERROR("cl_req_prep failed: %d\n", rc);
2223                 GOTO(out, req = ERR_PTR(rc));
2224         }
2225
2226         sort_brw_pages(pga, page_count);
2227         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2228                                   pga, &req, crattr.cra_capa, 1);
2229         if (rc != 0) {
2230                 CERROR("prep_req failed: %d\n", rc);
2231                 GOTO(out, req = ERR_PTR(rc));
2232         }
2233
2234         /* Need to update the timestamps after the request is built in case
2235          * we race with setattr (locally or in queue at OST).  If OST gets
2236          * later setattr before earlier BRW (as determined by the request xid),
2237          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2238          * way to do this in a single call.  bug 10150 */
2239         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2240         cl_req_attr_set(env, clerq, &crattr,
2241                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2242
2243         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2244         aa = ptlrpc_req_async_args(req);
2245         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2246         list_splice(rpc_list, &aa->aa_oaps);
2247         CFS_INIT_LIST_HEAD(rpc_list);
2248         aa->aa_clerq = clerq;
2249 out:
2250         capa_put(crattr.cra_capa);
2251         if (IS_ERR(req)) {
2252                 if (oa)
2253                         OBDO_FREE(oa);
2254                 if (pga)
2255                         OBD_FREE(pga, sizeof(*pga) * page_count);
2256                 /* this should happen rarely and is pretty bad, it makes the
2257                  * pending list not follow the dirty order */
2258                 client_obd_list_lock(&cli->cl_loi_list_lock);
2259                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2260                         list_del_init(&oap->oap_rpc_item);
2261
2262                         /* queued sync pages can be torn down while the pages
2263                          * were between the pending list and the rpc */
2264                         if (oap->oap_interrupted) {
2265                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2266                                 osc_ap_completion(env, cli, NULL, oap, 0,
2267                                                   oap->oap_count);
2268                                 continue;
2269                         }
2270                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2271                 }
2272                 if (clerq && !IS_ERR(clerq))
2273                         cl_req_completion(env, clerq, PTR_ERR(req));
2274         }
2275         RETURN(req);
2276 }
2277
2278 /**
2279  * prepare pages for ASYNC io and put pages in send queue.
2280  *
2281  * \param cli -
2282  * \param loi -
2283  * \param cmd - OBD_BRW_* macroses
2284  * \param lop - pending pages
2285  *
2286  * \return zero if pages successfully add to send queue.
2287  * \return not zere if error occurring.
2288  */
2289 static int
2290 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2291                  struct lov_oinfo *loi,
2292                  int cmd, struct loi_oap_pages *lop)
2293 {
2294         struct ptlrpc_request *req;
2295         obd_count page_count = 0;
2296         struct osc_async_page *oap = NULL, *tmp;
2297         struct osc_brw_async_args *aa;
2298         const struct obd_async_page_ops *ops;
2299         CFS_LIST_HEAD(rpc_list);
2300         unsigned int ending_offset;
2301         unsigned  starting_offset = 0;
2302         int srvlock = 0;
2303         struct cl_object *clob = NULL;
2304         ENTRY;
2305
2306         /* If there are HP OAPs we need to handle at least 1 of them,
2307          * move it the beginning of the pending list for that. */
2308         if (!list_empty(&lop->lop_urgent)) {
2309                 oap = list_entry(lop->lop_urgent.next,
2310                                  struct osc_async_page, oap_urgent_item);
2311                 if (oap->oap_async_flags & ASYNC_HP)
2312                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2313         }
2314
2315         /* first we find the pages we're allowed to work with */
2316         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2317                                  oap_pending_item) {
2318                 ops = oap->oap_caller_ops;
2319
2320                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2321                          "magic 0x%x\n", oap, oap->oap_magic);
2322
2323                 if (clob == NULL) {
2324                         /* pin object in memory, so that completion call-backs
2325                          * can be safely called under client_obd_list lock. */
2326                         clob = osc_oap2cl_page(oap)->cp_obj;
2327                         cl_object_get(clob);
2328                 }
2329
2330                 if (page_count != 0 &&
2331                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2332                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2333                                " oap %p, page %p, srvlock %u\n",
2334                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2335                         break;
2336                 }
2337                 /* in llite being 'ready' equates to the page being locked
2338                  * until completion unlocks it.  commit_write submits a page
2339                  * as not ready because its unlock will happen unconditionally
2340                  * as the call returns.  if we race with commit_write giving
2341                  * us that page we dont' want to create a hole in the page
2342                  * stream, so we stop and leave the rpc to be fired by
2343                  * another dirtier or kupdated interval (the not ready page
2344                  * will still be on the dirty list).  we could call in
2345                  * at the end of ll_file_write to process the queue again. */
2346                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2347                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2348                                                     cmd);
2349                         if (rc < 0)
2350                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2351                                                 "instead of ready\n", oap,
2352                                                 oap->oap_page, rc);
2353                         switch (rc) {
2354                         case -EAGAIN:
2355                                 /* llite is telling us that the page is still
2356                                  * in commit_write and that we should try
2357                                  * and put it in an rpc again later.  we
2358                                  * break out of the loop so we don't create
2359                                  * a hole in the sequence of pages in the rpc
2360                                  * stream.*/
2361                                 oap = NULL;
2362                                 break;
2363                         case -EINTR:
2364                                 /* the io isn't needed.. tell the checks
2365                                  * below to complete the rpc with EINTR */
2366                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2367                                 oap->oap_count = -EINTR;
2368                                 break;
2369                         case 0:
2370                                 oap->oap_async_flags |= ASYNC_READY;
2371                                 break;
2372                         default:
2373                                 LASSERTF(0, "oap %p page %p returned %d "
2374                                             "from make_ready\n", oap,
2375                                             oap->oap_page, rc);
2376                                 break;
2377                         }
2378                 }
2379                 if (oap == NULL)
2380                         break;
2381                 /*
2382                  * Page submitted for IO has to be locked. Either by
2383                  * ->ap_make_ready() or by higher layers.
2384                  */
2385 #if defined(__KERNEL__) && defined(__linux__)
2386                 {
2387                         struct cl_page *page;
2388
2389                         page = osc_oap2cl_page(oap);
2390
2391                         if (page->cp_type == CPT_CACHEABLE &&
2392                             !(PageLocked(oap->oap_page) &&
2393                               (CheckWriteback(oap->oap_page, cmd)))) {
2394                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2395                                        oap->oap_page,
2396                                        (long)oap->oap_page->flags,
2397                                        oap->oap_async_flags);
2398                                 LBUG();
2399                         }
2400                 }
2401 #endif
2402                 /* If there is a gap at the start of this page, it can't merge
2403                  * with any previous page, so we'll hand the network a
2404                  * "fragmented" page array that it can't transfer in 1 RDMA */
2405                 if (page_count != 0 && oap->oap_page_off != 0)
2406                         break;
2407
2408                 /* take the page out of our book-keeping */
2409                 list_del_init(&oap->oap_pending_item);
2410                 lop_update_pending(cli, lop, cmd, -1);
2411                 list_del_init(&oap->oap_urgent_item);
2412
2413                 if (page_count == 0)
2414                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2415                                           (PTLRPC_MAX_BRW_SIZE - 1);
2416
2417                 /* ask the caller for the size of the io as the rpc leaves. */
2418                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2419                         oap->oap_count =
2420                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2421                                                       cmd);
2422                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2423                 }
2424                 if (oap->oap_count <= 0) {
2425                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2426                                oap->oap_count);
2427                         osc_ap_completion(env, cli, NULL,
2428                                           oap, 0, oap->oap_count);
2429                         continue;
2430                 }
2431
2432                 /* now put the page back in our accounting */
2433                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2434                 if (page_count == 0)
2435                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2436                 if (++page_count >= cli->cl_max_pages_per_rpc)
2437                         break;
2438
2439                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2440                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2441                  * have the same alignment as the initial writes that allocated
2442                  * extents on the server. */
2443                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2444                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2445                 if (ending_offset == 0)
2446                         break;
2447
2448                 /* If there is a gap at the end of this page, it can't merge
2449                  * with any subsequent pages, so we'll hand the network a
2450                  * "fragmented" page array that it can't transfer in 1 RDMA */
2451                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2452                         break;
2453         }
2454
2455         osc_wake_cache_waiters(cli);
2456
2457         loi_list_maint(cli, loi);
2458
2459         client_obd_list_unlock(&cli->cl_loi_list_lock);
2460
2461         if (clob != NULL)
2462                 cl_object_put(env, clob);
2463
2464         if (page_count == 0) {
2465                 client_obd_list_lock(&cli->cl_loi_list_lock);
2466                 RETURN(0);
2467         }
2468
2469         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2470         if (IS_ERR(req)) {
2471                 LASSERT(list_empty(&rpc_list));
2472                 loi_list_maint(cli, loi);
2473                 RETURN(PTR_ERR(req));
2474         }
2475
2476         aa = ptlrpc_req_async_args(req);
2477
2478         if (cmd == OBD_BRW_READ) {
2479                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2480                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2481                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2482                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2483         } else {
2484                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2485                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2486                                  cli->cl_w_in_flight);
2487                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2488                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2489         }
2490         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2491
2492         client_obd_list_lock(&cli->cl_loi_list_lock);
2493
2494         if (cmd == OBD_BRW_READ)
2495                 cli->cl_r_in_flight++;
2496         else
2497                 cli->cl_w_in_flight++;
2498
2499         /* queued sync pages can be torn down while the pages
2500          * were between the pending list and the rpc */
2501         tmp = NULL;
2502         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2503                 /* only one oap gets a request reference */
2504                 if (tmp == NULL)
2505                         tmp = oap;
2506                 if (oap->oap_interrupted && !req->rq_intr) {
2507                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2508                                oap, req);
2509                         ptlrpc_mark_interrupted(req);
2510                 }
2511         }
2512         if (tmp != NULL)
2513                 tmp->oap_request = ptlrpc_request_addref(req);
2514
2515         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2516                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2517
2518         req->rq_interpret_reply = brw_interpret;
2519         ptlrpcd_add_req(req, PSCOPE_BRW);
2520         RETURN(1);
2521 }
2522
2523 #define LOI_DEBUG(LOI, STR, args...)                                     \
2524         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2525                !list_empty(&(LOI)->loi_ready_item) ||                    \
2526                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2527                (LOI)->loi_write_lop.lop_num_pending,                     \
2528                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2529                (LOI)->loi_read_lop.lop_num_pending,                      \
2530                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2531                args)                                                     \
2532
2533 /* This is called by osc_check_rpcs() to find which objects have pages that
2534  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2535 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2536 {
2537         ENTRY;
2538
2539         /* First return objects that have blocked locks so that they
2540          * will be flushed quickly and other clients can get the lock,
2541          * then objects which have pages ready to be stuffed into RPCs */
2542         if (!list_empty(&cli->cl_loi_hp_ready_list))
2543                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2544                                   struct lov_oinfo, loi_hp_ready_item));
2545         if (!list_empty(&cli->cl_loi_ready_list))
2546                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2547                                   struct lov_oinfo, loi_ready_item));
2548
2549         /* then if we have cache waiters, return all objects with queued
2550          * writes.  This is especially important when many small files
2551          * have filled up the cache and not been fired into rpcs because
2552          * they don't pass the nr_pending/object threshhold */
2553         if (!list_empty(&cli->cl_cache_waiters) &&
2554             !list_empty(&cli->cl_loi_write_list))
2555                 RETURN(list_entry(cli->cl_loi_write_list.next,
2556                                   struct lov_oinfo, loi_write_item));
2557
2558         /* then return all queued objects when we have an invalid import
2559          * so that they get flushed */
2560         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2561                 if (!list_empty(&cli->cl_loi_write_list))
2562                         RETURN(list_entry(cli->cl_loi_write_list.next,
2563                                           struct lov_oinfo, loi_write_item));
2564                 if (!list_empty(&cli->cl_loi_read_list))
2565                         RETURN(list_entry(cli->cl_loi_read_list.next,
2566                                           struct lov_oinfo, loi_read_item));
2567         }
2568         RETURN(NULL);
2569 }
2570
2571 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2572 {
2573         struct osc_async_page *oap;
2574         int hprpc = 0;
2575
2576         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2577                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2578                                  struct osc_async_page, oap_urgent_item);
2579                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2580         }
2581
2582         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2583                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2584                                  struct osc_async_page, oap_urgent_item);
2585                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2586         }
2587
2588         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2589 }
2590
2591 /* called with the loi list lock held */
2592 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2593 {
2594         struct lov_oinfo *loi;
2595         int rc = 0, race_counter = 0;
2596         ENTRY;
2597
2598         while ((loi = osc_next_loi(cli)) != NULL) {
2599                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2600
2601                 if (osc_max_rpc_in_flight(cli, loi))
2602                         break;
2603
2604                 /* attempt some read/write balancing by alternating between
2605                  * reads and writes in an object.  The makes_rpc checks here
2606                  * would be redundant if we were getting read/write work items
2607                  * instead of objects.  we don't want send_oap_rpc to drain a
2608                  * partial read pending queue when we're given this object to
2609                  * do io on writes while there are cache waiters */
2610                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2611                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2612                                               &loi->loi_write_lop);
2613                         if (rc < 0)
2614                                 break;
2615                         if (rc > 0)
2616                                 race_counter = 0;
2617                         else
2618                                 race_counter++;
2619                 }
2620                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2621                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2622                                               &loi->loi_read_lop);
2623                         if (rc < 0)
2624                                 break;
2625                         if (rc > 0)
2626                                 race_counter = 0;
2627                         else
2628                                 race_counter++;
2629                 }
2630
2631                 /* attempt some inter-object balancing by issueing rpcs
2632                  * for each object in turn */
2633                 if (!list_empty(&loi->loi_hp_ready_item))
2634                         list_del_init(&loi->loi_hp_ready_item);
2635                 if (!list_empty(&loi->loi_ready_item))
2636                         list_del_init(&loi->loi_ready_item);
2637                 if (!list_empty(&loi->loi_write_item))
2638                         list_del_init(&loi->loi_write_item);
2639                 if (!list_empty(&loi->loi_read_item))
2640                         list_del_init(&loi->loi_read_item);
2641
2642                 loi_list_maint(cli, loi);
2643
2644                 /* send_oap_rpc fails with 0 when make_ready tells it to
2645                  * back off.  llite's make_ready does this when it tries
2646                  * to lock a page queued for write that is already locked.
2647                  * we want to try sending rpcs from many objects, but we
2648                  * don't want to spin failing with 0.  */
2649                 if (race_counter == 10)
2650                         break;
2651         }
2652         EXIT;
2653 }
2654
2655 /* we're trying to queue a page in the osc so we're subject to the
2656  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2657  * If the osc's queued pages are already at that limit, then we want to sleep
2658  * until there is space in the osc's queue for us.  We also may be waiting for
2659  * write credits from the OST if there are RPCs in flight that may return some
2660  * before we fall back to sync writes.
2661  *
2662  * We need this know our allocation was granted in the presence of signals */
2663 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2664 {
2665         int rc;
2666         ENTRY;
2667         client_obd_list_lock(&cli->cl_loi_list_lock);
2668         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2669         client_obd_list_unlock(&cli->cl_loi_list_lock);
2670         RETURN(rc);
2671 };
2672
2673 /**
2674  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2675  * is available.
2676  */
2677 int osc_enter_cache_try(const struct lu_env *env,
2678                         struct client_obd *cli, struct lov_oinfo *loi,
2679                         struct osc_async_page *oap, int transient)
2680 {
2681         int has_grant;
2682
2683         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2684         if (has_grant) {
2685                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2686                 if (transient) {
2687                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2688                         atomic_inc(&obd_dirty_transit_pages);
2689                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2690                 }
2691         }
2692         return has_grant;
2693 }
2694
2695 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2696  * grant or cache space. */
2697 static int osc_enter_cache(const struct lu_env *env,
2698                            struct client_obd *cli, struct lov_oinfo *loi,
2699                            struct osc_async_page *oap)
2700 {
2701         struct osc_cache_waiter ocw;
2702         struct l_wait_info lwi = { 0 };
2703
2704         ENTRY;
2705
2706         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2707                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2708                cli->cl_dirty_max, obd_max_dirty_pages,
2709                cli->cl_lost_grant, cli->cl_avail_grant);
2710
2711         /* force the caller to try sync io.  this can jump the list
2712          * of queued writes and create a discontiguous rpc stream */
2713         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2714             loi->loi_ar.ar_force_sync)
2715                 RETURN(-EDQUOT);
2716
2717         /* Hopefully normal case - cache space and write credits available */
2718         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2719             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2720             osc_enter_cache_try(env, cli, loi, oap, 0))
2721                 RETURN(0);
2722
2723         /* Make sure that there are write rpcs in flight to wait for.  This
2724          * is a little silly as this object may not have any pending but
2725          * other objects sure might. */
2726         if (cli->cl_w_in_flight) {
2727                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2728                 cfs_waitq_init(&ocw.ocw_waitq);
2729                 ocw.ocw_oap = oap;
2730                 ocw.ocw_rc = 0;
2731
2732                 loi_list_maint(cli, loi);
2733                 osc_check_rpcs(env, cli);
2734                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2735
2736                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2737                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2738
2739                 client_obd_list_lock(&cli->cl_loi_list_lock);
2740                 if (!list_empty(&ocw.ocw_entry)) {
2741                         list_del(&ocw.ocw_entry);
2742                         RETURN(-EINTR);
2743                 }
2744                 RETURN(ocw.ocw_rc);
2745         }
2746
2747         RETURN(-EDQUOT);
2748 }
2749
2750
2751 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2752                         struct lov_oinfo *loi, cfs_page_t *page,
2753                         obd_off offset, const struct obd_async_page_ops *ops,
2754                         void *data, void **res, int nocache,
2755                         struct lustre_handle *lockh)
2756 {
2757         struct osc_async_page *oap;
2758
2759         ENTRY;
2760
2761         if (!page)
2762                 return size_round(sizeof(*oap));
2763
2764         oap = *res;
2765         oap->oap_magic = OAP_MAGIC;
2766         oap->oap_cli = &exp->exp_obd->u.cli;
2767         oap->oap_loi = loi;
2768
2769         oap->oap_caller_ops = ops;
2770         oap->oap_caller_data = data;
2771
2772         oap->oap_page = page;
2773         oap->oap_obj_off = offset;
2774         if (!client_is_remote(exp) &&
2775             cfs_capable(CFS_CAP_SYS_RESOURCE))
2776                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2777
2778         LASSERT(!(offset & ~CFS_PAGE_MASK));
2779
2780         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2781         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2782         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2783         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2784
2785         spin_lock_init(&oap->oap_lock);
2786         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2787         RETURN(0);
2788 }
2789
2790 struct osc_async_page *oap_from_cookie(void *cookie)
2791 {
2792         struct osc_async_page *oap = cookie;
2793         if (oap->oap_magic != OAP_MAGIC)
2794                 return ERR_PTR(-EINVAL);
2795         return oap;
2796 };
2797
2798 int osc_queue_async_io(const struct lu_env *env,
2799                        struct obd_export *exp, struct lov_stripe_md *lsm,
2800                        struct lov_oinfo *loi, void *cookie,
2801                        int cmd, obd_off off, int count,
2802                        obd_flag brw_flags, enum async_flags async_flags)
2803 {
2804         struct client_obd *cli = &exp->exp_obd->u.cli;
2805         struct osc_async_page *oap;
2806         int rc = 0;
2807         ENTRY;
2808
2809         oap = oap_from_cookie(cookie);
2810         if (IS_ERR(oap))
2811                 RETURN(PTR_ERR(oap));
2812
2813         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2814                 RETURN(-EIO);
2815
2816         if (!list_empty(&oap->oap_pending_item) ||
2817             !list_empty(&oap->oap_urgent_item) ||
2818             !list_empty(&oap->oap_rpc_item))
2819                 RETURN(-EBUSY);
2820
2821         /* check if the file's owner/group is over quota */
2822         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2823                 struct cl_object *obj;
2824                 struct cl_attr    attr; /* XXX put attr into thread info */
2825
2826                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2827
2828                 cl_object_attr_lock(obj);
2829                 rc = cl_object_attr_get(env, obj, &attr);
2830                 cl_object_attr_unlock(obj);
2831
2832                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2833                                             attr.cat_gid) == NO_QUOTA)
2834                         rc = -EDQUOT;
2835                 if (rc)
2836                         RETURN(rc);
2837         }
2838
2839         if (loi == NULL)
2840                 loi = lsm->lsm_oinfo[0];
2841
2842         client_obd_list_lock(&cli->cl_loi_list_lock);
2843
2844         LASSERT(off + count <= CFS_PAGE_SIZE);
2845         oap->oap_cmd = cmd;
2846         oap->oap_page_off = off;
2847         oap->oap_count = count;
2848         oap->oap_brw_flags = brw_flags;
2849         oap->oap_async_flags = async_flags;
2850
2851         if (cmd & OBD_BRW_WRITE) {
2852                 rc = osc_enter_cache(env, cli, loi, oap);
2853                 if (rc) {
2854                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2855                         RETURN(rc);
2856                 }
2857         }
2858
2859         osc_oap_to_pending(oap);
2860         loi_list_maint(cli, loi);
2861
2862         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2863                   cmd);
2864
2865         osc_check_rpcs(env, cli);
2866         client_obd_list_unlock(&cli->cl_loi_list_lock);
2867
2868         RETURN(0);
2869 }
2870
2871 /* aka (~was & now & flag), but this is more clear :) */
2872 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2873
2874 int osc_set_async_flags_base(struct client_obd *cli,
2875                              struct lov_oinfo *loi, struct osc_async_page *oap,
2876                              obd_flag async_flags)
2877 {
2878         struct loi_oap_pages *lop;
2879         ENTRY;
2880
2881         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2882                 RETURN(-EIO);
2883
2884         if (oap->oap_cmd & OBD_BRW_WRITE) {
2885                 lop = &loi->loi_write_lop;
2886         } else {
2887                 lop = &loi->loi_read_lop;
2888         }
2889
2890         if (list_empty(&oap->oap_pending_item))
2891                 RETURN(-EINVAL);
2892
2893         if ((oap->oap_async_flags & async_flags) == async_flags)
2894                 RETURN(0);
2895
2896         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2897                 oap->oap_async_flags |= ASYNC_READY;
2898
2899         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2900             list_empty(&oap->oap_rpc_item)) {
2901                 if (oap->oap_async_flags & ASYNC_HP)
2902                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2903                 else
2904                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2905                 oap->oap_async_flags |= ASYNC_URGENT;
2906                 loi_list_maint(cli, loi);
2907         }
2908
2909         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2910                         oap->oap_async_flags);
2911         RETURN(0);
2912 }
2913
2914 int osc_teardown_async_page(struct obd_export *exp,
2915                             struct lov_stripe_md *lsm,
2916                             struct lov_oinfo *loi, void *cookie)
2917 {
2918         struct client_obd *cli = &exp->exp_obd->u.cli;
2919         struct loi_oap_pages *lop;
2920         struct osc_async_page *oap;
2921         int rc = 0;
2922         ENTRY;
2923
2924         oap = oap_from_cookie(cookie);
2925         if (IS_ERR(oap))
2926                 RETURN(PTR_ERR(oap));
2927
2928         if (loi == NULL)
2929                 loi = lsm->lsm_oinfo[0];
2930
2931         if (oap->oap_cmd & OBD_BRW_WRITE) {
2932                 lop = &loi->loi_write_lop;
2933         } else {
2934                 lop = &loi->loi_read_lop;
2935         }
2936
2937         client_obd_list_lock(&cli->cl_loi_list_lock);
2938
2939         if (!list_empty(&oap->oap_rpc_item))
2940                 GOTO(out, rc = -EBUSY);
2941
2942         osc_exit_cache(cli, oap, 0);
2943         osc_wake_cache_waiters(cli);
2944
2945         if (!list_empty(&oap->oap_urgent_item)) {
2946                 list_del_init(&oap->oap_urgent_item);
2947                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2948         }
2949         if (!list_empty(&oap->oap_pending_item)) {
2950                 list_del_init(&oap->oap_pending_item);
2951                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2952         }
2953         loi_list_maint(cli, loi);
2954         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2955 out:
2956         client_obd_list_unlock(&cli->cl_loi_list_lock);
2957         RETURN(rc);
2958 }
2959
2960 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2961                                          struct ldlm_enqueue_info *einfo,
2962                                          int flags)
2963 {
2964         void *data = einfo->ei_cbdata;
2965
2966         LASSERT(lock != NULL);
2967         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2968         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2969         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2970         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2971
2972         lock_res_and_lock(lock);
2973         spin_lock(&osc_ast_guard);
2974         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2975         lock->l_ast_data = data;
2976         spin_unlock(&osc_ast_guard);
2977         unlock_res_and_lock(lock);
2978 }
2979
2980 static void osc_set_data_with_check(struct lustre_handle *lockh,
2981                                     struct ldlm_enqueue_info *einfo,
2982                                     int flags)
2983 {
2984         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2985
2986         if (lock != NULL) {
2987                 osc_set_lock_data_with_check(lock, einfo, flags);
2988                 LDLM_LOCK_PUT(lock);
2989         } else
2990                 CERROR("lockh %p, data %p - client evicted?\n",
2991                        lockh, einfo->ei_cbdata);
2992 }
2993
2994 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2995                              ldlm_iterator_t replace, void *data)
2996 {
2997         struct ldlm_res_id res_id;
2998         struct obd_device *obd = class_exp2obd(exp);
2999
3000         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3001         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3002         return 0;
3003 }
3004
3005 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3006                             obd_enqueue_update_f upcall, void *cookie,
3007                             int *flags, int rc)
3008 {
3009         int intent = *flags & LDLM_FL_HAS_INTENT;
3010         ENTRY;
3011
3012         if (intent) {
3013                 /* The request was created before ldlm_cli_enqueue call. */
3014                 if (rc == ELDLM_LOCK_ABORTED) {
3015                         struct ldlm_reply *rep;
3016                         rep = req_capsule_server_get(&req->rq_pill,
3017                                                      &RMF_DLM_REP);
3018
3019                         LASSERT(rep != NULL);
3020                         if (rep->lock_policy_res1)
3021                                 rc = rep->lock_policy_res1;
3022                 }
3023         }
3024
3025         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3026                 *flags |= LDLM_FL_LVB_READY;
3027                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3028                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3029         }
3030
3031         /* Call the update callback. */
3032         rc = (*upcall)(cookie, rc);
3033         RETURN(rc);
3034 }
3035
3036 static int osc_enqueue_interpret(const struct lu_env *env,
3037                                  struct ptlrpc_request *req,
3038                                  struct osc_enqueue_args *aa, int rc)
3039 {
3040         struct ldlm_lock *lock;
3041         struct lustre_handle handle;
3042         __u32 mode;
3043
3044         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3045          * might be freed anytime after lock upcall has been called. */
3046         lustre_handle_copy(&handle, aa->oa_lockh);
3047         mode = aa->oa_ei->ei_mode;
3048
3049         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3050          * be valid. */
3051         lock = ldlm_handle2lock(&handle);
3052
3053         /* Take an additional reference so that a blocking AST that
3054          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3055          * to arrive after an upcall has been executed by
3056          * osc_enqueue_fini(). */
3057         ldlm_lock_addref(&handle, mode);
3058
3059         /* Complete obtaining the lock procedure. */
3060         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3061                                    mode, aa->oa_flags, aa->oa_lvb,
3062                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
3063                                    &handle, rc);
3064         /* Complete osc stuff. */
3065         rc = osc_enqueue_fini(req, aa->oa_lvb,
3066                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3067         /* Release the lock for async request. */
3068         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3069                 /*
3070                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3071                  * not already released by
3072                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3073                  */
3074                 ldlm_lock_decref(&handle, mode);
3075
3076         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3077                  aa->oa_lockh, req, aa);
3078         ldlm_lock_decref(&handle, mode);
3079         LDLM_LOCK_PUT(lock);
3080         return rc;
3081 }
3082
3083 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3084                         struct lov_oinfo *loi, int flags,
3085                         struct ost_lvb *lvb, __u32 mode, int rc)
3086 {
3087         if (rc == ELDLM_OK) {
3088                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3089                 __u64 tmp;
3090
3091                 LASSERT(lock != NULL);
3092                 loi->loi_lvb = *lvb;
3093                 tmp = loi->loi_lvb.lvb_size;
3094                 /* Extend KMS up to the end of this lock and no further
3095                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3096                 if (tmp > lock->l_policy_data.l_extent.end)
3097                         tmp = lock->l_policy_data.l_extent.end + 1;
3098                 if (tmp >= loi->loi_kms) {
3099                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3100                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3101                         loi_kms_set(loi, tmp);
3102                 } else {
3103                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3104                                    LPU64"; leaving kms="LPU64", end="LPU64,
3105                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3106                                    lock->l_policy_data.l_extent.end);
3107                 }
3108                 ldlm_lock_allow_match(lock);
3109                 LDLM_LOCK_PUT(lock);
3110         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3111                 loi->loi_lvb = *lvb;
3112                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3113                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3114                 rc = ELDLM_OK;
3115         }
3116 }
3117 EXPORT_SYMBOL(osc_update_enqueue);
3118
3119 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3120
3121 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3122  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3123  * other synchronous requests, however keeping some locks and trying to obtain
3124  * others may take a considerable amount of time in a case of ost failure; and
3125  * when other sync requests do not get released lock from a client, the client
3126  * is excluded from the cluster -- such scenarious make the life difficult, so
3127  * release locks just after they are obtained. */
3128 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3129                      int *flags, ldlm_policy_data_t *policy,
3130                      struct ost_lvb *lvb, int kms_valid,
3131                      obd_enqueue_update_f upcall, void *cookie,
3132                      struct ldlm_enqueue_info *einfo,
3133                      struct lustre_handle *lockh,
3134                      struct ptlrpc_request_set *rqset, int async)
3135 {
3136         struct obd_device *obd = exp->exp_obd;
3137         struct ptlrpc_request *req = NULL;
3138         int intent = *flags & LDLM_FL_HAS_INTENT;
3139         ldlm_mode_t mode;
3140         int rc;
3141         ENTRY;
3142
3143         /* Filesystem lock extents are extended to page boundaries so that
3144          * dealing with the page cache is a little smoother.  */
3145         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3146         policy->l_extent.end |= ~CFS_PAGE_MASK;
3147
3148         /*
3149          * kms is not valid when either object is completely fresh (so that no
3150          * locks are cached), or object was evicted. In the latter case cached
3151          * lock cannot be used, because it would prime inode state with
3152          * potentially stale LVB.
3153          */
3154         if (!kms_valid)
3155                 goto no_match;
3156
3157         /* Next, search for already existing extent locks that will cover us */
3158         /* If we're trying to read, we also search for an existing PW lock.  The
3159          * VFS and page cache already protect us locally, so lots of readers/
3160          * writers can share a single PW lock.
3161          *
3162          * There are problems with conversion deadlocks, so instead of
3163          * converting a read lock to a write lock, we'll just enqueue a new
3164          * one.
3165          *
3166          * At some point we should cancel the read lock instead of making them
3167          * send us a blocking callback, but there are problems with canceling
3168          * locks out from other users right now, too. */
3169         mode = einfo->ei_mode;
3170         if (einfo->ei_mode == LCK_PR)
3171                 mode |= LCK_PW;
3172         mode = ldlm_lock_match(obd->obd_namespace,
3173                                *flags | LDLM_FL_LVB_READY, res_id,
3174                                einfo->ei_type, policy, mode, lockh, 0);
3175         if (mode) {
3176                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3177
3178                 if (matched->l_ast_data == NULL ||
3179                     matched->l_ast_data == einfo->ei_cbdata) {
3180                         /* addref the lock only if not async requests and PW
3181                          * lock is matched whereas we asked for PR. */
3182                         if (!rqset && einfo->ei_mode != mode)
3183                                 ldlm_lock_addref(lockh, LCK_PR);
3184                         osc_set_lock_data_with_check(matched, einfo, *flags);
3185                         if (intent) {
3186                                 /* I would like to be able to ASSERT here that
3187                                  * rss <= kms, but I can't, for reasons which
3188                                  * are explained in lov_enqueue() */
3189                         }
3190
3191                         /* We already have a lock, and it's referenced */
3192                         (*upcall)(cookie, ELDLM_OK);
3193
3194                         /* For async requests, decref the lock. */
3195                         if (einfo->ei_mode != mode)
3196                                 ldlm_lock_decref(lockh, LCK_PW);
3197                         else if (rqset)
3198                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3199                         LDLM_LOCK_PUT(matched);
3200                         RETURN(ELDLM_OK);
3201                 } else
3202                         ldlm_lock_decref(lockh, mode);
3203                 LDLM_LOCK_PUT(matched);
3204         }
3205
3206  no_match:
3207         if (intent) {
3208                 CFS_LIST_HEAD(cancels);
3209                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3210                                            &RQF_LDLM_ENQUEUE_LVB);
3211                 if (req == NULL)
3212                         RETURN(-ENOMEM);
3213
3214                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3215                 if (rc)
3216                         RETURN(rc);
3217
3218                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3219                                      sizeof *lvb);
3220                 ptlrpc_request_set_replen(req);
3221         }
3222
3223         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3224         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3225
3226         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3227                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3228         if (rqset) {
3229                 if (!rc) {
3230                         struct osc_enqueue_args *aa;
3231                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3232                         aa = ptlrpc_req_async_args(req);
3233                         aa->oa_ei = einfo;
3234                         aa->oa_exp = exp;
3235                         aa->oa_flags  = flags;
3236                         aa->oa_upcall = upcall;
3237                         aa->oa_cookie = cookie;
3238                         aa->oa_lvb    = lvb;
3239                         aa->oa_lockh  = lockh;
3240
3241                         req->rq_interpret_reply =
3242                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3243                         if (rqset == PTLRPCD_SET)
3244                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3245                         else
3246                                 ptlrpc_set_add_req(rqset, req);
3247                 } else if (intent) {
3248                         ptlrpc_req_finished(req);
3249                 }
3250                 RETURN(rc);
3251         }
3252
3253         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3254         if (intent)
3255                 ptlrpc_req_finished(req);
3256
3257         RETURN(rc);
3258 }
3259
3260 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3261                        struct ldlm_enqueue_info *einfo,
3262                        struct ptlrpc_request_set *rqset)
3263 {
3264         struct ldlm_res_id res_id;
3265         int rc;
3266         ENTRY;
3267
3268         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3269                            oinfo->oi_md->lsm_object_gr, &res_id);
3270
3271         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3272                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3273                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3274                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3275                               rqset, rqset != NULL);
3276         RETURN(rc);
3277 }
3278
3279 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3280                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3281                    int *flags, void *data, struct lustre_handle *lockh,
3282                    int unref)
3283 {
3284         struct obd_device *obd = exp->exp_obd;
3285         int lflags = *flags;
3286         ldlm_mode_t rc;
3287         ENTRY;
3288
3289         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3290                 RETURN(-EIO);
3291
3292         /* Filesystem lock extents are extended to page boundaries so that
3293          * dealing with the page cache is a little smoother */
3294         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3295         policy->l_extent.end |= ~CFS_PAGE_MASK;
3296
3297         /* Next, search for already existing extent locks that will cover us */
3298         /* If we're trying to read, we also search for an existing PW lock.  The
3299          * VFS and page cache already protect us locally, so lots of readers/
3300          * writers can share a single PW lock. */
3301         rc = mode;
3302         if (mode == LCK_PR)
3303                 rc |= LCK_PW;
3304         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3305                              res_id, type, policy, rc, lockh, unref);
3306         if (rc) {
3307                 if (data != NULL)
3308                         osc_set_data_with_check(lockh, data, lflags);
3309                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3310                         ldlm_lock_addref(lockh, LCK_PR);
3311                         ldlm_lock_decref(lockh, LCK_PW);
3312                 }
3313                 RETURN(rc);
3314         }
3315         RETURN(rc);
3316 }
3317
3318 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3319 {
3320         ENTRY;
3321
3322         if (unlikely(mode == LCK_GROUP))
3323                 ldlm_lock_decref_and_cancel(lockh, mode);
3324         else
3325                 ldlm_lock_decref(lockh, mode);
3326
3327         RETURN(0);
3328 }
3329
3330 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3331                       __u32 mode, struct lustre_handle *lockh)
3332 {
3333         ENTRY;
3334         RETURN(osc_cancel_base(lockh, mode));
3335 }
3336
3337 static int osc_cancel_unused(struct obd_export *exp,
3338                              struct lov_stripe_md *lsm, int flags,
3339                              void *opaque)
3340 {
3341         struct obd_device *obd = class_exp2obd(exp);
3342         struct ldlm_res_id res_id, *resp = NULL;
3343
3344         if (lsm != NULL) {
3345                 resp = osc_build_res_name(lsm->lsm_object_id,
3346                                           lsm->lsm_object_gr, &res_id);
3347         }
3348
3349         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3350 }
3351
3352 static int osc_statfs_interpret(const struct lu_env *env,
3353                                 struct ptlrpc_request *req,
3354                                 struct osc_async_args *aa, int rc)
3355 {
3356         struct obd_statfs *msfs;
3357         ENTRY;
3358
3359         if (rc != 0)
3360                 GOTO(out, rc);
3361
3362         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3363         if (msfs == NULL) {
3364                 GOTO(out, rc = -EPROTO);
3365         }
3366
3367         *aa->aa_oi->oi_osfs = *msfs;
3368 out:
3369         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3370         RETURN(rc);
3371 }
3372
3373 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3374                             __u64 max_age, struct ptlrpc_request_set *rqset)
3375 {
3376         struct ptlrpc_request *req;
3377         struct osc_async_args *aa;
3378         int                    rc;
3379         ENTRY;
3380
3381         /* We could possibly pass max_age in the request (as an absolute
3382          * timestamp or a "seconds.usec ago") so the target can avoid doing
3383          * extra calls into the filesystem if that isn't necessary (e.g.
3384          * during mount that would help a bit).  Having relative timestamps
3385          * is not so great if request processing is slow, while absolute
3386          * timestamps are not ideal because they need time synchronization. */
3387         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3388         if (req == NULL)
3389                 RETURN(-ENOMEM);
3390
3391         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3392         if (rc) {
3393                 ptlrpc_request_free(req);
3394                 RETURN(rc);
3395         }
3396         ptlrpc_request_set_replen(req);
3397         req->rq_request_portal = OST_CREATE_PORTAL;
3398         ptlrpc_at_set_req_timeout(req);
3399
3400         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3401                 /* procfs requests not want stat in wait for avoid deadlock */
3402                 req->rq_no_resend = 1;
3403                 req->rq_no_delay = 1;
3404         }
3405
3406         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3407         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3408         aa = ptlrpc_req_async_args(req);
3409         aa->aa_oi = oinfo;
3410
3411         ptlrpc_set_add_req(rqset, req);
3412         RETURN(0);
3413 }
3414
3415 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3416                       __u64 max_age, __u32 flags)
3417 {
3418         struct obd_statfs     *msfs;
3419         struct ptlrpc_request *req;
3420         struct obd_import     *imp = NULL;
3421         int rc;
3422         ENTRY;
3423
3424         /*Since the request might also come from lprocfs, so we need
3425          *sync this with client_disconnect_export Bug15684*/
3426         down_read(&obd->u.cli.cl_sem);
3427         if (obd->u.cli.cl_import)
3428                 imp = class_import_get(obd->u.cli.cl_import);
3429         up_read(&obd->u.cli.cl_sem);
3430         if (!imp)
3431                 RETURN(-ENODEV);
3432
3433         /* We could possibly pass max_age in the request (as an absolute
3434          * timestamp or a "seconds.usec ago") so the target can avoid doing
3435          * extra calls into the filesystem if that isn't necessary (e.g.
3436          * during mount that would help a bit).  Having relative timestamps
3437          * is not so great if request processing is slow, while absolute
3438          * timestamps are not ideal because they need time synchronization. */
3439         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3440
3441         class_import_put(imp);
3442
3443         if (req == NULL)
3444                 RETURN(-ENOMEM);
3445
3446         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3447         if (rc) {
3448                 ptlrpc_request_free(req);
3449                 RETURN(rc);
3450         }
3451         ptlrpc_request_set_replen(req);
3452         req->rq_request_portal = OST_CREATE_PORTAL;
3453         ptlrpc_at_set_req_timeout(req);
3454
3455         if (flags & OBD_STATFS_NODELAY) {
3456                 /* procfs requests not want stat in wait for avoid deadlock */
3457                 req->rq_no_resend = 1;
3458                 req->rq_no_delay = 1;
3459         }
3460
3461         rc = ptlrpc_queue_wait(req);
3462         if (rc)
3463                 GOTO(out, rc);
3464
3465         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3466         if (msfs == NULL) {
3467                 GOTO(out, rc = -EPROTO);
3468         }
3469
3470         *osfs = *msfs;
3471
3472         EXIT;
3473  out:
3474         ptlrpc_req_finished(req);
3475         return rc;
3476 }
3477
3478 /* Retrieve object striping information.
3479  *
3480  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3481  * the maximum number of OST indices which will fit in the user buffer.
3482  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3483  */
3484 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3485 {
3486         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3487         struct lov_user_md_v3 lum, *lumk;
3488         struct lov_user_ost_data_v1 *lmm_objects;
3489         int rc = 0, lum_size;
3490         ENTRY;
3491
3492         if (!lsm)
3493                 RETURN(-ENODATA);
3494
3495         /* we only need the header part from user space to get lmm_magic and
3496          * lmm_stripe_count, (the header part is common to v1 and v3) */
3497         lum_size = sizeof(struct lov_user_md_v1);
3498         if (copy_from_user(&lum, lump, lum_size))
3499                 RETURN(-EFAULT);
3500
3501         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3502             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3503                 RETURN(-EINVAL);
3504
3505         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3506         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3507         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3508         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3509
3510         /* we can use lov_mds_md_size() to compute lum_size
3511          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3512         if (lum.lmm_stripe_count > 0) {
3513                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3514                 OBD_ALLOC(lumk, lum_size);
3515                 if (!lumk)
3516                         RETURN(-ENOMEM);
3517
3518                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3519                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3520                 else
3521                         lmm_objects = &(lumk->lmm_objects[0]);
3522                 lmm_objects->l_object_id = lsm->lsm_object_id;
3523         } else {
3524                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3525                 lumk = &lum;
3526         }
3527
3528         lumk->lmm_object_id = lsm->lsm_object_id;
3529         lumk->lmm_object_gr = lsm->lsm_object_gr;
3530         lumk->lmm_stripe_count = 1;
3531
3532         if (copy_to_user(lump, lumk, lum_size))
3533                 rc = -EFAULT;
3534
3535         if (lumk != &lum)
3536                 OBD_FREE(lumk, lum_size);
3537
3538         RETURN(rc);
3539 }
3540
3541
3542 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3543                          void *karg, void *uarg)
3544 {
3545         struct obd_device *obd = exp->exp_obd;
3546         struct obd_ioctl_data *data = karg;
3547         int err = 0;
3548         ENTRY;
3549
3550         if (!try_module_get(THIS_MODULE)) {
3551                 CERROR("Can't get module. Is it alive?");
3552                 return -EINVAL;
3553         }
3554         switch (cmd) {
3555         case OBD_IOC_LOV_GET_CONFIG: {
3556                 char *buf;
3557                 struct lov_desc *desc;
3558                 struct obd_uuid uuid;
3559
3560                 buf = NULL;
3561                 len = 0;
3562                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3563                         GOTO(out, err = -EINVAL);
3564
3565                 data = (struct obd_ioctl_data *)buf;
3566
3567                 if (sizeof(*desc) > data->ioc_inllen1) {
3568                         obd_ioctl_freedata(buf, len);
3569                         GOTO(out, err = -EINVAL);
3570                 }
3571
3572                 if (data->ioc_inllen2 < sizeof(uuid)) {
3573                         obd_ioctl_freedata(buf, len);
3574                         GOTO(out, err = -EINVAL);
3575                 }
3576
3577                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3578                 desc->ld_tgt_count = 1;
3579                 desc->ld_active_tgt_count = 1;
3580                 desc->ld_default_stripe_count = 1;
3581                 desc->ld_default_stripe_size = 0;
3582                 desc->ld_default_stripe_offset = 0;
3583                 desc->ld_pattern = 0;
3584                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3585
3586                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3587
3588                 err = copy_to_user((void *)uarg, buf, len);
3589                 if (err)
3590                         err = -EFAULT;
3591                 obd_ioctl_freedata(buf, len);
3592                 GOTO(out, err);
3593         }
3594         case LL_IOC_LOV_SETSTRIPE:
3595                 err = obd_alloc_memmd(exp, karg);
3596                 if (err > 0)
3597                         err = 0;
3598                 GOTO(out, err);
3599         case LL_IOC_LOV_GETSTRIPE:
3600                 err = osc_getstripe(karg, uarg);
3601                 GOTO(out, err);
3602         case OBD_IOC_CLIENT_RECOVER:
3603                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3604                                             data->ioc_inlbuf1);
3605                 if (err > 0)
3606                         err = 0;
3607                 GOTO(out, err);
3608         case IOC_OSC_SET_ACTIVE:
3609                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3610                                                data->ioc_offset);
3611                 GOTO(out, err);
3612         case OBD_IOC_POLL_QUOTACHECK:
3613                 err = lquota_poll_check(quota_interface, exp,
3614                                         (struct if_quotacheck *)karg);
3615                 GOTO(out, err);
3616         case OBD_IOC_PING_TARGET:
3617                 err = ptlrpc_obd_ping(obd);
3618                 GOTO(out, err);
3619         default:
3620                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3621                        cmd, cfs_curproc_comm());
3622                 GOTO(out, err = -ENOTTY);
3623         }
3624 out:
3625         module_put(THIS_MODULE);
3626         return err;
3627 }
3628
3629 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3630                         void *key, __u32 *vallen, void *val,
3631                         struct lov_stripe_md *lsm)
3632 {
3633         ENTRY;
3634         if (!vallen || !val)
3635                 RETURN(-EFAULT);
3636
3637         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3638                 __u32 *stripe = val;
3639                 *vallen = sizeof(*stripe);
3640                 *stripe = 0;
3641                 RETURN(0);
3642         } else if (KEY_IS(KEY_LAST_ID)) {
3643                 struct ptlrpc_request *req;
3644                 obd_id                *reply;
3645                 char                  *tmp;
3646                 int                    rc;
3647
3648                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3649                                            &RQF_OST_GET_INFO_LAST_ID);
3650                 if (req == NULL)
3651                         RETURN(-ENOMEM);
3652
3653                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3654                                      RCL_CLIENT, keylen);
3655                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3656                 if (rc) {
3657                         ptlrpc_request_free(req);
3658                         RETURN(rc);
3659                 }
3660
3661                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3662                 memcpy(tmp, key, keylen);
3663
3664                 ptlrpc_request_set_replen(req);
3665                 rc = ptlrpc_queue_wait(req);
3666                 if (rc)
3667                         GOTO(out, rc);
3668
3669                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3670                 if (reply == NULL)
3671                         GOTO(out, rc = -EPROTO);
3672
3673                 *((obd_id *)val) = *reply;
3674         out:
3675                 ptlrpc_req_finished(req);
3676                 RETURN(rc);
3677         } else if (KEY_IS(KEY_FIEMAP)) {
3678                 struct ptlrpc_request *req;
3679                 struct ll_user_fiemap *reply;
3680                 char *tmp;
3681                 int rc;
3682
3683                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3684                                            &RQF_OST_GET_INFO_FIEMAP);
3685                 if (req == NULL)
3686                         RETURN(-ENOMEM);
3687
3688                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3689                                      RCL_CLIENT, keylen);
3690                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3691                                      RCL_CLIENT, *vallen);
3692                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3693                                      RCL_SERVER, *vallen);
3694
3695                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3696                 if (rc) {
3697                         ptlrpc_request_free(req);
3698                         RETURN(rc);
3699                 }
3700
3701                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3702                 memcpy(tmp, key, keylen);
3703                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3704                 memcpy(tmp, val, *vallen);
3705
3706                 ptlrpc_request_set_replen(req);
3707                 rc = ptlrpc_queue_wait(req);
3708                 if (rc)
3709                         GOTO(out1, rc);
3710
3711                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3712                 if (reply == NULL)
3713                         GOTO(out1, rc = -EPROTO);
3714
3715                 memcpy(val, reply, *vallen);
3716         out1:
3717                 ptlrpc_req_finished(req);
3718
3719                 RETURN(rc);
3720         }
3721
3722         RETURN(-EINVAL);
3723 }
3724
3725 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3726                                           struct ptlrpc_request *req,
3727                                           void *aa, int rc)
3728 {
3729         struct llog_ctxt *ctxt;
3730         struct obd_import *imp = req->rq_import;
3731         ENTRY;
3732
3733         if (rc != 0)
3734                 RETURN(rc);
3735
3736         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3737         if (ctxt) {
3738                 if (rc == 0)
3739                         rc = llog_initiator_connect(ctxt);
3740                 else
3741                         CERROR("cannot establish connection for "
3742                                "ctxt %p: %d\n", ctxt, rc);
3743         }
3744
3745         llog_ctxt_put(ctxt);
3746         spin_lock(&imp->imp_lock);
3747         imp->imp_server_timeout = 1;
3748         imp->imp_pingable = 1;
3749         spin_unlock(&imp->imp_lock);
3750         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3751
3752         RETURN(rc);
3753 }
3754
3755 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3756                               void *key, obd_count vallen, void *val,
3757                               struct ptlrpc_request_set *set)
3758 {
3759         struct ptlrpc_request *req;
3760         struct obd_device     *obd = exp->exp_obd;
3761         struct obd_import     *imp = class_exp2cliimp(exp);
3762         char                  *tmp;
3763         int                    rc;
3764         ENTRY;
3765
3766         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3767
3768         if (KEY_IS(KEY_NEXT_ID)) {
3769                 if (vallen != sizeof(obd_id))
3770                         RETURN(-ERANGE);
3771                 if (val == NULL)
3772                         RETURN(-EINVAL);
3773                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3774                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3775                        exp->exp_obd->obd_name,
3776                        obd->u.cli.cl_oscc.oscc_next_id);
3777
3778                 RETURN(0);
3779         }
3780
3781         if (KEY_IS(KEY_UNLINKED)) {
3782                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3783                 spin_lock(&oscc->oscc_lock);
3784                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3785                 spin_unlock(&oscc->oscc_lock);
3786                 RETURN(0);
3787         }
3788
3789         if (KEY_IS(KEY_INIT_RECOV)) {
3790                 if (vallen != sizeof(int))
3791                         RETURN(-EINVAL);
3792                 spin_lock(&imp->imp_lock);
3793                 imp->imp_initial_recov = *(int *)val;
3794                 spin_unlock(&imp->imp_lock);
3795                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3796                        exp->exp_obd->obd_name,
3797                        imp->imp_initial_recov);
3798                 RETURN(0);
3799         }
3800
3801         if (KEY_IS(KEY_CHECKSUM)) {
3802                 if (vallen != sizeof(int))
3803                         RETURN(-EINVAL);
3804                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3805                 RETURN(0);
3806         }
3807
3808         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3809                 sptlrpc_conf_client_adapt(obd);
3810                 RETURN(0);
3811         }
3812
3813         if (KEY_IS(KEY_FLUSH_CTX)) {
3814                 sptlrpc_import_flush_my_ctx(imp);
3815                 RETURN(0);
3816         }
3817
3818         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3819                 RETURN(-EINVAL);
3820
3821         /* We pass all other commands directly to OST. Since nobody calls osc
3822            methods directly and everybody is supposed to go through LOV, we
3823            assume lov checked invalid values for us.
3824            The only recognised values so far are evict_by_nid and mds_conn.
3825            Even if something bad goes through, we'd get a -EINVAL from OST
3826            anyway. */
3827
3828         if (KEY_IS(KEY_GRANT_SHRINK))  
3829                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
3830         else 
3831                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3832         
3833         if (req == NULL)
3834                 RETURN(-ENOMEM);
3835
3836         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3837                              RCL_CLIENT, keylen);
3838         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3839                              RCL_CLIENT, vallen);
3840         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3841         if (rc) {
3842                 ptlrpc_request_free(req);
3843                 RETURN(rc);
3844         }
3845
3846         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3847         memcpy(tmp, key, keylen);
3848         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3849         memcpy(tmp, val, vallen);
3850
3851         if (KEY_IS(KEY_MDS_CONN)) {
3852                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3853
3854                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3855                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3856                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3857                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3858         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3859                 struct osc_grant_args *aa;
3860                 struct obdo *oa;
3861
3862                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3863                 aa = ptlrpc_req_async_args(req);
3864                 OBD_ALLOC_PTR(oa);
3865                 if (!oa) {
3866                         ptlrpc_req_finished(req);
3867                         RETURN(-ENOMEM);
3868                 }
3869                 *oa = ((struct ost_body *)val)->oa;
3870                 aa->aa_oa = oa;
3871                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3872         }
3873         
3874         ptlrpc_request_set_replen(req);
3875         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3876                 LASSERT(set != NULL);
3877                 ptlrpc_set_add_req(set, req);
3878                 ptlrpc_check_set(NULL, set);
3879         } else 
3880                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3881         
3882         RETURN(0);
3883 }
3884
3885
3886 static struct llog_operations osc_size_repl_logops = {
3887         lop_cancel: llog_obd_repl_cancel
3888 };
3889
3890 static struct llog_operations osc_mds_ost_orig_logops;
3891 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3892                          struct obd_device *tgt, int count,
3893                          struct llog_catid *catid, struct obd_uuid *uuid)
3894 {
3895         int rc;
3896         ENTRY;
3897
3898         LASSERT(olg == &obd->obd_olg);
3899         spin_lock(&obd->obd_dev_lock);
3900         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3901                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3902                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3903                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3904                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3905                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3906         }
3907         spin_unlock(&obd->obd_dev_lock);
3908
3909         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3910                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3911         if (rc) {
3912                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3913                 GOTO(out, rc);
3914         }
3915
3916         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3917                         NULL, &osc_size_repl_logops);
3918         if (rc) {
3919                 struct llog_ctxt *ctxt =
3920                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3921                 if (ctxt)
3922                         llog_cleanup(ctxt);
3923                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3924         }
3925         GOTO(out, rc);
3926 out:
3927         if (rc) {
3928                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3929                        obd->obd_name, tgt->obd_name, count, catid, rc);
3930                 CERROR("logid "LPX64":0x%x\n",
3931                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3932         }
3933         return rc;
3934 }
3935
3936 static int osc_llog_finish(struct obd_device *obd, int count)
3937 {
3938         struct llog_ctxt *ctxt;
3939         int rc = 0, rc2 = 0;
3940         ENTRY;
3941
3942         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3943         if (ctxt)
3944                 rc = llog_cleanup(ctxt);
3945
3946         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3947         if (ctxt)
3948                 rc2 = llog_cleanup(ctxt);
3949         if (!rc)
3950                 rc = rc2;
3951
3952         RETURN(rc);
3953 }
3954
3955 static int osc_reconnect(const struct lu_env *env,
3956                          struct obd_export *exp, struct obd_device *obd,
3957                          struct obd_uuid *cluuid,
3958                          struct obd_connect_data *data,
3959                          void *localdata)
3960 {
3961         struct client_obd *cli = &obd->u.cli;
3962
3963         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3964                 long lost_grant;
3965
3966                 client_obd_list_lock(&cli->cl_loi_list_lock);
3967                 data->ocd_grant = cli->cl_avail_grant ?:
3968                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3969                 lost_grant = cli->cl_lost_grant;
3970                 cli->cl_lost_grant = 0;
3971                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3972
3973                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3974                        "cl_lost_grant: %ld\n", data->ocd_grant,
3975                        cli->cl_avail_grant, lost_grant);
3976                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3977                        " ocd_grant: %d\n", data->ocd_connect_flags,
3978                        data->ocd_version, data->ocd_grant);
3979         }
3980
3981         RETURN(0);
3982 }
3983
3984 static int osc_disconnect(struct obd_export *exp)
3985 {
3986         struct obd_device *obd = class_exp2obd(exp);
3987         struct llog_ctxt  *ctxt;
3988         int rc;
3989
3990         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3991         if (ctxt) {
3992                 if (obd->u.cli.cl_conn_count == 1) {
3993                         /* Flush any remaining cancel messages out to the
3994                          * target */
3995                         llog_sync(ctxt, exp);
3996                 }
3997                 llog_ctxt_put(ctxt);
3998         } else {
3999                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4000                        obd);
4001         }
4002
4003         rc = client_disconnect_export(exp);
4004         /**
4005          * Initially we put del_shrink_grant before disconnect_export, but it
4006          * causes the following problem if setup (connect) and cleanup
4007          * (disconnect) are tangled together.
4008          *      connect p1                     disconnect p2
4009          *   ptlrpc_connect_import 
4010          *     ...............               class_manual_cleanup
4011          *                                     osc_disconnect
4012          *                                     del_shrink_grant
4013          *   ptlrpc_connect_interrupt
4014          *     init_grant_shrink
4015          *   add this client to shrink list                 
4016          *                                      cleanup_osc
4017          * Bang! pinger trigger the shrink.
4018          * So the osc should be disconnected from the shrink list, after we
4019          * are sure the import has been destroyed. BUG18662 
4020          */
4021         if (obd->u.cli.cl_import == NULL)
4022                 osc_del_shrink_grant(&obd->u.cli);
4023         return rc;
4024 }
4025
4026 static int osc_import_event(struct obd_device *obd,
4027                             struct obd_import *imp,
4028                             enum obd_import_event event)
4029 {
4030         struct client_obd *cli;
4031         int rc = 0;
4032
4033         ENTRY;
4034         LASSERT(imp->imp_obd == obd);
4035
4036         switch (event) {
4037         case IMP_EVENT_DISCON: {
4038                 /* Only do this on the MDS OSC's */
4039                 if (imp->imp_server_timeout) {
4040                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4041
4042                         spin_lock(&oscc->oscc_lock);
4043                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4044                         spin_unlock(&oscc->oscc_lock);
4045                 }
4046                 cli = &obd->u.cli;
4047                 client_obd_list_lock(&cli->cl_loi_list_lock);
4048                 cli->cl_avail_grant = 0;
4049                 cli->cl_lost_grant = 0;
4050                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4051                 break;
4052         }
4053         case IMP_EVENT_INACTIVE: {
4054                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4055                 break;
4056         }
4057         case IMP_EVENT_INVALIDATE: {
4058                 struct ldlm_namespace *ns = obd->obd_namespace;
4059                 struct lu_env         *env;
4060                 int                    refcheck;
4061
4062                 env = cl_env_get(&refcheck);
4063                 if (!IS_ERR(env)) {
4064                         /* Reset grants */
4065                         cli = &obd->u.cli;
4066                         client_obd_list_lock(&cli->cl_loi_list_lock);
4067                         /* all pages go to failing rpcs due to the invalid
4068                          * import */
4069                         osc_check_rpcs(env, cli);
4070                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4071
4072                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4073                         cl_env_put(env, &refcheck);
4074                 } else
4075                         rc = PTR_ERR(env);
4076                 break;
4077         }
4078         case IMP_EVENT_ACTIVE: {
4079                 /* Only do this on the MDS OSC's */
4080                 if (imp->imp_server_timeout) {
4081                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4082
4083                         spin_lock(&oscc->oscc_lock);
4084                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4085                         spin_unlock(&oscc->oscc_lock);
4086                 }
4087                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4088                 break;
4089         }
4090         case IMP_EVENT_OCD: {
4091                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4092
4093                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4094                         osc_init_grant(&obd->u.cli, ocd);
4095
4096                 /* See bug 7198 */
4097                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4098                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4099
4100                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4101                 break;
4102         }
4103         default:
4104                 CERROR("Unknown import event %d\n", event);
4105                 LBUG();
4106         }
4107         RETURN(rc);
4108 }
4109
4110 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4111 {
4112         int rc;
4113         ENTRY;
4114
4115         ENTRY;
4116         rc = ptlrpcd_addref();
4117         if (rc)
4118                 RETURN(rc);
4119
4120         rc = client_obd_setup(obd, lcfg);
4121         if (rc) {
4122                 ptlrpcd_decref();
4123         } else {
4124                 struct lprocfs_static_vars lvars = { 0 };
4125                 struct client_obd *cli = &obd->u.cli;
4126
4127                 lprocfs_osc_init_vars(&lvars);
4128                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4129                         lproc_osc_attach_seqstat(obd);
4130                         sptlrpc_lprocfs_cliobd_attach(obd);
4131                         ptlrpc_lprocfs_register_obd(obd);
4132                 }
4133
4134                 oscc_init(obd);
4135                 /* We need to allocate a few requests more, because
4136                    brw_interpret tries to create new requests before freeing
4137                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4138                    reserved, but I afraid that might be too much wasted RAM
4139                    in fact, so 2 is just my guess and still should work. */
4140                 cli->cl_import->imp_rq_pool =
4141                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4142                                             OST_MAXREQSIZE,
4143                                             ptlrpc_add_rqs_to_pool);
4144                 
4145                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4146                 sema_init(&cli->cl_grant_sem, 1);
4147         }
4148
4149         RETURN(rc);
4150 }
4151
4152 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4153 {
4154         int rc = 0;
4155         ENTRY;
4156
4157         switch (stage) {
4158         case OBD_CLEANUP_EARLY: {
4159                 struct obd_import *imp;
4160                 imp = obd->u.cli.cl_import;
4161                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4162                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4163                 ptlrpc_deactivate_import(imp);
4164                 spin_lock(&imp->imp_lock);
4165                 imp->imp_pingable = 0;
4166                 spin_unlock(&imp->imp_lock);
4167                 break;
4168         }
4169         case OBD_CLEANUP_EXPORTS: {
4170                 /* If we set up but never connected, the
4171                    client import will not have been cleaned. */
4172                 if (obd->u.cli.cl_import) {
4173                         struct obd_import *imp;
4174                         down_write(&obd->u.cli.cl_sem);
4175                         imp = obd->u.cli.cl_import;
4176                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4177                                obd->obd_name);
4178                         ptlrpc_invalidate_import(imp);
4179                         if (imp->imp_rq_pool) {
4180                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4181                                 imp->imp_rq_pool = NULL;
4182                         }
4183                         class_destroy_import(imp);
4184                         up_write(&obd->u.cli.cl_sem);
4185                         obd->u.cli.cl_import = NULL;
4186                 }
4187                 rc = obd_llog_finish(obd, 0);
4188                 if (rc != 0)
4189                         CERROR("failed to cleanup llogging subsystems\n");
4190                 break;
4191                 }
4192         }
4193         RETURN(rc);
4194 }
4195
4196 int osc_cleanup(struct obd_device *obd)
4197 {
4198         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4199         int rc;
4200
4201         ENTRY;
4202         ptlrpc_lprocfs_unregister_obd(obd);
4203         lprocfs_obd_cleanup(obd);
4204
4205         spin_lock(&oscc->oscc_lock);
4206         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4207         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4208         spin_unlock(&oscc->oscc_lock);
4209
4210         /* free memory of osc quota cache */
4211         lquota_cleanup(quota_interface, obd);
4212
4213         rc = client_obd_cleanup(obd);
4214
4215         ptlrpcd_decref();
4216         RETURN(rc);
4217 }
4218
4219 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4220 {
4221         struct lprocfs_static_vars lvars = { 0 };
4222         int rc = 0;
4223
4224         lprocfs_osc_init_vars(&lvars);
4225
4226         switch (lcfg->lcfg_command) {
4227         default:
4228                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4229                                               lcfg, obd);
4230                 if (rc > 0)
4231                         rc = 0;
4232                 break;
4233         }
4234
4235         return(rc);
4236 }
4237
4238 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4239 {
4240         return osc_process_config_base(obd, buf);
4241 }
4242
4243 struct obd_ops osc_obd_ops = {
4244         .o_owner                = THIS_MODULE,
4245         .o_setup                = osc_setup,
4246         .o_precleanup           = osc_precleanup,
4247         .o_cleanup              = osc_cleanup,
4248         .o_add_conn             = client_import_add_conn,
4249         .o_del_conn             = client_import_del_conn,
4250         .o_connect              = client_connect_import,
4251         .o_reconnect            = osc_reconnect,
4252         .o_disconnect           = osc_disconnect,
4253         .o_statfs               = osc_statfs,
4254         .o_statfs_async         = osc_statfs_async,
4255         .o_packmd               = osc_packmd,
4256         .o_unpackmd             = osc_unpackmd,
4257         .o_precreate            = osc_precreate,
4258         .o_create               = osc_create,
4259         .o_destroy              = osc_destroy,
4260         .o_getattr              = osc_getattr,
4261         .o_getattr_async        = osc_getattr_async,
4262         .o_setattr              = osc_setattr,
4263         .o_setattr_async        = osc_setattr_async,
4264         .o_brw                  = osc_brw,
4265         .o_punch                = osc_punch,
4266         .o_sync                 = osc_sync,
4267         .o_enqueue              = osc_enqueue,
4268         .o_change_cbdata        = osc_change_cbdata,
4269         .o_cancel               = osc_cancel,
4270         .o_cancel_unused        = osc_cancel_unused,
4271         .o_iocontrol            = osc_iocontrol,
4272         .o_get_info             = osc_get_info,
4273         .o_set_info_async       = osc_set_info_async,
4274         .o_import_event         = osc_import_event,
4275         .o_llog_init            = osc_llog_init,
4276         .o_llog_finish          = osc_llog_finish,
4277         .o_process_config       = osc_process_config,
4278 };
4279
4280 extern struct lu_kmem_descr  osc_caches[];
4281 extern spinlock_t            osc_ast_guard;
4282 extern struct lock_class_key osc_ast_guard_class;
4283
4284 int __init osc_init(void)
4285 {
4286         struct lprocfs_static_vars lvars = { 0 };
4287         int rc;
4288         ENTRY;
4289
4290         /* print an address of _any_ initialized kernel symbol from this
4291          * module, to allow debugging with gdb that doesn't support data
4292          * symbols from modules.*/
4293         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4294
4295         rc = lu_kmem_init(osc_caches);
4296
4297         lprocfs_osc_init_vars(&lvars);
4298
4299         request_module("lquota");
4300         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4301         lquota_init(quota_interface);
4302         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4303
4304         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4305                                  LUSTRE_OSC_NAME, &osc_device_type);
4306         if (rc) {
4307                 if (quota_interface)
4308                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4309                 lu_kmem_fini(osc_caches);
4310                 RETURN(rc);
4311         }
4312
4313         spin_lock_init(&osc_ast_guard);
4314         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4315
4316         RETURN(rc);
4317 }
4318
4319 #ifdef __KERNEL__
4320 static void /*__exit*/ osc_exit(void)
4321 {
4322         lu_device_type_fini(&osc_device_type);
4323
4324         lquota_exit(quota_interface);
4325         if (quota_interface)
4326                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4327
4328         class_unregister_type(LUSTRE_OSC_NAME);
4329         lu_kmem_fini(osc_caches);
4330 }
4331
4332 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4333 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4334 MODULE_LICENSE("GPL");
4335
4336 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4337 #endif