Whamcloud - gitweb
lu_ref support for ldlm_lock and ldlm_resource. See lu_ref patch.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
213                                   lustre_swab_ost_body);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         *oinfo->oi_oa = body->oa;
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
315                                         oinfo->oi_oa->o_gr > 0);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         *oinfo->oi_oa = body->oa;
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(struct ptlrpc_request *req,
349                                  struct osc_async_args *aa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         *aa->aa_oi->oi_oa = body->oa;
362 out:
363         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364         RETURN(rc);
365 }
366
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368                              struct obd_trans_info *oti,
369                              struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request *req;
372         struct osc_async_args *aa;
373         int                    rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         osc_pack_req_body(req, oinfo);
388
389         ptlrpc_request_set_replen(req);
390
391         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
392                 LASSERT(oti);
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394         }
395
396         /* do mds to ost setattr asynchronouly */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req);
400         } else {
401                 req->rq_interpret_reply = osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
404                 aa = ptlrpc_req_async_args(req);
405                 aa->aa_oi = oinfo;
406
407                 ptlrpc_set_add_req(rqset, req);
408         }
409
410         RETURN(0);
411 }
412
413 int osc_real_create(struct obd_export *exp, struct obdo *oa,
414                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 {
416         struct ptlrpc_request *req;
417         struct ost_body       *body;
418         struct lov_stripe_md  *lsm;
419         int                    rc;
420         ENTRY;
421
422         LASSERT(oa);
423         LASSERT(ea);
424
425         lsm = *ea;
426         if (!lsm) {
427                 rc = obd_alloc_memmd(exp, &lsm);
428                 if (rc < 0)
429                         RETURN(rc);
430         }
431
432         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433         if (req == NULL)
434                 GOTO(out, rc = -ENOMEM);
435
436         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 GOTO(out, rc);
440         }
441
442         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443         LASSERT(body);
444         body->oa = *oa;
445
446         ptlrpc_request_set_replen(req);
447
448         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
449             oa->o_flags == OBD_FL_DELORPHAN) {
450                 DEBUG_REQ(D_HA, req,
451                           "delorphan from OST integration");
452                 /* Don't resend the delorphan req */
453                 req->rq_no_resend = req->rq_no_delay = 1;
454         }
455
456         rc = ptlrpc_queue_wait(req);
457         if (rc)
458                 GOTO(out_req, rc);
459
460         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
461         if (body == NULL)
462                 GOTO(out_req, rc = -EPROTO);
463
464         *oa = body->oa;
465
466         /* This should really be sent by the OST */
467         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
468         oa->o_valid |= OBD_MD_FLBLKSZ;
469
470         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
471          * have valid lsm_oinfo data structs, so don't go touching that.
472          * This needs to be fixed in a big way.
473          */
474         lsm->lsm_object_id = oa->o_id;
475         lsm->lsm_object_gr = oa->o_gr;
476         *ea = lsm;
477
478         if (oti != NULL) {
479                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
480
481                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
482                         if (!oti->oti_logcookies)
483                                 oti_alloc_cookies(oti, 1);
484                         *oti->oti_logcookies = oa->o_lcookie;
485                 }
486         }
487
488         CDEBUG(D_HA, "transno: "LPD64"\n",
489                lustre_msg_get_transno(req->rq_repmsg));
490 out_req:
491         ptlrpc_req_finished(req);
492 out:
493         if (rc && !*ea)
494                 obd_free_memmd(exp, &lsm);
495         RETURN(rc);
496 }
497
498 static int osc_punch_interpret(struct ptlrpc_request *req,
499                                struct osc_async_args *aa, int rc)
500 {
501         struct ost_body *body;
502         ENTRY;
503
504         if (rc != 0)
505                 GOTO(out, rc);
506
507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508         if (body == NULL)
509                 GOTO(out, rc = -EPROTO);
510
511         *aa->aa_oi->oi_oa = body->oa;
512 out:
513         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
514         RETURN(rc);
515 }
516
517 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
518                      struct obd_trans_info *oti,
519                      struct ptlrpc_request_set *rqset)
520 {
521         struct ptlrpc_request *req;
522         struct osc_async_args *aa;
523         struct ost_body       *body;
524         int                    rc;
525         ENTRY;
526
527         if (!oinfo->oi_oa) {
528                 CDEBUG(D_INFO, "oa NULL\n");
529                 RETURN(-EINVAL);
530         }
531
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
533         if (req == NULL)
534                 RETURN(-ENOMEM);
535
536         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
538         if (rc) {
539                 ptlrpc_request_free(req);
540                 RETURN(rc);
541         }
542         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
543         ptlrpc_at_set_req_timeout(req);
544         osc_pack_req_body(req, oinfo);
545
546         /* overload the size and blocks fields in the oa with start/end */
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa.o_size = oinfo->oi_policy.l_extent.start;
550         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
551         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->aa_oi = oinfo;
559         ptlrpc_set_add_req(rqset, req);
560
561         RETURN(0);
562 }
563
564 static int osc_sync(struct obd_export *exp, struct obdo *oa,
565                     struct lov_stripe_md *md, obd_size start, obd_size end,
566                     void *capa)
567 {
568         struct ptlrpc_request *req;
569         struct ost_body       *body;
570         int                    rc;
571         ENTRY;
572
573         if (!oa) {
574                 CDEBUG(D_INFO, "oa NULL\n");
575                 RETURN(-EINVAL);
576         }
577
578         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
579         if (req == NULL)
580                 RETURN(-ENOMEM);
581
582         osc_set_capa_size(req, &RMF_CAPA1, capa);
583         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
584         if (rc) {
585                 ptlrpc_request_free(req);
586                 RETURN(rc);
587         }
588
589         /* overload the size and blocks fields in the oa with start/end */
590         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
591         LASSERT(body);
592         body->oa = *oa;
593         body->oa.o_size = start;
594         body->oa.o_blocks = end;
595         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
596         osc_pack_capa(req, body, capa);
597
598         ptlrpc_request_set_replen(req);
599
600         rc = ptlrpc_queue_wait(req);
601         if (rc)
602                 GOTO(out, rc);
603
604         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
605         if (body == NULL)
606                 GOTO(out, rc = -EPROTO);
607
608         *oa = body->oa;
609
610         EXIT;
611  out:
612         ptlrpc_req_finished(req);
613         return rc;
614 }
615
616 /* Find and cancel locally locks matched by @mode in the resource found by
617  * @objid. Found locks are added into @cancel list. Returns the amount of
618  * locks added to @cancels list. */
619 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
620                                    struct list_head *cancels, ldlm_mode_t mode,
621                                    int lock_flags)
622 {
623         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
624         struct ldlm_res_id res_id;
625         struct ldlm_resource *res;
626         int count;
627         ENTRY;
628
629         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
630         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
631         if (res == NULL)
632                 RETURN(0);
633
634         LDLM_RESOURCE_ADDREF(res);
635         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
636                                            lock_flags, 0, NULL);
637         LDLM_RESOURCE_DELREF(res);
638         ldlm_resource_putref(res);
639         RETURN(count);
640 }
641
642 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
643                                  int rc)
644 {
645         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
646
647         atomic_dec(&cli->cl_destroy_in_flight);
648         cfs_waitq_signal(&cli->cl_destroy_waitq);
649         return 0;
650 }
651
652 static int osc_can_send_destroy(struct client_obd *cli)
653 {
654         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
655             cli->cl_max_rpcs_in_flight) {
656                 /* The destroy request can be sent */
657                 return 1;
658         }
659         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
660             cli->cl_max_rpcs_in_flight) {
661                 /*
662                  * The counter has been modified between the two atomic
663                  * operations.
664                  */
665                 cfs_waitq_signal(&cli->cl_destroy_waitq);
666         }
667         return 0;
668 }
669
670 /* Destroy requests can be async always on the client, and we don't even really
671  * care about the return code since the client cannot do anything at all about
672  * a destroy failure.
673  * When the MDS is unlinking a filename, it saves the file objects into a
674  * recovery llog, and these object records are cancelled when the OST reports
675  * they were destroyed and sync'd to disk (i.e. transaction committed).
676  * If the client dies, or the OST is down when the object should be destroyed,
677  * the records are not cancelled, and when the OST reconnects to the MDS next,
678  * it will retrieve the llog unlink logs and then sends the log cancellation
679  * cookies to the MDS after committing destroy transactions. */
680 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
681                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
682                        struct obd_export *md_export)
683 {
684         struct client_obd     *cli = &exp->exp_obd->u.cli;
685         struct ptlrpc_request *req;
686         struct ost_body       *body;
687         CFS_LIST_HEAD(cancels);
688         int rc, count;
689         ENTRY;
690
691         if (!oa) {
692                 CDEBUG(D_INFO, "oa NULL\n");
693                 RETURN(-EINVAL);
694         }
695
696         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
697                                         LDLM_FL_DISCARD_DATA);
698
699         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
700         if (req == NULL) {
701                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
702                 RETURN(-ENOMEM);
703         }
704
705         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
706                                0, &cancels, count);
707         if (rc) {
708                 ptlrpc_request_free(req);
709                 RETURN(rc);
710         }
711
712         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
713         req->rq_interpret_reply = osc_destroy_interpret;
714         ptlrpc_at_set_req_timeout(req);
715
716         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
717                 oa->o_lcookie = *oti->oti_logcookies;
718         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
719         LASSERT(body);
720         body->oa = *oa;
721
722         ptlrpc_request_set_replen(req);
723
724         if (!osc_can_send_destroy(cli)) {
725                 struct l_wait_info lwi = { 0 };
726
727                 /*
728                  * Wait until the number of on-going destroy RPCs drops
729                  * under max_rpc_in_flight
730                  */
731                 l_wait_event_exclusive(cli->cl_destroy_waitq,
732                                        osc_can_send_destroy(cli), &lwi);
733         }
734
735         /* Do not wait for response */
736         ptlrpcd_add_req(req);
737         RETURN(0);
738 }
739
740 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
741                                 long writing_bytes)
742 {
743         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
744
745         LASSERT(!(oa->o_valid & bits));
746
747         oa->o_valid |= bits;
748         client_obd_list_lock(&cli->cl_loi_list_lock);
749         oa->o_dirty = cli->cl_dirty;
750         if (cli->cl_dirty > cli->cl_dirty_max) {
751                 CERROR("dirty %lu > dirty_max %lu\n",
752                        cli->cl_dirty, cli->cl_dirty_max);
753                 oa->o_undirty = 0;
754         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
755                 CERROR("dirty %d > system dirty_max %d\n",
756                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
757                 oa->o_undirty = 0;
758         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
759                 CERROR("dirty %lu - dirty_max %lu too big???\n",
760                        cli->cl_dirty, cli->cl_dirty_max);
761                 oa->o_undirty = 0;
762         } else {
763                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
764                                 (cli->cl_max_rpcs_in_flight + 1);
765                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
766         }
767         oa->o_grant = cli->cl_avail_grant;
768         oa->o_dropped = cli->cl_lost_grant;
769         cli->cl_lost_grant = 0;
770         client_obd_list_unlock(&cli->cl_loi_list_lock);
771         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
772                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
773 }
774
775 /* caller must hold loi_list_lock */
776 static void osc_consume_write_grant(struct client_obd *cli,
777                                     struct brw_page *pga)
778 {
779         atomic_inc(&obd_dirty_pages);
780         cli->cl_dirty += CFS_PAGE_SIZE;
781         cli->cl_avail_grant -= CFS_PAGE_SIZE;
782         pga->flag |= OBD_BRW_FROM_GRANT;
783         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
784                CFS_PAGE_SIZE, pga, pga->pg);
785         LASSERT(cli->cl_avail_grant >= 0);
786 }
787
788 /* the companion to osc_consume_write_grant, called when a brw has completed.
789  * must be called with the loi lock held. */
790 static void osc_release_write_grant(struct client_obd *cli,
791                                     struct brw_page *pga, int sent)
792 {
793         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
794         ENTRY;
795
796         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
797                 EXIT;
798                 return;
799         }
800
801         pga->flag &= ~OBD_BRW_FROM_GRANT;
802         atomic_dec(&obd_dirty_pages);
803         cli->cl_dirty -= CFS_PAGE_SIZE;
804         if (!sent) {
805                 cli->cl_lost_grant += CFS_PAGE_SIZE;
806                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
807                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
808         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
809                 /* For short writes we shouldn't count parts of pages that
810                  * span a whole block on the OST side, or our accounting goes
811                  * wrong.  Should match the code in filter_grant_check. */
812                 int offset = pga->off & ~CFS_PAGE_MASK;
813                 int count = pga->count + (offset & (blocksize - 1));
814                 int end = (offset + pga->count) & (blocksize - 1);
815                 if (end)
816                         count += blocksize - end;
817
818                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
819                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
820                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
821                        cli->cl_avail_grant, cli->cl_dirty);
822         }
823
824         EXIT;
825 }
826
827 static unsigned long rpcs_in_flight(struct client_obd *cli)
828 {
829         return cli->cl_r_in_flight + cli->cl_w_in_flight;
830 }
831
832 /* caller must hold loi_list_lock */
833 void osc_wake_cache_waiters(struct client_obd *cli)
834 {
835         struct list_head *l, *tmp;
836         struct osc_cache_waiter *ocw;
837
838         ENTRY;
839         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
840                 /* if we can't dirty more, we must wait until some is written */
841                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
842                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
843                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
844                                "osc max %ld, sys max %d\n", cli->cl_dirty,
845                                cli->cl_dirty_max, obd_max_dirty_pages);
846                         return;
847                 }
848
849                 /* if still dirty cache but no grant wait for pending RPCs that
850                  * may yet return us some grant before doing sync writes */
851                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
852                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
853                                cli->cl_w_in_flight);
854                         return;
855                 }
856
857                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
858                 list_del_init(&ocw->ocw_entry);
859                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
860                         /* no more RPCs in flight to return grant, do sync IO */
861                         ocw->ocw_rc = -EDQUOT;
862                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
863                 } else {
864                         osc_consume_write_grant(cli,
865                                                 &ocw->ocw_oap->oap_brw_page);
866                 }
867
868                 cfs_waitq_signal(&ocw->ocw_waitq);
869         }
870
871         EXIT;
872 }
873
874 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         cli->cl_avail_grant = ocd->ocd_grant;
878         client_obd_list_unlock(&cli->cl_loi_list_lock);
879
880         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
881                cli->cl_avail_grant, cli->cl_lost_grant);
882         LASSERT(cli->cl_avail_grant >= 0);
883 }
884
885 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
886 {
887         client_obd_list_lock(&cli->cl_loi_list_lock);
888         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
889         if (body->oa.o_valid & OBD_MD_FLGRANT)
890                 cli->cl_avail_grant += body->oa.o_grant;
891         /* waiters are woken in brw_interpret */
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893 }
894
895 /* We assume that the reason this OSC got a short read is because it read
896  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
897  * via the LOV, and it _knows_ it's reading inside the file, it's just that
898  * this stripe never got written at or beyond this stripe offset yet. */
899 static void handle_short_read(int nob_read, obd_count page_count,
900                               struct brw_page **pga)
901 {
902         char *ptr;
903         int i = 0;
904
905         /* skip bytes read OK */
906         while (nob_read > 0) {
907                 LASSERT (page_count > 0);
908
909                 if (pga[i]->count > nob_read) {
910                         /* EOF inside this page */
911                         ptr = cfs_kmap(pga[i]->pg) +
912                                 (pga[i]->off & ~CFS_PAGE_MASK);
913                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
914                         cfs_kunmap(pga[i]->pg);
915                         page_count--;
916                         i++;
917                         break;
918                 }
919
920                 nob_read -= pga[i]->count;
921                 page_count--;
922                 i++;
923         }
924
925         /* zero remaining pages */
926         while (page_count-- > 0) {
927                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
928                 memset(ptr, 0, pga[i]->count);
929                 cfs_kunmap(pga[i]->pg);
930                 i++;
931         }
932 }
933
934 static int check_write_rcs(struct ptlrpc_request *req,
935                            int requested_nob, int niocount,
936                            obd_count page_count, struct brw_page **pga)
937 {
938         int    *remote_rcs, i;
939
940         /* return error if any niobuf was in error */
941         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
942                                         sizeof(*remote_rcs) * niocount, NULL);
943         if (remote_rcs == NULL) {
944                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
945                 return(-EPROTO);
946         }
947         if (lustre_msg_swabbed(req->rq_repmsg))
948                 for (i = 0; i < niocount; i++)
949                         __swab32s(&remote_rcs[i]);
950
951         for (i = 0; i < niocount; i++) {
952                 if (remote_rcs[i] < 0)
953                         return(remote_rcs[i]);
954
955                 if (remote_rcs[i] != 0) {
956                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
957                                 i, remote_rcs[i], req);
958                         return(-EPROTO);
959                 }
960         }
961
962         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
963                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
964                        req->rq_bulk->bd_nob_transferred, requested_nob);
965                 return(-EPROTO);
966         }
967
968         return (0);
969 }
970
971 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
972 {
973         if (p1->flag != p2->flag) {
974                 unsigned mask = ~OBD_BRW_FROM_GRANT;
975
976                 /* warn if we try to combine flags that we don't know to be
977                  * safe to combine */
978                 if ((p1->flag & mask) != (p2->flag & mask))
979                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
980                                "same brw?\n", p1->flag, p2->flag);
981                 return 0;
982         }
983
984         return (p1->off + p1->count == p2->off);
985 }
986
987 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
988                                    struct brw_page **pga, int opc,
989                                    cksum_type_t cksum_type)
990 {
991         __u32 cksum;
992         int i = 0;
993
994         LASSERT (pg_count > 0);
995         cksum = init_checksum(cksum_type);
996         while (nob > 0 && pg_count > 0) {
997                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
998                 int off = pga[i]->off & ~CFS_PAGE_MASK;
999                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1000
1001                 /* corrupt the data before we compute the checksum, to
1002                  * simulate an OST->client data error */
1003                 if (i == 0 && opc == OST_READ &&
1004                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1005                         memcpy(ptr + off, "bad1", min(4, nob));
1006                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1007                 cfs_kunmap(pga[i]->pg);
1008                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1009                                off, cksum);
1010
1011                 nob -= pga[i]->count;
1012                 pg_count--;
1013                 i++;
1014         }
1015         /* For sending we only compute the wrong checksum instead
1016          * of corrupting the data so it is still correct on a redo */
1017         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1018                 cksum++;
1019
1020         return cksum;
1021 }
1022
1023 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1024                                 struct lov_stripe_md *lsm, obd_count page_count,
1025                                 struct brw_page **pga,
1026                                 struct ptlrpc_request **reqp,
1027                                 struct obd_capa *ocapa)
1028 {
1029         struct ptlrpc_request   *req;
1030         struct ptlrpc_bulk_desc *desc;
1031         struct ost_body         *body;
1032         struct obd_ioobj        *ioobj;
1033         struct niobuf_remote    *niobuf;
1034         int niocount, i, requested_nob, opc, rc;
1035         struct osc_brw_async_args *aa;
1036         struct req_capsule      *pill;
1037         struct brw_page *pg_prev;
1038
1039         ENTRY;
1040         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1041                 RETURN(-ENOMEM); /* Recoverable */
1042         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1043                 RETURN(-EINVAL); /* Fatal */
1044
1045         if ((cmd & OBD_BRW_WRITE) != 0) {
1046                 opc = OST_WRITE;
1047                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1048                                                 cli->cl_import->imp_rq_pool,
1049                                                 &RQF_OST_BRW);
1050         } else {
1051                 opc = OST_READ;
1052                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1053         }
1054
1055         if (req == NULL)
1056                 RETURN(-ENOMEM);
1057
1058         for (niocount = i = 1; i < page_count; i++) {
1059                 if (!can_merge_pages(pga[i - 1], pga[i]))
1060                         niocount++;
1061         }
1062
1063         pill = &req->rq_pill;
1064         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1065                              niocount * sizeof(*niobuf));
1066         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1067
1068         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1069         if (rc) {
1070                 ptlrpc_request_free(req);
1071                 RETURN(rc);
1072         }
1073         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1074         ptlrpc_at_set_req_timeout(req);
1075
1076         if (opc == OST_WRITE)
1077                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1078                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1079         else
1080                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1081                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1082
1083         if (desc == NULL)
1084                 GOTO(out, rc = -ENOMEM);
1085         /* NB request now owns desc and will free it when it gets freed */
1086
1087         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1088         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1089         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1090         LASSERT(body && ioobj && niobuf);
1091
1092         body->oa = *oa;
1093
1094         obdo_to_ioobj(oa, ioobj);
1095         ioobj->ioo_bufcnt = niocount;
1096         osc_pack_capa(req, body, ocapa);
1097         LASSERT (page_count > 0);
1098         pg_prev = pga[0];
1099         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1100                 struct brw_page *pg = pga[i];
1101
1102                 LASSERT(pg->count > 0);
1103                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1104                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1105                          pg->off, pg->count);
1106 #ifdef __linux__
1107                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1108                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1109                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1110                          i, page_count,
1111                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1112                          pg_prev->pg, page_private(pg_prev->pg),
1113                          pg_prev->pg->index, pg_prev->off);
1114 #else
1115                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1116                          "i %d p_c %u\n", i, page_count);
1117 #endif
1118                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1119                         (pg->flag & OBD_BRW_SRVLOCK));
1120
1121                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1122                                       pg->count);
1123                 requested_nob += pg->count;
1124
1125                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1126                         niobuf--;
1127                         niobuf->len += pg->count;
1128                 } else {
1129                         niobuf->offset = pg->off;
1130                         niobuf->len    = pg->count;
1131                         niobuf->flags  = pg->flag;
1132                 }
1133                 pg_prev = pg;
1134         }
1135
1136         LASSERTF((void *)(niobuf - niocount) ==
1137                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1138                                niocount * sizeof(*niobuf)),
1139                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1140                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1141                 (void *)(niobuf - niocount));
1142
1143         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1144
1145         /* size[REQ_REC_OFF] still sizeof (*body) */
1146         if (opc == OST_WRITE) {
1147                 if (unlikely(cli->cl_checksum) &&
1148                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1149                         /* store cl_cksum_type in a local variable since
1150                          * it can be changed via lprocfs */
1151                         cksum_type_t cksum_type = cli->cl_cksum_type;
1152
1153                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1154                                 oa->o_flags = body->oa.o_flags = 0;
1155                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1156                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1157                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1158                                                              page_count, pga,
1159                                                              OST_WRITE,
1160                                                              cksum_type);
1161                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1162                                body->oa.o_cksum);
1163                         /* save this in 'oa', too, for later checking */
1164                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165                         oa->o_flags |= cksum_type_pack(cksum_type);
1166                 } else {
1167                         /* clear out the checksum flag, in case this is a
1168                          * resend but cl_checksum is no longer set. b=11238 */
1169                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1170                 }
1171                 oa->o_cksum = body->oa.o_cksum;
1172                 /* 1 RC per niobuf */
1173                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1174                                      sizeof(__u32) * niocount);
1175         } else {
1176                 if (unlikely(cli->cl_checksum) &&
1177                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1178                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1179                                 body->oa.o_flags = 0;
1180                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1181                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1182                 }
1183                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1184                 /* 1 RC for the whole I/O */
1185         }
1186         ptlrpc_request_set_replen(req);
1187
1188         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1189         aa = ptlrpc_req_async_args(req);
1190         aa->aa_oa = oa;
1191         aa->aa_requested_nob = requested_nob;
1192         aa->aa_nio_count = niocount;
1193         aa->aa_page_count = page_count;
1194         aa->aa_resends = 0;
1195         aa->aa_ppga = pga;
1196         aa->aa_cli = cli;
1197         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1198
1199         *reqp = req;
1200         RETURN(0);
1201
1202  out:
1203         ptlrpc_req_finished(req);
1204         RETURN(rc);
1205 }
1206
1207 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1208                                 __u32 client_cksum, __u32 server_cksum, int nob,
1209                                 obd_count page_count, struct brw_page **pga,
1210                                 cksum_type_t client_cksum_type)
1211 {
1212         __u32 new_cksum;
1213         char *msg;
1214         cksum_type_t cksum_type;
1215
1216         if (server_cksum == client_cksum) {
1217                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1218                 return 0;
1219         }
1220
1221         if (oa->o_valid & OBD_MD_FLFLAGS)
1222                 cksum_type = cksum_type_unpack(oa->o_flags);
1223         else
1224                 cksum_type = OBD_CKSUM_CRC32;
1225
1226         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1227                                       cksum_type);
1228
1229         if (cksum_type != client_cksum_type)
1230                 msg = "the server did not use the checksum type specified in "
1231                       "the original request - likely a protocol problem";
1232         else if (new_cksum == server_cksum)
1233                 msg = "changed on the client after we checksummed it - "
1234                       "likely false positive due to mmap IO (bug 11742)";
1235         else if (new_cksum == client_cksum)
1236                 msg = "changed in transit before arrival at OST";
1237         else
1238                 msg = "changed in transit AND doesn't match the original - "
1239                       "likely false positive due to mmap IO (bug 11742)";
1240
1241         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1242                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1243                            "["LPU64"-"LPU64"]\n",
1244                            msg, libcfs_nid2str(peer->nid),
1245                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1246                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1247                                                         (__u64)0,
1248                            oa->o_id,
1249                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1250                            pga[0]->off,
1251                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1252         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1253                "client csum now %x\n", client_cksum, client_cksum_type,
1254                server_cksum, cksum_type, new_cksum);
1255         return 1;
1256 }
1257
1258 /* Note rc enters this function as number of bytes transferred */
1259 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1260 {
1261         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1262         const lnet_process_id_t *peer =
1263                         &req->rq_import->imp_connection->c_peer;
1264         struct client_obd *cli = aa->aa_cli;
1265         struct ost_body *body;
1266         __u32 client_cksum = 0;
1267         ENTRY;
1268
1269         if (rc < 0 && rc != -EDQUOT)
1270                 RETURN(rc);
1271
1272         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1273         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1274                                   lustre_swab_ost_body);
1275         if (body == NULL) {
1276                 CDEBUG(D_INFO, "Can't unpack body\n");
1277                 RETURN(-EPROTO);
1278         }
1279
1280         /* set/clear over quota flag for a uid/gid */
1281         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1282             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1283                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1284                              body->oa.o_gid, body->oa.o_valid,
1285                              body->oa.o_flags);
1286
1287         if (rc < 0)
1288                 RETURN(rc);
1289
1290         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1291                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1292
1293         osc_update_grant(cli, body);
1294
1295         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1296                 if (rc > 0) {
1297                         CERROR("Unexpected +ve rc %d\n", rc);
1298                         RETURN(-EPROTO);
1299                 }
1300                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1301
1302                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1303                     check_write_checksum(&body->oa, peer, client_cksum,
1304                                          body->oa.o_cksum, aa->aa_requested_nob,
1305                                          aa->aa_page_count, aa->aa_ppga,
1306                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1307                         RETURN(-EAGAIN);
1308
1309                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1310                         RETURN(-EAGAIN);
1311
1312                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1313                                      aa->aa_page_count, aa->aa_ppga);
1314                 GOTO(out, rc);
1315         }
1316
1317         /* The rest of this function executes only for OST_READs */
1318         if (rc > aa->aa_requested_nob) {
1319                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1320                        aa->aa_requested_nob);
1321                 RETURN(-EPROTO);
1322         }
1323
1324         if (rc != req->rq_bulk->bd_nob_transferred) {
1325                 CERROR ("Unexpected rc %d (%d transferred)\n",
1326                         rc, req->rq_bulk->bd_nob_transferred);
1327                 return (-EPROTO);
1328         }
1329
1330         if (rc < aa->aa_requested_nob)
1331                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1332
1333         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1334                                          aa->aa_ppga))
1335                 GOTO(out, rc = -EAGAIN);
1336
1337         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1338                 static int cksum_counter;
1339                 __u32      server_cksum = body->oa.o_cksum;
1340                 char      *via;
1341                 char      *router;
1342                 cksum_type_t cksum_type;
1343
1344                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1345                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1346                 else
1347                         cksum_type = OBD_CKSUM_CRC32;
1348                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1349                                                  aa->aa_ppga, OST_READ,
1350                                                  cksum_type);
1351
1352                 if (peer->nid == req->rq_bulk->bd_sender) {
1353                         via = router = "";
1354                 } else {
1355                         via = " via ";
1356                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1357                 }
1358
1359                 if (server_cksum == ~0 && rc > 0) {
1360                         CERROR("Protocol error: server %s set the 'checksum' "
1361                                "bit, but didn't send a checksum.  Not fatal, "
1362                                "but please notify on http://bugzilla.lustre.org/\n",
1363                                libcfs_nid2str(peer->nid));
1364                 } else if (server_cksum != client_cksum) {
1365                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1366                                            "%s%s%s inum "LPU64"/"LPU64" object "
1367                                            LPU64"/"LPU64" extent "
1368                                            "["LPU64"-"LPU64"]\n",
1369                                            req->rq_import->imp_obd->obd_name,
1370                                            libcfs_nid2str(peer->nid),
1371                                            via, router,
1372                                            body->oa.o_valid & OBD_MD_FLFID ?
1373                                                 body->oa.o_fid : (__u64)0,
1374                                            body->oa.o_valid & OBD_MD_FLFID ?
1375                                                 body->oa.o_generation :(__u64)0,
1376                                            body->oa.o_id,
1377                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1378                                                 body->oa.o_gr : (__u64)0,
1379                                            aa->aa_ppga[0]->off,
1380                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1381                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1382                                                                         1);
1383                         CERROR("client %x, server %x, cksum_type %x\n",
1384                                client_cksum, server_cksum, cksum_type);
1385                         cksum_counter = 0;
1386                         aa->aa_oa->o_cksum = client_cksum;
1387                         rc = -EAGAIN;
1388                 } else {
1389                         cksum_counter++;
1390                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1391                         rc = 0;
1392                 }
1393         } else if (unlikely(client_cksum)) {
1394                 static int cksum_missed;
1395
1396                 cksum_missed++;
1397                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1398                         CERROR("Checksum %u requested from %s but not sent\n",
1399                                cksum_missed, libcfs_nid2str(peer->nid));
1400         } else {
1401                 rc = 0;
1402         }
1403 out:
1404         if (rc >= 0)
1405                 *aa->aa_oa = body->oa;
1406
1407         RETURN(rc);
1408 }
1409
1410 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1411                             struct lov_stripe_md *lsm,
1412                             obd_count page_count, struct brw_page **pga,
1413                             struct obd_capa *ocapa)
1414 {
1415         struct ptlrpc_request *req;
1416         int                    rc;
1417         cfs_waitq_t            waitq;
1418         int                    resends = 0;
1419         struct l_wait_info     lwi;
1420
1421         ENTRY;
1422
1423         cfs_waitq_init(&waitq);
1424
1425 restart_bulk:
1426         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1427                                   page_count, pga, &req, ocapa);
1428         if (rc != 0)
1429                 return (rc);
1430
1431         rc = ptlrpc_queue_wait(req);
1432
1433         if (rc == -ETIMEDOUT && req->rq_resend) {
1434                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1435                 ptlrpc_req_finished(req);
1436                 goto restart_bulk;
1437         }
1438
1439         rc = osc_brw_fini_request(req, rc);
1440
1441         ptlrpc_req_finished(req);
1442         if (osc_recoverable_error(rc)) {
1443                 resends++;
1444                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1445                         CERROR("too many resend retries, returning error\n");
1446                         RETURN(-EIO);
1447                 }
1448
1449                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1450                 l_wait_event(waitq, 0, &lwi);
1451
1452                 goto restart_bulk;
1453         }
1454
1455         RETURN (rc);
1456 }
1457
1458 int osc_brw_redo_request(struct ptlrpc_request *request,
1459                          struct osc_brw_async_args *aa)
1460 {
1461         struct ptlrpc_request *new_req;
1462         struct ptlrpc_request_set *set = request->rq_set;
1463         struct osc_brw_async_args *new_aa;
1464         struct osc_async_page *oap;
1465         int rc = 0;
1466         ENTRY;
1467
1468         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1469                 CERROR("too many resend retries, returning error\n");
1470                 RETURN(-EIO);
1471         }
1472
1473         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1474 /*
1475         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1476         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1477                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1478                                            REQ_REC_OFF + 3);
1479 */
1480         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1481                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1482                                   aa->aa_cli, aa->aa_oa,
1483                                   NULL /* lsm unused by osc currently */,
1484                                   aa->aa_page_count, aa->aa_ppga,
1485                                   &new_req, NULL /* ocapa */);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1490
1491         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1492                 if (oap->oap_request != NULL) {
1493                         LASSERTF(request == oap->oap_request,
1494                                  "request %p != oap_request %p\n",
1495                                  request, oap->oap_request);
1496                         if (oap->oap_interrupted) {
1497                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1498                                 ptlrpc_req_finished(new_req);
1499                                 RETURN(-EINTR);
1500                         }
1501                 }
1502         }
1503         /* New request takes over pga and oaps from old request.
1504          * Note that copying a list_head doesn't work, need to move it... */
1505         aa->aa_resends++;
1506         new_req->rq_interpret_reply = request->rq_interpret_reply;
1507         new_req->rq_async_args = request->rq_async_args;
1508         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1509
1510         new_aa = ptlrpc_req_async_args(new_req);
1511
1512         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1513         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1514         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1515
1516         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1517                 if (oap->oap_request) {
1518                         ptlrpc_req_finished(oap->oap_request);
1519                         oap->oap_request = ptlrpc_request_addref(new_req);
1520                 }
1521         }
1522
1523         /* use ptlrpc_set_add_req is safe because interpret functions work
1524          * in check_set context. only one way exist with access to request
1525          * from different thread got -EINTR - this way protected with
1526          * cl_loi_list_lock */
1527         ptlrpc_set_add_req(set, new_req);
1528
1529         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1530
1531         DEBUG_REQ(D_INFO, new_req, "new request");
1532         RETURN(0);
1533 }
1534
1535 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1536                           struct lov_stripe_md *lsm, obd_count page_count,
1537                           struct brw_page **pga, struct ptlrpc_request_set *set,
1538                           struct obd_capa *ocapa)
1539 {
1540         struct ptlrpc_request     *req;
1541         struct client_obd         *cli = &exp->exp_obd->u.cli;
1542         int                        rc, i;
1543         struct osc_brw_async_args *aa;
1544         ENTRY;
1545
1546         /* Consume write credits even if doing a sync write -
1547          * otherwise we may run out of space on OST due to grant. */
1548         if (cmd == OBD_BRW_WRITE) {
1549                 spin_lock(&cli->cl_loi_list_lock);
1550                 for (i = 0; i < page_count; i++) {
1551                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1552                                 osc_consume_write_grant(cli, pga[i]);
1553                 }
1554                 spin_unlock(&cli->cl_loi_list_lock);
1555         }
1556
1557         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1558                                   &req, ocapa);
1559
1560         aa = ptlrpc_req_async_args(req);
1561         if (cmd == OBD_BRW_READ) {
1562                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1563                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1564         } else {
1565                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1566                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1567                                  cli->cl_w_in_flight);
1568         }
1569         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
1570
1571         LASSERT(list_empty(&aa->aa_oaps));
1572         if (rc == 0) {
1573                 req->rq_interpret_reply = brw_interpret;
1574                 ptlrpc_set_add_req(set, req);
1575                 client_obd_list_lock(&cli->cl_loi_list_lock);
1576                 if (cmd == OBD_BRW_READ)
1577                         cli->cl_r_in_flight++;
1578                 else
1579                         cli->cl_w_in_flight++;
1580                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1581                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1582         } else if (cmd == OBD_BRW_WRITE) {
1583                 client_obd_list_lock(&cli->cl_loi_list_lock);
1584                 for (i = 0; i < page_count; i++)
1585                         osc_release_write_grant(cli, pga[i], 0);
1586                 osc_wake_cache_waiters(cli);
1587                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1588         }
1589         RETURN (rc);
1590 }
1591
1592 /*
1593  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1594  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1595  * fine for our small page arrays and doesn't require allocation.  its an
1596  * insertion sort that swaps elements that are strides apart, shrinking the
1597  * stride down until its '1' and the array is sorted.
1598  */
1599 static void sort_brw_pages(struct brw_page **array, int num)
1600 {
1601         int stride, i, j;
1602         struct brw_page *tmp;
1603
1604         if (num == 1)
1605                 return;
1606         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1607                 ;
1608
1609         do {
1610                 stride /= 3;
1611                 for (i = stride ; i < num ; i++) {
1612                         tmp = array[i];
1613                         j = i;
1614                         while (j >= stride && array[j - stride]->off > tmp->off) {
1615                                 array[j] = array[j - stride];
1616                                 j -= stride;
1617                         }
1618                         array[j] = tmp;
1619                 }
1620         } while (stride > 1);
1621 }
1622
1623 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1624 {
1625         int count = 1;
1626         int offset;
1627         int i = 0;
1628
1629         LASSERT (pages > 0);
1630         offset = pg[i]->off & ~CFS_PAGE_MASK;
1631
1632         for (;;) {
1633                 pages--;
1634                 if (pages == 0)         /* that's all */
1635                         return count;
1636
1637                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1638                         return count;   /* doesn't end on page boundary */
1639
1640                 i++;
1641                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1642                 if (offset != 0)        /* doesn't start on page boundary */
1643                         return count;
1644
1645                 count++;
1646         }
1647 }
1648
1649 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1650 {
1651         struct brw_page **ppga;
1652         int i;
1653
1654         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1655         if (ppga == NULL)
1656                 return NULL;
1657
1658         for (i = 0; i < count; i++)
1659                 ppga[i] = pga + i;
1660         return ppga;
1661 }
1662
1663 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1664 {
1665         LASSERT(ppga != NULL);
1666         OBD_FREE(ppga, sizeof(*ppga) * count);
1667 }
1668
1669 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1670                    obd_count page_count, struct brw_page *pga,
1671                    struct obd_trans_info *oti)
1672 {
1673         struct obdo *saved_oa = NULL;
1674         struct brw_page **ppga, **orig;
1675         struct obd_import *imp = class_exp2cliimp(exp);
1676         struct client_obd *cli = &imp->imp_obd->u.cli;
1677         int rc, page_count_orig;
1678         ENTRY;
1679
1680         if (cmd & OBD_BRW_CHECK) {
1681                 /* The caller just wants to know if there's a chance that this
1682                  * I/O can succeed */
1683
1684                 if (imp == NULL || imp->imp_invalid)
1685                         RETURN(-EIO);
1686                 RETURN(0);
1687         }
1688
1689         /* test_brw with a failed create can trip this, maybe others. */
1690         LASSERT(cli->cl_max_pages_per_rpc);
1691
1692         rc = 0;
1693
1694         orig = ppga = osc_build_ppga(pga, page_count);
1695         if (ppga == NULL)
1696                 RETURN(-ENOMEM);
1697         page_count_orig = page_count;
1698
1699         sort_brw_pages(ppga, page_count);
1700         while (page_count) {
1701                 obd_count pages_per_brw;
1702
1703                 if (page_count > cli->cl_max_pages_per_rpc)
1704                         pages_per_brw = cli->cl_max_pages_per_rpc;
1705                 else
1706                         pages_per_brw = page_count;
1707
1708                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1709
1710                 if (saved_oa != NULL) {
1711                         /* restore previously saved oa */
1712                         *oinfo->oi_oa = *saved_oa;
1713                 } else if (page_count > pages_per_brw) {
1714                         /* save a copy of oa (brw will clobber it) */
1715                         OBDO_ALLOC(saved_oa);
1716                         if (saved_oa == NULL)
1717                                 GOTO(out, rc = -ENOMEM);
1718                         *saved_oa = *oinfo->oi_oa;
1719                 }
1720
1721                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1722                                       pages_per_brw, ppga, oinfo->oi_capa);
1723
1724                 if (rc != 0)
1725                         break;
1726
1727                 page_count -= pages_per_brw;
1728                 ppga += pages_per_brw;
1729         }
1730
1731 out:
1732         osc_release_ppga(orig, page_count_orig);
1733
1734         if (saved_oa != NULL)
1735                 OBDO_FREE(saved_oa);
1736
1737         RETURN(rc);
1738 }
1739
1740 static int osc_brw_async(int cmd, struct obd_export *exp,
1741                          struct obd_info *oinfo, obd_count page_count,
1742                          struct brw_page *pga, struct obd_trans_info *oti,
1743                          struct ptlrpc_request_set *set)
1744 {
1745         struct brw_page **ppga, **orig;
1746         struct client_obd *cli = &exp->exp_obd->u.cli;
1747         int page_count_orig;
1748         int rc = 0;
1749         ENTRY;
1750
1751         if (cmd & OBD_BRW_CHECK) {
1752                 struct obd_import *imp = class_exp2cliimp(exp);
1753                 /* The caller just wants to know if there's a chance that this
1754                  * I/O can succeed */
1755
1756                 if (imp == NULL || imp->imp_invalid)
1757                         RETURN(-EIO);
1758                 RETURN(0);
1759         }
1760
1761         orig = ppga = osc_build_ppga(pga, page_count);
1762         if (ppga == NULL)
1763                 RETURN(-ENOMEM);
1764         page_count_orig = page_count;
1765
1766         sort_brw_pages(ppga, page_count);
1767         while (page_count) {
1768                 struct brw_page **copy;
1769                 obd_count pages_per_brw;
1770
1771                 pages_per_brw = min_t(obd_count, page_count,
1772                                       cli->cl_max_pages_per_rpc);
1773
1774                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1775
1776                 /* use ppga only if single RPC is going to fly */
1777                 if (pages_per_brw != page_count_orig || ppga != orig) {
1778                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1779                         if (copy == NULL)
1780                                 GOTO(out, rc = -ENOMEM);
1781                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1782                 } else
1783                         copy = ppga;
1784
1785                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1786                                     pages_per_brw, copy, set, oinfo->oi_capa);
1787
1788                 if (rc != 0) {
1789                         if (copy != ppga)
1790                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1791                         break;
1792                 }
1793                 if (copy == orig) {
1794                         /* we passed it to async_internal() which is
1795                          * now responsible for releasing memory */
1796                         orig = NULL;
1797                 }
1798
1799                 page_count -= pages_per_brw;
1800                 ppga += pages_per_brw;
1801         }
1802 out:
1803         if (orig)
1804                 osc_release_ppga(orig, page_count_orig);
1805         RETURN(rc);
1806 }
1807
1808 static void osc_check_rpcs(struct client_obd *cli);
1809
1810 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1811  * the dirty accounting.  Writeback completes or truncate happens before
1812  * writing starts.  Must be called with the loi lock held. */
1813 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1814                            int sent)
1815 {
1816         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1817 }
1818
1819
1820 /* This maintains the lists of pending pages to read/write for a given object
1821  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1822  * to quickly find objects that are ready to send an RPC. */
1823 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1824                          int cmd)
1825 {
1826         int optimal;
1827         ENTRY;
1828
1829         if (lop->lop_num_pending == 0)
1830                 RETURN(0);
1831
1832         /* if we have an invalid import we want to drain the queued pages
1833          * by forcing them through rpcs that immediately fail and complete
1834          * the pages.  recovery relies on this to empty the queued pages
1835          * before canceling the locks and evicting down the llite pages */
1836         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1837                 RETURN(1);
1838
1839         /* stream rpcs in queue order as long as as there is an urgent page
1840          * queued.  this is our cheap solution for good batching in the case
1841          * where writepage marks some random page in the middle of the file
1842          * as urgent because of, say, memory pressure */
1843         if (!list_empty(&lop->lop_urgent)) {
1844                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1845                 RETURN(1);
1846         }
1847         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1848         optimal = cli->cl_max_pages_per_rpc;
1849         if (cmd & OBD_BRW_WRITE) {
1850                 /* trigger a write rpc stream as long as there are dirtiers
1851                  * waiting for space.  as they're waiting, they're not going to
1852                  * create more pages to coallesce with what's waiting.. */
1853                 if (!list_empty(&cli->cl_cache_waiters)) {
1854                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1855                         RETURN(1);
1856                 }
1857                 /* +16 to avoid triggering rpcs that would want to include pages
1858                  * that are being queued but which can't be made ready until
1859                  * the queuer finishes with the page. this is a wart for
1860                  * llite::commit_write() */
1861                 optimal += 16;
1862         }
1863         if (lop->lop_num_pending >= optimal)
1864                 RETURN(1);
1865
1866         RETURN(0);
1867 }
1868
1869 static void on_list(struct list_head *item, struct list_head *list,
1870                     int should_be_on)
1871 {
1872         if (list_empty(item) && should_be_on)
1873                 list_add_tail(item, list);
1874         else if (!list_empty(item) && !should_be_on)
1875                 list_del_init(item);
1876 }
1877
1878 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1879  * can find pages to build into rpcs quickly */
1880 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1881 {
1882         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1883                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1884                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1885
1886         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1887                 loi->loi_write_lop.lop_num_pending);
1888
1889         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1890                 loi->loi_read_lop.lop_num_pending);
1891 }
1892
1893 static void lop_update_pending(struct client_obd *cli,
1894                                struct loi_oap_pages *lop, int cmd, int delta)
1895 {
1896         lop->lop_num_pending += delta;
1897         if (cmd & OBD_BRW_WRITE)
1898                 cli->cl_pending_w_pages += delta;
1899         else
1900                 cli->cl_pending_r_pages += delta;
1901 }
1902
1903 /* this is called when a sync waiter receives an interruption.  Its job is to
1904  * get the caller woken as soon as possible.  If its page hasn't been put in an
1905  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1906  * desiring interruption which will forcefully complete the rpc once the rpc
1907  * has timed out */
1908 static void osc_occ_interrupted(struct oig_callback_context *occ)
1909 {
1910         struct osc_async_page *oap;
1911         struct loi_oap_pages *lop;
1912         struct lov_oinfo *loi;
1913         ENTRY;
1914
1915         /* XXX member_of() */
1916         oap = list_entry(occ, struct osc_async_page, oap_occ);
1917
1918         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1919
1920         oap->oap_interrupted = 1;
1921
1922         /* ok, it's been put in an rpc. only one oap gets a request reference */
1923         if (oap->oap_request != NULL) {
1924                 ptlrpc_mark_interrupted(oap->oap_request);
1925                 ptlrpcd_wake(oap->oap_request);
1926                 GOTO(unlock, 0);
1927         }
1928
1929         /* we don't get interruption callbacks until osc_trigger_group_io()
1930          * has been called and put the sync oaps in the pending/urgent lists.*/
1931         if (!list_empty(&oap->oap_pending_item)) {
1932                 list_del_init(&oap->oap_pending_item);
1933                 list_del_init(&oap->oap_urgent_item);
1934
1935                 loi = oap->oap_loi;
1936                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1937                         &loi->loi_write_lop : &loi->loi_read_lop;
1938                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1939                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1940
1941                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1942                 oap->oap_oig = NULL;
1943         }
1944
1945 unlock:
1946         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1947 }
1948
1949 /* this is trying to propogate async writeback errors back up to the
1950  * application.  As an async write fails we record the error code for later if
1951  * the app does an fsync.  As long as errors persist we force future rpcs to be
1952  * sync so that the app can get a sync error and break the cycle of queueing
1953  * pages for which writeback will fail. */
1954 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1955                            int rc)
1956 {
1957         if (rc) {
1958                 if (!ar->ar_rc)
1959                         ar->ar_rc = rc;
1960
1961                 ar->ar_force_sync = 1;
1962                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1963                 return;
1964
1965         }
1966
1967         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1968                 ar->ar_force_sync = 0;
1969 }
1970
1971 static void osc_oap_to_pending(struct osc_async_page *oap)
1972 {
1973         struct loi_oap_pages *lop;
1974
1975         if (oap->oap_cmd & OBD_BRW_WRITE)
1976                 lop = &oap->oap_loi->loi_write_lop;
1977         else
1978                 lop = &oap->oap_loi->loi_read_lop;
1979
1980         if (oap->oap_async_flags & ASYNC_URGENT)
1981                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1982         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1983         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1984 }
1985
1986 /* this must be called holding the loi list lock to give coverage to exit_cache,
1987  * async_flag maintenance, and oap_request */
1988 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1989                               struct osc_async_page *oap, int sent, int rc)
1990 {
1991         __u64 xid = 0;
1992
1993         ENTRY;
1994         if (oap->oap_request != NULL) {
1995                 xid = ptlrpc_req_xid(oap->oap_request);
1996                 ptlrpc_req_finished(oap->oap_request);
1997                 oap->oap_request = NULL;
1998         }
1999
2000         oap->oap_async_flags = 0;
2001         oap->oap_interrupted = 0;
2002
2003         if (oap->oap_cmd & OBD_BRW_WRITE) {
2004                 osc_process_ar(&cli->cl_ar, xid, rc);
2005                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2006         }
2007
2008         if (rc == 0 && oa != NULL) {
2009                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2010                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2011                 if (oa->o_valid & OBD_MD_FLMTIME)
2012                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2013                 if (oa->o_valid & OBD_MD_FLATIME)
2014                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2015                 if (oa->o_valid & OBD_MD_FLCTIME)
2016                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2017         }
2018
2019         if (oap->oap_oig) {
2020                 osc_exit_cache(cli, oap, sent);
2021                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2022                 oap->oap_oig = NULL;
2023                 EXIT;
2024                 return;
2025         }
2026
2027         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2028                                                 oap->oap_cmd, oa, rc);
2029
2030         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2031          * I/O on the page could start, but OSC calls it under lock
2032          * and thus we can add oap back to pending safely */
2033         if (rc)
2034                 /* upper layer wants to leave the page on pending queue */
2035                 osc_oap_to_pending(oap);
2036         else
2037                 osc_exit_cache(cli, oap, sent);
2038         EXIT;
2039 }
2040
2041 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2042 {
2043         struct osc_brw_async_args *aa = data;
2044         struct client_obd *cli;
2045         ENTRY;
2046
2047         rc = osc_brw_fini_request(req, rc);
2048         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2049         if (osc_recoverable_error(rc)) {
2050                 rc = osc_brw_redo_request(req, aa);
2051                 if (rc == 0)
2052                         RETURN(0);
2053         }
2054
2055         cli = aa->aa_cli;
2056
2057         client_obd_list_lock(&cli->cl_loi_list_lock);
2058
2059         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2060          * is called so we know whether to go to sync BRWs or wait for more
2061          * RPCs to complete */
2062         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2063                 cli->cl_w_in_flight--;
2064         else
2065                 cli->cl_r_in_flight--;
2066
2067         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2068                 struct osc_async_page *oap, *tmp;
2069                 /* the caller may re-use the oap after the completion call so
2070                  * we need to clean it up a little */
2071                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2072                         list_del_init(&oap->oap_rpc_item);
2073                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2074                 }
2075                 OBDO_FREE(aa->aa_oa);
2076         } else { /* from async_internal() */
2077                 int i;
2078                 for (i = 0; i < aa->aa_page_count; i++)
2079                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2080         }
2081         osc_wake_cache_waiters(cli);
2082         osc_check_rpcs(cli);
2083         client_obd_list_unlock(&cli->cl_loi_list_lock);
2084
2085         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2086         RETURN(rc);
2087 }
2088
2089 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2090                                             struct list_head *rpc_list,
2091                                             int page_count, int cmd)
2092 {
2093         struct ptlrpc_request *req;
2094         struct brw_page **pga = NULL;
2095         struct osc_brw_async_args *aa;
2096         struct obdo *oa = NULL;
2097         struct obd_async_page_ops *ops = NULL;
2098         void *caller_data = NULL;
2099         struct obd_capa *ocapa;
2100         struct osc_async_page *oap;
2101         struct ldlm_lock *lock = NULL;
2102         int i, rc;
2103
2104         ENTRY;
2105         LASSERT(!list_empty(rpc_list));
2106
2107         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2108         if (pga == NULL)
2109                 RETURN(ERR_PTR(-ENOMEM));
2110
2111         OBDO_ALLOC(oa);
2112         if (oa == NULL)
2113                 GOTO(out, req = ERR_PTR(-ENOMEM));
2114
2115         i = 0;
2116         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2117                 if (ops == NULL) {
2118                         ops = oap->oap_caller_ops;
2119                         caller_data = oap->oap_caller_data;
2120                         lock = oap->oap_ldlm_lock;
2121                 }
2122                 pga[i] = &oap->oap_brw_page;
2123                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2124                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2125                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2126                 i++;
2127         }
2128
2129         /* always get the data for the obdo for the rpc */
2130         LASSERT(ops != NULL);
2131         ops->ap_fill_obdo(caller_data, cmd, oa);
2132         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2133         if (lock) {
2134                 oa->o_handle = lock->l_remote_handle;
2135                 oa->o_valid |= OBD_MD_FLHANDLE;
2136         }
2137
2138         sort_brw_pages(pga, page_count);
2139         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2140                                   pga, &req, ocapa);
2141         capa_put(ocapa);
2142         if (rc != 0) {
2143                 CERROR("prep_req failed: %d\n", rc);
2144                 GOTO(out, req = ERR_PTR(rc));
2145         }
2146
2147         /* Need to update the timestamps after the request is built in case
2148          * we race with setattr (locally or in queue at OST).  If OST gets
2149          * later setattr before earlier BRW (as determined by the request xid),
2150          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2151          * way to do this in a single call.  bug 10150 */
2152         ops->ap_update_obdo(caller_data, cmd, oa,
2153                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2154
2155         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2156         aa = ptlrpc_req_async_args(req);
2157         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2158         list_splice(rpc_list, &aa->aa_oaps);
2159         CFS_INIT_LIST_HEAD(rpc_list);
2160
2161 out:
2162         if (IS_ERR(req)) {
2163                 if (oa)
2164                         OBDO_FREE(oa);
2165                 if (pga)
2166                         OBD_FREE(pga, sizeof(*pga) * page_count);
2167         }
2168         RETURN(req);
2169 }
2170
2171 /* the loi lock is held across this function but it's allowed to release
2172  * and reacquire it during its work */
2173 /**
2174  * prepare pages for ASYNC io and put pages in send queue.
2175  *
2176  * \param cli -
2177  * \param loi -
2178  * \param cmd - OBD_BRW_* macroses
2179  * \param lop - pending pages
2180  *
2181  * \return zero if pages successfully add to send queue.
2182  * \return not zere if error occurring.
2183  */
2184 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2185                             int cmd, struct loi_oap_pages *lop)
2186 {
2187         struct ptlrpc_request *req;
2188         obd_count page_count = 0;
2189         struct osc_async_page *oap = NULL, *tmp;
2190         struct osc_brw_async_args *aa;
2191         struct obd_async_page_ops *ops;
2192         CFS_LIST_HEAD(rpc_list);
2193         unsigned int ending_offset;
2194         unsigned  starting_offset = 0;
2195         int srvlock = 0;
2196         ENTRY;
2197
2198         /* first we find the pages we're allowed to work with */
2199         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2200                                  oap_pending_item) {
2201                 ops = oap->oap_caller_ops;
2202
2203                 LASSERT(oap->oap_magic == OAP_MAGIC);
2204
2205                 if (page_count != 0 &&
2206                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2207                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2208                                " oap %p, page %p, srvlock %u\n",
2209                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2210                         break;
2211                 }
2212                 /* in llite being 'ready' equates to the page being locked
2213                  * until completion unlocks it.  commit_write submits a page
2214                  * as not ready because its unlock will happen unconditionally
2215                  * as the call returns.  if we race with commit_write giving
2216                  * us that page we dont' want to create a hole in the page
2217                  * stream, so we stop and leave the rpc to be fired by
2218                  * another dirtier or kupdated interval (the not ready page
2219                  * will still be on the dirty list).  we could call in
2220                  * at the end of ll_file_write to process the queue again. */
2221                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2222                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2223                         if (rc < 0)
2224                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2225                                                 "instead of ready\n", oap,
2226                                                 oap->oap_page, rc);
2227                         switch (rc) {
2228                         case -EAGAIN:
2229                                 /* llite is telling us that the page is still
2230                                  * in commit_write and that we should try
2231                                  * and put it in an rpc again later.  we
2232                                  * break out of the loop so we don't create
2233                                  * a hole in the sequence of pages in the rpc
2234                                  * stream.*/
2235                                 oap = NULL;
2236                                 break;
2237                         case -EINTR:
2238                                 /* the io isn't needed.. tell the checks
2239                                  * below to complete the rpc with EINTR */
2240                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2241                                 oap->oap_count = -EINTR;
2242                                 break;
2243                         case 0:
2244                                 oap->oap_async_flags |= ASYNC_READY;
2245                                 break;
2246                         default:
2247                                 LASSERTF(0, "oap %p page %p returned %d "
2248                                             "from make_ready\n", oap,
2249                                             oap->oap_page, rc);
2250                                 break;
2251                         }
2252                 }
2253                 if (oap == NULL)
2254                         break;
2255                 /*
2256                  * Page submitted for IO has to be locked. Either by
2257                  * ->ap_make_ready() or by higher layers.
2258                  */
2259 #if defined(__KERNEL__) && defined(__linux__)
2260                  if(!(PageLocked(oap->oap_page) &&
2261                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2262                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2263                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2264                         LBUG();
2265                 }
2266 #endif
2267                 /* If there is a gap at the start of this page, it can't merge
2268                  * with any previous page, so we'll hand the network a
2269                  * "fragmented" page array that it can't transfer in 1 RDMA */
2270                 if (page_count != 0 && oap->oap_page_off != 0)
2271                         break;
2272
2273                 /* take the page out of our book-keeping */
2274                 list_del_init(&oap->oap_pending_item);
2275                 lop_update_pending(cli, lop, cmd, -1);
2276                 list_del_init(&oap->oap_urgent_item);
2277
2278                 if (page_count == 0)
2279                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2280                                           (PTLRPC_MAX_BRW_SIZE - 1);
2281
2282                 /* ask the caller for the size of the io as the rpc leaves. */
2283                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2284                         oap->oap_count =
2285                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2286                 if (oap->oap_count <= 0) {
2287                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2288                                oap->oap_count);
2289                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2290                         continue;
2291                 }
2292
2293                 /* now put the page back in our accounting */
2294                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2295                 if (page_count == 0)
2296                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2297                 if (++page_count >= cli->cl_max_pages_per_rpc)
2298                         break;
2299
2300                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2301                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2302                  * have the same alignment as the initial writes that allocated
2303                  * extents on the server. */
2304                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2305                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2306                 if (ending_offset == 0)
2307                         break;
2308
2309                 /* If there is a gap at the end of this page, it can't merge
2310                  * with any subsequent pages, so we'll hand the network a
2311                  * "fragmented" page array that it can't transfer in 1 RDMA */
2312                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2313                         break;
2314         }
2315
2316         osc_wake_cache_waiters(cli);
2317
2318         if (page_count == 0)
2319                 RETURN(0);
2320
2321         loi_list_maint(cli, loi);
2322
2323         client_obd_list_unlock(&cli->cl_loi_list_lock);
2324
2325         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2326         if (IS_ERR(req)) {
2327                 /* this should happen rarely and is pretty bad, it makes the
2328                  * pending list not follow the dirty order */
2329                 client_obd_list_lock(&cli->cl_loi_list_lock);
2330                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2331                         list_del_init(&oap->oap_rpc_item);
2332
2333                         /* queued sync pages can be torn down while the pages
2334                          * were between the pending list and the rpc */
2335                         if (oap->oap_interrupted) {
2336                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2337                                 osc_ap_completion(cli, NULL, oap, 0,
2338                                                   oap->oap_count);
2339                                 continue;
2340                         }
2341                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2342                 }
2343                 loi_list_maint(cli, loi);
2344                 RETURN(PTR_ERR(req));
2345         }
2346
2347         aa = ptlrpc_req_async_args(req);
2348
2349         if (cmd == OBD_BRW_READ) {
2350                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2351                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2352                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2353                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2354         } else {
2355                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2356                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2357                                  cli->cl_w_in_flight);
2358                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2359                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2360         }
2361         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2362
2363         client_obd_list_lock(&cli->cl_loi_list_lock);
2364
2365         if (cmd == OBD_BRW_READ)
2366                 cli->cl_r_in_flight++;
2367         else
2368                 cli->cl_w_in_flight++;
2369
2370         /* queued sync pages can be torn down while the pages
2371          * were between the pending list and the rpc */
2372         tmp = NULL;
2373         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2374                 /* only one oap gets a request reference */
2375                 if (tmp == NULL)
2376                         tmp = oap;
2377                 if (oap->oap_interrupted && !req->rq_intr) {
2378                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2379                                oap, req);
2380                         ptlrpc_mark_interrupted(req);
2381                 }
2382         }
2383         if (tmp != NULL)
2384                 tmp->oap_request = ptlrpc_request_addref(req);
2385
2386         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2387                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2388
2389         req->rq_interpret_reply = brw_interpret;
2390         ptlrpcd_add_req(req);
2391         RETURN(1);
2392 }
2393
2394 #define LOI_DEBUG(LOI, STR, args...)                                     \
2395         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2396                !list_empty(&(LOI)->loi_cli_item),                        \
2397                (LOI)->loi_write_lop.lop_num_pending,                     \
2398                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2399                (LOI)->loi_read_lop.lop_num_pending,                      \
2400                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2401                args)                                                     \
2402
2403 /* This is called by osc_check_rpcs() to find which objects have pages that
2404  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2405 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2406 {
2407         ENTRY;
2408         /* first return all objects which we already know to have
2409          * pages ready to be stuffed into rpcs */
2410         if (!list_empty(&cli->cl_loi_ready_list))
2411                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2412                                   struct lov_oinfo, loi_cli_item));
2413
2414         /* then if we have cache waiters, return all objects with queued
2415          * writes.  This is especially important when many small files
2416          * have filled up the cache and not been fired into rpcs because
2417          * they don't pass the nr_pending/object threshhold */
2418         if (!list_empty(&cli->cl_cache_waiters) &&
2419             !list_empty(&cli->cl_loi_write_list))
2420                 RETURN(list_entry(cli->cl_loi_write_list.next,
2421                                   struct lov_oinfo, loi_write_item));
2422
2423         /* then return all queued objects when we have an invalid import
2424          * so that they get flushed */
2425         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2426                 if (!list_empty(&cli->cl_loi_write_list))
2427                         RETURN(list_entry(cli->cl_loi_write_list.next,
2428                                           struct lov_oinfo, loi_write_item));
2429                 if (!list_empty(&cli->cl_loi_read_list))
2430                         RETURN(list_entry(cli->cl_loi_read_list.next,
2431                                           struct lov_oinfo, loi_read_item));
2432         }
2433         RETURN(NULL);
2434 }
2435
2436 /* called with the loi list lock held */
2437 static void osc_check_rpcs(struct client_obd *cli)
2438 {
2439         struct lov_oinfo *loi;
2440         int rc = 0, race_counter = 0;
2441         ENTRY;
2442
2443         while ((loi = osc_next_loi(cli)) != NULL) {
2444                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2445
2446                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2447                         break;
2448
2449                 /* attempt some read/write balancing by alternating between
2450                  * reads and writes in an object.  The makes_rpc checks here
2451                  * would be redundant if we were getting read/write work items
2452                  * instead of objects.  we don't want send_oap_rpc to drain a
2453                  * partial read pending queue when we're given this object to
2454                  * do io on writes while there are cache waiters */
2455                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2456                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2457                                               &loi->loi_write_lop);
2458                         if (rc < 0)
2459                                 break;
2460                         if (rc > 0)
2461                                 race_counter = 0;
2462                         else
2463                                 race_counter++;
2464                 }
2465                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2466                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2467                                               &loi->loi_read_lop);
2468                         if (rc < 0)
2469                                 break;
2470                         if (rc > 0)
2471                                 race_counter = 0;
2472                         else
2473                                 race_counter++;
2474                 }
2475
2476                 /* attempt some inter-object balancing by issueing rpcs
2477                  * for each object in turn */
2478                 if (!list_empty(&loi->loi_cli_item))
2479                         list_del_init(&loi->loi_cli_item);
2480                 if (!list_empty(&loi->loi_write_item))
2481                         list_del_init(&loi->loi_write_item);
2482                 if (!list_empty(&loi->loi_read_item))
2483                         list_del_init(&loi->loi_read_item);
2484
2485                 loi_list_maint(cli, loi);
2486
2487                 /* send_oap_rpc fails with 0 when make_ready tells it to
2488                  * back off.  llite's make_ready does this when it tries
2489                  * to lock a page queued for write that is already locked.
2490                  * we want to try sending rpcs from many objects, but we
2491                  * don't want to spin failing with 0.  */
2492                 if (race_counter == 10)
2493                         break;
2494         }
2495         EXIT;
2496 }
2497
2498 /* we're trying to queue a page in the osc so we're subject to the
2499  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2500  * If the osc's queued pages are already at that limit, then we want to sleep
2501  * until there is space in the osc's queue for us.  We also may be waiting for
2502  * write credits from the OST if there are RPCs in flight that may return some
2503  * before we fall back to sync writes.
2504  *
2505  * We need this know our allocation was granted in the presence of signals */
2506 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2507 {
2508         int rc;
2509         ENTRY;
2510         client_obd_list_lock(&cli->cl_loi_list_lock);
2511         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2512         client_obd_list_unlock(&cli->cl_loi_list_lock);
2513         RETURN(rc);
2514 };
2515
2516 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2517  * grant or cache space. */
2518 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2519                            struct osc_async_page *oap)
2520 {
2521         struct osc_cache_waiter ocw;
2522         struct l_wait_info lwi = { 0 };
2523
2524         ENTRY;
2525
2526         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2527                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2528                cli->cl_dirty_max, obd_max_dirty_pages,
2529                cli->cl_lost_grant, cli->cl_avail_grant);
2530
2531         /* force the caller to try sync io.  this can jump the list
2532          * of queued writes and create a discontiguous rpc stream */
2533         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2534             loi->loi_ar.ar_force_sync)
2535                 RETURN(-EDQUOT);
2536
2537         /* Hopefully normal case - cache space and write credits available */
2538         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2539             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2540             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2541                 /* account for ourselves */
2542                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2543                 RETURN(0);
2544         }
2545
2546         /* Make sure that there are write rpcs in flight to wait for.  This
2547          * is a little silly as this object may not have any pending but
2548          * other objects sure might. */
2549         if (cli->cl_w_in_flight) {
2550                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2551                 cfs_waitq_init(&ocw.ocw_waitq);
2552                 ocw.ocw_oap = oap;
2553                 ocw.ocw_rc = 0;
2554
2555                 loi_list_maint(cli, loi);
2556                 osc_check_rpcs(cli);
2557                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2558
2559                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2560                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2561
2562                 client_obd_list_lock(&cli->cl_loi_list_lock);
2563                 if (!list_empty(&ocw.ocw_entry)) {
2564                         list_del(&ocw.ocw_entry);
2565                         RETURN(-EINTR);
2566                 }
2567                 RETURN(ocw.ocw_rc);
2568         }
2569
2570         RETURN(-EDQUOT);
2571 }
2572
2573 /**
2574  * Checks if requested extent lock is compatible with a lock under the page.
2575  *
2576  * Checks if the lock under \a page is compatible with a read or write lock
2577  * (specified by \a rw) for an extent [\a start , \a end].
2578  *
2579  * \param exp osc export
2580  * \param lsm striping information for the file
2581  * \param res osc_async_page placeholder
2582  * \param rw OBD_BRW_READ if requested for reading,
2583  *           OBD_BRW_WRITE if requested for writing
2584  * \param start start of the requested extent
2585  * \param end end of the requested extent
2586  * \param cookie transparent parameter for passing locking context
2587  *
2588  * \post result == 1, *cookie == context, appropriate lock is referenced or
2589  * \post result == 0
2590  *
2591  * \retval 1 owned lock is reused for the request
2592  * \retval 0 no lock reused for the request
2593  *
2594  * \see osc_release_short_lock
2595  */
2596 static int osc_reget_short_lock(struct obd_export *exp,
2597                                 struct lov_stripe_md *lsm,
2598                                 void **res, int rw,
2599                                 obd_off start, obd_off end,
2600                                 void **cookie)
2601 {
2602         struct osc_async_page *oap = *res;
2603         int rc;
2604
2605         ENTRY;
2606
2607         spin_lock(&oap->oap_lock);
2608         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2609                                   start, end, cookie);
2610         spin_unlock(&oap->oap_lock);
2611
2612         RETURN(rc);
2613 }
2614
2615 /**
2616  * Releases a reference to a lock taken in a "fast" way.
2617  *
2618  * Releases a read or a write (specified by \a rw) lock
2619  * referenced by \a cookie.
2620  *
2621  * \param exp osc export
2622  * \param lsm striping information for the file
2623  * \param end end of the locked extent
2624  * \param rw OBD_BRW_READ if requested for reading,
2625  *           OBD_BRW_WRITE if requested for writing
2626  * \param cookie transparent parameter for passing locking context
2627  *
2628  * \post appropriate lock is dereferenced
2629  *
2630  * \see osc_reget_short_lock
2631  */
2632 static int osc_release_short_lock(struct obd_export *exp,
2633                                   struct lov_stripe_md *lsm, obd_off end,
2634                                   void *cookie, int rw)
2635 {
2636         ENTRY;
2637         ldlm_lock_fast_release(cookie, rw);
2638         /* no error could have happened at this layer */
2639         RETURN(0);
2640 }
2641
2642 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2643                         struct lov_oinfo *loi, cfs_page_t *page,
2644                         obd_off offset, struct obd_async_page_ops *ops,
2645                         void *data, void **res, int nocache,
2646                         struct lustre_handle *lockh)
2647 {
2648         struct osc_async_page *oap;
2649         struct ldlm_res_id oid;
2650         int rc = 0;
2651         ENTRY;
2652
2653         if (!page)
2654                 return size_round(sizeof(*oap));
2655
2656         oap = *res;
2657         oap->oap_magic = OAP_MAGIC;
2658         oap->oap_cli = &exp->exp_obd->u.cli;
2659         oap->oap_loi = loi;
2660
2661         oap->oap_caller_ops = ops;
2662         oap->oap_caller_data = data;
2663
2664         oap->oap_page = page;
2665         oap->oap_obj_off = offset;
2666
2667         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2668         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2669         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2670         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2671
2672         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2673
2674         spin_lock_init(&oap->oap_lock);
2675
2676         /* If the page was marked as notcacheable - don't add to any locks */
2677         if (!nocache) {
2678                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2679                 /* This is the only place where we can call cache_add_extent
2680                    without oap_lock, because this page is locked now, and
2681                    the lock we are adding it to is referenced, so cannot lose
2682                    any pages either. */
2683                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2684                 if (rc)
2685                         RETURN(rc);
2686         }
2687
2688         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2689         RETURN(0);
2690 }
2691
2692 struct osc_async_page *oap_from_cookie(void *cookie)
2693 {
2694         struct osc_async_page *oap = cookie;
2695         if (oap->oap_magic != OAP_MAGIC)
2696                 return ERR_PTR(-EINVAL);
2697         return oap;
2698 };
2699
2700 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2701                               struct lov_oinfo *loi, void *cookie,
2702                               int cmd, obd_off off, int count,
2703                               obd_flag brw_flags, enum async_flags async_flags)
2704 {
2705         struct client_obd *cli = &exp->exp_obd->u.cli;
2706         struct osc_async_page *oap;
2707         int rc = 0;
2708         ENTRY;
2709
2710         oap = oap_from_cookie(cookie);
2711         if (IS_ERR(oap))
2712                 RETURN(PTR_ERR(oap));
2713
2714         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2715                 RETURN(-EIO);
2716
2717         if (!list_empty(&oap->oap_pending_item) ||
2718             !list_empty(&oap->oap_urgent_item) ||
2719             !list_empty(&oap->oap_rpc_item))
2720                 RETURN(-EBUSY);
2721
2722         /* check if the file's owner/group is over quota */
2723 #ifdef HAVE_QUOTA_SUPPORT
2724         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2725                 struct obd_async_page_ops *ops;
2726                 struct obdo *oa;
2727
2728                 OBDO_ALLOC(oa);
2729                 if (oa == NULL)
2730                         RETURN(-ENOMEM);
2731
2732                 ops = oap->oap_caller_ops;
2733                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2734                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2735                     NO_QUOTA)
2736                         rc = -EDQUOT;
2737
2738                 OBDO_FREE(oa);
2739                 if (rc)
2740                         RETURN(rc);
2741         }
2742 #endif
2743
2744         if (loi == NULL)
2745                 loi = lsm->lsm_oinfo[0];
2746
2747         client_obd_list_lock(&cli->cl_loi_list_lock);
2748
2749         oap->oap_cmd = cmd;
2750         oap->oap_page_off = off;
2751         oap->oap_count = count;
2752         oap->oap_brw_flags = brw_flags;
2753         oap->oap_async_flags = async_flags;
2754
2755         if (cmd & OBD_BRW_WRITE) {
2756                 rc = osc_enter_cache(cli, loi, oap);
2757                 if (rc) {
2758                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2759                         RETURN(rc);
2760                 }
2761         }
2762
2763         osc_oap_to_pending(oap);
2764         loi_list_maint(cli, loi);
2765
2766         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2767                   cmd);
2768
2769         osc_check_rpcs(cli);
2770         client_obd_list_unlock(&cli->cl_loi_list_lock);
2771
2772         RETURN(0);
2773 }
2774
2775 /* aka (~was & now & flag), but this is more clear :) */
2776 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2777
2778 static int osc_set_async_flags(struct obd_export *exp,
2779                                struct lov_stripe_md *lsm,
2780                                struct lov_oinfo *loi, void *cookie,
2781                                obd_flag async_flags)
2782 {
2783         struct client_obd *cli = &exp->exp_obd->u.cli;
2784         struct loi_oap_pages *lop;
2785         struct osc_async_page *oap;
2786         int rc = 0;
2787         ENTRY;
2788
2789         oap = oap_from_cookie(cookie);
2790         if (IS_ERR(oap))
2791                 RETURN(PTR_ERR(oap));
2792
2793         /*
2794          * bug 7311: OST-side locking is only supported for liblustre for now
2795          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2796          * implementation has to handle case where OST-locked page was picked
2797          * up by, e.g., ->writepage().
2798          */
2799         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2800         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2801                                      * tread here. */
2802
2803         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2804                 RETURN(-EIO);
2805
2806         if (loi == NULL)
2807                 loi = lsm->lsm_oinfo[0];
2808
2809         if (oap->oap_cmd & OBD_BRW_WRITE) {
2810                 lop = &loi->loi_write_lop;
2811         } else {
2812                 lop = &loi->loi_read_lop;
2813         }
2814
2815         client_obd_list_lock(&cli->cl_loi_list_lock);
2816
2817         if (list_empty(&oap->oap_pending_item))
2818                 GOTO(out, rc = -EINVAL);
2819
2820         if ((oap->oap_async_flags & async_flags) == async_flags)
2821                 GOTO(out, rc = 0);
2822
2823         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2824                 oap->oap_async_flags |= ASYNC_READY;
2825
2826         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2827                 if (list_empty(&oap->oap_rpc_item)) {
2828                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2829                         loi_list_maint(cli, loi);
2830                 }
2831         }
2832
2833         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2834                         oap->oap_async_flags);
2835 out:
2836         osc_check_rpcs(cli);
2837         client_obd_list_unlock(&cli->cl_loi_list_lock);
2838         RETURN(rc);
2839 }
2840
2841 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2842                              struct lov_oinfo *loi,
2843                              struct obd_io_group *oig, void *cookie,
2844                              int cmd, obd_off off, int count,
2845                              obd_flag brw_flags,
2846                              obd_flag async_flags)
2847 {
2848         struct client_obd *cli = &exp->exp_obd->u.cli;
2849         struct osc_async_page *oap;
2850         struct loi_oap_pages *lop;
2851         int rc = 0;
2852         ENTRY;
2853
2854         oap = oap_from_cookie(cookie);
2855         if (IS_ERR(oap))
2856                 RETURN(PTR_ERR(oap));
2857
2858         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2859                 RETURN(-EIO);
2860
2861         if (!list_empty(&oap->oap_pending_item) ||
2862             !list_empty(&oap->oap_urgent_item) ||
2863             !list_empty(&oap->oap_rpc_item))
2864                 RETURN(-EBUSY);
2865
2866         if (loi == NULL)
2867                 loi = lsm->lsm_oinfo[0];
2868
2869         client_obd_list_lock(&cli->cl_loi_list_lock);
2870
2871         oap->oap_cmd = cmd;
2872         oap->oap_page_off = off;
2873         oap->oap_count = count;
2874         oap->oap_brw_flags = brw_flags;
2875         oap->oap_async_flags = async_flags;
2876
2877         if (cmd & OBD_BRW_WRITE)
2878                 lop = &loi->loi_write_lop;
2879         else
2880                 lop = &loi->loi_read_lop;
2881
2882         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2883         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2884                 oap->oap_oig = oig;
2885                 rc = oig_add_one(oig, &oap->oap_occ);
2886         }
2887
2888         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2889                   oap, oap->oap_page, rc);
2890
2891         client_obd_list_unlock(&cli->cl_loi_list_lock);
2892
2893         RETURN(rc);
2894 }
2895
2896 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2897                                  struct loi_oap_pages *lop, int cmd)
2898 {
2899         struct list_head *pos, *tmp;
2900         struct osc_async_page *oap;
2901
2902         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2903                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2904                 list_del(&oap->oap_pending_item);
2905                 osc_oap_to_pending(oap);
2906         }
2907         loi_list_maint(cli, loi);
2908 }
2909
2910 static int osc_trigger_group_io(struct obd_export *exp,
2911                                 struct lov_stripe_md *lsm,
2912                                 struct lov_oinfo *loi,
2913                                 struct obd_io_group *oig)
2914 {
2915         struct client_obd *cli = &exp->exp_obd->u.cli;
2916         ENTRY;
2917
2918         if (loi == NULL)
2919                 loi = lsm->lsm_oinfo[0];
2920
2921         client_obd_list_lock(&cli->cl_loi_list_lock);
2922
2923         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2924         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2925
2926         osc_check_rpcs(cli);
2927         client_obd_list_unlock(&cli->cl_loi_list_lock);
2928
2929         RETURN(0);
2930 }
2931
2932 static int osc_teardown_async_page(struct obd_export *exp,
2933                                    struct lov_stripe_md *lsm,
2934                                    struct lov_oinfo *loi, void *cookie)
2935 {
2936         struct client_obd *cli = &exp->exp_obd->u.cli;
2937         struct loi_oap_pages *lop;
2938         struct osc_async_page *oap;
2939         int rc = 0;
2940         ENTRY;
2941
2942         oap = oap_from_cookie(cookie);
2943         if (IS_ERR(oap))
2944                 RETURN(PTR_ERR(oap));
2945
2946         if (loi == NULL)
2947                 loi = lsm->lsm_oinfo[0];
2948
2949         if (oap->oap_cmd & OBD_BRW_WRITE) {
2950                 lop = &loi->loi_write_lop;
2951         } else {
2952                 lop = &loi->loi_read_lop;
2953         }
2954
2955         client_obd_list_lock(&cli->cl_loi_list_lock);
2956
2957         if (!list_empty(&oap->oap_rpc_item))
2958                 GOTO(out, rc = -EBUSY);
2959
2960         osc_exit_cache(cli, oap, 0);
2961         osc_wake_cache_waiters(cli);
2962
2963         if (!list_empty(&oap->oap_urgent_item)) {
2964                 list_del_init(&oap->oap_urgent_item);
2965                 oap->oap_async_flags &= ~ASYNC_URGENT;
2966         }
2967         if (!list_empty(&oap->oap_pending_item)) {
2968                 list_del_init(&oap->oap_pending_item);
2969                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2970         }
2971         loi_list_maint(cli, loi);
2972         cache_remove_extent(cli->cl_cache, oap);
2973
2974         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2975 out:
2976         client_obd_list_unlock(&cli->cl_loi_list_lock);
2977         RETURN(rc);
2978 }
2979
2980 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2981                            struct ldlm_lock_desc *new, void *data,
2982                            int flag)
2983 {
2984         struct lustre_handle lockh = { 0 };
2985         int rc;
2986         ENTRY;
2987
2988         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2989                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2990                 LBUG();
2991         }
2992
2993         switch (flag) {
2994         case LDLM_CB_BLOCKING:
2995                 ldlm_lock2handle(lock, &lockh);
2996                 rc = ldlm_cli_cancel(&lockh);
2997                 if (rc != ELDLM_OK)
2998                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2999                 break;
3000         case LDLM_CB_CANCELING: {
3001
3002                 ldlm_lock2handle(lock, &lockh);
3003                 /* This lock wasn't granted, don't try to do anything */
3004                 if (lock->l_req_mode != lock->l_granted_mode)
3005                         RETURN(0);
3006
3007                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3008                                   &lockh);
3009
3010                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3011                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3012                                                           lock, new, data,flag);
3013                 break;
3014         }
3015         default:
3016                 LBUG();
3017         }
3018
3019         RETURN(0);
3020 }
3021 EXPORT_SYMBOL(osc_extent_blocking_cb);
3022
3023 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3024                                     int flags)
3025 {
3026         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3027
3028         if (lock == NULL) {
3029                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3030                 return;
3031         }
3032         lock_res_and_lock(lock);
3033 #if defined (__KERNEL__) && defined (__linux__)
3034         /* Liang XXX: Darwin and Winnt checking should be added */
3035         if (lock->l_ast_data && lock->l_ast_data != data) {
3036                 struct inode *new_inode = data;
3037                 struct inode *old_inode = lock->l_ast_data;
3038                 if (!(old_inode->i_state & I_FREEING))
3039                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3040                 LASSERTF(old_inode->i_state & I_FREEING,
3041                          "Found existing inode %p/%lu/%u state %lu in lock: "
3042                          "setting data to %p/%lu/%u\n", old_inode,
3043                          old_inode->i_ino, old_inode->i_generation,
3044                          old_inode->i_state,
3045                          new_inode, new_inode->i_ino, new_inode->i_generation);
3046         }
3047 #endif
3048         lock->l_ast_data = data;
3049         unlock_res_and_lock(lock);
3050         LDLM_LOCK_PUT(lock);
3051 }
3052
3053 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3054                              ldlm_iterator_t replace, void *data)
3055 {
3056         struct ldlm_res_id res_id;
3057         struct obd_device *obd = class_exp2obd(exp);
3058
3059         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3060         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3061         return 0;
3062 }
3063
3064 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3065                             struct obd_info *oinfo, int intent, int rc)
3066 {
3067         ENTRY;
3068
3069         if (intent) {
3070                 /* The request was created before ldlm_cli_enqueue call. */
3071                 if (rc == ELDLM_LOCK_ABORTED) {
3072                         struct ldlm_reply *rep;
3073                         rep = req_capsule_server_get(&req->rq_pill,
3074                                                      &RMF_DLM_REP);
3075
3076                         LASSERT(rep != NULL);
3077                         if (rep->lock_policy_res1)
3078                                 rc = rep->lock_policy_res1;
3079                 }
3080         }
3081
3082         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3083                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3084                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3085                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3086                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3087         }
3088
3089         if (!rc)
3090                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3091
3092         /* Call the update callback. */
3093         rc = oinfo->oi_cb_up(oinfo, rc);
3094         RETURN(rc);
3095 }
3096
3097 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3098                                  struct osc_enqueue_args *aa, int rc)
3099 {
3100         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3101         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3102         struct ldlm_lock *lock;
3103
3104         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3105          * be valid. */
3106         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3107
3108         /* Complete obtaining the lock procedure. */
3109         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3110                                    aa->oa_ei->ei_mode,
3111                                    &aa->oa_oi->oi_flags,
3112                                    &lsm->lsm_oinfo[0]->loi_lvb,
3113                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3114                                    lustre_swab_ost_lvb,
3115                                    aa->oa_oi->oi_lockh, rc);
3116
3117         /* Complete osc stuff. */
3118         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3119
3120         /* Release the lock for async request. */
3121         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3122                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3123
3124         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3125                  aa->oa_oi->oi_lockh, req, aa);
3126         LDLM_LOCK_PUT(lock);
3127         return rc;
3128 }
3129
3130 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3131  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3132  * other synchronous requests, however keeping some locks and trying to obtain
3133  * others may take a considerable amount of time in a case of ost failure; and
3134  * when other sync requests do not get released lock from a client, the client
3135  * is excluded from the cluster -- such scenarious make the life difficult, so
3136  * release locks just after they are obtained. */
3137 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3138                        struct ldlm_enqueue_info *einfo,
3139                        struct ptlrpc_request_set *rqset)
3140 {
3141         struct ldlm_res_id res_id;
3142         struct obd_device *obd = exp->exp_obd;
3143         struct ptlrpc_request *req = NULL;
3144         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3145         ldlm_mode_t mode;
3146         int rc;
3147         ENTRY;
3148
3149
3150         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3151                            oinfo->oi_md->lsm_object_gr, &res_id);
3152         /* Filesystem lock extents are extended to page boundaries so that
3153          * dealing with the page cache is a little smoother.  */
3154         oinfo->oi_policy.l_extent.start -=
3155                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3156         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3157
3158         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3159                 goto no_match;
3160
3161         /* Next, search for already existing extent locks that will cover us */
3162         /* If we're trying to read, we also search for an existing PW lock.  The
3163          * VFS and page cache already protect us locally, so lots of readers/
3164          * writers can share a single PW lock.
3165          *
3166          * There are problems with conversion deadlocks, so instead of
3167          * converting a read lock to a write lock, we'll just enqueue a new
3168          * one.
3169          *
3170          * At some point we should cancel the read lock instead of making them
3171          * send us a blocking callback, but there are problems with canceling
3172          * locks out from other users right now, too. */
3173         mode = einfo->ei_mode;
3174         if (einfo->ei_mode == LCK_PR)
3175                 mode |= LCK_PW;
3176         mode = ldlm_lock_match(obd->obd_namespace,
3177                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3178                                einfo->ei_type, &oinfo->oi_policy, mode,
3179                                oinfo->oi_lockh);
3180         if (mode) {
3181                 /* addref the lock only if not async requests and PW lock is
3182                  * matched whereas we asked for PR. */
3183                 if (!rqset && einfo->ei_mode != mode)
3184                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3185                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3186                                         oinfo->oi_flags);
3187                 if (intent) {
3188                         /* I would like to be able to ASSERT here that rss <=
3189                          * kms, but I can't, for reasons which are explained in
3190                          * lov_enqueue() */
3191                 }
3192
3193                 /* We already have a lock, and it's referenced */
3194                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3195
3196                 /* For async requests, decref the lock. */
3197                 if (einfo->ei_mode != mode)
3198                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3199                 else if (rqset)
3200                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3201
3202                 RETURN(ELDLM_OK);
3203         }
3204
3205  no_match:
3206         if (intent) {
3207                 CFS_LIST_HEAD(cancels);
3208                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3209                                            &RQF_LDLM_ENQUEUE_LVB);
3210                 if (req == NULL)
3211                         RETURN(-ENOMEM);
3212
3213                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3214                 if (rc)
3215                         RETURN(rc);
3216
3217                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3218                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3219                 ptlrpc_request_set_replen(req);
3220         }
3221
3222         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3223         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3224
3225         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3226                               &oinfo->oi_policy, &oinfo->oi_flags,
3227                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3228                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3229                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3230                               rqset ? 1 : 0);
3231         if (rqset) {
3232                 if (!rc) {
3233                         struct osc_enqueue_args *aa;
3234                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3235                         aa = ptlrpc_req_async_args(req);
3236                         aa->oa_oi = oinfo;
3237                         aa->oa_ei = einfo;
3238                         aa->oa_exp = exp;
3239
3240                         req->rq_interpret_reply = osc_enqueue_interpret;
3241                         ptlrpc_set_add_req(rqset, req);
3242                 } else if (intent) {
3243                         ptlrpc_req_finished(req);
3244                 }
3245                 RETURN(rc);
3246         }
3247
3248         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3249         if (intent)
3250                 ptlrpc_req_finished(req);
3251
3252         RETURN(rc);
3253 }
3254
3255 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3256                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3257                      int *flags, void *data, struct lustre_handle *lockh)
3258 {
3259         struct ldlm_res_id res_id;
3260         struct obd_device *obd = exp->exp_obd;
3261         int lflags = *flags;
3262         ldlm_mode_t rc;
3263         ENTRY;
3264
3265         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3266
3267         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3268                 RETURN(-EIO);
3269
3270         /* Filesystem lock extents are extended to page boundaries so that
3271          * dealing with the page cache is a little smoother */
3272         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3273         policy->l_extent.end |= ~CFS_PAGE_MASK;
3274
3275         /* Next, search for already existing extent locks that will cover us */
3276         /* If we're trying to read, we also search for an existing PW lock.  The
3277          * VFS and page cache already protect us locally, so lots of readers/
3278          * writers can share a single PW lock. */
3279         rc = mode;
3280         if (mode == LCK_PR)
3281                 rc |= LCK_PW;
3282         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3283                              &res_id, type, policy, rc, lockh);
3284         if (rc) {
3285                 osc_set_data_with_check(lockh, data, lflags);
3286                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3287                         ldlm_lock_addref(lockh, LCK_PR);
3288                         ldlm_lock_decref(lockh, LCK_PW);
3289                 }
3290                 RETURN(rc);
3291         }
3292         RETURN(rc);
3293 }
3294
3295 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3296                       __u32 mode, struct lustre_handle *lockh)
3297 {
3298         ENTRY;
3299
3300         if (unlikely(mode == LCK_GROUP))
3301                 ldlm_lock_decref_and_cancel(lockh, mode);
3302         else
3303                 ldlm_lock_decref(lockh, mode);
3304
3305         RETURN(0);
3306 }
3307
3308 static int osc_cancel_unused(struct obd_export *exp,
3309                              struct lov_stripe_md *lsm, int flags,
3310                              void *opaque)
3311 {
3312         struct obd_device *obd = class_exp2obd(exp);
3313         struct ldlm_res_id res_id, *resp = NULL;
3314
3315         if (lsm != NULL) {
3316                 resp = osc_build_res_name(lsm->lsm_object_id,
3317                                           lsm->lsm_object_gr, &res_id);
3318         }
3319
3320         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3321 }
3322
3323 static int osc_statfs_interpret(const struct lu_env *env,
3324                                 struct ptlrpc_request *req,
3325                                 struct osc_async_args *aa, int rc)
3326 {
3327         struct obd_statfs *msfs;
3328         ENTRY;
3329
3330         if (rc != 0)
3331                 GOTO(out, rc);
3332
3333         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3334         if (msfs == NULL) {
3335                 GOTO(out, rc = -EPROTO);
3336         }
3337
3338         *aa->aa_oi->oi_osfs = *msfs;
3339 out:
3340         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3341         RETURN(rc);
3342 }
3343
3344 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3345                             __u64 max_age, struct ptlrpc_request_set *rqset)
3346 {
3347         struct ptlrpc_request *req;
3348         struct osc_async_args *aa;
3349         int                    rc;
3350         ENTRY;
3351
3352         /* We could possibly pass max_age in the request (as an absolute
3353          * timestamp or a "seconds.usec ago") so the target can avoid doing
3354          * extra calls into the filesystem if that isn't necessary (e.g.
3355          * during mount that would help a bit).  Having relative timestamps
3356          * is not so great if request processing is slow, while absolute
3357          * timestamps are not ideal because they need time synchronization. */
3358         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3359         if (req == NULL)
3360                 RETURN(-ENOMEM);
3361
3362         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3363         if (rc) {
3364                 ptlrpc_request_free(req);
3365                 RETURN(rc);
3366         }
3367         ptlrpc_request_set_replen(req);
3368         req->rq_request_portal = OST_CREATE_PORTAL;
3369         ptlrpc_at_set_req_timeout(req);
3370
3371         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3372                 /* procfs requests not want stat in wait for avoid deadlock */
3373                 req->rq_no_resend = 1;
3374                 req->rq_no_delay = 1;
3375         }
3376
3377         req->rq_interpret_reply = osc_statfs_interpret;
3378         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3379         aa = ptlrpc_req_async_args(req);
3380         aa->aa_oi = oinfo;
3381
3382         ptlrpc_set_add_req(rqset, req);
3383         RETURN(0);
3384 }
3385
3386 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3387                       __u64 max_age, __u32 flags)
3388 {
3389         struct obd_statfs     *msfs;
3390         struct ptlrpc_request *req;
3391         struct obd_import     *imp = NULL;
3392         int rc;
3393         ENTRY;
3394
3395         /*Since the request might also come from lprocfs, so we need
3396          *sync this with client_disconnect_export Bug15684*/
3397         down_read(&obd->u.cli.cl_sem);
3398         if (obd->u.cli.cl_import)
3399                 imp = class_import_get(obd->u.cli.cl_import);
3400         up_read(&obd->u.cli.cl_sem);
3401         if (!imp)
3402                 RETURN(-ENODEV);
3403
3404         /* We could possibly pass max_age in the request (as an absolute
3405          * timestamp or a "seconds.usec ago") so the target can avoid doing
3406          * extra calls into the filesystem if that isn't necessary (e.g.
3407          * during mount that would help a bit).  Having relative timestamps
3408          * is not so great if request processing is slow, while absolute
3409          * timestamps are not ideal because they need time synchronization. */
3410         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3411
3412         class_import_put(imp);
3413
3414         if (req == NULL)
3415                 RETURN(-ENOMEM);
3416
3417         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3418         if (rc) {
3419                 ptlrpc_request_free(req);
3420                 RETURN(rc);
3421         }
3422         ptlrpc_request_set_replen(req);
3423         req->rq_request_portal = OST_CREATE_PORTAL;
3424         ptlrpc_at_set_req_timeout(req);
3425
3426         if (flags & OBD_STATFS_NODELAY) {
3427                 /* procfs requests not want stat in wait for avoid deadlock */
3428                 req->rq_no_resend = 1;
3429                 req->rq_no_delay = 1;
3430         }
3431
3432         rc = ptlrpc_queue_wait(req);
3433         if (rc)
3434                 GOTO(out, rc);
3435
3436         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3437         if (msfs == NULL) {
3438                 GOTO(out, rc = -EPROTO);
3439         }
3440
3441         *osfs = *msfs;
3442
3443         EXIT;
3444  out:
3445         ptlrpc_req_finished(req);
3446         return rc;
3447 }
3448
3449 /* Retrieve object striping information.
3450  *
3451  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3452  * the maximum number of OST indices which will fit in the user buffer.
3453  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3454  */
3455 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3456 {
3457         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3458         struct lov_user_md_v3 lum, *lumk;
3459         struct lov_user_ost_data_v1 *lmm_objects;
3460         int rc = 0, lum_size;
3461         ENTRY;
3462
3463         if (!lsm)
3464                 RETURN(-ENODATA);
3465
3466         /* we only need the header part from user space to get lmm_magic and
3467          * lmm_stripe_count, (the header part is common to v1 and v3) */
3468         lum_size = sizeof(struct lov_user_md_v1);
3469         if (copy_from_user(&lum, lump, lum_size))
3470                 RETURN(-EFAULT);
3471
3472         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3473             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3474                 RETURN(-EINVAL);
3475
3476         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3477         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3478         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3479         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3480
3481         /* we can use lov_mds_md_size() to compute lum_size
3482          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3483         if (lum.lmm_stripe_count > 0) {
3484                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3485                 OBD_ALLOC(lumk, lum_size);
3486                 if (!lumk)
3487                         RETURN(-ENOMEM);
3488
3489                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3490                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3491                 else
3492                         lmm_objects = &(lumk->lmm_objects[0]);
3493                 lmm_objects->l_object_id = lsm->lsm_object_id;
3494         } else {
3495                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3496                 lumk = &lum;
3497         }
3498
3499         lumk->lmm_object_id = lsm->lsm_object_id;
3500         lumk->lmm_object_gr = lsm->lsm_object_gr;
3501         lumk->lmm_stripe_count = 1;
3502
3503         if (copy_to_user(lump, lumk, lum_size))
3504                 rc = -EFAULT;
3505
3506         if (lumk != &lum)
3507                 OBD_FREE(lumk, lum_size);
3508
3509         RETURN(rc);
3510 }
3511
3512
3513 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3514                          void *karg, void *uarg)
3515 {
3516         struct obd_device *obd = exp->exp_obd;
3517         struct obd_ioctl_data *data = karg;
3518         int err = 0;
3519         ENTRY;
3520
3521         if (!try_module_get(THIS_MODULE)) {
3522                 CERROR("Can't get module. Is it alive?");
3523                 return -EINVAL;
3524         }
3525         switch (cmd) {
3526         case OBD_IOC_LOV_GET_CONFIG: {
3527                 char *buf;
3528                 struct lov_desc *desc;
3529                 struct obd_uuid uuid;
3530
3531                 buf = NULL;
3532                 len = 0;
3533                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3534                         GOTO(out, err = -EINVAL);
3535
3536                 data = (struct obd_ioctl_data *)buf;
3537
3538                 if (sizeof(*desc) > data->ioc_inllen1) {
3539                         obd_ioctl_freedata(buf, len);
3540                         GOTO(out, err = -EINVAL);
3541                 }
3542
3543                 if (data->ioc_inllen2 < sizeof(uuid)) {
3544                         obd_ioctl_freedata(buf, len);
3545                         GOTO(out, err = -EINVAL);
3546                 }
3547
3548                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3549                 desc->ld_tgt_count = 1;
3550                 desc->ld_active_tgt_count = 1;
3551                 desc->ld_default_stripe_count = 1;
3552                 desc->ld_default_stripe_size = 0;
3553                 desc->ld_default_stripe_offset = 0;
3554                 desc->ld_pattern = 0;
3555                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3556
3557                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3558
3559                 err = copy_to_user((void *)uarg, buf, len);
3560                 if (err)
3561                         err = -EFAULT;
3562                 obd_ioctl_freedata(buf, len);
3563                 GOTO(out, err);
3564         }
3565         case LL_IOC_LOV_SETSTRIPE:
3566                 err = obd_alloc_memmd(exp, karg);
3567                 if (err > 0)
3568                         err = 0;
3569                 GOTO(out, err);
3570         case LL_IOC_LOV_GETSTRIPE:
3571                 err = osc_getstripe(karg, uarg);
3572                 GOTO(out, err);
3573         case OBD_IOC_CLIENT_RECOVER:
3574                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3575                                             data->ioc_inlbuf1);
3576                 if (err > 0)
3577                         err = 0;
3578                 GOTO(out, err);
3579         case IOC_OSC_SET_ACTIVE:
3580                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3581                                                data->ioc_offset);
3582                 GOTO(out, err);
3583         case OBD_IOC_POLL_QUOTACHECK:
3584                 err = lquota_poll_check(quota_interface, exp,
3585                                         (struct if_quotacheck *)karg);
3586                 GOTO(out, err);
3587         default:
3588                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3589                        cmd, cfs_curproc_comm());
3590                 GOTO(out, err = -ENOTTY);
3591         }
3592 out:
3593         module_put(THIS_MODULE);
3594         return err;
3595 }
3596
3597 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3598                         void *key, __u32 *vallen, void *val,
3599                         struct lov_stripe_md *lsm)
3600 {
3601         ENTRY;
3602         if (!vallen || !val)
3603                 RETURN(-EFAULT);
3604
3605         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3606                 __u32 *stripe = val;
3607                 *vallen = sizeof(*stripe);
3608                 *stripe = 0;
3609                 RETURN(0);
3610         } else if (KEY_IS(KEY_LAST_ID)) {
3611                 struct ptlrpc_request *req;
3612                 obd_id                *reply;
3613                 char                  *tmp;
3614                 int                    rc;
3615
3616                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3617                                            &RQF_OST_GET_INFO_LAST_ID);
3618                 if (req == NULL)
3619                         RETURN(-ENOMEM);
3620
3621                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3622                                      RCL_CLIENT, keylen);
3623                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3624                 if (rc) {
3625                         ptlrpc_request_free(req);
3626                         RETURN(rc);
3627                 }
3628
3629                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3630                 memcpy(tmp, key, keylen);
3631
3632                 ptlrpc_request_set_replen(req);
3633                 rc = ptlrpc_queue_wait(req);
3634                 if (rc)
3635                         GOTO(out, rc);
3636
3637                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3638                 if (reply == NULL)
3639                         GOTO(out, rc = -EPROTO);
3640
3641                 *((obd_id *)val) = *reply;
3642         out:
3643                 ptlrpc_req_finished(req);
3644                 RETURN(rc);
3645         } else if (KEY_IS(KEY_FIEMAP)) {
3646                 struct ptlrpc_request *req;
3647                 struct ll_user_fiemap *reply;
3648                 char *tmp;
3649                 int rc;
3650
3651                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3652                                            &RQF_OST_GET_INFO_FIEMAP);
3653                 if (req == NULL)
3654                         RETURN(-ENOMEM);
3655
3656                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3657                                      RCL_CLIENT, keylen);
3658                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3659                                      RCL_CLIENT, *vallen);
3660                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3661                                      RCL_SERVER, *vallen);
3662
3663                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3664                 if (rc) {
3665                         ptlrpc_request_free(req);
3666                         RETURN(rc);
3667                 }
3668
3669                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3670                 memcpy(tmp, key, keylen);
3671                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3672                 memcpy(tmp, val, *vallen);
3673
3674                 ptlrpc_request_set_replen(req);
3675                 rc = ptlrpc_queue_wait(req);
3676                 if (rc)
3677                         GOTO(out1, rc);
3678
3679                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3680                 if (reply == NULL)
3681                         GOTO(out1, rc = -EPROTO);
3682
3683                 memcpy(val, reply, *vallen);
3684         out1:
3685                 ptlrpc_req_finished(req);
3686
3687                 RETURN(rc);
3688         }
3689
3690         RETURN(-EINVAL);
3691 }
3692
3693 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3694                                           void *aa, int rc)
3695 {
3696         struct llog_ctxt *ctxt;
3697         struct obd_import *imp = req->rq_import;
3698         ENTRY;
3699
3700         if (rc != 0)
3701                 RETURN(rc);
3702
3703         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3704         if (ctxt) {
3705                 if (rc == 0)
3706                         rc = llog_initiator_connect(ctxt);
3707                 else
3708                         CERROR("cannot establish connection for "
3709                                "ctxt %p: %d\n", ctxt, rc);
3710         }
3711
3712         llog_ctxt_put(ctxt);
3713         spin_lock(&imp->imp_lock);
3714         imp->imp_server_timeout = 1;
3715         imp->imp_pingable = 1;
3716         spin_unlock(&imp->imp_lock);
3717         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3718
3719         RETURN(rc);
3720 }
3721
3722 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3723                               void *key, obd_count vallen, void *val,
3724                               struct ptlrpc_request_set *set)
3725 {
3726         struct ptlrpc_request *req;
3727         struct obd_device     *obd = exp->exp_obd;
3728         struct obd_import     *imp = class_exp2cliimp(exp);
3729         char                  *tmp;
3730         int                    rc;
3731         ENTRY;
3732
3733         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3734
3735         if (KEY_IS(KEY_NEXT_ID)) {
3736                 if (vallen != sizeof(obd_id))
3737                         RETURN(-ERANGE);
3738                 if (val == NULL)
3739                         RETURN(-EINVAL);
3740                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3741                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3742                        exp->exp_obd->obd_name,
3743                        obd->u.cli.cl_oscc.oscc_next_id);
3744
3745                 RETURN(0);
3746         }
3747
3748         if (KEY_IS(KEY_UNLINKED)) {
3749                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3750                 spin_lock(&oscc->oscc_lock);
3751                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3752                 spin_unlock(&oscc->oscc_lock);
3753                 RETURN(0);
3754         }
3755
3756         if (KEY_IS(KEY_INIT_RECOV)) {
3757                 if (vallen != sizeof(int))
3758                         RETURN(-EINVAL);
3759                 spin_lock(&imp->imp_lock);
3760                 imp->imp_initial_recov = *(int *)val;
3761                 spin_unlock(&imp->imp_lock);
3762                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3763                        exp->exp_obd->obd_name,
3764                        imp->imp_initial_recov);
3765                 RETURN(0);
3766         }
3767
3768         if (KEY_IS(KEY_CHECKSUM)) {
3769                 if (vallen != sizeof(int))
3770                         RETURN(-EINVAL);
3771                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3772                 RETURN(0);
3773         }
3774
3775         if (KEY_IS(KEY_FLUSH_CTX)) {
3776                 sptlrpc_import_flush_my_ctx(imp);
3777                 RETURN(0);
3778         }
3779
3780         if (!set)
3781                 RETURN(-EINVAL);
3782
3783         /* We pass all other commands directly to OST. Since nobody calls osc
3784            methods directly and everybody is supposed to go through LOV, we
3785            assume lov checked invalid values for us.
3786            The only recognised values so far are evict_by_nid and mds_conn.
3787            Even if something bad goes through, we'd get a -EINVAL from OST
3788            anyway. */
3789
3790
3791         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3792         if (req == NULL)
3793                 RETURN(-ENOMEM);
3794
3795         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3796                              RCL_CLIENT, keylen);
3797         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3798                              RCL_CLIENT, vallen);
3799         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3800         if (rc) {
3801                 ptlrpc_request_free(req);
3802                 RETURN(rc);
3803         }
3804
3805         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3806         memcpy(tmp, key, keylen);
3807         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3808         memcpy(tmp, val, vallen);
3809
3810         if (KEY_IS(KEY_MDS_CONN)) {
3811                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3812
3813                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3814                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3815                 LASSERT(oscc->oscc_oa.o_gr > 0);
3816                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3817         }
3818
3819         ptlrpc_request_set_replen(req);
3820         ptlrpc_set_add_req(set, req);
3821         ptlrpc_check_set(set);
3822
3823         RETURN(0);
3824 }
3825
3826
3827 static struct llog_operations osc_size_repl_logops = {
3828         lop_cancel: llog_obd_repl_cancel
3829 };
3830
3831 static struct llog_operations osc_mds_ost_orig_logops;
3832 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3833                          struct obd_device *tgt, int count,
3834                          struct llog_catid *catid, struct obd_uuid *uuid)
3835 {
3836         int rc;
3837         ENTRY;
3838
3839         LASSERT(olg == &obd->obd_olg);
3840         spin_lock(&obd->obd_dev_lock);
3841         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3842                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3843                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3844                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3845                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3846                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3847         }
3848         spin_unlock(&obd->obd_dev_lock);
3849
3850         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3851                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3852         if (rc) {
3853                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3854                 GOTO (out, rc);
3855         }
3856
3857         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3858                         NULL, &osc_size_repl_logops);
3859         if (rc)
3860                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3861 out:
3862         if (rc) {
3863                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3864                        obd->obd_name, tgt->obd_name, count, catid, rc);
3865                 CERROR("logid "LPX64":0x%x\n",
3866                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3867         }
3868         RETURN(rc);
3869 }
3870
3871 static int osc_llog_finish(struct obd_device *obd, int count)
3872 {
3873         struct llog_ctxt *ctxt;
3874         int rc = 0, rc2 = 0;
3875         ENTRY;
3876
3877         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3878         if (ctxt)
3879                 rc = llog_cleanup(ctxt);
3880
3881         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3882         if (ctxt)
3883                 rc2 = llog_cleanup(ctxt);
3884         if (!rc)
3885                 rc = rc2;
3886
3887         RETURN(rc);
3888 }
3889
3890 static int osc_reconnect(const struct lu_env *env,
3891                          struct obd_export *exp, struct obd_device *obd,
3892                          struct obd_uuid *cluuid,
3893                          struct obd_connect_data *data)
3894 {
3895         struct client_obd *cli = &obd->u.cli;
3896
3897         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3898                 long lost_grant;
3899
3900                 client_obd_list_lock(&cli->cl_loi_list_lock);
3901                 data->ocd_grant = cli->cl_avail_grant ?:
3902                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3903                 lost_grant = cli->cl_lost_grant;
3904                 cli->cl_lost_grant = 0;
3905                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3906
3907                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3908                        "cl_lost_grant: %ld\n", data->ocd_grant,
3909                        cli->cl_avail_grant, lost_grant);
3910                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3911                        " ocd_grant: %d\n", data->ocd_connect_flags,
3912                        data->ocd_version, data->ocd_grant);
3913         }
3914
3915         RETURN(0);
3916 }
3917
3918 static int osc_disconnect(struct obd_export *exp)
3919 {
3920         struct obd_device *obd = class_exp2obd(exp);
3921         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3922         int rc;
3923
3924         if (obd->u.cli.cl_conn_count == 1)
3925                 /* flush any remaining cancel messages out to the target */
3926                 llog_sync(ctxt, exp);
3927
3928         llog_ctxt_put(ctxt);
3929
3930         rc = client_disconnect_export(exp);
3931         return rc;
3932 }
3933
3934 static int osc_import_event(struct obd_device *obd,
3935                             struct obd_import *imp,
3936                             enum obd_import_event event)
3937 {
3938         struct client_obd *cli;
3939         int rc = 0;
3940
3941         ENTRY;
3942         LASSERT(imp->imp_obd == obd);
3943
3944         switch (event) {
3945         case IMP_EVENT_DISCON: {
3946                 /* Only do this on the MDS OSC's */
3947                 if (imp->imp_server_timeout) {
3948                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3949
3950                         spin_lock(&oscc->oscc_lock);
3951                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3952                         spin_unlock(&oscc->oscc_lock);
3953                 }
3954                 cli = &obd->u.cli;
3955                 client_obd_list_lock(&cli->cl_loi_list_lock);
3956                 cli->cl_avail_grant = 0;
3957                 cli->cl_lost_grant = 0;
3958                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3959                 break;
3960         }
3961         case IMP_EVENT_INACTIVE: {
3962                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3963                 break;
3964         }
3965         case IMP_EVENT_INVALIDATE: {
3966                 struct ldlm_namespace *ns = obd->obd_namespace;
3967
3968                 /* Reset grants */
3969                 cli = &obd->u.cli;
3970                 client_obd_list_lock(&cli->cl_loi_list_lock);
3971                 /* all pages go to failing rpcs due to the invalid import */
3972                 osc_check_rpcs(cli);
3973                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3974
3975                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3976
3977                 break;
3978         }
3979         case IMP_EVENT_ACTIVE: {
3980                 /* Only do this on the MDS OSC's */
3981                 if (imp->imp_server_timeout) {
3982                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3983
3984                         spin_lock(&oscc->oscc_lock);
3985                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3986                         spin_unlock(&oscc->oscc_lock);
3987                 }
3988                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3989                 break;
3990         }
3991         case IMP_EVENT_OCD: {
3992                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3993
3994                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3995                         osc_init_grant(&obd->u.cli, ocd);
3996
3997                 /* See bug 7198 */
3998                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3999                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4000
4001                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4002                 break;
4003         }
4004         default:
4005                 CERROR("Unknown import event %d\n", event);
4006                 LBUG();
4007         }
4008         RETURN(rc);
4009 }
4010
4011 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4012 {
4013         int rc;
4014         ENTRY;
4015
4016         ENTRY;
4017         rc = ptlrpcd_addref();
4018         if (rc)
4019                 RETURN(rc);
4020
4021         rc = client_obd_setup(obd, lcfg);
4022         if (rc) {
4023                 ptlrpcd_decref();
4024         } else {
4025                 struct lprocfs_static_vars lvars = { 0 };
4026                 struct client_obd *cli = &obd->u.cli;
4027
4028                 lprocfs_osc_init_vars(&lvars);
4029                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4030                         lproc_osc_attach_seqstat(obd);
4031                         sptlrpc_lprocfs_cliobd_attach(obd);
4032                         ptlrpc_lprocfs_register_obd(obd);
4033                 }
4034
4035                 oscc_init(obd);
4036                 /* We need to allocate a few requests more, because
4037                    brw_interpret tries to create new requests before freeing
4038                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4039                    reserved, but I afraid that might be too much wasted RAM
4040                    in fact, so 2 is just my guess and still should work. */
4041                 cli->cl_import->imp_rq_pool =
4042                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4043                                             OST_MAXREQSIZE,
4044                                             ptlrpc_add_rqs_to_pool);
4045                 cli->cl_cache = cache_create(obd);
4046                 if (!cli->cl_cache) {
4047                         osc_cleanup(obd);
4048                         rc = -ENOMEM;
4049                 }
4050         }
4051
4052         RETURN(rc);
4053 }
4054
4055 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4056 {
4057         int rc = 0;
4058         ENTRY;
4059
4060         switch (stage) {
4061         case OBD_CLEANUP_EARLY: {
4062                 struct obd_import *imp;
4063                 imp = obd->u.cli.cl_import;
4064                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4065                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4066                 ptlrpc_deactivate_import(imp);
4067                 spin_lock(&imp->imp_lock);
4068                 imp->imp_pingable = 0;
4069                 spin_unlock(&imp->imp_lock);
4070                 break;
4071         }
4072         case OBD_CLEANUP_EXPORTS: {
4073                 /* If we set up but never connected, the
4074                    client import will not have been cleaned. */
4075                 if (obd->u.cli.cl_import) {
4076                         struct obd_import *imp;
4077                         imp = obd->u.cli.cl_import;
4078                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4079                                obd->obd_name);
4080                         ptlrpc_invalidate_import(imp);
4081                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
4082                         class_destroy_import(imp);
4083                         obd->u.cli.cl_import = NULL;
4084                 }
4085                 rc = obd_llog_finish(obd, 0);
4086                 if (rc != 0)
4087                         CERROR("failed to cleanup llogging subsystems\n");
4088                 break;
4089                 }
4090         }
4091         RETURN(rc);
4092 }
4093
4094 int osc_cleanup(struct obd_device *obd)
4095 {
4096         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4097         int rc;
4098
4099         ENTRY;
4100         ptlrpc_lprocfs_unregister_obd(obd);
4101         lprocfs_obd_cleanup(obd);
4102
4103         spin_lock(&oscc->oscc_lock);
4104         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4105         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4106         spin_unlock(&oscc->oscc_lock);
4107
4108         /* free memory of osc quota cache */
4109         lquota_cleanup(quota_interface, obd);
4110
4111         cache_destroy(obd->u.cli.cl_cache);
4112         rc = client_obd_cleanup(obd);
4113
4114         ptlrpcd_decref();
4115         RETURN(rc);
4116 }
4117
4118 static int osc_register_page_removal_cb(struct obd_export *exp,
4119                                         obd_page_removal_cb_t func,
4120                                         obd_pin_extent_cb pin_cb)
4121 {
4122         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4123                                            pin_cb);
4124 }
4125
4126 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4127                                           obd_page_removal_cb_t func)
4128 {
4129         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4130 }
4131
4132 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4133                                        obd_lock_cancel_cb cb)
4134 {
4135         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4136
4137         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4138         return 0;
4139 }
4140
4141 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4142                                          obd_lock_cancel_cb cb)
4143 {
4144         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4145                 CERROR("Unregistering cancel cb %p, while only %p was "
4146                        "registered\n", cb,
4147                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4148                 RETURN(-EINVAL);
4149         }
4150
4151         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4152         return 0;
4153 }
4154
4155 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4156 {
4157         struct lustre_cfg *lcfg = buf;
4158         struct lprocfs_static_vars lvars = { 0 };
4159         int rc = 0;
4160
4161         lprocfs_osc_init_vars(&lvars);
4162
4163         switch (lcfg->lcfg_command) {
4164         case LCFG_SPTLRPC_CONF:
4165                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4166                 break;
4167         default:
4168                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4169                                               lcfg, obd);
4170                 break;
4171         }
4172
4173         return(rc);
4174 }
4175
4176 struct obd_ops osc_obd_ops = {
4177         .o_owner                = THIS_MODULE,
4178         .o_setup                = osc_setup,
4179         .o_precleanup           = osc_precleanup,
4180         .o_cleanup              = osc_cleanup,
4181         .o_add_conn             = client_import_add_conn,
4182         .o_del_conn             = client_import_del_conn,
4183         .o_connect              = client_connect_import,
4184         .o_reconnect            = osc_reconnect,
4185         .o_disconnect           = osc_disconnect,
4186         .o_statfs               = osc_statfs,
4187         .o_statfs_async         = osc_statfs_async,
4188         .o_packmd               = osc_packmd,
4189         .o_unpackmd             = osc_unpackmd,
4190         .o_precreate            = osc_precreate,
4191         .o_create               = osc_create,
4192         .o_destroy              = osc_destroy,
4193         .o_getattr              = osc_getattr,
4194         .o_getattr_async        = osc_getattr_async,
4195         .o_setattr              = osc_setattr,
4196         .o_setattr_async        = osc_setattr_async,
4197         .o_brw                  = osc_brw,
4198         .o_brw_async            = osc_brw_async,
4199         .o_prep_async_page      = osc_prep_async_page,
4200         .o_reget_short_lock     = osc_reget_short_lock,
4201         .o_release_short_lock   = osc_release_short_lock,
4202         .o_queue_async_io       = osc_queue_async_io,
4203         .o_set_async_flags      = osc_set_async_flags,
4204         .o_queue_group_io       = osc_queue_group_io,
4205         .o_trigger_group_io     = osc_trigger_group_io,
4206         .o_teardown_async_page  = osc_teardown_async_page,
4207         .o_punch                = osc_punch,
4208         .o_sync                 = osc_sync,
4209         .o_enqueue              = osc_enqueue,
4210         .o_match                = osc_match,
4211         .o_change_cbdata        = osc_change_cbdata,
4212         .o_cancel               = osc_cancel,
4213         .o_cancel_unused        = osc_cancel_unused,
4214         .o_iocontrol            = osc_iocontrol,
4215         .o_get_info             = osc_get_info,
4216         .o_set_info_async       = osc_set_info_async,
4217         .o_import_event         = osc_import_event,
4218         .o_llog_init            = osc_llog_init,
4219         .o_llog_finish          = osc_llog_finish,
4220         .o_process_config       = osc_process_config,
4221         .o_register_page_removal_cb = osc_register_page_removal_cb,
4222         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4223         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4224         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4225 };
4226 int __init osc_init(void)
4227 {
4228         struct lprocfs_static_vars lvars = { 0 };
4229         int rc;
4230         ENTRY;
4231
4232         lprocfs_osc_init_vars(&lvars);
4233
4234         request_module("lquota");
4235         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4236         lquota_init(quota_interface);
4237         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4238
4239         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4240                                  LUSTRE_OSC_NAME, NULL);
4241         if (rc) {
4242                 if (quota_interface)
4243                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4244                 RETURN(rc);
4245         }
4246
4247         RETURN(rc);
4248 }
4249
4250 #ifdef __KERNEL__
4251 static void /*__exit*/ osc_exit(void)
4252 {
4253         lquota_exit(quota_interface);
4254         if (quota_interface)
4255                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4256
4257         class_unregister_type(LUSTRE_OSC_NAME);
4258 }
4259
4260 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4261 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4262 MODULE_LICENSE("GPL");
4263
4264 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4265 #endif